Skip to content
Eugene Lazutkin edited this page Jan 18, 2025 · 20 revisions

This is the home of the 3.x version. The home for 2.x can be found here.

Dashboard

Node.js CI NPM version

About

stream-chain bridges JavaScript functions with streams efficiently. It creates a chain of streams out of regular functions, asynchronous functions, generators, and existing streams, while properly handling backpressure. The resulting chain is represented as a Duplex stream, which can be combined with other streams in the usual way.

It is purpose is to simplify creating powerful object-processing pipelines. It eliminates a boilerplate helping to concentrate on the functionality without losing the performance.

stream-chain is a lightweight, no-dependencies micro-package. It is distributed under New BSD license.

Why?

Making processing pipelines appears to be easy: just chain functions one after another, and we are done. Real life pipelines filter objects out and/or produce more objects out of a few ones. On top of that, we have to deal with asynchronous operations, while processing or producing data: networking, databases, files, user responses, and so on. An unequal number of values per stage and unequal throughput of stages introduced problems like backpressure, which requires algorithms implemented by streams.

While many API improvements were made to make streams easy to use, in reality, a lot of boilerplate is required when creating a pipeline. stream-chain eliminates many of them helping to keep your programs uncluttered and beautiful.

Documentation

  • Intro by examples — see what it can do!
  • chain() — the heart of the package (imported as stream-chain).
  • Transducers:
    • gen() — creates an asynchronous generator from a list of functions.
    • fun() — creates an asynchronous function from a list of functions.
  • Adapters:
    • asStream() — converts a function into a transforming stream.
  • Various useful helpers:
    • Slicing utilities:
      • take() — takes only a specified number of items from the stream.
      • takeWhile() — takes items while a condition is true.
      • takeWithSkip() — skips, then takes a specified number of items.
      • skip() — skips a specified number of items.
      • skipWhile() — skips items while a condition is true.
    • Fold AKA reduce:
      • fold() — accumulates items using a supplied reducer function until its stream is finished, then produces a result.
      • reduce() — an alias for fold().
      • reduceStream() — accumulates items using a supplied reducer function. The current value of the accumulator is available as a property on the stream's instance.
      • scan() — updates an accumulator with each item and passes the current value of the accumulator. It is a companion to fold().
    • Stream adapters:
      • readableFrom() — converts an iterable/iterator into a readable stream.
    • Stream helpers:
      • fixUtf8Stream() — makes sure that chunks don't break multi-byte UTF-8 characters by repartitioning them.
      • lines() — emits lines from a stream of bytes line by line.
      • batch() — groups items into batches.
    • Typed streams — TypeScript helpers for working with typed streams.
Clone this wiki locally