ploomber.DAG¶
- class ploomber.DAG(name=None, clients=None, executor='serial')¶
A collection of tasks with dependencies
- Parameters
name (str, optional) – A name to identify this DAG
clients (dict, optional) – A dictionary with classes as keys and clients as values, can be later modified using dag.clients[dag] = client
differ (CodeDiffer) – An object to determine whether two pieces of code are the same and to output a diff, defaults to CodeDiffer() (default parameters)
executor (str or ploomber.executors instance, optional) – The executor to use (ploomber.executors.Serial and ploomber.executors.Parallel), is a string is passed (‘serial’ or ‘parallel’) the corresponding executor is initialized with default parameters
- name¶
A name to identify the DAG
- Type
str
- clients¶
A class to client mapping
- Type
dict
- executor¶
Executor object to run tasks
- Type
ploomber.Executor
- on_render¶
Function to execute upon rendering. Can request a “dag” parameter.
- Type
callable
- on_finish¶
Function to execute upon execution. Can request a “dag” parameter and/or “report”, which contains the report object returned by the build function.
- Type
callable
- on_failure¶
Function to execute upon failure. Can request a “dag” parameter and/or “traceback” which will contain a dictionary, possible keys are “build” which contains the build error traceback and “on_finish” which contains the on_finish hook traceback, if any.
- Type
callable
- serializer¶
Function to serialize products from PythonCallable tasks. Used if the task has no serializer. See
ploombe.tasks.PythonCallable
documentation for details.- Type
callable
- unserializer¶
Function to unserialize products from PythonCallable tasks. Used if the task has no serializer. See
ploombe.tasks.PythonCallable
documentation for details.- Type
callable
Examples
Spec API:
pip install ploomber ploomber examples -n guides/first-pipeline -o example cd example pip install -r requirements.txt ploomber build
Python API:
>>> from pathlib import Path >>> from ploomber import DAG >>> from ploomber.tasks import ShellScript, PythonCallable >>> from ploomber.products import File >>> from ploomber.executors import Serial >>> code = ("echo hi > {{product['first']}}; " ... "echo bye > {{product['second']}}") >>> _ = Path('script.sh').write_text(code) >>> dag = DAG(executor=Serial(build_in_subprocess=False)) >>> product = {'first': File('first.txt'), 'second': File('second.txt')} >>> shell = ShellScript(Path('script.sh'), product, dag=dag, name='script') >>> def my_task(upstream, product): ... first = Path(upstream['script']['first']).read_text() ... second = Path(upstream['script']['second']).read_text() ... Path(product).write_text(first + ' ' + second) >>> callable = PythonCallable(my_task, File('final.txt'), dag=dag) >>> shell >> callable PythonCallable: my_task -> File('final.txt') >>> _ = dag.build()
Methods
build
([force, show_progress, debug, …])Runs the DAG in order so that all upstream dependencies are run for every task
build_partially
(target[, force, …])Partially build a dag until certain task
check_tasks_have_allowed_status
(allowed, …)Close all clients (dag-level, task-level and product-level)
get
(k[,d])get_downstream
(task_name)Get downstream tasks for a given task name
items
()keys
()plot
([output, include_products, backend, …])Plot the DAG
pop
(name)Remove a task from the dag
render
([force, show_progress, remote])Render resolves all placeholders in tasks and determines whether a task should run or not based on the task.product metadata, this allows up-to-date tasks to be skipped
status
(**kwargs)Returns a table with tasks status
to_markup
([path, fmt, sections, backend])Returns a str (md or html) with the pipeline’s description
values
()- build(force=False, show_progress=True, debug=None, close_clients=True)¶
Runs the DAG in order so that all upstream dependencies are run for every task
- Parameters
force (bool, default=False) – If True, it will run all tasks regardless of status, defaults to False
show_progress (bool, default=True) – Show progress bar
debug ('now' or 'later', default=None) – If ‘now’, Drop a debugging session if building raises an exception. Note that this modifies the executor and temporarily sets it to Serial with subprocess off and catching exceptions/warnings off. Restores the original executor at the end. If ‘later’ it keeps the executor the same and serializes the traceback errors for later debugging
- close_clientsbool, default=True
Close all clients (dag-level, task-level and product-level) upon successful build
Notes
All dag-level clients are closed after calling this function
changelog
Changed in version 0.20:
debug
changed from True/False to ‘now’/’later’/NoneNew in version 0.20:
debug
now supports debugging NotebookRunner tasks- Returns
A dict-like object with tasks as keys and dicts with task status as values
- Return type
BuildReport
- build_partially(target, force=False, show_progress=True, debug=None, skip_upstream=False)¶
Partially build a dag until certain task
- Parameters
target (str) – Name of the target task (last one to build). Can pass a wildcard such as ‘tasks-*’
force (bool, default=False) – If True, it will run all tasks regardless of status, defaults to False
show_progress (bool, default=True) – Show progress bar
debug ('now' or 'later', default=None) – If ‘now’, Drop a debugging session if building raises an exception. Note that this modifies the executor and temporarily sets it to Serial with subprocess off and catching exceptions/warnings off. Restores the original executor at the end. If ‘later’ it keeps the executor the same and serializes the traceback errors for later debugging
skip_upstream (bool, default=False) – If False, includes all upstream dependencies required to build target, otherwise it skips them. Note that if this is True and it’s not possible to build a given task (e.g., missing upstream products), this will fail
Notes
changelog
Changed in version 0.20:
debug
changed from True/False to ‘now’/’later’/NoneNew in version 0.20:
debug
now supports debugging NotebookRunner tasks
- check_tasks_have_allowed_status(allowed, new_status)¶
- close_clients()¶
Close all clients (dag-level, task-level and product-level)
- get(k[, d]) → D[k] if k in D, else d. d defaults to None.¶
- get_downstream(task_name)¶
Get downstream tasks for a given task name
- items() → a set-like object providing a view on D’s items¶
- keys() → a set-like object providing a view on D’s keys¶
- plot(output='embed', include_products=False, backend=None, image_only=False)¶
Plot the DAG
- Parameters
output (str, default='embed') – Where to save the output (e.g., pipeline.png). If ‘embed’, it returns an IPython image instead.
include_products (bool, default=False) – If False, each node only contains the task name, if True if contains the task name and products. Only available when using the pygraphviz backend
backend (str, default=None) – How to generate the plot, if None it uses pygraphviz if installed, otherwise it uses D3 (which doesn’t require extra dependencies), you can force to use a backend by passing ‘pygraphviz’ or ‘d3’.
- pop(name)¶
Remove a task from the dag
- render(force=False, show_progress=True, remote=False)¶
Render resolves all placeholders in tasks and determines whether a task should run or not based on the task.product metadata, this allows up-to-date tasks to be skipped
- Parameters
force (bool, default=False) – Ignore product metadata status and prepare all tasks to be executed. This option renders much faster in DAGs with products whose metadata is stored in remote systems, because there is no need to fetch metadata over the network. If the DAG won’t be built, this option is recommended.
show_progress (bool, default=True) – Show progress bar
remote (bool, default=False) – Use remote metadata for determining task status. In most scenarios, you want this to be False, Ploomber uses this internally when exporting pipelines to other platforms (via Soopervisor).
- status(**kwargs)¶
Returns a table with tasks status
- to_markup(path=None, fmt='html', sections=None, backend=None)¶
Returns a str (md or html) with the pipeline’s description
- Parameters
sections (list) – Which sections to include, possible values are “plot”, “status” and “source”. Defaults to [“plot”, “status”]
- values() → an object providing a view on D’s values¶
Attributes
product