Skip to main content

Pipelines

API BackendTooling

Pipe an input through a series of type-safe routines and tasks to produce an output, or simply, run logic in a series of stages.

Installation

yarn add @boost/pipeline

Pipelines

A pipeline can be used to process an input, either in parallel or serial, through a series of actions known as work units, to produce an output. If you don't need an input, but merely need to process work in stages, the pipeline supports that as well. There are multiple types of work units and pipelines, so choose the best one for each use case.

To begin, instantiate a pipeline with a context, and an optional input value.

import { Context, ConcurrentPipeline } from '@boost/pipeline';
import { referenceFunction } from './example';

const input = 123;
const pipeline = new ConcurrentPipeline(new Context(), input);

Once instantiated, we must register work units (either a task or routine) that will process the input value, either with ParallelPipeline#add() or SerialPipeline#pipe(). All work units require a descriptive title, and are passed the context and current value when being executed.

// Tasks
pipeline.add('Task using an anonymous function', (context, value) => value);
pipeline.add('Task using a function reference', referenceFunction);
pipeline.add(new Task('Task using a class instance', referenceFunction));

// Routines
pipeline.add(new ExampleRoutine('key', 'Explicit routine using a class instance'));

And to finish, we can execute our pipeline to process each work unit and produce the final output value.

const output = await pipeline.run();

Contexts

A Context is merely a plain class that provides contextual information to all work units, and is passed as the 1st argument when executing. It's highly encouraged to create custom contexts with typed properties, helper methods, and more.

import { Context } from '@boost/pipeline';

export default class ProcessContext extends Context {
readonly cwd: string;

readonly root: string;

constructor(root: string, cwd?: string) {
this.cwd = cwd || process.cwd();
this.root = root;
}
}

A unique feature of contexts is the ability to deep clone itself using Context#clone(). This method is extremely useful when a context of the same shape must be passed to another pipeline without collisions or mutations occurring.

const newContext = context.clone();

Input & output types

The input type is inferred from the 2nd constructor argument, while the output type defaults to the input type. If you need to customize either the input or output type manually, the pipeline generics can be customized upon instantiation.

const pipeline = new ConcurrentPipeline<Context, number, string[]>(new Context(), 123);

Work types

There are 2 types of work units that can be registered in a pipeline.

Task

A task is simply a function/method (in any form) that accepts an input and returns an output. It can be represented by a standard function or a Task instance.

import { Context } from '@boost/pipeline';

function task(context: Context, value: number): string {
return value.toLocaleString();
}

parallelPipeline.add('A title for this task', task);
import { Context, Task } from '@boost/pipeline';

const task = new Task('A title for this task', (context: Context, value: number) =>
value.toLocaleString(),
);

serialPipeline.pipe(task);

Routine

A Routine is a specialized work unit implemented with a class. It provides helper methods, the ability to create nested hierarchical pipelines, and an implicit encapsulation of similar logic and tasks.

To begin, import and extend the Routine class and implement the Routine#blueprint() and Routine#execute() methods. The class requires 3 generics to be defined, starting with an output type (defaults to unknown), an input type (defaults to unknown), and an options interface (defaults to an empty object).

The blueprint method is inherited from Contract, and should return an object that matches the structure of the generic options interface. The execute method should accept the input type and return the expected output type.

import { Blueprint, Schemas, Bind } from '@boost/common';
import { Routine } from '@boost/pipeline';

interface ExampleOptions {
limit?: number;
}

type Input = number;
type Output = string;

export default class ExampleRoutine extends Routine<Output, Input, ExampleOptions> {
blueprint({ number }: Schemas): Blueprint<ExampleOptions> {
return {
limit: number(10),
};
}

async execute(context: Context, value: Input): Promise<Output> {
return this.createWaterfallPipeline(context, value)
.pipe('Rounding to cents', this.roundToCents)
.pipe('Converting to readable format', this.makeReadable)
.pipe('Adding currency', this.addCurrency)
.run();
}

@Bind
roundToCents(context: Context, value: number): number {
return Number(value.toFixed(2));
}

@Bind
makeReadable(context: Context, value: number): string {
return value.toLocaleString();
}

@Bind
addCurrency(context: Context, value: string): string {
return `$${value}`;
}
}

When instantiating a routine, a unique key and title must be provided, both of which are primarily used for streaming to a console. An options object can be passed as the 3rd argument.

new ExampleRoutine('key', 'Custom title here', { limit: 5 });

Creating hierarchical pipelines

The most prominent feature of Routine is the ability to create hierarchical pipelines that can be nested or executed in any fashion. This can be achieved with the Routine#createAggregatedPipeline(), #createConcurrentPipeline(), #createPooledPipeline(), and #createWaterfallPipeline() methods, all of which require a context and an initial value.

