ploomber.spec.DAGSpec

class ploomber.spec.DAGSpec(data, env=None, lazy_import=False, reload=False, parent_path=None)

A DAG spec is a dictionary with certain structure that can be converted to a DAG using DAGSpec.to_dag().

There are two cases: the simplest one is just a dictionary with a “location” key with the factory to call, the other explicitly describes the DAG structure as a dictionary.

When .to_dag() is called, the current working directory is temporarily switched to the spec file parent folder (only applies when loading from a file)

Parameters:
  • data (str, pathlib.Path or dict) – Path to a YAML spec or dict spec. If loading from a file, sources and products are resolved to the file’s parent. If the file is in a packaged structure (i.e., src/package/pipeline.yaml), the existence of a setup.py in the same folder as src/ is validated. If loaded from a dict, sources and products aren’t resolved, unless a parent_path is not None.

  • env (dict, pathlib.path or str, optional) – If path it must be a YAML file. Environment to load. Any string with the format ‘{{placeholder}}’ in the spec is replaced by the corresponding value in the given key (i.e., placeholder). If env is None and spec is a dict, no env is loaded. If None and loaded from a YAML file, an env.yaml file is loaded from the current working diirectory, if it doesn’t exist, it is loaded from the YAML spec parent folder. If none of these exist, no env is loaded. A ploomber.Env object is initialized, see documentation for details.

  • lazy_import (bool, optional) – Whether to import dotted paths to initialize PythonCallables with the actual function. If False, PythonCallables are initialized directly with the dotted path, which means some verifications such as import statements in that function’s module are delayed until the pipeline is executed. This also applies to placeholders loaded using a SourceLoader, if a template exists, it will return the path to it, instead of initializing it, if it doesn’t, it will return None instead of raising an error. This setting is useful when we require to load YAML spec and instantiate the DAG object to extract information from it (e.g., which are the declared tasks) but the process running it may not have all the required dependencies to do so (e.g., an imported library in a PythonCallable task).

  • reload (bool, optional) – Reloads modules before importing dotted paths to detect code changes if the module has already been imported. Has no effect if lazy_import=True.

Examples

Load from pipeline.yaml:

>>> from ploomber.spec import DAGSpec
>>> spec = DAGSpec('spec/pipeline.yaml') # load spec
>>> dag = spec.to_dag() # convert to DAG
>>> status = dag.status()

Override env.yaml:

>>> from ploomber.spec import DAGSpec
>>> spec = DAGSpec('spec/pipeline.yaml', env=dict(key='value'))
>>> dag = spec.to_dag()
>>> status = dag.status()

See also

ploomber.DAG

Pipeline internal representation, implements the methods in the command-line interface (e.g., DAG.build(), or DAG.plot)

path

Returns the path used to load the data. None if loaded from a dictionary

Type:

str or None

Methods

clear()

find([env, reload, lazy_import, ...])

Automatically find pipeline.yaml and return a DAGSpec object, which can be converted to a DAG using .to_dag()

from_directory(path_to_dir)

Construct a DAGSpec from a directory.

from_files(files)

Construct DAGSpec from list of files or glob-like pattern.

get(k[,d])

items()

keys()

pop(k[,d])

If key is not found, d is returned if given, otherwise KeyError is raised.

popitem()

as a 2-tuple; but raise KeyError if D is empty.

setdefault(k[,d])

to_dag()

Converts the DAG spec to a DAG object

update([E, ]**F)

If E present and has a .keys() method, does: for k in E: D[k] = E[k] If E present and lacks .keys() method, does: for (k, v) in E: D[k] = v In either case, this is followed by: for k, v in F.items(): D[k] = v

values()

clear() None.  Remove all items from D.
classmethod find(env=None, reload=False, lazy_import=False, starting_dir=None, name=None)

Automatically find pipeline.yaml and return a DAGSpec object, which can be converted to a DAG using .to_dag()

Parameters:
  • env – The environment to pass to the spec

  • name (str, default=None) – Filename to search for. If None, it looks for a pipeline.yaml file, otherwise it looks for a file with such name.

classmethod from_directory(path_to_dir)

Construct a DAGSpec from a directory. Product and upstream are extracted from sources

Parameters:

path_to_dir (str) – The directory to use. Looks for scripts (.py, .R or .ipynb) in the directory and interprets them as task sources, file names are assigned as task names (without extension). The spec is generated with the default values in the “meta” section. Ignores files with invalid extensions.

Notes

env is not supported because the spec is generated from files in path_to_dir, hence, there is no way to embed tags

classmethod from_files(files)

Construct DAGSpec from list of files or glob-like pattern. Product and upstream are extracted from sources

Parameters:

files (list or str) – List of files to use or glob-like string pattern. If glob-like pattern, ignores directories that match the criteria.

get(k[, d]) D[k] if k in D, else d.  d defaults to None.
items() a set-like object providing a view on D's items
keys() a set-like object providing a view on D's keys
pop(k[, d]) v, remove specified key and return the corresponding value.

If key is not found, d is returned if given, otherwise KeyError is raised.

popitem() (k, v), remove and return some (key, value) pair

as a 2-tuple; but raise KeyError if D is empty.

setdefault(k[, d]) D.get(k,d), also set D[k]=d if k not in D
to_dag()

Converts the DAG spec to a DAG object

update([E, ]**F) None.  Update D from mapping/iterable E and F.

If E present and has a .keys() method, does: for k in E: D[k] = E[k] If E present and lacks .keys() method, does: for (k, v) in E: D[k] = v In either case, this is followed by: for k, v in F.items(): D[k] = v

values() an object providing a view on D's values

Attributes

path