Spec API (pipeline.yaml)

Note

This document assumes you are already familiar with Ploomber’s core concepts (DAG, product, task, and upstream). If you’re not, check out this guide: Basic concepts.

This section describes the pipeline.yaml schema.

meta

meta is an optional section for meta-configuration, it controls how the DAG is constructed.

meta.source_loader

Load task sources (tasks[*].source) from a Python module. For example, say you have a module my_module and want to load sources from a path/to/sources directory inside that module:

meta:
    source_loader:
        module: my_module
        path: path/to/sources

meta.import_tasks_from

Add tasks defined in a different file to the current one. This direcive is useful for composing pipelines. For example, if you have a training and a serving pipeline, you can define the pre-processing logic in a pipeline.preprocessing.yaml and then import the file into pipeline.training.yaml and pipeline.serving.yaml:

meta:
    import_tasks_from: /path/to/tasks.yaml

The file must be a list where each element is a valid Task.

Click here to see a batch serving example.

Click here to see an online serving example.

meta.extract_upstream

Extract upstream dependencies from the source code (True by default).

meta:
    extract_upstream: True

If False, tasks must declare dependencies using the upstream key:

meta:
    extract_upstream: false

tasks:
    - source: tasks/clean.py
      product: outupt/report.html
      upstream: [some_task, another_task]

meta.extract_product

Default:

meta:
    extract_product: False

meta.product_default_class

Product class key for a given task class. Names should match (case-sensitive) the names in the Python API. These are rarely changed, except for SQLScript. Defaults:

meta:
    product_default_class:
        SQLScript: SQLRelation
        SQLDump: File
        NotebookRunner: File
        ShellScript: File
        PythonCallable: File

executor

Determines which executor to use:

  1. serial: Runs one task at a time (Note: By default, function tasks run in a subprocess)

  2. parallel: Run independent tasks in parallel (Note: this runs all tasks in a subprocess)

  3. Dotted path: This allows you to customize the initialization parameters

For example, say you want to use the ploomber.executors.Serial executor but do not want to run functions in a subprocess, you can pass a dotted path and custom parameters like this:

executor:
  dotted_path: ploomber.executors.Serial
  build_in_subprocess: false # do not run function tasks in a subprocess

Another common use case is to limit the number of subprocesses when using the ploomber.executors.Parallel executor:

executor:
  dotted_path: ploomber.executors.Parallel
  processes: 2 # limit to a max of 2 processes

To learn more about the executors:

clients

These are the default clients. It allows you to specify a single client for all Tasks/Products for a given class. The most common use case is SQL database configuration.

