To run this locally, install Ploomber and execute: ploomber examples -n guides/testing
Found an issue? Let us know.
Questions? Ask us on Slack.
Pipeline testing¶
Tutorial showing how to use a task’s on_finish hook to test data quality.
Testing your pipeline is critical to ensure your data expectations hold. When you perform a data transformation, you are expecting the output to have certain properties (e.g. no nulls in certain column). Without testing, these expectations won’t be verified and will cause errors errors to propagate to all downstream tasks.
These are the most common sources of errors when transforming data:
A join operation generates duplicated entries because a wrong assumption of a one-to-one relationship (which is really a one-to-many) in the source tables
A function that aggregates data returns
NULL
because at least one of the input data points wasNULL
Dirty data points are used in the analysis (e.g. in a column
age
, you forgot to remove corrupted data points with negative values)
Some of these errors are easy to spot (2), but it might take you some tome to find out about others (1 and 3), or worst, you will never notice these errors and just use incorrect data in your analysis. And even if your code is correct and all your expectations hold true, it might not hold true in the future if the data changes and it’s important for you to know this as soon as it happens.
To make testing effective, your tests should run every time you run your tasks. Ploomber has a mechanism to automate this.
Sample data¶
This example loads data from a single table called my_table
, which has two columns:
age: ranges from 21 to 80 but there are some corrupted records with -42
score: ranges from 0 to 10 but there are some corrupted records with missing values
Let’s take a look at our example pipeline.yaml
:
[1]:
from pathlib import Path
# Content of pipeline.yaml
clients:
SQLScript: db.get_client
SQLDump: db.get_client
tasks:
- source: clean.sql
name: clean
product: ['my_clean_table', 'table']
on_finish: integration_tests.test_sql_clean
- source: dump.sql
name: dump
class: SQLDump
product: output/my_clean_table.csv
chunksize: null
- source: transform.py
product:
nb: output/transformed.html
data: output/transformed.csv
on_finish: integration_tests.test_py_transform
The pipeline has three tasks, one to clean the raw table, another one to dump the clean data to a CSV file and finally, one Python task to transform the data. We included a SQL and a Python task to show how you can test both types of tasks but we recommend you to do as much analysis as you can using SQL because it scales much better than Python code (you won’t have to deal with memory errors).
The configuration is straightforward, the only new key is on_finish
(inside the first and third task). This is known as a hook. Task hooks allow you to embed custom logic when certain events happen. on_finish
is executed after a task successfully executes. The value is a dotted path, which tells Ploomber where to find your testing function. Under the hood, Ploomber will import your function and call it after the task is executed, here’s some equivalent code:
from integration_tests import test_sql_clean
# your task is executed...
# ploomber calls your testing function...
test_sql_clean()
Before diving into the testing source code, let’s see the rest of the tasks.
clean.sql
just filters columns we don’t want to include in the analysis:
# Content of clean.sql
DROP TABLE IF EXISTS {{product}};
CREATE TABLE {{product}} AS
SELECT * FROM my_table
WHERE score is not null AND age > 0
dump.sql
just selects all rows from the clean table to dump it to the CSV file:
# Content of dump.sql
SELECT * FROM {{upstream['clean']}}
Finally, the transform.py
script generates a new column using score
# Content of transform.py
import pandas as pd
# + tags=["parameters"]
upstream = ['dump']
product = None
# +
df = pd.read_csv(upstream['dump'])
df['multiplied_score'] = df.score * 42
# +
df.to_csv(product['data'])
Let’s now take a look at our tests:
# Content of integration_tests.py
import pandas as pd
from ploomber.testing.sql import nulls_in_columns, range_in_column
def test_sql_clean(client, product):
"""Tests for clean.sql
"""
assert not nulls_in_columns(client, ['score', 'age'], product)
min_age, max_age = range_in_column(client, 'age', product)
assert min_age > 0
def test_py_transform(product):
"""Tests for transform.py
"""
df = pd.read_csv(str(product['data']))
assert not df.multiplied_score.isna().sum()
assert df.multiplied_score.min() >= 0
Testing Python scripts¶
To test your Python scripts, you have to know which file to look at. You can do so by simply adding product
as argument to your function. If your Python script generates more than one product (like in our case), product
will be a dictionary-like object, that’s why we are using product['data']
. This returns a Product
object, to get the path to the file, simply use the str
function.
>>> product # dictionary-like object: maps names to Product objects
>>> product['data'] # Product object
>>> str(product['data']) # path to the data file
Testing SQL scripts¶
To test SQL scripts, you also need the client to send queries to the appropriate database, to do so, just add client
to your testing function.
The ploomber.testing.sql
module implements convenient functions to test your tables. They always take client
as its first argument, just pass the client variable directly. Since our SQL script only generates a product, you can directly pass the product object to the testing function (otherwise pass product[key]
) with the appropriate key.
Note: If you’re implementing your own SQL testing logic, doing str(product)
will return a {schema}.{name}
string, you can also use product.schema
and product.name
.
Running the pipeline¶
Before we run the pipeline, we generate a sample database:
[2]:
%%sh
cd setup
python script.py
Let’s now run our pipeline:
[3]:
%%sh
ploomber build
name Ran? Elapsed (s) Percentage
--------- ------ ------------- ------------
clean True 0.02013 0.78097
dump True 0.002051 0.0795713
transform True 2.55538 99.1395
Building task 'transform': 0%| | 0/3 [00:00<?, ?it/s]
Executing: 0%| | 0/5 [00:00<?, ?cell/s]
Executing: 20%|██ | 1/5 [00:01<00:06, 1.75s/cell]
Executing: 100%|██████████| 5/5 [00:02<00:00, 2.39cell/s]
Building task 'transform': 100%|██████████| 3/3 [00:02<00:00, 1.16it/s]
Everything looks good.
Let’s now imagine a colleague found an error in the cleaning logic and has re-written the script. However, he was unaware that both columns in the raw table had corrupted data and forgot to include the filtering conditions.
The script now looks like this:
[4]:
path = Path('clean.sql')
new_code = path.read_text().replace('WHERE score is not null AND age > 0', '')
path.write_text(new_code)
[4]:
86
# Content of clean.sql
DROP TABLE IF EXISTS {{product}};
CREATE TABLE {{product}} AS
SELECT * FROM my_table
WHERE score is not null AND age > 0
Let’s see what happens if we run the pipeline:
[5]:
%%capture captured
%%sh --no-raise-error
ploomber build
[6]:
print(captured.stderr)
Building task 'clean': 100%|██████████| 3/3 [00:00<00:00, 115.03it/s]
Traceback (most recent call last):
File "/Users/Edu/dev/ploomber/src/ploomber/cli/io.py", line 20, in wrapper
fn(**kwargs)
File "/Users/Edu/dev/ploomber/src/ploomber/cli/build.py", line 51, in main
report = dag.build(force=args.force, debug=args.debug)
File "/Users/Edu/dev/ploomber/src/ploomber/dag/dag.py", line 482, in build
report = callable_()
File "/Users/Edu/dev/ploomber/src/ploomber/dag/dag.py", line 581, in _build
raise build_exception
File "/Users/Edu/dev/ploomber/src/ploomber/dag/dag.py", line 513, in _build
task_reports = self._executor(dag=self,
File "/Users/Edu/dev/ploomber/src/ploomber/executors/serial.py", line 138, in __call__
raise DAGBuildError(str(exceptions_all))
ploomber.exceptions.DAGBuildError:
=============================== DAG build failed ===============================
--------- SQLScript: clean -> SQLRelation(('my_clean_table', 'table')) ---------
---------- /Users/Edu/dev/projects-ploomber/guides/testing/clean.sql -----------
Traceback (most recent call last):
File "/Users/Edu/dev/ploomber/src/ploomber/tasks/abc.py", line 591, in _build
self._post_run_actions()
File "/Users/Edu/dev/ploomber/src/ploomber/tasks/abc.py", line 342, in _post_run_actions
self._run_on_finish()
File "/Users/Edu/dev/ploomber/src/ploomber/tasks/abc.py", line 333, in _run_on_finish
self.on_finish(**kwargs)
File "/Users/Edu/dev/ploomber/src/ploomber/util/dotted_path.py", line 74, in __call__
out = self._callable(*args, **kwargs_final)
File "/Users/Edu/dev/projects-ploomber/guides/testing/integration_tests.py", line 8, in test_sql_clean
assert not nulls_in_columns(client, ['score', 'age'], product)
AssertionError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/Edu/dev/ploomber/src/ploomber/executors/serial.py", line 186, in catch_exceptions
fn()
File "/Users/Edu/dev/ploomber/src/ploomber/executors/serial.py", line 159, in __call__
return self.fn(**self.kwargs)
File "/Users/Edu/dev/ploomber/src/ploomber/executors/serial.py", line 166, in catch_warnings
result = fn()
File "/Users/Edu/dev/ploomber/src/ploomber/executors/serial.py", line 159, in __call__
return self.fn(**self.kwargs)
File "/Users/Edu/dev/ploomber/src/ploomber/executors/serial.py", line 235, in build_in_subprocess
report, meta = task._build(**build_kwargs)
File "/Users/Edu/dev/ploomber/src/ploomber/tasks/abc.py", line 603, in _build
raise TaskBuildError(msg) from e
ploomber.exceptions.TaskBuildError: Exception when running on_finish for task "clean":
=============================== Summary (1 task) ===============================
SQLScript: clean -> SQLRelation(('my_clean_table', 'table'))
=============================== DAG build failed ===============================
Ploomber a structured error message to understand why your pipeline failed. The last few lines are a summary:
=============================== Summary (1 task) ===============================
SQLScript: clean -> SQLRelation(('my_clean_table', 'table'))
=============================== DAG build failed ===============================
By looking at the summary we know our pipeline failed because one task crashed (clean
). If we scroll up we’ll see a header section:
--------- SQLScript: clean -> SQLRelation(('my_clean_table', 'table')) ---------
-------------- /Users/Edu/dev/projects-ploomber/testing/clean.sql --------------
Each task displays its traceback on a separate section. Since only one task failed in our example we only see one task traceback.
At the end of this task traceback, we see the following line:
Exception when running on_finish for task "clean":
Now we know that the on_finish
hook crashed. If we go up a few lines up:
assert not nulls_in_columns(client, ['score', 'age'], product)
AssertionError
That tells me the exact test that failed! Pipelines can get very large; it helps a lot to have a structured error message that tells us what failed and where it happened. Our take away from the error message is: “the pipeline building process failed because the on_finish
hook in the clean
task raised an exception in certain assertion”. That’s much better than either “the pipeline failed” or “this line raised an exception”.
Let’s fix our pipeline and add the WHERE
clause again:
[7]:
path = Path('clean.sql')
new_code = path.read_text() + 'WHERE score is not null AND age > 0'
print(new_code)
path.write_text(new_code)
DROP TABLE IF EXISTS {{product}};
CREATE TABLE {{product}} AS
SELECT * FROM my_table
WHERE score is not null AND age > 0
[7]:
121
[8]:
%%sh
ploomber build
name Ran? Elapsed (s) Percentage
--------- ------ ------------- ------------
clean True 0.017694 0.67259
dump True 0.001669 0.0634426
transform True 2.61136 99.264
Building task 'transform': 0%| | 0/3 [00:00<?, ?it/s]
Executing: 0%| | 0/5 [00:00<?, ?cell/s]
Executing: 20%|██ | 1/5 [00:01<00:07, 1.78s/cell]
Executing: 100%|██████████| 5/5 [00:02<00:00, 2.34cell/s]
Building task 'transform': 100%|██████████| 3/3 [00:02<00:00, 1.13it/s]
All good! Pipeline is running without issues again!
Test-driven development (TDD)¶
Writing data tests is essential for developing robust pipelines. Coding tests is simple, all we have to do is write in code that we already have in our mind when thinking what the outcome of a script should be.
This thought process happens before we write the actual code, which means we could easily write tests even before we write the actual code. This approach is called Test-driven development (TDD).
Following this framework has an added benefit, since we force ourselves to put in concrete terms our data expectations, it makes easier to think how we want to get there.
Furthermore, tests also serve as documentation for us (and for others). By looking at our tests, anyone can see what our intent is. Then by looking at the code, it will be easier to spot mismatches between our intent and our implementation.
Pro tip: debugging and developing tests interactively¶
Even though tests are usually just a few short statements, writing them in an interactive way can help you quickly prototype your assertions. One simple trick you can use to do this is to start an interactive session and load the client
and product
variables.
Let’s imagine you want to write a test for a new SQL script (but the same applies for other types of scripts). You add a testing function, but it’s currently empty:
def my_sql_testing_function(client, product):
pass
If you run this, Ploomber will still call your function, you can start an interactive session when this happens:
def my_sql_testing_function(client, product):
from IPython import embed; embed()
Once you call ploomber build
, wait for the Python prompt to show and verify you have the client
and product
variables:
>>> print(client)
>>> print(product)