Online service (API)¶
To encapsulate all your pipeline’s logic for online predictions, use
ploomber.OnlineDAG
. Once implemented, you can generate predictions
like this:
from my_project import MyOnlineDAG
# MyOnlineDAG is a subclass of OnlineDAG
dag = MyOnlineDAG()
dag.predict(input_data=input_data)
You can easily integrate an online DAG with any library such as Flask or gRPC.
The only requisite is that your feature generation code should be entirely
made of Python functions (i.e., ploomber.tasks.PythonCallable
) tasks
with configured serializer and unserializer.
Composing online pipelines¶
To create an online DAG, list your feature tasks in a features.yaml
and
use import_tasks_from
in your training pipeline (pipeline.yaml
).
Subclass ploomber.OnlineDAG
to create a serving pipeline.
OnlineDAG
will take your tasks from features.yaml
and create
new “input tasks” based on upstream
references in yout feature tasks.
For example, if features.yaml
has tasks a_feature
and
another_feature
(see the diagram in the first section), and both obtain
their inputs from a task named get
; the source code may look like this:
def a_feature(upstream):
raw_data = upstream['get']
# process raw_data to generate features...
# return a_feature
return df_a_feature
def another_feature(upstream):
raw_data = upstream['get']
# process raw_data to generate features...
# return another_feature
return df_another_feature
Since features.yaml
does not contain a task named get
, OnlineDAG
automatically identifies it as an “input task”. Finally, you must provide a
“terminal task”, which is the last task in your online pipeline:
To implement this, create a subclass of OnlineDAG
and provide the path
to your features.yaml
, parameters for your terminal task and the terminal
task:
from ploomber import OnlineDAG
# subclass OnlineDAG...
class MyOnlineDAG(OnlineDAG):
# and provide these three methods...
# get_partial: returns a path to your feature tasks
@staticmethod
def get_partial():
return 'tasks-features.yaml'
# terminal_params: returns a dictionary with parameters for the terminal task
@staticmethod
def terminal_params():
model = pickle.loads(resources.read_binary(ml_online, 'model.pickle'))
return dict(model=model)
# terminal_task: implementation of your terminal task
@staticmethod
def terminal_task(upstream, model):
# receives all tasks with no downtream dependencies in
# tasks-features.yaml
a_feature = upstream['a_feature']
another_feature = upstream['another_feature']
X = pd.DataFrame({'a_feature': a_feature,
'anoter_feature': anoter_feature})
return model.predict(X)
To call MyOnlineDAG
:
from my_project import MyOnlineDAG
dag = MyOnlineDAG()
# pass parameters (one per input)
prediction = dag.predict(get=input_data)
You can import and call MyOnlineDAG
in any framework (e.g., Flask) to
expose your pipeline as an online service.
from flask import Flask, request, jsonify
import pandas as pd
from my_project import OnlineDAG
# instantiate online dag
dag = OnlineDAG()
app = Flask(__name__)
@app.route('/', methods=['POST'])
def predict():
request_data = request.get_json()
# get JSON data and create a data frame with a single row
input_data = pd.DataFrame(request_data, index=[0])
# pass input data, argument per root node
out = pipeline.predict(get=input_data)
# return output from the terminal task
return jsonify({'prediction': int(out['terminal'])})
Examples¶
Click here to see a deployment example using AWS Lambda.
Click here to see a complete sample project that trains a model and exposes an API via Flask.