Spec API (pipeline.yaml
)¶
Note
This document assumes you are already familiar with Ploomber’s core concepts (DAG, product, task, and upstream). If you’re not, check out this guide: Basic concepts.
This section describes the pipeline.yaml
schema.
meta
¶
meta
is an optional section for meta-configuration, it controls how the
DAG is constructed.
meta.source_loader
¶
Load task sources (tasks[*].source
) from a Python module. For example,
say you have a module my_module
and want to load sources from a
path/to/sources
directory inside that module:
meta:
source_loader:
module: my_module
path: path/to/sources
meta.import_tasks_from
¶
Add tasks defined in a different file to the current one. This direcive is
useful for composing pipelines. For example, if you have a training and a
serving pipeline, you can define the pre-processing logic in a
pipeline.preprocessing.yaml
and then import the file into
pipeline.training.yaml
and pipeline.serving.yaml
:
meta:
import_tasks_from: /path/to/tasks.yaml
The file must be a list where each element is a valid Task.
Click here to see a batch serving example.
Click here to see an online serving example.
meta.extract_upstream
¶
Extract upstream dependencies from the source code (True
by default).
meta:
extract_upstream: True
If False, tasks must declare dependencies using the upstream
key:
meta:
extract_upstream: false
tasks:
- source: tasks/clean.py
product: outupt/report.html
upstream: [some_task, another_task]
meta.extract_product
¶
Default:
meta:
extract_product: False
meta.product_default_class
¶
Product class key for a given task class. Names should match (case-sensitive)
the names in the Python API. These are rarely changed, except
for SQLScript
. Defaults:
meta:
product_default_class:
SQLScript: SQLRelation
SQLDump: File
NotebookRunner: File
ShellScript: File
PythonCallable: File
executor
¶
Determines which executor to use:
serial
: Runs one task at a time (Note: By default, function tasks run in a subprocess)parallel
: Run independent tasks in parallel (Note: this runs all tasks in a subprocess)Dotted path: This allows you to customize the initialization parameters
For example, say you want to use the ploomber.executors.Serial
executor
but do not want to run functions in a subprocess, you can pass a dotted path
and custom parameters like this:
executor:
dotted_path: ploomber.executors.Serial
build_in_subprocess: false # do not run function tasks in a subprocess
Another common use case is to limit the number of subprocesses when using the
ploomber.executors.Parallel
executor:
executor:
dotted_path: ploomber.executors.Parallel
processes: 2 # limit to a max of 2 processes
You can set which method should be used to start child processes. method can be ‘fork’, ‘spawn’ or ‘forkserver’:
executor:
dotted_path: ploomber.executors.Parallel
processes: 2 # limit to a max of 2 processes
start_method: spawn # start child process using 'spawn' method
To learn more about the executors:
clients
¶
These are the default clients. It allows you to specify a single client for all Tasks/Products for a given class. The most common use case is SQL database configuration.
Other scenarios are ploomber.products.File
clients, which Ploomber can use
to backup pipeline results (say, for example, you run a job that trains
several models and want to save output results. You can use
ploomber.clients.GCloudStorageClient
or ploomber.clients.S3Client
for that.
Keys must be valid ploomber.tasks
or ploomber.products
names, values must be dotted paths to functions that return a
ploomber.clients
instance.
Can be a string (call without arguments):
clients:
# this assumes there is a clients.py with a get_client function
{some-class}: clients.get_client
Or a dictionary (to call with arguments):
clients:
{some-class}:
# this assumes there is a clients.py with a get_client function
dotted_path: clients.get_client
kwarg_1: value_1
...
kwarg_k: value_k
Example: Database dump
clients:
SQLDump: clients.get
tasks:
- source: query.sql
product: output/data.csv
# dump everything into a single file
chunksize: null
from ploomber.clients import SQLAlchemyClient
def get():
return SQLAlchemyClient('sqlite:///my.db')
Download:
ploomber examples -n cookbook/sql-dump -o sql-dump
Example: Upload files to the cloud
clients:
# configures a dag-level File client
File: clients.get_local # you can switch to clients.get_s3 or clients.get_gcloud
tasks:
- source: functions.create_file
# upon execution, this file is uploaded to storage
product: products/some-file.txt
- source: scripts/some-script.py
# upon execution, both files are uploaded to storage
product:
nb: products/some-script.ipynb
file: products/another-file.txt
# you may also pass a task-level File client if you don't want to upload
# all products in the pipeline
# client: clients.get_s3
from ploomber.clients import LocalStorageClient, GCloudStorageClient, S3Client
def get_local():
"""Returns local client
"""
return LocalStorageClient('backup')
def get_s3():
"""Returns S3 client
"""
# assumes your environment is already configured, you may also pass the
# json_credentials_path
return S3Client(bucket_name='some-bucket', parent='my-project/products')
def get_gcloud():
"""Returns google cloud storage client
"""
# assumes your environment is already configured, you may also pass the
# json_credentials_path
return GCloudStorageClient(bucket_name='some-bucket',
parent='my-project/products')
Download:
ploomber examples -n cookbook/file-client -o file-client
Full projects
on_{render, finish, failure}
¶
Important
Hooks are not executed when opening scripts/notebooks in Jupyter.
These are hooks that execute when specific events happen:
on_render
: executes after verifying there are no errors in your pipeline declaration (e.g., a task that doesn’t exist declared as an upstream dependency)on_finish
: executes upon successful pipeline runon_failure
: executes upon failed pipeline run
They all are optional and take a dotted path as an argument. For example,
assume you have a hooks.py
with function on_render
, on_finish
,
and on_failure
. You can add them to your pipeline.yaml
like this:
on_render: hooks.on_render
on_finish: hooks.on_finish
on_failure: hooks.on_failure
If your hook takes arguments, you may call it like this:
# to call any hook with arguments
# {hook-name} must be one of: on_render, on_finish, on_failure
{hook-name}:
dotted_path: {dotted.path.to.hook}
argument: value
Calling with arguments is useful when you have a parametrized pipeline.
If you need information from your DAG in your hook, you may
request the dag
(ploomber.DAG
) argument in any of the
hooks. on_finish
can also request a report
argument, which constains a
summary report of the pipeline’s execution.
on_failure
can request a traceback
argument which will have a dictionary, possible keys are build
which
has the build error traceback, and on_finish
which includes the
on_finish
hook traceback, if any. For more information, see the DAG
documentation ploomber.DAG
.
Example: Hooks
# dag-level hooks
on_render:
dotted_path: hooks.dag_level_on_render
my_param: 10
on_finish: hooks.dag_level_on_finish
on_failure: hooks.dag_level_on_failure
def dag_level_on_render(my_param):
"""Executed after the pipeline renders (before execution)
"""
print(f'rendered DAG! my_param={my_param}')
def dag_level_on_finish(dag, report):
"""Executes after the pipeline runs all tasks
"""
print(f'Finished executing pipeline {dag}, report:\n{report}')
def dag_level_on_failure(traceback):
"""Executes if the pipeline fails
"""
if traceback.get('build'):
print('Pipeline execution failed while running the tasks!')
if traceback.get('on_finish'):
print('Pipeline execution failed while executing an on_finish hook!')
Download:
ploomber examples -n cookbook/hooks -o hooks
serializer
and unserializer
¶
By default, tasks whose source is a function
(i.e., ploomber.tasks.PythonCallable
). Receive input paths
(in upstream
) and output paths (in product
) when the function executes. Saving interim results allows Ploomber to provide incremental
builds (What are incremental builds?).
However, in some cases, we might want to provide a pipeline that performs
all operations in memory (e.g., to do online serving).
ploomber.OnlineDAG
can convert a file-based pipeline
into an in-memory one without code changes, allowing you to re-use your
feature engineering code for training and serving. The only requisite is for
tasks to configure a serializer
and unserializer
.
Click here to
see an example.
Normally, a task whose source is a function looks like this:
import pandas as pd
def my_task(product, upstream):
df_upstream = pd.read_csv(upstream['name'])
# process data...
# save product
df_product.to_csv(product)
And you use the product
parameter to save any task output.
However, if you add a serializer
, product
isn’t passed, and you must
return the product object:
import pandas as pd
def my_task(upstream):
df_upstream = pd.read_csv(upstream['name'])
# process data...
return df_product
The serializer
function is called with the returned object as its
first argument and product
(output path) as the second argument:
serializer(df_product, product)
A similar logic applies to unserializer
; when present, the function is
called for each upstream dependency with the product as the argument:
unserializer(product)
In your task function, you receive objects (instead of paths):
import pandas as pd
def my_task(upstream):
# no need to call pd.read_csv here
df_upstream = upstream['name']
return df_product
If you want to provide a Task-level serializer/unserializer pass it directly to
the task, if you set a DAG-level serializer/unserializer and wish to exclude
specific task pass serializer: null
or unserializer: null
in the
selected task.
Example: Serialization
serializer: util.my_serializer
unserializer: util.my_unserializer
tasks:
- source: tasks.one_product
product: output/one.txt
- source: tasks.many_products
product:
something: output/something.csv
another: output/something.txt
- source: tasks.joblib_product
product: output/something.joblib
- source: tasks.final_product
product: output/final.csv
from ploomber.io import serializer, unserializer
@serializer(fallback='joblib', defaults=['.csv', '.txt'])
def my_serializer(obj, product):
pass
@unserializer(fallback='joblib', defaults=['.csv', '.txt'])
def my_unserializer(product):
pass
Download:
ploomber examples -n cookbook/serialization -o serialization
source_loader
¶
If you package your project (i.e., add a setup.py
), source_loader
offers
a convenient way to load sources inside such package.
For example, if your package is named my_package
and you want to load from
the folder my_sources/
within the package:
meta:
source_loader:
module: my_package
path: my_sources
tasks:
# this is loaded from my_package (my_sources directory)
- source: script.sql
# task definition continues...
To find out the location used, you can execute the following in a Python session:
import my_package; print(my_package) # print package location
The above should print something like path/to/my_package/__init__.py
.
Using the configuration above, it implies that source loader will load the file
from path/to/my_package/my_sources/script.sql
.
Note: this only applies to tasks whose source
is a relative path. Dotted
paths and absolute paths are not affected.
For details, see ploomber.SourceLoader
, which is the underlying Python
implementation. Here’s an example that uses source_loader.
SQLScript product class¶
By default, SQL scripts use ploomber.products.SQLRelation
as
product class. Such product doesn’t save product’s metadata; required for
incremental builds (What are incremental builds?). If you want to use them, you
need to change the default value and configure the product’s client.
Here’s an example
that uses product_default_class
to configure a SQLite pipeline with
incremental builds.
For more information on product clients, see: FAQ and Glossary.
Loading from a factory¶
The CLI looks for a pipeline.yaml
by default, if you’re using the Python API,
and want to save some typing, you can specify a pipeline.yaml
like this:
# pipeline.yaml
location: {dotted.path.to.factory}
With such configuration, commands such as ploomber build
will work.
task
¶
task
schema.
Tip
All other keys passed here are forwarded to the class constructor, so the
allowed values will depend on the task class. For example, if running a
notebook the task class is ploomber.tasks.NotebookRunner
, if it’s
a function it’ll be a ploomber.tasks.PythonCallable
, see the
documentation to learn what extra arguments they take.
tasks[*].name
¶
The name of the task. The filename (without the extension) is used if not defined.
tasks[*].source
¶
Indicates where the source code for a task is. This can be a path to a files if using scripts/notebooks or dotted paths if using a function.
By default, paths are relative to the pipeline.yaml
parent folder (absolute
paths are not affected), unless source_loader
is configured; in such
situation, paths are relative to the location configured in the
SourceLoader
object. See the source_loader
section for more details.
For example, if your pipeline is located at project/pipeline.yaml
, and
you have:
tasks:
- source: scripts/my_script.py
# task definition continues...
Ploomber will expect your script to be located at
project/scripts/my_script.py
If using a function, the dotted path should be importable. for example, if you have:
tasks:
- source: my_package.my_module.my_function
# task definition continues...
Ploomber runs a code equivalent to:
from my_package.my_module import my_function
tasks[*].product
¶
Indicates output(s) generated by the task. This can be either a File(s) or
SQL relation(s) (table or view). The exact type depends on the source
value
for the given task: SQL scripts generate SQL relations, everything else
generates files.
When generating files, paths are relative to the pipeline.yaml
parent
directory. For example, if your pipeline is located at
project/pipeline.yaml
, and you have:
tasks:
- source: scripts/my_script.py
product: output/my_output.csv
Ploomber will save your output to project/output/my_output.csv
When generating SQL relations, the format is different:
tasks:
- source: scripts/my_script.sql
# list with three elements (last one can be table or view)
product: [schema, name, table]
# schema is optional, it can also be: [name, table]
If the task generates multiple products, pass a dictionary:
tasks:
- source: scripts/my_script.py
product:
nb: output/report.html
data: output/data.csv
Note
The name of keys in the product dictionary can be chosen freely so as to be descriptive of the outputs (e.g. data
, data_clean
, model
, etc.)
The mechanism to make product
available when executing your task
depends on the type of task.
SQL tasks receive a {{product}}
placeholder:
-- {{product}} is replaced by "schema.name" or "name" if schema is empty
CREATE TABLE {{product}} AS
SELECT * FROM my_table WHERE my_column > 10
If product
is a dictionary, use {{product['key']}}
Python/R scripts/notebooks receive a product
variable in the
“injected-parameters” cell:
# %% tags=["parameters"]
product = None
# %% tags=["injected-parameters"]
product = '/path/to/output/data.csv'
# your code...
If product
is a dictionary, this becomes
product = {'key': '/path/to/output/data.csv', ...}
Python functions receive the product
argument:
import pandas as pd
def my_task(product):
# process data...
df.to_csv(product)
If product
is a dictionary, use product['key']
.
The same logic applies when making upstream
dependencies available to
tasks, but in this case. upstream
is always a dictionary: SQL scripts can
refer to their upstream dependencies using {{upstream['key']}}
. While
Python scripts and notebooks receive upstream in the “injected-parameters”
cell, and Python functions are called with an upstream
argument.
tasks[*].params
¶
Use this section to pass arbitrary parameters to a task.
tasks:
- source: {some-source}
product: {some-product}
params:
my_param: 42
You can also generate parameters from functions, for example:
tasks:
- source: {some-source}
product: {some-product}
params:
my_param: params::my_param_generate
This will import params
and call the function my_param_generate
. The
returned value is assigned to my_param
.
If your function takes parameters:
tasks:
- source: {some-source}
product: {some-product}
params:
my_param:
dotted_path: params::my_param_generate
arg1: value1
arg2: value2
In this case, my_param_generate
is called
like my_param_generate(arg1='value1', arg2='value2')
The mechanism to pass params
to tasks depends on the task type:
SQL tasks receive them as placeholders.
-- {{my_param}} is replaced by 42
SELECT * FROM my_table WHERE my_column > {{my_param}}
Python/R scripts/notebooks receive them in the “injected-parameters” cell:
# %% tags=["parameters"]
my_param = None
# %% tags=["injected-parameters"]
my_param = 42
# your code...
Python functions receive them as arguments:
# function is called with my_param=42
def my_task(product, my_param):
pass
Changelog
Changed in version 0.21: Allows passing dotted paths (module::function
) to
tasks[*].params
tasks[*].on_{render, finish, failure}
¶
Important
Hooks are not executed when opening scripts/notebooks in Jupyter.
These are hooks that execute under certain events. They are equivalent to dag-level hooks, except they apply to a specific task. There are three types of hooks:
on_render
executes right before executing the task.on_finish
executes when a task finishes successfully.on_failure
executes when a task errors during execution.
They all are optional and take a dotted path as an argument. For example,
assume your hooks.py
with functions on_render
, on_finish
, and
on_failure
. You can add those hooks to a task in your pipeline.yaml
like this:
tasks:
- source: tasks.my_task
product: products/output.csv
on_render: hooks.on_render
on_finish: hooks.on_finish
on_failure: hooks.on_failure
If your hook takes arguments, you may call it like this:
# to call any hook with arguments
# {hook-name} must be one of: on_render, on_finish, on_failure
{hook-name}:
dotted_path: {dotted.path.to.hook}
argument: value
Calling with arguments is useful when you have a parametrized pipeline.
If you need information from the task, you may add any of the following arguments to the hook:
task
: Task object (a subclass ofploomber.tasks.Task
)client
: Tasks’s client (a subclass ofploomber.clients.Client
)product
: Tasks’s product (a subclass ofploomber.products.Product
)params
: Tasks’s params (a dictionary)
For example, if you want to check the data quality of a function that cleans some data, you may want to add an on_finish
hook that loads the output and tests the data:
import pandas as pd
def on_finish(product):
df = pd.read_csv(product)
# check that column "age" has no NAs
assert not df.age.isna().sum()
Example: Hooks
tasks:
- source: tasks.do_something
product: output/data.csv
# task-level hooks
on_render:
dotted_path: hooks.on_render
my_param: 20
on_finish: hooks.on_finish
on_failure: hooks.on_failure
def on_render(my_param, task, client, product, params):
"""Executed after the task renders (before execution)
"""
print(f'Finished rendering {task.name} with my_param {my_param}, '
f'client {client}, product {product}, and task params {params}')
def on_finish(task, client, product, params):
"""Executes after the task runs
"""
print(f'Finished running {task.name} with client {client}, '
f'product {product} and params {params}')
def on_failure(task, client, product, params):
"""Executes if the task fails
"""
print(f'{task.name} with client {client}, '
f'product {product} and params {params} failed!')
Download:
ploomber examples -n cookbook/hooks -o hooks
tasks[*].params.resources_
¶
The params
section contains an optional section called resources_
(Note
the trailing underscore). By default, Ploomber marks tasks as outdated when
their parameters change; however, parameters in the resources_
section work differently: they’re marked as outdated when the contents of the file
change. For example, suppose you’re using a JSON file as a configuration
source for a given task, and want to make Ploomber re-run a task if such file
changes, you can do something like this:
tasks:
- source: scripts/my-script.py
product: report.html
params:
resources_:
# whenever the JSON file changes, my-script.py runs again
file: my-config-file.json
tasks[*].grid
¶
Sometimes, you may want to run the same task over a set of parameters, grid
allows you to do so. For example, say you want to train multiple models, each
one with a different set of parameters:
tasks:
- source: random-forest.py
# name is required when using grid
name: random-forest-
product: random-forest.html
grid:
# six tasks: 3 * 2
n_estimators: [5, 10, 20]
criterion: [gini, entropy]
The spec above generates six tasks, one for each combination of parameters (3 * 2)
. In this example, products will be named random-forest-X.html
where X
goes from 0
to 5
. Similarly, task names will be random-forest-X. You can customize task names and product names to contain the corresponding parameter values, see the below sections for details.
Generating large grids dynamically
In cases where listing each parameter isn’t feasible, you can write a function to produce all values (Added in version 0.21):
# grid.py (note: this can be any name you want)
def generate_values():
return range(10)
Then call it in your pipeline.yaml
like this:
tasks:
- source: random-forest.py
# name is required when using grid
name: random-forest-
product: random-forest.html
grid:
n_estimators: grid::generate_values
criterion: [gini, entropy]
Note
This feature is available since version 0.19.8
; however, in
version 0.21
, the format changed to module::function
.
From 0.19.8
to 0.20
, the format was module.function
.
The above will generate 20 tasks (10 generate from n_estimators
times
2 generated by criterion
).
If the function takes parameters:
tasks:
- source: random-forest.py
# name is required when using grid
name: random-forest-
product: random-forest.html
grid:
n_estimators:
dotted_path: grid::generate_values
arg1: value1
arg2: value2
criterion: [gini, entropy]
Templating output paths (products)
You can also customize the product outputs to organize them in different folders and names (Added in version 0.17.2):
tasks:
- source: random-forest.py
name: random-forest-
product: 'n_estimators=[[n_estimators]]/criterion=[[criterion]].html'
grid:
n_estimators: [5, 10, 20]
criterion: [gini, entropy]
The example above will generate outputs by replacing the parameter values;
for example, it will store the random forest with n_estimators=5
, and
criterion=gini
at, n_estimators=5/criterion=gini.html
. Note that this
uses square brackets to differentiate them from regular placeholders when
using an env.yaml
file.
Templating name tasks
Similarly, you can also customize task names (Added in version 0.19.8):
tasks:
- source: random-forest.py
name: random-forest-[[n_estimators]]-[[criterion]]
product: 'n_estimators=[[n_estimators]]/criterion=[[criterion]].html'
grid:
n_estimators: [5, 10, 20]
criterion: [gini, entropy]
The above will generate with task names random-forest-5-gini
, random-forest-10-gini
, etc.
Passing a list of grids
You may pass a list instead of a dictionary to use multiple sets of parameters:
tasks:
- source: train-model.py
name: train-model-
product: train-model.html
grid:
- model_type: [random-forest]
n_estimators: [5, 10, 20]
criterion: [gini, entropy]
- model_type: [ada-boost]
n_estimators: [1, 3, 5]
learning_rate: [1, 2]
Creating a task that depends on all grid-generated tasks
To create a task downstream to all tasks generated by grid
, you can use a
wildcard with the *
character:
# make all tasks that start with train-model- dependencies of this task
upstream = ['train-model-*']
Examples and changelog
Example: Grid
- source: scripts/fit.py
# generates tasks fit-1, fit-2, etc
name: fit-[[model_type]]-[[n_estimators]]-[[criterion]][[learning_rate]]
# disabling static_analysis because the notebook does not have
# a fixed set of parameters (depends on random-forest vs ada-boost)
static_analysis: disable
product:
nb: products/report-[[model_type]]-[[n_estimators]]-[[criterion]][[learning_rate]].html
model: products/model-[[model_type]]-[[n_estimators]]-[[criterion]][[learning_rate]].pickle
grid:
# generates 6 tasks (1 * 3 * 2)
- model_type: [random-forest]
n_estimators: [1, 3, 5]
criterion: [gini, entropy]
# generates 6 tasks (1 * 3 * 2)
- model_type: [ada-boost]
n_estimators: [1, 3, 5]
learning_rate: [1, 2]
Download:
ploomber examples -n cookbook/grid -o grid
Example: Model selection with nested cross-validation
executor: parallel
tasks:
- source: tasks/load.py
product:
nb: products/load.html
data: products/data.csv
- source: tasks/fit.py
name: fit-[[model]]
product:
nb: products/fit-[[model]].html
model: products/model-[[model]].pkl
grid:
- model: [sklearn.ensemble.RandomForestClassifier]
model_params:
# optimize over these parameters
- n_estimators: [2, 5]
criterion: [gini, entropy]
- model: [sklearn.svm.SVC]
model_params:
# optimize over these parameters
- kernel: [linear, poly]
C: [0.1, 1.0]
Download:
pip install ploomber
ploomber examples -n cookbook/nested-cv -o nested-cv
Changelog
Changed in version 0.21: Dotted paths are required to have the module::function
format,
module.function
is no longer allowed.
New in version 0.19.8: Pass dotted path (module.function
) to generate large grids dynamically
New in version 0.19.8: Customize task names using placeholders [[placeholder]]
New in version 0.17.2: Use params
and grid
in the same task. Values in params
are constant across the grid.
New in version 0.17.2: Customize the product paths with placeholders [[placeholder]]
tasks[*].client
¶
Task client to use. By default, the class-level client in the clients
section is used. This task-level value overrides it. Required for some
tasks (e.g., SQLScript
), optional for others (e.g., File
).
Can be a string (call without arguments):
client: clients.get_db_client
Or a dictionary (to call with arguments):
client:
dotted_path: clients.get_db_client
kwarg_1: value_1
...
kwarg_k: value_k
tasks[*].product_client
¶
Product client to use (to save product’s metadata). Only required if you want
to enable incremental builds (What are incremental builds?) if using SQL products.
It can be a string or a dictionary (API is the same as tasks[*].client
).
More information on product clients: FAQ and Glossary.
tasks[*].upstream
¶
Dependencies for this task. Only required if meta.extract_upstream=True
tasks:
...
upstream: {str or list}
Example:
tasks:
source: scripts/my-script.py
product: output/report.html
upstream: [clean_data_a, clean_data_b]
tasks[*].class
¶
Task class to use (any class from ploomber.tasks). You rarely have to set
this, since it is inferred from source
. For example,
ploomber.tasks.NotebookRunner
for .py
and .ipynb
files, ploomber.tasks.SQLScript
for .sql
, and
ploomber.tasks.PythonCallable
for dotted paths.
tasks[*].product_class
¶
This takes any class name from ploomber.products
. You rarely have
to set this, since values from meta.product_default_class
contain the
typical values.
Parametrizing with env.yaml
¶
In some situations, it’s helpful to parametrize a pipeline. For example, you could run your pipeline with a sample of the data as a smoke test; to make sure it runs before triggering a run with the entire dataset, which could take several hours to finish.
To add parameters to your pipeline, create and env.yaml
file next to your
pipeline.yaml
:
my_param: my_value
nested:
param: another_value
Then use placeholders in your pipeline.yaml
file:
tasks:
- source: module.function
params:
my_param: '{{my_param}}'
my_second_param: '{{nested.param}}'
In the previous example, module.function
is called with
my_param='my_value'
and my_second_param='another_value'
.
A common pattern is to use a pipeline parameter to change the location of
tasks[*].product
. For example:
tasks:
- source: module.function
# path determined by a parameter
product: '{{some_directory}}/output.csv'
- source: my_script.sql
# schema and prefix determined by a parameter
product: ['{{some_schema}}', '{{some_prefix}}_name', table]
This can help you keep products generated by runs with different parameters in different locations.
These are the most common use cases, but you can use placeholders anywhere in
your pipeline.yaml
values (not keys):
tasks:
- source: module.function
# doesn't work
'{{placeholder}}': value
You can update your env.yaml
file or switch them from the command-line to
change the parameter values, run ploomber build --help
to get a list of
arguments you can pass to override the parameters defined in env.yaml
.
Note that these parameters are constant (they must be changed explicitly by you
either by updating the env.yaml
file or via the command line), if you want
to define dynamic parameters, you can do so with the Python API,
check out this example for an
example.
Re-using values in env.yaml
¶
In version 0.20
and newer, you can refer to existing keys in your
env.yaml
and re-use them in upcoming values:
prefix: path/to/outputs
reports: '{{prefix}}/reports' # resolves to /path/to/outputs/reports
models: '{{prefix}}/models' # resolves to /path/to/outputs/models
Note that order matters; you can only refer to keys that have been defined earlier in the file.
Setting parameters from the CLI¶
Once you define pipeline parameters, you can switch them from the command line:
ploomber {command} --env--param value # note the double dash
For example:
ploomber build --env--param value
Default placeholders¶
There are a few default placeholders you can use in your pipeline.yaml
,
even if not defined in the env.yaml
(or if you don’t have a env.yaml
altogether)
{{here}}
: Absolute path to the parent folder ofpipeline.yaml
{{cwd}}
: Absolute path to the current working directory{{root}}
: Absolute path to project’s root folder. It is usually the same as{{here}}
, except when the project is a package (i.e., it hassetup.py
file), in such a case, it points to the parent directory of thesetup.py
file.{{user}}
: Current username{{now}}
: Current timestamp in ISO 8601 format (Added in Ploomber 0.13.4){{git_hash}}
: git tag (if any) or git hash (Added in Ploomber 0.17.1){{git}}
: returns the branch name (if at the tip of it), git tag (if any), or git hash (Added in Ploomber 0.17.1){{env.ANY_ENV_VAR}}
environment variable present on the instance running the pipeline can be referenced using this syntax (Added in Ploomber 0.21.7)
When packaging a soopervisor docker image and no env.yaml file is defined a default one will be generated including some default placeholders such as {{git}}, {{git_hash}}. The reason being is that the package being copied to the docker image will not include .git and other ignored folders so at runtime ploomber won’t have the information necessary to calculate the git hash, hence this is being pre-calculated during image build time from the parent pipeline repo.
A common use case for this is when passing paths to files to scripts/notebooks. For example, let’s say your script has to read a file from a specific location. Using {{here}}
turns path into absolute so you can ready it when using Jupyter, even if the script is in a different location than your pipeline.yaml
.
By default, paths in tasks[*].product
are interpreted relative to the
parent folder of pipeline.yaml
. You can use {{cwd}}
or {{root}}
to override this behavior:
tasks:
- source: scripts/my-script.py
product:
nb: products/report.html
data: product/data.csv
params:
# make this an absolute file so you can read it when opening
# scripts/my-script.py in Jupyter
input_path: '{{here}}/some/path/file.json'
For more on parametrized pipelines, check out the guide: Parametrized pipelines.