-
-
Notifications
You must be signed in to change notification settings - Fork 11
chain()
chain()
, which is the default export of the stream-chain
package, is a factory function that returns a stream based on Duplex. It chains its dependents in a single pipeline optionally binding error
events. It accepts an array of streams of functions and combines them together efficiently. The result can be used as a regular stream.
Many details about this package can be discovered by looking at test files located in tests/
and in the source code (index.js
).
The function accepts the following arguments:
-
fns
is an array. Its items can be one of:- A function. Allowed: regular functions, asynchronous functions, generator functions, asynchronous generator functions.
- Transform stream.
- Duplex stream.
- The very first stream can be Readable.
- The very last stream can be Writable.
- An array of functions, streams, or other arrays. Such arrays will be flattened and their elements are included verbatim.
- All falsy values are simply ignored.
-
(Since 3.1.0) A web stream object. It is adapted to a corresponding Node stream:
-
ReadableStream
⇒Readable
(for the very first stream). -
WritableStream
⇒Writable
(for the very last stream). -
{readable, writable}
pair ⇒Duplex
. - Notes:
- As of 8/24/2024 Node's support for web streams is still mostly experimental.
- The
fromWeb()
functions are used to adapt web streams to Node streams with the{objectMode: true}
option. If you want to specify something different, you can use thefromWeb()
functions directly.
-
- Notes on how different values are handled can be found below.
-
options
is an optional object detailed in the Node's documentation.- If
options
is not specified, or falsy, it is assumed to be an empty object. - If it doesn't specify
writableObjectMode
it is assumed to betrue
. - If it doesn't specify
readableObjectMode
it is assumed to betrue
. - Always make sure that
writableObjectMode
is the same as the corresponding object mode of the first stream, andreadableObjectMode
is the same as the corresponding object mode of the last stream.- Eventually both these modes can be deduced, but Node does not define the standard way to determine it, so currently it cannot be done reliably.
- Additionally the following custom properties are recognized:
-
noGrouping
is an optional boolean flag. If it is falsy (the default), all subsequent functions are going to be grouped together usingchain.gen()
function. The grouping of functions usually produces faster pipelines. Otherwise, every function will be wrapped as a separate steam. -
skipEvents
is an optional boolean flag. If it is falsy (the default),'error'
events from all streams are forwarded to the created instance. If it is truthy, no event forwarding is made. A user can always do so externally or in a constructor of derived classes.
-
- If
A resulting instance can be used to attach handlers for stream events.
import chain from 'stream-chain';
// const {chain} = require('stream-chain');
const pipeline = chain([x => x * x, x => [x - 1, x, x + 1]]);
pipeline.on('error', error => console.error(error));
dataSource.pipe(pipeline);
Streams are used as is.
(Since 3.1.0) A web stream object can be used as a regular Node stream. It will be adapted automatically
to a corresponding Node stream with the {objectMode: true}
option.
Functions can be grouped directly or indirectly (see options.noGrouping
above) using gen(). This function produces an asynchronous generator function, which can consume values and produce multiple (or no) results.
chain()
can extract grouped functions and re-group them for efficiency. Users can create
custom pipelines and do not pay the price of the grouping.
Functions are called in the order they are passed to chain()
with two arguments: chunk
and encoding
(see Node's documentation). The result of the function call is passed to the next function in the chain.
Values returned by a regular function can be interpreted differently. If a function throws an exception, it will be caught and passed to a callback generating a stream error.
It means "no value was produced", which effectively terminates the processing of the current value. Nothing will be passed to the next function in the chain.
This value means that nothing will be passed to the next function in the chain and the iterations will be stopped. It is usually used to terminate potentially infinite loops.
Important: chain.stop
works only within function chain segments created by gen() or
fun(). The native streams do not support this feature treating it as chain.none
.
Asynchronous functions can return a Promise or "thenable" (an object with a property then()
). The processing will be delayed until the promise resolves or fails.
It can be a generator or an object with a property named next()
. It will be iterated according to the generator protocol.
This case covers generator functions.
next()
can return a Promise
according to the asynchronous generator protocol.
This case covers asynchronous generator functions.
chain.many()
is a wrapper for an array holding multiple values (0 or more).
It cannot contain any special values. All values will be pushed to a stream sequentially.
chain.finalValue()
is a wrapper for a single value. This value is not passed to
the next function in the chain. Instead it will be used as a final value of the chain.
Important: chain.finalValue()
works only within function chain segments created by gen() or
fun(). The native streams do not support this feature treating its payload as a regular value.
Any other value is passed to the next function in the chain unchanged.
The following public properties are available: streams
, input
, output
.
streams
is an array of streams created by the constructor. Its values either Transform streams that use corresponding functions from a constructor parameter, or user-provided streams. All streams are already piped sequentially starting from the beginning.
This array is provided mainly to attach event handlers to individual components.
input
is the beginning of the pipeline. Effectively it is the first item of streams
.
output
is the end of the pipeline. Effectively it is the last item of streams
.
Generally, a Chain
instance should be used to represent a chain:
const pipeline = chain([
x => x * x,
x => [x - 1, x, x + 1],
new Transform({
writableObjectMode: true,
transform(chunk, _, callback) {
callback(null, chunk.toString());
}
})
]);
dataSource
.pipe(pipeline)
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('output.txt.gz'));
But in some cases input
and output
provide better control over how a data processing pipeline should be organized:
pipeline.output
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('output.txt.gz'));
dataSource.pipe(pipeline.input);
Please select what style you want to use, and never mix them together with the same object!
For convenience of users, the module defines static properties on chain
imported from other modules:
-
gen(...fns)
from gen module. -
asStream(fn)
from asStream module. - Multiple imports from defs module:
- Symbols:
none
,stop
,finalSymbol
,manySymbol
,flushSymbol
,fListSymbol
. - Errors:
Stop
. - Makers of return values:
finalValue(value)
,many(values)
. - Testers:
isFinalValue(value)
,isMany(value)
,isFlushable(value)
,isFunctionList(value)
. - Getters:
getFinalValue(value)
,getManyValues(value)
,getFunctionList(value)
. - Setters:
flushable(value, final)
,setFunctionList(value, fns)
.
- Symbols:
See corresponding modules for details.
This is a helper function. It takes a function or an iterable object and returns a function:
- If it is a function, it returns the function.
- If it is an asynchronously iterable object, it returns a function that returns an asynchronous iterator.
- If it is an iterable object, it returns a function that returns an iterator.
The returned iterator function is bound to its object.