ploomber.tasks.PythonCallable¶
- class ploomber.tasks.PythonCallable(source, product, dag, name=None, params=None, unserializer=None, serializer=None, debug_mode=None)¶
Execute a Python function
- Parameters:
source (callable) – The callable to execute
product (ploomber.products.Product) – Product generated upon successful execution
dag (ploomber.DAG) – A DAG to add this task to
name (str) – A str to indentify this task. Should not already exist in the dag
params (dict) – Parameters to pass to the callable, by default, the callable will be executed with a “product” (which will contain the product object). It will also include a “upstream” parameter if the task has upstream dependencies along with any parameters declared here
unserializer (callable, optional) – A callable to unserialize upstream products, the product object is passed as unique argument. If None, the source function receives the product object directly. If the task has no upstream dependencies, this argument has no effect
serializer (callable, optional) – A callable to serialize this task’s product, must take two arguments, the first argument passed is the value returned by the task’s source, the second argument is the product oject. If None, the task’s source is responsible for serializing its own product. If used, the source function must not have a “product” parameter but return its result instead
debug_mode (None, 'now' or 'later', default=None) – If ‘now’, runs notebook in debug mode, this will start debugger if an error is thrown. If ‘later’, it will serialize the traceback for later debugging. (Added in 0.20)
Examples
Spec API:
tasks: - source: my_functions.my_task product: data.csv
# content of my_functions.py from pathlib import Path def my_task(product): Path(product).touch()
Spec API (multiple outputs):
tasks: - source: my_functions.another_task product: one: one.csv another: another.csv
# content of my_functions.py from pathlib import Path def another_task(product): Path(product['one']).touch() Path(product['another']).touch()
Python API:
>>> from pathlib import Path >>> from ploomber import DAG >>> from ploomber.tasks import PythonCallable >>> from ploomber.products import File >>> from ploomber.executors import Serial >>> dag = DAG(executor=Serial(build_in_subprocess=False)) >>> def my_function(product): ... # create data.csv ... Path(product).touch() >>> PythonCallable(my_function, File('data.csv'), dag=dag) PythonCallable: my_function -> File('data.csv') >>> summary = dag.build()
Python API (multiple products):
>>> from pathlib import Path >>> from ploomber import DAG >>> from ploomber.tasks import PythonCallable >>> from ploomber.products import File >>> from ploomber.executors import Serial >>> dag = DAG(executor=Serial(build_in_subprocess=False)) >>> def my_function(product): ... Path(product['first']).touch() ... Path(product['second']).touch() >>> product = {'first': File('first.csv'), ... 'second': File('second.csv')} >>> task = PythonCallable(my_function, product, dag=dag) >>> summary = dag.build()
Notes
changelog
New in version 0.20:
debug
constructor flag renamed todebug_mode
to avoid conflicts with thedebug
method.More examples using the Python API.
The
executor=Serial(build_in_subprocess=False)
argument is only required if copy-pasting the example in a Python session. If you store the code in a script, you may delete it and calldag.build
like this:if __name__ == '__main__': dag.build()
Then call your script:
python script.py
Methods
build
([force, catch_exceptions])Build a single task
debug
([kind])Run callable in debug mode.
load
([key])Loads the product.
render
([force, outdated_by_code, remote])Renders code and product, all upstream tasks must have been rendered first, for that reason, this method will usually not be called directly but via DAG.render(), which renders in the right order.
run
()This is the only required method Task subclasses must implement
set_upstream
(other[, group_name])status
([return_code_diff, sections])Prints the current task status
- build(force=False, catch_exceptions=True)¶
Build a single task
Although Tasks are primarily designed to execute via DAG.build(), it is possible to do so in isolation. However, this only works if the task does not have any unrendered upstream dependencies, if that’s the case, you should call DAG.render() before calling Task.build()
- Returns:
A dictionary with keys ‘run’ and ‘elapsed’
- Return type:
dict
- Raises:
TaskBuildError – If the error failed to build because it has upstream dependencies, the build itself failed or build succeded but on_finish hook failed
DAGBuildEarlyStop – If any task or on_finish hook raises a DAGBuildEarlyStop error
- debug(kind='ipdb')¶
Run callable in debug mode.
- Parameters:
kind (str ('ipdb' or 'pdb')) – Which debugger to use ‘ipdb’ for IPython debugger or ‘pdb’ for debugger from the standard library
Notes
Be careful when debugging tasks. If the task has run successfully, you overwrite products but don’t save the updated source code, your DAG will enter an inconsistent state where the metadata won’t match the overwritten product.
- load(key=None, **kwargs)¶
Loads the product. It uses the unserializer function if any, otherwise it tries to load it based on the file extension
- Parameters:
key – Key to load, if this task generates more than one product
**kwargs – Arguments passed to the unserializer function
- render(force=False, outdated_by_code=True, remote=False)¶
Renders code and product, all upstream tasks must have been rendered first, for that reason, this method will usually not be called directly but via DAG.render(), which renders in the right order.
Render fully determines whether a task should run or not.
- Parameters:
force (bool, default=False) – If True, mark status as WaitingExecution/WaitingUpstream even if the task is up-to-date (if there are any File(s) with clients, this also ignores the status of the remote copy), otherwise, the normal process follows and only up-to-date tasks are marked as Skipped.
outdated_by_code (bool, default=True) – Factors to determine if Task.product is marked outdated when source code changes. Otherwise just the upstream timestamps are used.
remote (bool, default=False) – Use remote metadata to determine status
Notes
This method tries to avoid calls to check for product status whenever possible, since checking product’s metadata can be a slow operation (e.g. if metadata is stored in a remote database)
When passing force=True, product’s status checking is skipped altogether, this can be useful when we only want to quickly get a rendered DAG object to interact with it
- run()¶
This is the only required method Task subclasses must implement
- set_upstream(other, group_name=None)¶
- status(return_code_diff=False, sections=None)¶
Prints the current task status
- Parameters:
sections (list, optional) – Sections to include. Defaults to “name”, “last_run”, “oudated”, “product”, “doc”, “location”
Attributes
PRODUCT_CLASSES_ALLOWED
client
debug_mode
exec_status
name
A str that represents the name of the task, you can access tasks in a dag using dag['some_name']
on_failure
Callable to be executed if task fails (passes Task as first parameter and the exception as second parameter)
on_finish
Callable to be executed after this task is built successfully (passes Task as first parameter)
on_render
params
dict that holds the parameter that will be passed to the task upon execution.
product
The product this task will create upon execution
source
Source is used by the task to compute its output, for most cases this is source code, for example PythonCallable takes a function as source and SQLScript takes a string with SQL code as source.
upstream
A mapping for upstream dependencies {task name} -> [task object]