ploomber.spec.DAGSpec¶
- class ploomber.spec.DAGSpec(data, env=None, lazy_import=False, reload=False, parent_path=None)¶
A DAG spec is a dictionary with certain structure that can be converted to a DAG using
DAGSpec.to_dag()
.There are two cases: the simplest one is just a dictionary with a “location” key with the factory to call, the other explicitly describes the DAG structure as a dictionary.
When
.to_dag()
is called, the current working directory is temporarily switched to the spec file parent folder (only applies when loading from a file)- Parameters:
data (str, pathlib.Path or dict) – Path to a YAML spec or dict spec. If loading from a file, sources and products are resolved to the file’s parent. If the file is in a packaged structure (i.e., src/package/pipeline.yaml), the existence of a setup.py in the same folder as src/ is validated. If loaded from a dict, sources and products aren’t resolved, unless a parent_path is not None.
env (dict, pathlib.path or str, optional) – If path it must be a YAML file. Environment to load. Any string with the format ‘{{placeholder}}’ in the spec is replaced by the corresponding value in the given key (i.e., placeholder). If
env
is None and spec is a dict, no env is loaded. If None and loaded from a YAML file, anenv.yaml
file is loaded from the current working diirectory, if it doesn’t exist, it is loaded from the YAML spec parent folder. If none of these exist, no env is loaded. Aploomber.Env
object is initialized, see documentation for details.lazy_import (bool, optional) – Whether to import dotted paths to initialize PythonCallables with the actual function. If False, PythonCallables are initialized directly with the dotted path, which means some verifications such as import statements in that function’s module are delayed until the pipeline is executed. This also applies to placeholders loaded using a SourceLoader, if a template exists, it will return the path to it, instead of initializing it, if it doesn’t, it will return None instead of raising an error. This setting is useful when we require to load YAML spec and instantiate the DAG object to extract information from it (e.g., which are the declared tasks) but the process running it may not have all the required dependencies to do so (e.g., an imported library in a PythonCallable task).
reload (bool, optional) – Reloads modules before importing dotted paths to detect code changes if the module has already been imported. Has no effect if lazy_import=True.
Examples
Load from
pipeline.yaml
:>>> from ploomber.spec import DAGSpec >>> spec = DAGSpec('spec/pipeline.yaml') # load spec >>> dag = spec.to_dag() # convert to DAG >>> status = dag.status()
Override
env.yaml
:>>> from ploomber.spec import DAGSpec >>> spec = DAGSpec('spec/pipeline.yaml', env=dict(key='value')) >>> dag = spec.to_dag() >>> status = dag.status()
See also
ploomber.DAG
Pipeline internal representation, implements the methods in the command-line interface (e.g.,
DAG.build()
, orDAG.plot
)
- path¶
Returns the path used to load the data. None if loaded from a dictionary
- Type:
str or None
Methods
clear
()find
([env, reload, lazy_import, ...])Automatically find pipeline.yaml and return a DAGSpec object, which can be converted to a DAG using .to_dag()
from_directory
(path_to_dir)Construct a DAGSpec from a directory.
from_files
(files)Construct DAGSpec from list of files or glob-like pattern.
get
(k[,d])items
()keys
()pop
(k[,d])If key is not found, d is returned if given, otherwise KeyError is raised.
popitem
()as a 2-tuple; but raise KeyError if D is empty.
setdefault
(k[,d])to_dag
()Converts the DAG spec to a DAG object
update
([E, ]**F)If E present and has a .keys() method, does: for k in E: D[k] = E[k] If E present and lacks .keys() method, does: for (k, v) in E: D[k] = v In either case, this is followed by: for k, v in F.items(): D[k] = v
values
()- clear() None. Remove all items from D. ¶
- classmethod find(env=None, reload=False, lazy_import=False, starting_dir=None, name=None)¶
Automatically find pipeline.yaml and return a DAGSpec object, which can be converted to a DAG using .to_dag()
- Parameters:
env – The environment to pass to the spec
name (str, default=None) – Filename to search for. If None, it looks for a pipeline.yaml file, otherwise it looks for a file with such name.
- classmethod from_directory(path_to_dir)¶
Construct a DAGSpec from a directory. Product and upstream are extracted from sources
- Parameters:
path_to_dir (str) – The directory to use. Looks for scripts (
.py
,.R
or.ipynb
) in the directory and interprets them as task sources, file names are assigned as task names (without extension). The spec is generated with the default values in the “meta” section. Ignores files with invalid extensions.
Notes
env
is not supported because the spec is generated from files inpath_to_dir
, hence, there is no way to embed tags
- classmethod from_files(files)¶
Construct DAGSpec from list of files or glob-like pattern. Product and upstream are extracted from sources
- Parameters:
files (list or str) – List of files to use or glob-like string pattern. If glob-like pattern, ignores directories that match the criteria.
- get(k[, d]) D[k] if k in D, else d. d defaults to None. ¶
- items() a set-like object providing a view on D's items ¶
- keys() a set-like object providing a view on D's keys ¶
- pop(k[, d]) v, remove specified key and return the corresponding value. ¶
If key is not found, d is returned if given, otherwise KeyError is raised.
- popitem() (k, v), remove and return some (key, value) pair ¶
as a 2-tuple; but raise KeyError if D is empty.
- setdefault(k[, d]) D.get(k,d), also set D[k]=d if k not in D ¶
- to_dag()¶
Converts the DAG spec to a DAG object
- update([E, ]**F) None. Update D from mapping/iterable E and F. ¶
If E present and has a .keys() method, does: for k in E: D[k] = E[k] If E present and lacks .keys() method, does: for (k, v) in E: D[k] = v In either case, this is followed by: for k, v in F.items(): D[k] = v
- values() an object providing a view on D's values ¶
Attributes