Forking Module

HiDi’s forking module exposes functionality for concurrent pipelines. Forking us done with ordinary Transforms that take lists of pipelines upon initialization.

class hidi.forking.ThreadForkTransform(pipelines, progress=False)[source]

Bases: hidi.forking.ExecutorFork

Fork a pipeline using concurrent.futures.ThreadPoolExecutor as a backend for execution.

This is useful if you have several transforms that perform well when running in concurrent threads such as IO heavy or CPU heavy tasks that execute outside the Python runtime.

The forked transform will return a list of Pipeline outputs, in the same order as the forked pipelines were given.

Parameters:
  • pipelines (list[hidi.pipeline.Pipeline]) – An array of pipelines to fork execution to.
  • progress (bool) – When True, progress of the forked pipelines will be logged.

:rtype list[Any]

class hidi.forking.ProcessForkTransform(pipelines, progress=False)[source]

Bases: hidi.forking.ExecutorFork

Fork a pipeline using concurrent.futures.ProcessesPoolExecutor as a backend for execution.

This method is useful if you have several transforms that can be executed concurrently and are CPU intensive.

The forked pipeline will now return a list of pipeline ouputs, in the same order as the forked pipelines were given.

Special care must be taken as each transform must be pickled to a new process.

Parameters:
  • pipelines (list[hidi.pipeline.Pipeline]) – An array of pipelines to fork execution to.
  • progress (bool) – When True, progress of the forked pipelines will be logged.

:rtype list[Any]

class hidi.forking.TrivialForkTransform(pipelines, progress=False)[source]

Bases: hidi.transform.Transform

Trivial Fork Transform using an ordinary loop.

Parameters:
  • pipelines (list[hidi.pipeline.Pipeline]) – An array of pipelines to fork execution to.
  • progress (bool) – When True, progress of the forked pipelines will be logged.

:rtype list[Any]

Example

Here is an example of using a ProcessForkTransform:

import numpy as np

from hidi import pipeline, inout, matrix, forking


def to_float32(df, **kwargs):
    return df.astype(np.int32).astype(np.float32)


def create_pipeline(infiles):
    pl = pipeline.Pipeline([
        inout.ReadTransform(infiles),
        matrix.SparseTransform(),
        matrix.SimilarityTransform(),
        matrix.ApplyTransform(fn=to_float32),
        matrix.ScalarTransform(fn='log1p')
    ])

    left = pipeline.Pipeline([
        matrix.SNMFTransform(rank=32, max_iter=2),
        matrix.DenseTransform(),
        matrix.ItemsMatrixToDFTransform(),
        inout.WriteTransform('snmf-latent-factors.csv')
    ])

    right = pipeline.Pipeline([
        matrix.SVDTransform(n_components=32, n_iter=2),
        matrix.ItemsMatrixToDFTransform(),
        inout.WriteTransform('svd-latent-factors.csv')
    ])

    pl.add(forking.ProcessForkTransform([left, right], progress=False))

    return pl


def run_pipeline():
    pl = create_pipeline(['hidi/examples/data/user-item.csv'])

    return pl.run(progress=False)


if __name__ == '__main__':
    run_pipeline()