Forking Module¶
HiDi’s forking module exposes functionality for concurrent pipelines. Forking us done with ordinary Transforms that take lists of pipelines upon initialization.
-
class
hidi.forking.
ThreadForkTransform
(pipelines, progress=False)[source]¶ Bases:
hidi.forking.ExecutorFork
Fork a pipeline using
concurrent.futures.ThreadPoolExecutor
as a backend for execution.This is useful if you have several transforms that perform well when running in concurrent threads such as IO heavy or CPU heavy tasks that execute outside the Python runtime.
The forked transform will return a list of Pipeline outputs, in the same order as the forked pipelines were given.
Parameters: - pipelines (list[hidi.pipeline.Pipeline]) – An array of pipelines to fork execution to.
- progress (bool) – When True, progress of the forked pipelines will be logged.
-
class
hidi.forking.
ProcessForkTransform
(pipelines, progress=False)[source]¶ Bases:
hidi.forking.ExecutorFork
Fork a pipeline using
concurrent.futures.ProcessesPoolExecutor
as a backend for execution.This method is useful if you have several transforms that can be executed concurrently and are CPU intensive.
The forked pipeline will now return a list of pipeline ouputs, in the same order as the forked pipelines were given.
Special care must be taken as each transform must be pickled to a new process.
Parameters: - pipelines (list[hidi.pipeline.Pipeline]) – An array of pipelines to fork execution to.
- progress (bool) – When True, progress of the forked pipelines will be logged.
-
class
hidi.forking.
TrivialForkTransform
(pipelines, progress=False)[source]¶ Bases:
hidi.transform.Transform
Trivial Fork Transform using an ordinary loop.
Parameters: - pipelines (list[hidi.pipeline.Pipeline]) – An array of pipelines to fork execution to.
- progress (bool) – When True, progress of the forked pipelines will be logged.
Example¶
Here is an example of using a ProcessForkTransform
:
import numpy as np
from hidi import pipeline, inout, matrix, forking
def to_float32(df, **kwargs):
return df.astype(np.int32).astype(np.float32)
def create_pipeline(infiles):
pl = pipeline.Pipeline([
inout.ReadTransform(infiles),
matrix.SparseTransform(),
matrix.SimilarityTransform(),
matrix.ApplyTransform(fn=to_float32),
matrix.ScalarTransform(fn='log1p')
])
left = pipeline.Pipeline([
matrix.SNMFTransform(rank=32, max_iter=2),
matrix.DenseTransform(),
matrix.ItemsMatrixToDFTransform(),
inout.WriteTransform('snmf-embeddings.csv')
])
right = pipeline.Pipeline([
matrix.SVDTransform(n_components=32, n_iter=2),
matrix.ItemsMatrixToDFTransform(),
inout.WriteTransform('svd-embeddings.csv')
])
pl.add(forking.ProcessForkTransform([left, right], progress=False))
return pl
def run_pipeline():
pl = create_pipeline(['hidi/examples/data/user-item.csv'])
return pl.run(progress=False)
if __name__ == '__main__':
run_pipeline()