ploomber.tasks.ScriptRunner¶
- class ploomber.tasks.ScriptRunner(source, product, dag, name=None, params=None, ext_in=None, static_analysis='regular', local_execution=False)¶
Similar to NotebookRunner, except it uses python to run the code, instead of papermill, hence, it doesn’t generate an output notebook. But it also works by injecting a cell into the source code. Source can be a
.py
script or an.ipynb
notebook. Does not support magics.- Parameters
source (str or pathlib.Path) – Script source, if str, the content is interpreted as the actual script, if pathlib.Path, the content of the file is loaded. When loading from a str, ext_in must be passed
product (ploomber.File) – The output file
dag (ploomber.DAG) – A DAG to add this task to
name (str, optional) – A str to indentify this task. Should not already exist in the dag
params (dict, optional) – Script parameters. This are passed as the “parameters” argument to the papermill.execute_notebook function, by default, “product” and “upstream” are included
ext_in (str, optional) – Source extension. Required if loading from a str. If source is a
pathlib.Path
, the extension from the file is used.static_analysis (('disabled', 'regular', 'strict'), default='regular') – Check for various errors in the script. In ‘regular’ mode, it aborts execution if the notebook has syntax issues, or similar problems that would cause the code to break if executed. In ‘strict’ mode, it performs the same checks but raises an issue before starting execution of any task, furthermore, it verifies that the parameters cell and the params passed to the notebook match, thus, making the script behave like a function with a signature.
local_execution (bool, optional) – Change working directory to be the parent of the script source. Defaults to False.
Examples
Spec API:
tasks: - source: script.py class: ScriptRunner product: data: data.csv another: another.csv
Python API:
>>> from pathlib import Path >>> from ploomber import DAG >>> from ploomber.tasks import ScriptRunner >>> from ploomber.products import File >>> dag = DAG() >>> product = {'data': File('data.csv'), 'another': File('another.csv')} >>> _ = ScriptRunner(Path('script.py'), product, dag=dag) >>> _ = dag.build()
Methods
build
([force, catch_exceptions])Build a single task
debug
([kind])Opens the notebook (with injected parameters) in debug mode in a temporary location
load
([key])Load task as pandas.DataFrame.
render
([force, outdated_by_code, remote])Renders code and product, all upstream tasks must have been rendered first, for that reason, this method will usually not be called directly but via DAG.render(), which renders in the right order.
run
()This is the only required method Task subclasses must implement
set_upstream
(other[, group_name])status
([return_code_diff, sections])Prints the current task status
- build(force=False, catch_exceptions=True)¶
Build a single task
Although Tasks are primarily designed to execute via DAG.build(), it is possible to do so in isolation. However, this only works if the task does not have any unrendered upstream dependencies, if that’s the case, you should call DAG.render() before calling Task.build()
- Returns
A dictionary with keys ‘run’ and ‘elapsed’
- Return type
dict
- Raises
TaskBuildError – If the error failed to build because it has upstream dependencies, the build itself failed or build succeded but on_finish hook failed
DAGBuildEarlyStop – If any task or on_finish hook raises a DAGBuildEarlyStop error
- debug(kind='ipdb')¶
Opens the notebook (with injected parameters) in debug mode in a temporary location
- Parameters
kind (str, default='ipdb') – Debugger to use, ‘ipdb’ to use line-by-line IPython debugger, ‘pdb’ to use line-by-line Python debugger or ‘pm’ to to post-portem debugging using IPython
Notes
Be careful when debugging tasks. If the task has run successfully, you overwrite products but don’t save the updated source code, your DAG will enter an inconsistent state where the metadata won’t match the overwritten product.
- load(key=None, **kwargs)¶
Load task as pandas.DataFrame. Only implemented in certain tasks
- render(force=False, outdated_by_code=True, remote=False)¶
Renders code and product, all upstream tasks must have been rendered first, for that reason, this method will usually not be called directly but via DAG.render(), which renders in the right order.
Render fully determines whether a task should run or not.
- Parameters
force (bool, default=False) – If True, mark status as WaitingExecution/WaitingUpstream even if the task is up-to-date (if there are any File(s) with clients, this also ignores the status of the remote copy), otherwise, the normal process follows and only up-to-date tasks are marked as Skipped.
outdated_by_code (bool, default=True) – Factors to determine if Task.product is marked outdated when source code changes. Otherwise just the upstream timestamps are used.
remote (bool, default=False) – Use remote metadata to determine status
Notes
This method tries to avoid calls to check for product status whenever possible, since checking product’s metadata can be a slow operation (e.g. if metadata is stored in a remote database)
When passing force=True, product’s status checking is skipped altogether, this can be useful when we only want to quickly get a rendered DAG object to interact with it
- run()¶
This is the only required method Task subclasses must implement
- set_upstream(other, group_name=None)¶
- status(return_code_diff=False, sections=None)¶
Prints the current task status
- Parameters
sections (list, optional) – Sections to include. Defaults to “name”, “last_run”, “oudated”, “product”, “doc”, “location”
Attributes
PRODUCT_CLASSES_ALLOWED
client
exec_status
name
A str that represents the name of the task, you can access tasks in a dag using dag[‘some_name’]
on_failure
Callable to be executed if task fails (passes Task as first parameter and the exception as second parameter)
on_finish
Callable to be executed after this task is built successfully (passes Task as first parameter)
on_render
params
dict that holds the parameter that will be passed to the task upon execution.
product
The product this task will create upon execution
source
Source is used by the task to compute its output, for most cases this is source code, for example PythonCallable takes a function as source and SQLScript takes a string with SQL code as source.
static_analysis
upstream
A mapping for upstream dependencies {task name} -> [task object]