ploomber.executors.Parallel¶
- class ploomber.executors.Parallel(processes=None, print_progress=False, start_method=None)¶
Runs a DAG in parallel using multiprocessing
- Parameters:
processes (int, default=None) – The number of processes to use. If None, uses
os.cpu_count
print_progress (bool, default=False) – Whether to print progress to stdout, otherwise just log it
start_method (str, default=None) – The method which should be used to start child processes. method can be ‘fork’, ‘spawn’ or ‘forkserver’. If None or empty then the default start_method is used.
Examples
Spec API:
# add at the top of your pipeline.yaml executor: parallel tasks: - source: script.py nb_product_key: [nb_ipynb, nb_html] product: nb_ipynb: nb.ipynb nb_html: report.html
Python API:
>>> from ploomber import DAG >>> from ploomber.executors import Parallel >>> dag = DAG(executor='parallel') # use with default values >>> dag = DAG(executor=Parallel(processes=2)) # customize
DAG can exit gracefully on function tasks (PythonCallable):
>>> from ploomber.products import File >>> from ploomber.tasks import PythonCallable >>> from ploomber.exceptions import DAGBuildEarlyStop >>> # A PythonCallable function that raises DAGBuildEarlyStop >>> def early_stop_root(product): ... raise DAGBuildEarlyStop('Ending gracefully')
>>> # Since DAGBuildEarlyStop is raised, DAG will exit gracefully. >>> dag = DAG(executor='parallel') >>> t = PythonCallable(early_stop_root, File('file.txt'), dag=dag) >>> dag.build()
DAG can also exit gracefully on notebook tasks:
>>> from pathlib import Path >>> from ploomber.tasks import NotebookRunner >>> from ploomber.products import File >>> def early_stop(): ... raise DAGBuildEarlyStop('Ending gracefully')
>>> # Use task-level hook "on_finish" to exit DAG gracefully. >>> dag = DAG(executor='parallel') >>> t = NotebookRunner(Path('nb.ipynb'), File('report.html'), dag=dag) >>> t.on_finish = early_stop >>> dag.build()
Notes
If any task crashes, downstream tasks execution is aborted, building continues until no more tasks can be executed
New in version 0.20: Added start_method argument
See also
ploomber.executors.Serial
Serial executor
Methods
Attributes
multiprocessing_start_methods