Pipe an input through a series of type-safe routines and tasks to produce an output, or simply, run logic in a series of stages.
|Called after the pipeline executes work units.|
|Called before the pipeline executes work units.|
|Called before a single work unit is executed.|
|Called after |
|Called while a command is being executed.|
|Called when an execution fails.|
|Called when an execution succeeds.|
|Called before a work unit is executed. Can return |
|Called when an execution is skipped.|
A pipeline can be used to process an input, either in parallel or serial, through a series of actions known as work units, to produce an output. If you don't need an input, but merely need to process work in stages, the pipeline supports that as well. There are multiple types of work units and pipelines, so choose the best one for each use case.
To begin, instantiate a pipeline with a context, and an optional input value.
Once instantiated, we must register work units (either a task or routine) that
will process the input value, either with
work units require a descriptive title, and are passed the context and current value when being
And to finish, we can execute our pipeline to process each work unit and produce the final output value.
Context is merely a plain class that provides contextual information to all work units, and is
passed as the 1st argument when executing. It's highly encouraged to create custom contexts with
typed properties, helper methods, and more.
A good example of context usage can be found in the Beemo project.
A unique feature of contexts is the ability to deep clone itself using
method is extremely useful when a context of the same shape must be passed to another pipeline
without collisions or mutations occurring.
The input type is inferred from the 2nd constructor argument, while the output type defaults to the input type. If you need to customize either the input or output type manually, the pipeline generics can be customized upon instantiation.
There are 2 types of work units that can be registered in a pipeline.
A task is simply a function/method (in any form) that accepts an input and returns an output. It can
be represented by a standard function or a
Routine is a specialized work unit implemented with a class. It provides helper methods, the
ability to create nested hierarchical pipelines, and an implicit encapsulation of similar logic and
To begin, import the
Routine class and implement the
methods. The class requires 3 generics to be defined, starting with an output type (defaults to
unknown), an input type (defaults to
unknown), and an options interface (defaults to an empty
Routine#blueprint() method is inherited from
Contract, and should
return an object that matches the structure of the generic options interface. The
Routine#execute() method should accept the input type and return the expected output type.
When instantiating a routine, a unique key and title must be provided, both of which are primarily used for streaming to a console. An options object can be passed as the 3rd argument.
The most prominent feature of
Routine is the ability to create hierarchical pipelines that can be
nested or executed in any fashion. This can be achieved with the
createWaterfallPipeline() methods, all of which require a context and an initial
Routine#depth property denotes the current depth within the hierarchy tree, while
Routine#index is the current index amongst all work at the same depth.
Routine#executeCommand() method can be used to execute binaries and commands on the host
machine (it uses execa under the hood). This is extremely
useful for executing locally installed NPM/Yarn binaries.
There are 4 types of pipelines, grouped into parallel and serial based patterns.
Parallel pipelines register work units with
ParallelPipeline#add(), and process the work units in
parallel when executing
Executes all work units in parallel, and returns a list of values once all resolve. If an error occurs, the pipeline will be interrupted.
This pipeline will run all work units at once. If there are far too many work units, it may degrade performance. In that case, use PooledPipeline instead.
Executes all work units in parallel without interruption, and returns an object with a list of
results once all resolve.
ConcurrentPipeline, all work units are ran at once. For performance improvements, use PooledPipeline when dealing with a large number of work units.
Executes a distinct set of work units in parallel without interruption, based on a max concurrency
limit, until all work units have ran. Returns an object with a list of
The following options can be passed as a 3rd argument to
number) - How many work units to process in parallel. Defaults to the number of CPUs.
boolean) - Process with first-in-last-out (FILO) order, instead of first-in-first-out (FIFO). Defaults to
number) - Timeout in milliseconds that each work unit may run, or
0to avoid a timeout. Defaults to
Serial pipelines register work units in a sequence with
SerialPipeline#pipe(), and process the
work units one by one when executing
Executes each work unit one by one, with the return value of the previous work unit being passed as a value argument to the next work unit. Returns the final value once all resolve.
Serial pipelines are designed using a linked list, with each call to
SerialPipeline#pipe() returning a new instance. It was designed this way so that input and output
types would cascade correctly down the chain. However, this pattern causes issues where pipes are
called within conditionals, resulting in new pipes to be lost. For example, this is invalid.
While this is technically valid (note the
pipeline assignment), but will break down if
Instead, it's suggested to use separate pipelines within each conditional block. This approach requires a bit of duplication, but avoids all other issues.