class ExampleRoutine extends Routine<Item[]> {
async execute(context: Context, items: Item[]): Promise<Item[]> {
return this.createConcurrentPipeline(context, [])
.add('Load items from cache', this.loadItemsFromCache)
.add('Fetch remote items', this.fetchItems)
.add('Sort and enqueue items', () => {
return this.createWaterfallPipeline(context, items)
.pipe(new SortRoutine('sort', 'Sorting items'))
.pipe(new QueueRoutine('queue', 'Enqueueing items'))
.run(),
})
.run();
}
}

The Routine#depth property denotes the current depth within the hierarchy tree, while Routine#index is the current index amongst all work at the same depth.

Executing local binaries

The Routine#executeCommand() method can be used to execute binaries and commands on the host machine (it uses execa under the hood). This is extremely useful for executing locally installed npm/Yarn binaries.

class ExampleRoutine extends Routine<string> {
async execute(context: Context): Promise<string> {
return this.executeCommand('babel', ['./src', '--out-dir', './lib'], {
preferLocal: true,
}).then((result) => result.stdout);
}
}

Pipeline types

There are 4 types of pipelines, grouped into parallel and serial based patterns.

Parallel

Parallel pipelines register work units with ParallelPipeline#add(), and process the work units in parallel when executing ParallelPipeline#run().

ConcurrentPipeline

The ConcurrentPipeline executes all work units in parallel, and returns a list of values once all resolve. If an error occurs, the pipeline will be interrupted.

import { Context, ConcurrentPipeline } from '@boost/pipeline';

const pipeline = new ConcurrentPipeline(new Context(), initialValue)
.add('First task', doSomething)
.add('Second task', anotherSomething)
.add('Final task', finalSomething);

const values = await pipeline.run();

This pipeline will run all work units at once. If there are far too many work units, it may degrade performance. In that case, use PooledPipeline instead.

AggregatedPipeline

The AggregatedPipeline executes all work units in parallel without interruption, and returns an object with a list of errors and results once all resolve.

import { Context, AggregatedPipeline } from '@boost/pipeline';

const pipeline = new AggregatedPipeline(new Context(), initialValue)
.add('First task', doSomething)
.add('Second task', anotherSomething)
.add('Final task', finalSomething);

const { errors, results } = await pipeline.run();

Like ConcurrentPipeline, all work units are ran at once. For performance improvements, use PooledPipeline when dealing with a large number of work units.

PooledPipeline

The PooledPipeline executes a distinct set of work units in parallel without interruption, based on a max concurrency limit, until all work units have ran. Returns an object with a list of errors and results once all resolve.

import { Context, PooledPipeline } from '@boost/pipeline';

const pipeline = new PooledPipeline(new Context(), initialValue)
.add('First task', doSomething)
.add('Second task', anotherSomething)
.add('Final task', finalSomething);

const { errors, results } = await pipeline.run();

The pipeline supports an optional PooledOptions object as its 3rd argument.

Serial

Serial pipelines register work units in a sequence with SerialPipeline#pipe(), and process the work units one by one when executing SerialPipeline#run().

WaterfallPipeline

The WaterfallPipeline executes each work unit one by one, with the return value of the previous work unit being passed as a value argument to the next work unit. Returns the final value once all resolve.

import { Context, WaterfallPipeline } from '@boost/pipeline';

const pipeline = new WaterfallPipeline(new Context(), 1000)
.pipe('Multiply initial value', (ctx, value) => value * 3)
.pipe('Convert to a readable string', (ctx, value) => value.toLocaleString())
.pipe('Convert to an array for reasons unknown', (ctx, value) => [value]);

const finalValue = await pipeline.run(); // ['3,000']

Caveats

Conditional serial pipelines

Serial pipelines are designed using a linked list, with each call to SerialPipeline#pipe() returning a new instance. It was designed this way so that input and output types would cascade correctly down the chain. However, this pattern causes issues where pipes are called within conditionals, resulting in new pipes to be lost. For example, this is invalid.

const pipeline = new WaterfallPipeline(new Context());

if (condition) {
pipeline.pipe('Do this', thisAction);
} else {
pipeline.pipe('Do that', thatAction);
}

await pipeline
.pipe('Then finish', finishAction)
.run();

While this is technically valid (note the let and pipeline assignment), but will break down if types conflict.

let pipeline = new WaterfallPipeline(new Context());

if (condition) {
pipeline = pipeline.pipe('Do this', thisAction);
} else {
pipeline = pipeline.pipe('Do that', thatAction);
}

await pipeline
.pipe('Then finish', finishAction)
.run();

Instead, it's suggested to use separate pipelines within each conditional block. This approach requires a bit of duplication, but avoids all other issues.

if (condition) {
await new WaterfallPipeline(new Context())
.pipe('Do this', thisAction)
.pipe('Then finish', finishAction)
.run()
} else {
await new WaterfallPipeline(new Context())
.pipe('Do that', thatAction)
.pipe('Then finish', finishAction)
.run()
}