Other scenarios are ploomber.products.File clients, which Ploomber can use to backup pipeline results (say, for example, you run a job that trains several models and want to save output results. You can use ploomber.clients.GCloudStorageClient or ploomber.clients.S3Client for that.

Keys must be valid ploomber.tasks or ploomber.products names, values must be dotted paths to functions that return a ploomber.clients instance.

Can be a string (call without arguments):

clients:
    # this assumes there is a clients.py with a get_client function
    {some-class}: clients.get_client

Or a dictionary (to call with arguments):

clients:
    {some-class}:
        # this assumes there is a clients.py with a get_client function
        dotted_path: clients.get_client
        kwarg_1: value_1
        ...
        kwarg_k: value_k
Example: Database dump
clients:
  SQLDump: clients.get

tasks:
  - source: query.sql
    product: output/data.csv
    # dump everything into a single file
    chunksize: null
from ploomber.clients import SQLAlchemyClient


def get():
    return SQLAlchemyClient('sqlite:///my.db')

Download:

ploomber examples -n cookbook/sql-dump -o sql-dump
Example: Upload files to the cloud
clients:
  # configures a dag-level File client
  File: clients.get_local # you can switch to clients.get_s3 or clients.get_gcloud

tasks:
  - source: functions.create_file
    # upon execution, this file is uploaded to storage
    product: products/some-file.txt
  
  - source: scripts/some-script.py
    # upon execution, both files are uploaded to storage
    product:
      nb: products/some-script.ipynb
      file: products/another-file.txt
    # you may also pass a task-level File client if you don't want to upload
    # all products in the pipeline
    # client: clients.get_s3
from ploomber.clients import LocalStorageClient, GCloudStorageClient, S3Client


def get_local():
    """Returns local client
    """
    return LocalStorageClient('backup')


def get_s3():
    """Returns S3 client
    """
    # assumes your environment is already configured, you may also pass the
    # json_credentials_path
    return S3Client(bucket_name='some-bucket', parent='my-project/products')


def get_gcloud():
    """Returns google cloud storage client
    """
    # assumes your environment is already configured, you may also pass the
    # json_credentials_path
    return GCloudStorageClient(bucket_name='some-bucket',
                               parent='my-project/products')

Download:

ploomber examples -n cookbook/file-client -o file-client
Full projects

on_{render, finish, failure}

Important

Hooks are not executed when opening scripts/notebooks in Jupyter.

These are hooks that execute when specific events happen:

  1. on_render: executes after verifying there are no errors in your pipeline declaration (e.g., a task that doesn’t exist declared as an upstream dependency)

  2. on_finish: executes upon successful pipeline run

  3. on_failure: executes upon failed pipeline run

They all are optional and take a dotted path as an argument. For example, assume you have a hooks.py with function on_render, on_finish, and on_failure. You can add them to your pipeline.yaml like this:

on_render: hooks.on_render
on_finish: hooks.on_finish
on_failure: hooks.on_failure

If your hook takes arguments, you may call it like this:

# to call any hook with arguments
# {hook-name} must be one of: on_render, on_finish, on_failure
{hook-name}:
    dotted_path: {dotted.path.to.hook}
    argument: value

Calling with arguments is useful when you have a parametrized pipeline.

If you need information from your DAG in your hook, you may request the dag (ploomber.DAG) argument in any of the hooks. on_finish can also request a report argument, which constains a summary report of the pipeline’s execution.

on_failure can request a traceback argument which will have a dictionary, possible keys are build which has the build error traceback, and on_finish which includes the on_finish hook traceback, if any. For more information, see the DAG documentation ploomber.DAG.

Example: Hooks
# dag-level hooks
on_render:
  dotted_path: hooks.dag_level_on_render
  my_param: 10
on_finish: hooks.dag_level_on_finish
on_failure: hooks.dag_level_on_failure
def dag_level_on_render(my_param):
    """Executed after the pipeline renders (before execution)
    """
    print(f'rendered DAG! my_param={my_param}')


def dag_level_on_finish(dag, report):
    """Executes after the pipeline runs all tasks
    """
    print(f'Finished executing pipeline {dag}, report:\n{report}')


def dag_level_on_failure(traceback):
    """Executes if the pipeline fails
    """
    if traceback.get('build'):
        print('Pipeline execution failed while running the tasks!')

    if traceback.get('on_finish'):
        print('Pipeline execution failed while executing an on_finish hook!')

Download:

ploomber examples -n cookbook/hooks -o hooks

serializer and unserializer

By default, tasks whose source is a function (i.e., ploomber.tasks.PythonCallable). Receive input paths (in upstream) and output paths (in product) when the function executes. Saving interim results allows Ploomber to provide incremental builds (What are incremental builds?).

However, in some cases, we might want to provide a pipeline that performs all operations in memory (e.g., to do online serving). ploomber.OnlineDAG can convert a file-based pipeline into an in-memory one without code changes, allowing you to re-use your feature engineering code for training and serving. The only requisite is for tasks to configure a serializer and unserializer. Click here to see an example.

Normally, a task whose source is a function looks like this:

import pandas as pd

def my_task(product, upstream):
    df_upstream = pd.read_csv(upstream['name'])
    # process data...
    # save product
    df_product.to_csv(product)

And you use the product parameter to save any task output.

However, if you add a serializer, product isn’t passed, and you must return the product object:

import pandas as pd

def my_task(upstream):
    df_upstream = pd.read_csv(upstream['name'])
    # process data...
    return df_product

The serializer function is called with the returned object as its first argument and product (output path) as the second argument:

serializer(df_product, product)

A similar logic applies to unserializer; when present, the function is called for each upstream dependency with the product as the argument:

unserializer(product)

In your task function, you receive objects (instead of paths):

import pandas as pd

def my_task(upstream):
    # no need to call pd.read_csv here
    df_upstream = upstream['name']
    return df_product

If you want to provide a Task-level serializer/unserializer pass it directly to the task, if you set a DAG-level serializer/unserializer and wish to exclude specific task pass serializer: null or unserializer: null in the selected task.

Example: Serialization
serializer: util.my_serializer
unserializer: util.my_unserializer

tasks:
  - source: tasks.one_product
    product: output/one.txt

  - source: tasks.many_products
    product:
      something: output/something.csv
      another: output/something.txt

  - source: tasks.joblib_product
    product: output/something.joblib

  - source: tasks.final_product
    product: output/final.csv
from ploomber.io import serializer, unserializer


@serializer(fallback='joblib', defaults=['.csv', '.txt'])
def my_serializer(obj, product):
    pass


@unserializer(fallback='joblib', defaults=['.csv', '.txt'])
def my_unserializer(product):
    pass

Download:

ploomber examples -n cookbook/serialization -o serialization

source_loader

If you package your project (i.e., add a setup.py), source_loader offers a convenient way to load sources inside such package.

For example, if your package is named my_package and you want to load from the folder my_sources/ within the package:

meta:
    source_loader:
        module: my_package
        path: my_sources

tasks:
    # this is loaded from my_package (my_sources directory)
    - source: script.sql
      # task definition continues...

To find out the location used, you can execute the following in a Python session:

import my_package; print(my_package) # print package location

The above should print something like path/to/my_package/__init__.py. Using the configuration above, it implies that source loader will load the file from path/to/my_package/my_sources/script.sql.

Note: this only applies to tasks whose source is a relative path. Dotted paths and absolute paths are not affected.

For details, see ploomber.SourceLoader, which is the underlying Python implementation. Here’s an example that uses source_loader.

SQLScript product class

By default, SQL scripts use ploomber.products.SQLRelation as product class. Such product doesn’t save product’s metadata; required for incremental builds (What are incremental builds?). If you want to use them, you need to change the default value and configure the product’s client.

Here’s an example that uses product_default_class to configure a SQLite pipeline with incremental builds.

For more information on product clients, see: FAQ and Glossary.

Loading from a factory

The CLI looks for a pipeline.yaml by default, if you’re using the Python API, and want to save some typing, you can specify a pipeline.yaml like this:

# pipeline.yaml
location: {dotted.path.to.factory}

With such configuration, commands such as ploomber build will work.

task

task schema.

Tip

All other keys passed here are forwarded to the class constructor, so the allowed values will depend on the task class. For example, if running a notebook the task class is ploomber.tasks.NotebookRunner, if it’s a function it’ll be a ploomber.tasks.PythonCallable, see the documentation to learn what extra arguments they take.

tasks[*].name

The name of the task. The filename (without the extension) is used if not defined.

tasks[*].source

Indicates where the source code for a task is. This can be a path to a files if using scripts/notebooks or dotted paths if using a function.

By default, paths are relative to the pipeline.yaml parent folder (absolute paths are not affected), unless source_loader is configured; in such situation, paths are relative to the location configured in the SourceLoader object. See the source_loader section for more details.

For example, if your pipeline is located at project/pipeline.yaml, and you have:

tasks:
    - source: scripts/my_script.py
      # task definition continues...

Ploomber will expect your script to be located at project/scripts/my_script.py

If using a function, the dotted path should be importable. for example, if you have:

tasks:
    - source: my_package.my_module.my_function
      # task definition continues...

Ploomber runs a code equivalent to:

from my_package.my_module import my_function

tasks[*].product

Indicates output(s) generated by the task. This can be either a File(s) or SQL relation(s) (table or view). The exact type depends on the source value for the given task: SQL scripts generate SQL relations, everything else generates files.

When generating files, paths are relative to the pipeline.yaml parent directory. For example, if your pipeline is located at project/pipeline.yaml, and you have:

tasks:
    - source: scripts/my_script.py
      product: output/my_output.csv

Ploomber will save your output to project/output/my_output.csv

When generating SQL relations, the format is different:

tasks:
    - source: scripts/my_script.sql
      # list with three elements (last one can be table or view)
      product: [schema, name, table]
      # schema is optional, it can also be: [name, table]

If the task generates multiple products, pass a dictionary:

tasks:
    - source: scripts/my_script.py
      product:
        nb: output/report.html
        data: output/data.csv

The mechanism to make product available when exeuting your task depends on the type of task.

SQL tasks receive a {{product}} placeholder:

-- {{product}} is replaced by "schema.name" or "name" if schema is empty
CREATE TABLE {{product}} AS
SELECT * FROM my_table WHERE my_column > 10

If product is a dictionary, use {{product['key']}}

Python/R scripts/notebooks receive a product variable in the “injected-parameters” cell:

# %% tags=["parameters"]
product = None

# %% tags=["injected-parameters"]
product = '/path/to/output/data.csv'

# your code...

If product is a dictionary, this becomes product = {'key': '/path/to/output/data.csv', ...}

Python functions receive the product argument:

import pandas as pd

def my_task(product):
    # process data...
    df.to_csv(product)

If product is a dictionary, use product['key'].

The same logic applies when making upstream dependencies available to tasks, but in this case. upstream is always a dictionary: SQL scripts can refer to their upstream dependencies using {{upstream['key']}}. While Python scripts and notebooks receive upstream in the “injected-parameters” cell, and Python functions are called with an upstream argument.

tasks[*].params

Use this section to pass arbitrary parameters to a task. The exact mechanism depends on the task type. Assume you have the following:

tasks:
    - source: {some-source}
      product: {some-product}
      params:
        my_param: 42

SQL tasks receive them as placeholders.

-- {{my_param}} is replaced by 42
SELECT * FROM my_table WHERE my_column > {{my_param}}

Python/R scripts/notebooks receive them in the “injected-parameters” cell:

# %% tags=["parameters"]
my_param = None

# %% tags=["injected-parameters"]
my_param = 42

# your code...

Python functions receive them as arguments:

# function is called with my_param=42
def my_task(product, my_param):
    pass

tasks[*].on_{render, finish, failure}

Important

Hooks are not executed when opening scripts/notebooks in Jupyter.

These are hooks that execute under certain events. They are equivalent to dag-level hooks, except they apply to a specific task. There are three types of hooks:

  1. on_render executes right before executing the task.

  2. on_finish executes when a task finishes successfully.

  3. on_failure executes when a task errors during execution.

They all are optional and take a dotted path as an argument. For example, assume your hooks.py with functions on_render, on_finish, and on_failure. You can add those hooks to a task in your pipeline.yaml like this:

tasks:
    - source: tasks.my_task
      product: products/output.csv
      on_render: hooks.on_render
      on_finish: hooks.on_finish
      on_failure: hooks.on_failure

If your hook takes arguments, you may call it like this:

# to call any hook with arguments
# {hook-name} must be one of: on_render, on_finish, on_failure
{hook-name}:
    dotted_path: {dotted.path.to.hook}
    argument: value

Calling with arguments is useful when you have a parametrized pipeline.

If you need information from the task, you may add any of the following arguments to the hook:

  1. task: Task object (a subclass of ploomber.tasks.Task)

  2. client: Tasks’s client (a subclass of ploomber.clients.Client)

  3. product: Tasks’s product (a subclass of ploomber.products.Product)

  4. params: Tasks’s params (a dictionary)

For example, if you want to check the data quality of a function that cleans some data, you may want to add an on_finish hook that loads the output and tests the data:

import pandas as pd

def on_finish(product):
    df = pd.read_csv(product)

    # check that column "age" has no NAs
    assert not df.age.isna().sum()
Example: Hooks
tasks:
  - source: tasks.do_something
    product: output/data.csv
    # task-level hooks
    on_render:
      dotted_path: hooks.on_render
      my_param: 20
    on_finish: hooks.on_finish
    on_failure: hooks.on_failure
def on_render(my_param, task, client, product, params):
    """Executed after the task renders (before execution)
    """
    print(f'Finished rendering {task.name} with my_param {my_param}, '
          f'client {client}, product {product}, and task params {params}')


def on_finish(task, client, product, params):
    """Executes after the task runs
    """
    print(f'Finished running {task.name} with client {client}, '
          f'product {product} and params {params}')


def on_failure(task, client, product, params):
    """Executes if the task fails
    """
    print(f'{task.name} with client {client}, '
          f'product {product} and params {params} failed!')

Download:

ploomber examples -n cookbook/hooks -o hooks

tasks[*].params.resources_

The params section contains an optional section called resources_ (Note the trailing underscore). By default, Ploomber marks tasks as outdated when their parameters change; however, parameters in the resources_ section work differently: they’re marked as outdated when the contents of the file change. For example, suppose you’re using a JSON file as a configuration source for a given task, and want to make Ploomber re-run a task if such file changes, you can do something like this:

tasks:
    - source: scripts/my-script.py
      product: report.html
      params:
        resources_:
            # whenever the JSON file changes, my-script.py runs again
            file: my-config-file.json

tasks[*].grid

Sometimes, you may want to run the same task over a set of parameters, grid allows you to do so. For example, say you want to train multiple models, each one with a different set of parameters:

tasks:
  - source: random-forest.py
    # name is required when using grid
    name: random-forest-
    product: random-forest.html
    grid:
        n_estimators: [5, 10, 20]
        criterion: [gini, entropy]

The spec above generates nine tasks for each combination of parameters with products random-forest-X.html where X goes from 0 to 8. Task names also include a suffix.

You can also customize the product outputs to organize them in different folders and names (Added in version 0.17.2):

tasks:
  - source: random-forest.py
    name: random-forest-
    product: 'n_estimators=[[n_estimators]]/criterion=[[criterion]].html'
    grid:
        n_estimators: [5, 10, 20]
        criterion: [gini, entropy]

The example above will generate outputs by replacing the parameter values; for example, it will store the random forest with n_estimators=5, and criterion=gini at, n_estimators=5/criterion=gini.html. Note that this uses square brackets to differentiate them from regular placeholders when using an env.yaml file.

You may pass a list instead of a dictionary to use multiple sets of parameters:

tasks:
  - source: train-model.py
    name: train-model-
    product: train-model.html
    grid:
      - model_type: [random-forest]
        n_estimators: [5, 10, 20]
        criterion: [gini, entropy]

      - model_type: [ada-boost]
        n_estimators: [1, 3, 5]
        learning_rate: [1, 2]

To create a task downstream to all tasks generated by grid, you can use a wildcard (e.g., train-model-*).

Example: Grid
  - source: scripts/fit.py
    # generates tasks fit-1, fit-2, etc
    name: fit-
    # disabling static_analysis because the notebook does not have
    # a fixed set of parameters (depends on random-forest vs ada-boost)
    static_analysis: disable
    product:
      nb: products/report.html
      model: products/model.pickle
    grid:
      # generates 6 tasks (1 * 3 * 2)
      - model_type: [random-forest]
        n_estimators: [1, 3, 5]
        criterion: [gini, entropy]

      # generates 6 tasks (1 * 3 * 2)
      - model_type: [ada-boost]
        n_estimators: [1, 3, 5]
        learning_rate: [1, 2]

Download:

ploomber examples -n cookbook/grid -o grid
Example: Model selection with nested cross-validation
executor: parallel

tasks:
  - source: tasks/load.py
    product:
      nb: products/load.html
      data: products/data.csv

  - source: tasks/fit.py
    name: fit-
    product:
      nb: products/fit.html
      model: products/model.pkl
    grid:
      - model: sklearn.ensemble.RandomForestClassifier
        model_params:
          # optimize over these parameters
          - n_estimators: [2, 5]
            criterion: [gini, entropy]
    
      - model: sklearn.svm.SVC
        model_params:
          # optimize over these parameters
          - kernel: [linear, poly]
            C: [0.1, 1.0]
      
      

Download:

pip install ploomber
ploomber examples -n cookbook/nested-cv -o nested-cv
Changelog

New in version 0.17.2: Use params and grid in the same task. Values in params are constant across the grid.

New in version 0.17.2: Cstomize the product paths with placeholders [[placeholder]]

tasks[*].client

Task client to use. By default, the class-level client in the clients section is used. This task-level value overrides it. Required for some tasks (e.g., SQLScript), optional for others (e.g., File).

Can be a string (call without arguments):

client: clients.get_db_client

Or a dictionary (to call with arguments):

client:
    dotted_path: clients.get_db_client
    kwarg_1: value_1
    ...
    kwarg_k: value_k

tasks[*].product_client

Product client to use (to save product’s metadata). Only required if you want to enable incremental builds (What are incremental builds?) if using SQL products. It can be a string or a dictionary (API is the same as tasks[*].client).

More information on product clients: FAQ and Glossary.

tasks[*].upstream

Dependencies for this task. Only required if meta.extract_upstream=True

tasks:
    ...
    upstream: {str or list}

Example:

tasks:
    source: scripts/my-script.py
    product: output/report.html
    upstream: [clean_data_a, clean_data_b]

tasks[*].class

Task class to use (any class from ploomber.tasks). You rarely have to set this, since it is inferred from source. For example, ploomber.tasks.NotebookRunner for .py and .ipynb files, ploomber.tasks.SQLScript for .sql, and ploomber.tasks.PythonCallable for dotted paths.

tasks[*].product_class

This takes any class name from ploomber.products. You rarely have to set this, since values from meta.product_default_class contain the typical values.

Parametrizing with env.yaml

In some situations, it’s helpful to parametrize a pipeline. For example, you could run your pipeline with a sample of the data as a smoke test; to make sure it runs before triggering a run with the entire dataset, which could take several hours to finish.

To add parameters to your pipeline, create and env.yaml file next to your pipeline.yaml:

my_param: my_value
nested:
    param: another_value

Then use placeholders in your pipeline.yaml file:

tasks:
    - source: module.function
      params:
        my_param: '{{my_param}}'
        my_second_param: '{{nested.param}}'

In the previous example, module.function is called with my_param='my_value' and my_second_param='another_value'.

A common pattern is to use a pipeline parameter to change the location of tasks[*].product. For example:

tasks:
    - source: module.function
      # path determined by a parameter
      product: '{{some_directory}}/output.csv'

    - source: my_script.sql
      # schema and prefix determined by a parameter
      product: ['{{some_schema}}', '{{some_prefix}}_name', table]

This can help you keep products generated by runs with different parameters in different locations.

These are the most common use cases, but you can use placeholders anywhere in your pipeline.yaml values (not keys):

tasks:
    - source: module.function
      # doesn't work
      '{{placeholder}}': value

You can update your env.yaml file or switch them from the command-line to change the parameter values, run ploomber build --help to get a list of arguments you can pass to override the parameters defined in env.yaml.

Note that these parameters are constant (they must be changed explicitly by you either by updating the env.yaml file or via the command line), if you want to define dynamic parameters, you can do so with the Python API, check out this example for an example.

Setting parameters from the CLI

Once you define pipeline parameters, you can switch them from the command line:

ploomber {command} --env--param value # note the double dash

For example:

ploomber build --env--param value

Default placeholders

There are a few default placeholders you can use in your pipeline.yaml, even if not defined in the env.yaml (or if you don’t have a env.yaml altogether)

  • {{here}}: Absolute path to the parent folder of pipeline.yaml

  • {{cwd}}: Absolute path to the current working directory

  • {{root}}: Absolute path to project’s root folder. It is usually the same as {{here}}, except when the project is a package (i.e., it has setup.py file), in such a case, it points to the parent directory of the setup.py file.

  • {{user}}: Current username

  • {{now}}: Current timestamp in ISO 8601 format (Added in Ploomber 0.13.4)

  • {{git_hash}}: git tag (if any) or git hash (Added in Ploomber 0.17.1)

  • {{git}}: returns the branch name (if at the tip of it), git tag (if any), or git hash (Added in Ploomber 0.17.1)

A common use case for this is when passing paths to files to scripts/notebooks. For example, let’s say your script has to read a file from a specific location. Using {{here}} turns path into absolute so you can ready it when using Jupyter, even if the script is in a different location than your pipeline.yaml.

By default, paths in tasks[*].product are interpreted relative to the parent folder of pipeline.yaml. You can use {{cwd}} or {{root}} to override this behavior:

tasks:
    - source: scripts/my-script.py
      product:
        nb: products/report.html
        data: product/data.csv
      params:
        # make this an absolute file so you can read it when opening
        # scripts/my-script.py in Jupyter
        input_path: '{{here}}/some/path/file.json'

For more on parametrized pipelines, check out the guide: Parametrized pipelines.