Pipelines
Pipe an input through a series of type-safe routines and tasks to produce an output, or simply, run logic in a series of stages.
Installation
- Yarn
- pnpm
- npm
yarn add @boost/pipeline
pnpm add @boost/pipeline
npm install @boost/pipeline
Pipelines
A pipeline can be used to process an input, either in parallel or serial, through a series of actions known as work units, to produce an output. If you don't need an input, but merely need to process work in stages, the pipeline supports that as well. There are multiple types of work units and pipelines, so choose the best one for each use case.
To begin, instantiate a pipeline with a context, and an optional input value.
import { Context, ConcurrentPipeline } from '@boost/pipeline';
import { referenceFunction } from './example';
const input = 123;
const pipeline = new ConcurrentPipeline(new Context(), input);
Once instantiated, we must register work units (either a task or routine) that
will process the input value, either with
ParallelPipeline#add()
or
SerialPipeline#pipe()
. All work units require a
descriptive title, and are passed the context and current value when being executed.
// Tasks
pipeline.add('Task using an anonymous function', (context, value) => value);
pipeline.add('Task using a function reference', referenceFunction);
pipeline.add(new Task('Task using a class instance', referenceFunction));
// Routines
pipeline.add(new ExampleRoutine('key', 'Explicit routine using a class instance'));
And to finish, we can execute our pipeline to process each work unit and produce the final output value.
const output = await pipeline.run();
Contexts
A Context
is merely a plain class that provides contextual
information to all work units, and is passed as the 1st argument when executing. It's highly
encouraged to create custom contexts with typed properties, helper methods, and more.
import { Context } from '@boost/pipeline';
export default class ProcessContext extends Context {
readonly cwd: string;
readonly root: string;
constructor(root: string, cwd?: string) {
this.cwd = cwd || process.cwd();
this.root = root;
}
}
A unique feature of contexts is the ability to deep clone itself using
Context#clone()
. This method is extremely useful when a
context of the same shape must be passed to another pipeline without collisions or mutations
occurring.
const newContext = context.clone();
Input & output types
The input type is inferred from the 2nd constructor argument, while the output type defaults to the input type. If you need to customize either the input or output type manually, the pipeline generics can be customized upon instantiation.
const pipeline = new ConcurrentPipeline<Context, number, string[]>(new Context(), 123);
Work types
There are 2 types of work units that can be registered in a pipeline.
Task
A task is simply a function/method (in any form) that accepts an input and returns an output. It can
be represented by a standard function or a Task
instance.
import { Context } from '@boost/pipeline';
function task(context: Context, value: number): string {
return value.toLocaleString();
}
parallelPipeline.add('A title for this task', task);
import { Context, Task } from '@boost/pipeline';
const task = new Task('A title for this task', (context: Context, value: number) =>
value.toLocaleString(),
);
serialPipeline.pipe(task);
Routine
A Routine
is a specialized work unit implemented with a class. It
provides helper methods, the ability to create nested hierarchical pipelines, and an implicit
encapsulation of similar logic and tasks.
To begin, import and extend the Routine
class and implement the
Routine#blueprint()
and
Routine#execute()
methods. The class requires 3 generics to
be defined, starting with an output type (defaults to unknown
), an input type (defaults to
unknown
), and an options interface (defaults to an empty object).
The blueprint method is inherited from Contract
, and should return
an object that matches the structure of the generic options interface. The execute method should
accept the input type and return the expected output type.
import { Blueprint, Schemas, Bind } from '@boost/common';
import { Routine } from '@boost/pipeline';
interface ExampleOptions {
limit?: number;
}
type Input = number;
type Output = string;
export default class ExampleRoutine extends Routine<Output, Input, ExampleOptions> {
blueprint({ number }: Schemas): Blueprint<ExampleOptions> {
return {
limit: number(10),
};
}
async execute(context: Context, value: Input): Promise<Output> {
return this.createWaterfallPipeline(context, value)
.pipe('Rounding to cents', this.roundToCents)
.pipe('Converting to readable format', this.makeReadable)
.pipe('Adding currency', this.addCurrency)
.run();
}
@Bind
roundToCents(context: Context, value: number): number {
return Number(value.toFixed(2));
}
@Bind
makeReadable(context: Context, value: number): string {
return value.toLocaleString();
}
@Bind
addCurrency(context: Context, value: string): string {
return `$${value}`;
}
}
When instantiating a routine, a unique key and title must be provided, both of which are primarily used for streaming to a console. An options object can be passed as the 3rd argument.
new ExampleRoutine('key', 'Custom title here', { limit: 5 });
Creating hierarchical pipelines
The most prominent feature of Routine
is the ability to create
hierarchical pipelines that can be nested or executed in any fashion. This can be achieved with the
Routine#createAggregatedPipeline()
,
#createConcurrentPipeline()
,
#createPooledPipeline()
, and
#createWaterfallPipeline()
methods, all of
which require a context and an initial value.
class ExampleRoutine extends Routine<Item[]> {
async execute(context: Context, items: Item[]): Promise<Item[]> {
return this.createConcurrentPipeline(context, [])
.add('Load items from cache', this.loadItemsFromCache)
.add('Fetch remote items', this.fetchItems)
.add('Sort and enqueue items', () => {
return this.createWaterfallPipeline(context, items)
.pipe(new SortRoutine('sort', 'Sorting items'))
.pipe(new QueueRoutine('queue', 'Enqueueing items'))
.run(),
})
.run();
}
}
The Routine#depth
property denotes the current depth within
the hierarchy tree, while Routine#index
is the current index
amongst all work at the same depth.
Executing local binaries
The Routine#executeCommand()
method can be used to
execute binaries and commands on the host machine (it uses
execa under the hood). This is extremely useful for
executing locally installed npm/Yarn binaries.
class ExampleRoutine extends Routine<string> {
async execute(context: Context): Promise<string> {
return this.executeCommand('babel', ['./src', '--out-dir', './lib'], {
preferLocal: true,
}).then((result) => result.stdout);
}
}
Pipeline types
There are 4 types of pipelines, grouped into parallel and serial based patterns.
Parallel
Parallel pipelines register work units with
ParallelPipeline#add()
, and process the work units in
parallel when executing ParallelPipeline#run()
.
ConcurrentPipeline
The ConcurrentPipeline
executes all work units in
parallel, and returns a list of values once all resolve. If an error occurs, the pipeline will be
interrupted.
import { Context, ConcurrentPipeline } from '@boost/pipeline';
const pipeline = new ConcurrentPipeline(new Context(), initialValue)
.add('First task', doSomething)
.add('Second task', anotherSomething)
.add('Final task', finalSomething);
const values = await pipeline.run();
This pipeline will run all work units at once. If there are far too many work units, it may degrade performance. In that case, use PooledPipeline instead.
AggregatedPipeline
The AggregatedPipeline
executes all work units in
parallel without interruption, and returns an object with a list of errors
and results
once
all resolve.
import { Context, AggregatedPipeline } from '@boost/pipeline';
const pipeline = new AggregatedPipeline(new Context(), initialValue)
.add('First task', doSomething)
.add('Second task', anotherSomething)
.add('Final task', finalSomething);
const { errors, results } = await pipeline.run();
Like
ConcurrentPipeline
, all work units are ran at once. For performance improvements, use PooledPipeline when dealing with a large number of work units.
PooledPipeline
The PooledPipeline
executes a distinct set of work units in
parallel without interruption, based on a max concurrency limit, until all work units have ran.
Returns an object with a list of errors
and results
once all resolve.
import { Context, PooledPipeline } from '@boost/pipeline';
const pipeline = new PooledPipeline(new Context(), initialValue)
.add('First task', doSomething)
.add('Second task', anotherSomething)
.add('Final task', finalSomething);
const { errors, results } = await pipeline.run();
The pipeline supports an optional PooledOptions
object as
its 3rd argument.
Serial
Serial pipelines register work units in a sequence with
SerialPipeline#pipe()
, and process the work units one
by one when executing SerialPipeline#run()
.
WaterfallPipeline
The WaterfallPipeline
executes each work unit one by one,
with the return value of the previous work unit being passed as a value argument to the next work
unit. Returns the final value once all resolve.
import { Context, WaterfallPipeline } from '@boost/pipeline';
const pipeline = new WaterfallPipeline(new Context(), 1000)
.pipe('Multiply initial value', (ctx, value) => value * 3)
.pipe('Convert to a readable string', (ctx, value) => value.toLocaleString())
.pipe('Convert to an array for reasons unknown', (ctx, value) => [value]);
const finalValue = await pipeline.run(); // ['3,000']
Caveats
Conditional serial pipelines
Serial pipelines are designed using a linked list, with each call to
SerialPipeline#pipe()
returning a new instance. It was
designed this way so that input and output types would cascade correctly down the chain. However,
this pattern causes issues where pipes are called within conditionals, resulting in new pipes to be
lost. For example, this is invalid.
const pipeline = new WaterfallPipeline(new Context());
if (condition) {
pipeline.pipe('Do this', thisAction);
} else {
pipeline.pipe('Do that', thatAction);
}
await pipeline
.pipe('Then finish', finishAction)
.run();
While this is technically valid (note the let
and pipeline
assignment), but will break down if
types conflict.
let pipeline = new WaterfallPipeline(new Context());
if (condition) {
pipeline = pipeline.pipe('Do this', thisAction);
} else {
pipeline = pipeline.pipe('Do that', thatAction);
}
await pipeline
.pipe('Then finish', finishAction)
.run();
Instead, it's suggested to use separate pipelines within each conditional block. This approach requires a bit of duplication, but avoids all other issues.
if (condition) {
await new WaterfallPipeline(new Context())
.pipe('Do this', thisAction)
.pipe('Then finish', finishAction)
.run()
} else {
await new WaterfallPipeline(new Context())
.pipe('Do that', thatAction)
.pipe('Then finish', finishAction)
.run()
}