This is a tutorial that will walk you through creating your first useful workflow with Aqueduct. You can find and download this notebook on GitHub here.
The philosophy behind the Aqueduct SDK is that you should be able to connect to your data systems, transform your data and generate predictions, and publish your results once you’re happy with them. This guide will walk you through installing your SDK, setting up a client, transforming some data, and publishing a workflow to the cloud.
Outline:
- We will use standard Python libraries to transform our data and build a simple ensemble of churn models.
- We will use the Aqueduct Python API to deploy our churn prediction pipeline
- We will visit the Aqueduct web interface to view our deployed prediction pipeline
Throughout this notebook, you'll see a decorator (@aq.op
) above functions. This decorator allows Aqueduct to run your functions as a part of a workflow automatically.
Before we do anything with Aqueduct, let’s first build some machine learning models. Aqueduct doesn’t have any opinions about where or how you do this. In this example notebook, we will build a basic model churn prediction model using a customer churn dataset.
Here is our training data:
import pandas as pd
import numpy as np
import aqueduct as aq
# Read some customer data from the Aqueduct repo.
customers_table = pd.read_csv(
"https://raw.githubusercontent.com/aqueducthq/aqueduct/main/examples/churn_prediction/data/customers.csv"
)
churn_table = pd.read_csv(
"https://raw.githubusercontent.com/aqueducthq/aqueduct/main/examples/churn_prediction/data/churn_data.csv"
)
pd.merge(customers_table, churn_table, on="cust_id").head()
Output
cust_id | n_workflows | n_rows | n_users | company_size | n_integrations | n_support_tickets | duration_months | using_deep_learning | n_data_eng | using_dbt | churn | |
---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 0 | 4 | 2007 | 2 | 29 | 5 | 3.0 | 1.0 | False | 2.0 | True | False |
1 | 1 | 3 | 8538 | 1 | 31 | 4 | 1.0 | 1.0 | False | 3.0 | True | True |
2 | 2 | 4 | 7548 | 1 | 29 | 3 | 1.0 | 3.0 | False | 1.0 | True | False |
3 | 3 | 3 | 4286 | 1 | 33 | 4 | 1.0 | 4.0 | False | 3.0 | True | False |
4 | 4 | 2 | 2136 | 1 | 28 | 3 | 0.0 | 1.0 | False | 2.0 | True | False |
Most real-world machine learning relies on some form of feature transformation. Because we have counts in our data we apply a log transformation to those counts.
# The @op decorator here allows Aqueduct to run this function as
# a part of an Aqueduct workflow. It tells Aqueduct that when
# we execute this function, we're defining a step in the workflow.
# While the results can be retrieved immediately, nothing is
# published until we call `publish_flow()` below.
@aq.op
def log_featurize(cust: pd.DataFrame) -> pd.DataFrame:
"""
log_featurize takes in customer data from the Aqueduct customers table
and log normalizes the numerical columns using the numpy.log function.
It skips the cust_id, using_deep_learning, and using_dbt columns because
these are not numerical columns that require regularization.
log_featurize adds all the log-normalized values into new columns, and
maintains the original values as-is. In addition to the original company_size
column, log_featurize will add a log_company_size column.
"""
features = cust.copy()
skip_cols = ["cust_id", "using_deep_learning", "using_dbt"]
for col in features.columns.difference(skip_cols):
features["log_" + col] = np.log(features[col] + 1.0)
return features.drop(columns="cust_id")
# Calling `.local()` on an @op-annotated function allows us to execute the
# function locally for testing purposes. When a function is called with
# `.local()`, Aqueduct does not capture the function execution as a part of
# the definition of a workflow.
features_table = log_featurize.local(customers_table)
features_table.head()
Output
n_workflows | n_rows | n_users | company_size | n_integrations | n_support_tickets | duration_months | using_deep_learning | n_data_eng | using_dbt | log_company_size | log_duration_months | log_n_data_eng | log_n_integrations | log_n_rows | log_n_support_tickets | log_n_users | log_n_workflows | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 4 | 2007 | 2 | 29 | 5 | 3.0 | 1.0 | False | 2.0 | True | 3.401197 | 0.693147 | 1.098612 | 1.791759 | 7.604894 | 1.386294 | 1.098612 | 1.609438 |
1 | 3 | 8538 | 1 | 31 | 4 | 1.0 | 1.0 | False | 3.0 | True | 3.465736 | 0.693147 | 1.386294 | 1.609438 | 9.052399 | 0.693147 | 0.693147 | 1.386294 |
2 | 4 | 7548 | 1 | 29 | 3 | 1.0 | 3.0 | False | 1.0 | True | 3.401197 | 1.386294 | 0.693147 | 1.386294 | 8.929170 | 0.693147 | 0.693147 | 1.609438 |
3 | 3 | 4286 | 1 | 33 | 4 | 1.0 | 4.0 | False | 3.0 | True | 3.526361 | 1.609438 | 1.386294 | 1.609438 | 8.363342 | 0.693147 | 0.693147 | 1.386294 |
4 | 2 | 2136 | 1 | 28 | 3 | 0.0 | 1.0 | False | 2.0 | True | 3.367296 | 0.693147 | 1.098612 | 1.386294 | 7.667158 | 0.000000 | 0.693147 | 1.098612 |
In this example, we will train and ensemble two basic classifiers. In practice, would probably do something more interesting but this will help illustrate post-processing logic (the ensemble function).
from sklearn.linear_model import LogisticRegression
linear_model = LogisticRegression(max_iter=10000)
linear_model.fit(features_table, churn_table["churn"])
Output
LogisticRegression(max_iter=10000)
LogisticRegression(max_iter=10000)
from sklearn.tree import DecisionTreeClassifier
decision_tree_model = DecisionTreeClassifier(max_depth=10, min_samples_split=3)
decision_tree_model.fit(features_table, churn_table["churn"])
Output
DecisionTreeClassifier(max_depth=10, min_samples_split=3)
DecisionTreeClassifier(max_depth=10, min_samples_split=3)
Putting all the above pieces together we can define the prediction workflow as the following:
@aq.op
def predict_linear(features_table):
"""
Generates predictions using the logistic regression model and
returns a new DataFrame with a column called linear that has
the likelihood of the customer churning.
"""
return pd.DataFrame({"linear": linear_model.predict_proba(features_table)[:, 1]})
@aq.op
def predict_tree(features_table):
"""
Generates predictions using the decision tree model and
returns a new DataFrame with a column called tree that has
the likelihood of the customer churning.
"""
return pd.DataFrame({"tree": decision_tree_model.predict_proba(features_table)[:, 1]})
@aq.op
def predict_ensemble(customers_table, linear_pred_table, tree_pred_table):
"""
predict_ensemble combines the results from our logistic regression
and decision tree models by taking the average of the two models'
probabilities that a user might churn. The resulting average is
then assigned into the `prob_churn` column on the customers_table.
"""
return customers_table.assign(prob_churn=linear_pred_table.join(tree_pred_table).mean(axis=1))
features_table = log_featurize.local(customers_table)
linear_pred_table = predict_linear.local(features_table)
tree_pred_table = predict_tree.local(features_table)
churn_table = predict_ensemble.local(customers_table, linear_pred_table, tree_pred_table)
If we look at the output table we see that the prob_churn
column contains the churn predictions.
churn_table.head()
Output
cust_id | n_workflows | n_rows | n_users | company_size | n_integrations | n_support_tickets | duration_months | using_deep_learning | n_data_eng | using_dbt | prob_churn | |
---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 0 | 4 | 2007 | 2 | 29 | 5 | 3.0 | 1.0 | False | 2.0 | True | 0.081036 |
1 | 1 | 3 | 8538 | 1 | 31 | 4 | 1.0 | 1.0 | False | 3.0 | True | 0.146564 |
2 | 2 | 4 | 7548 | 1 | 29 | 3 | 1.0 | 3.0 | False | 1.0 | True | 0.155584 |
3 | 3 | 3 | 4286 | 1 | 33 | 4 | 1.0 | 4.0 | False | 3.0 | True | 0.145398 |
4 | 4 | 2 | 2136 | 1 | 28 | 3 | 0.0 | 1.0 | False | 2.0 | True | 0.161406 |
Using standard python libraries we have trained two models and developed a simple workflow that combines feature transformations with models and post-prediction logic to estimate churn. In the remainder of this notebook we will deploy the above prediction workflow to the cloud, integrate it with production data sources, and publish predictions to multiple destinations.
We will now deploy the above prediction workflow to the cloud. You will need the API key for your Aqueduct server, which you can find on the /account
page of your UI.
# If you're running your notebook on a separate machine from your
# Aqueduct server, change this to the address of your Aqueduct server.
address = "http://localhost:8080"
# If you're running youre notebook on a separate machine from your
# Aqueduct server, you will have to copy your API key here rather than
# using `get_apikey()`.
api_key = aq.get_apikey()
client = aq.Client(api_key, address)
Workflows access and publish to data resource that are configured on the Resources Page. In this demo we will connect to the demo data resource which is a Postgres database containing several standard sample datasets including some synthetic customer churn data. Each kind of data resource may offer different functionality. Here we are using a relational resource which support general SQL expressions.
warehouse = client.resource(name="Demo")
# customers_table is an Aqueduct TableArtifact, which is a wrapper around
# a Pandas DataFrame. A TableArtifact can be used as argument to any operator
# in a workflow; you can also call .get() on a TableArtifact to retrieve
# the underlying DataFrame and interact with it directly.
customers_table = warehouse.sql(query="SELECT * FROM customers;")
print(type(customers_table))
Here the cust
table is a TableArtifact
representing a virtual table living in the cloud. For debugging purposes I can get the Pandas DataFrame corresponding to any TableArtifact
by using the get()
method:
# This gets the head of the underlying DataFrame. Note that you can't
# pass a DataFrame as an argument to a workflow; you must use the Aqueduct
# TableArtifact!
customers_table.get().head()
Output
cust_id | n_workflows | n_rows | n_users | company_size | n_integrations | n_support_tickets | duration_months | using_deep_learning | n_data_eng | using_dbt | |
---|---|---|---|---|---|---|---|---|---|---|---|
0 | 0 | 4 | 2007 | 2 | 29 | 5 | 3.0 | 1.0 | 0 | 2.0 | 1 |
1 | 1 | 3 | 8538 | 1 | 31 | 4 | 1.0 | 1.0 | 0 | 3.0 | 1 |
2 | 2 | 4 | 7548 | 1 | 29 | 3 | 1.0 | 3.0 | 0 | 1.0 | 1 |
3 | 3 | 3 | 4286 | 1 | 33 | 4 | 1.0 | 4.0 | 0 | 3.0 | 1 |
4 | 4 | 2 | 2136 | 1 | 28 | 3 | 0.0 | 1.0 | 0 | 2.0 | 1 |
I can transform these TableArtifacts
by applying decorated Python functions. Here, we can use the log_featurize
function we defined above, and we call it like a regular Python function. Because we added the decorator above, Aqueduct will now run this function in the cloud as a part of a workflow.
features_table = log_featurize(customers_table)
print(type(features_table))
As before, we can always view the value of the table by getting the corresponding data frame.
features_table.get().head()
Output
n_workflows | n_rows | n_users | company_size | n_integrations | n_support_tickets | duration_months | using_deep_learning | n_data_eng | using_dbt | log_company_size | log_duration_months | log_n_data_eng | log_n_integrations | log_n_rows | log_n_support_tickets | log_n_users | log_n_workflows | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 4 | 2007 | 2 | 29 | 5 | 3.0 | 1.0 | 0 | 2.0 | 1 | 3.401197 | 0.693147 | 1.098612 | 1.791759 | 7.604894 | 1.386294 | 1.098612 | 1.609438 |
1 | 3 | 8538 | 1 | 31 | 4 | 1.0 | 1.0 | 0 | 3.0 | 1 | 3.465736 | 0.693147 | 1.386294 | 1.609438 | 9.052399 | 0.693147 | 0.693147 | 1.386294 |
2 | 4 | 7548 | 1 | 29 | 3 | 1.0 | 3.0 | 0 | 1.0 | 1 | 3.401197 | 1.386294 | 0.693147 | 1.386294 | 8.929170 | 0.693147 | 0.693147 | 1.609438 |
3 | 3 | 4286 | 1 | 33 | 4 | 1.0 | 4.0 | 0 | 3.0 | 1 | 3.526361 | 1.609438 | 1.386294 | 1.609438 | 8.363342 | 0.693147 | 0.693147 | 1.386294 |
4 | 2 | 2136 | 1 | 28 | 3 | 0.0 | 1.0 | 0 | 2.0 | 1 | 3.367296 | 0.693147 | 1.098612 | 1.386294 | 7.667158 | 0.000000 | 0.693147 | 1.098612 |
Next, we can apply the prediction functions we defined above, and Aqueduct will continue to build a workflow of our functions running in the cloud:
linear_pred_table = predict_linear(features_table)
tree_pred_table = predict_tree(features_table)
churn_table = predict_ensemble(customers_table, linear_pred_table, tree_pred_table)
Note that our ensemble function can take in 3 data tables. Aqueduct will make sure that all three are correctly provided to the function when it's executing.
Examining the final table we see our predictions in the prob_churn
column.
churn_table.get().head()
Output
cust_id | n_workflows | n_rows | n_users | company_size | n_integrations | n_support_tickets | duration_months | using_deep_learning | n_data_eng | using_dbt | prob_churn | |
---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 0 | 4 | 2007 | 2 | 29 | 5 | 3.0 | 1.0 | 0 | 2.0 | 1 | 0.081036 |
1 | 1 | 3 | 8538 | 1 | 31 | 4 | 1.0 | 1.0 | 0 | 3.0 | 1 | 0.146564 |
2 | 2 | 4 | 7548 | 1 | 29 | 3 | 1.0 | 3.0 | 0 | 1.0 | 1 | 0.155584 |
3 | 3 | 3 | 4286 | 1 | 33 | 4 | 1.0 | 4.0 | 0 | 3.0 | 1 | 0.145398 |
4 | 4 | 2 | 2136 | 1 | 28 | 3 | 0.0 | 1.0 | 0 | 2.0 | 1 | 0.161406 |
Debugging or even knowing when a prediction pipeline is failing can be challenging! Input data is constantly change, features can drift, and even the choice of model may no longer make sense as labels shift. It is therefore critical that we check our data before publishing predictions. This can be done using checks
.
An Aqueduct check
takes in a Pandas DataFrame and returns a Panda Series of booleans. The test passes if everything in the Series is true.
Let's ensure that all our probabilities are valid probabilities between 0 and 1.
@aq.check(description="Ensuring valid probabilities.")
def valid_probabilities(df: pd.DataFrame):
return (df["prob_churn"] >= 0) & (df["prob_churn"] <= 1)
Executing valid_probabilities
on churn_table
will automatically attach the check to the table, but it will not run the test and until you call get()
on the output.
check_result = valid_probabilities(churn_table)
We can also create metrics and check the values of those metrics. A metric is simply a measurement applied to a whole table or a column in a table. Every artifact comes with a set of common, built-in metrics like mean
, min
, max
, etc.
# Use Aqueduct's built-in mean metric to calculate the average value of `prob_churn`.
# Calling .get() on the metric will retrieve the current value.
avg_pred_churn_metric = churn_table.mean("prob_churn")
avg_pred_churn_metric.get()
Output:
0.28025028109550476
We can also add bounds to metrics that enforce correctness constraints on the value of the metric. For example, we might expect the average churn metric to be between 0.1 and 0.3, and if avg_pred_churn
ever goes above 0.4 we want to prevent predictions from being published.
# Bounds on metrics ensure that the metric stays within a valid range.
# In this case, we'd ideally like churn to be between .1 and .3, and we
# know something's gone wrong if it's above .4.
avg_pred_churn_metric.bound(lower=0.1)
avg_pred_churn_metric.bound(upper=0.3)
avg_pred_churn_metric.bound(upper=0.4, severity="error")
Output:
<aqueduct.check_artifact.CheckArtifact at 0x120d68970>
So far we have defined a workflow to build the churn_table
containing our churn predictions. We now want to publish this table where others can use it. We do this by saving
the table to various resources.
First we save the table back to the data warehouse that contains the original customer data.
# This tells Aqueduct to save the results in churn_table
# back to the demo DB we configured earlier.
# NOTE: At this point, no data is actually saved! This is just
# part of a workflow spec that will be executed once the workflow
# is published in the next cell.
warehouse.save(churn_table, table_name="pred_churn", update_mode="replace")
With all this hard work done, we're ready to deploy our workflow so that it runs repeatedly. To do this, we define a workflow with a name and the artifacts that we want to compute and a cadence that we want to run the workflow. The Aqueduct APIs will automatically capture and intermediate artifacts and code needed to produce the final artifacts.
When you call publish_flow
, all of this will be shipped off to the cloud!
# This publishes all of the logic needed to create churn_table
# and avg_pred_churn_metric to Aqueduct. The URL below will
# take you to the Aqueduct UI, which will show you the status
# of your workflow runs and allow you to inspect them.
churn_flow = client.publish_flow(
name="Demo Churn Ensemble",
artifacts=[churn_table, avg_pred_churn_metric],
# Uncomment the following line to schedule on a hourly basis.
# schedule=aq.hourly(),
)
print(churn_flow.id())
Clicking on the URL above will take you to the Aqueudct UI where you can see the workflow that we just created! On the Aqueduct UI, you'll be able to see the DAG of operators we just created, click into any of those operators, and see the data and metadata associated with each stage of the pipeline.