In this example, we'll use a classic dataset with the health characteristics of several hundred patients and whether or not they have diabetes to build a K-nearest neighbors classifier for diabetes risk.
You can find and download this notebook on GitHub here.
This notebook is adapted from the most upvoted Kaggle notebook that does data cleaning and build a KNN classifier on the Pima diabetes dataset.
Throughout this notebook, you'll see a decorator (@aq.op
) above functions. This decorator allows Aqueduct to run your functions as a part of a workflow automatically.
# First, we'll load our dataset in from a catalog of common datasets on GitHub.
# The CSV we're loading from doesn't have column headers, so we provide the names
# of the columns to Pandas when reading the CSV
import pandas as pd
df = pd.read_csv(
"https://raw.githubusercontent.com/jbrownlee/Datasets/master/pima-indians-diabetes.csv",
names=[
"pregnancies",
"glucose",
"diastolic_bp",
"skin_thickness",
"2_hr_insulin",
"bmi",
"pedigree_fn",
"age",
"has_diabetes",
],
header=None,
)
print(df.dtypes)
df
Output
pregnancies | glucose | diastolic_bp | skin_thickness | 2_hr_insulin | bmi | pedigree_fn | age | has_diabetes | |
---|---|---|---|---|---|---|---|---|---|
0 | 6 | 148 | 72 | 35 | 0 | 33.6 | 0.627 | 50 | 1 |
1 | 1 | 85 | 66 | 29 | 0 | 26.6 | 0.351 | 31 | 0 |
2 | 8 | 183 | 64 | 0 | 0 | 23.3 | 0.672 | 32 | 1 |
3 | 1 | 89 | 66 | 23 | 94 | 28.1 | 0.167 | 21 | 0 |
4 | 0 | 137 | 40 | 35 | 168 | 43.1 | 2.288 | 33 | 1 |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
763 | 10 | 101 | 76 | 48 | 180 | 32.9 | 0.171 | 63 | 0 |
764 | 2 | 122 | 70 | 27 | 0 | 36.8 | 0.340 | 27 | 0 |
765 | 5 | 121 | 72 | 23 | 112 | 26.2 | 0.245 | 30 | 0 |
766 | 1 | 126 | 60 | 0 | 0 | 30.1 | 0.349 | 47 | 1 |
767 | 1 | 93 | 70 | 31 | 0 | 30.4 | 0.315 | 23 | 0 |
768 rows × 9 columns
import aqueduct as aq
import numpy as np
import pandas as pd
import sklearn.linear_model
# If you're running your notebook on a separate machine from your
# Aqueduct server, change this to the address of your Aqueduct server.
address = "http://localhost:8080"
# If you're running youre notebook on a separate machine from your
# Aqueduct server, you will have to copy your API key here rather than
# using `get_apikey()`.
api_key = aq.get_apikey()
client = aq.Client(api_key, address)
We know from the dataset definition that there are going to be missing values, but when we look at the types of the columns above, we only have integers. We know that oftentimes, missing values are encoded as 0s in datasets, so we'll see how many values in each column are 0.
Notice that for some columns (pregnancies
), 0 is a valid value, so we'll ignore that. The has_diabetes
column also uses 0 to indicate that the user doesn't have diabetes, so we can ignore that count as well.
(df == 0).sum()
Output:
pregnancies 111
glucose 5
diastolic_bp 35
skin_thickness 227
2_hr_insulin 374
bmi 11
pedigree_fn 0
age 0
has_diabetes 500
dtype: int64
With so much missing data, especially in the insulin and skin thickness measurements, it will be tough to build a model. Normally, we'd start visualizing our data to see what the distribution of the data across various splits is, but here, we'll cheat a little bit. Many of our friends over at Kaggle have built great visualizations for this dataset. In this example, we're using this notebook, which shows that we have normal distributions for glucose
and diastolic_bp
and skewed distributions for skin_thickness
, 2_hr_insulin
, and bmi
.
We'll write an interpolation function that replaces the 0 values in normal distributions with the mean of the dataset and replaces the 0 values in the skewed distributions with the median of the dataset.
# The @op decorator here allows Aqueduct to run this function as
# a part of an Aqueduct workflow. It tells Aqueduct that when
# we execute this function, we're defining a step in the workflow.
# While the results can be retrieved immediately, nothing is
# published until we call `publish_flow()` below.
@aq.op
def interpolate_missing_values(diabetes_df):
"""
This function interpolates missing values for our diabetes dataset. For the glucose
and diastolic_bp columns, this function assumes a normal distribution, calculates the
mean of the non-zero values, and replaces all the 0 values with the mean.
For the skin_thickness, bmi, and 2_hr_insulin column, this function assumes a skewed
distribution and instead replaces the 0 values with the median of the non-0 values.
"""
result = diabetes_df.copy()
# As per our Kaggle guide, the glucose and diastolic BP values are normally distributed,
# so we plug in the mean value of those columns for the missing values.
result["glucose"].replace(
0, int(diabetes_df[diabetes_df["glucose"] != 0]["glucose"].mean()), inplace=True
)
result["diastolic_bp"].replace(
0, int(diabetes_df[diabetes_df["diastolic_bp"] != 0]["diastolic_bp"].mean()), inplace=True
)
# skin_thickness, 2_hr_insulin, and bmi are skewed distribution, so we take the median
# values instead.
result["skin_thickness"].replace(
0,
int(diabetes_df[diabetes_df["skin_thickness"] != 0]["skin_thickness"].median()),
inplace=True,
)
result["2_hr_insulin"].replace(
0, int(diabetes_df[diabetes_df["2_hr_insulin"] != 0]["2_hr_insulin"].median()), inplace=True
)
result["bmi"].replace(
0, int(diabetes_df[diabetes_df["bmi"] != 0]["bmi"].median()), inplace=True
)
return result
Now that we've defined our interpolation function, we can test it locally. When you call an @op
-decorated function, Aqueduct assumes that it's a part of a larger workflow. For testing purposes, you can call a function with .local()
that will tell Aqueduct to run this function once but to avoid building a larger workflow around it (yet).
# Calling `.local()` on an @op-annotated function allows us to execute the
# function locally for testing purposes. When a function is called with
# `.local()`, Aqueduct does not capture the function execution as a part of
# the definition of a workflow.
interpolated = interpolate_missing_values.local(df)
interpolated
Output
pregnancies | glucose | diastolic_bp | skin_thickness | 2_hr_insulin | bmi | pedigree_fn | age | has_diabetes | |
---|---|---|---|---|---|---|---|---|---|
0 | 6 | 148 | 72 | 35 | 125 | 33.6 | 0.627 | 50 | 1 |
1 | 1 | 85 | 66 | 29 | 125 | 26.6 | 0.351 | 31 | 0 |
2 | 8 | 183 | 64 | 29 | 125 | 23.3 | 0.672 | 32 | 1 |
3 | 1 | 89 | 66 | 23 | 94 | 28.1 | 0.167 | 21 | 0 |
4 | 0 | 137 | 40 | 35 | 168 | 43.1 | 2.288 | 33 | 1 |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
763 | 10 | 101 | 76 | 48 | 180 | 32.9 | 0.171 | 63 | 0 |
764 | 2 | 122 | 70 | 27 | 125 | 36.8 | 0.340 | 27 | 0 |
765 | 5 | 121 | 72 | 23 | 112 | 26.2 | 0.245 | 30 | 0 |
766 | 1 | 126 | 60 | 29 | 125 | 30.1 | 0.349 | 47 | 1 |
767 | 1 | 93 | 70 | 31 | 125 | 30.4 | 0.315 | 23 | 0 |
768 rows × 9 columns
Now that we have cleaned data that we can use, the next natural step when building is to rescale our data. Because KNN is a distance-based algorithm, we don't want the relative magnitudes of the different features to affect the correctness of our predictions. Here, we'll use scikit-learn's StandardScaler
to rescale our numerical features.
@aq.op
def rescale_data(interpolated_df):
"""
This function takes in our diabetes dataset after 0-values have been
filled in, and it uses sklearn's StandardScaler to rescale the data.
It rescales all numerical columns, except for the has_diabetes column
which is binary and is the output vairable we are trying to predict.
"""
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
all_columns = list(interpolated_df.columns)
all_columns.remove("has_diabetes") # Remove the outcome column.
scaled = scaler.fit_transform(interpolated_df[all_columns])
# Convert the result of our StandardScaler back to a DataFrame
results = pd.DataFrame(scaled, columns=all_columns)
results["has_diabetes"] = interpolated_df["has_diabetes"]
return results
# Again, calling `.local()` here allows us to execute rescale_data
# for testing purposes without defining a stage in our workflow.
scaled = rescale_data.local(interpolated)
scaled
Output
pregnancies | glucose | diastolic_bp | skin_thickness | 2_hr_insulin | bmi | pedigree_fn | age | has_diabetes | |
---|---|---|---|---|---|---|---|---|---|
0 | 0.639947 | 0.865461 | -0.020645 | 0.831114 | -0.609776 | 0.167240 | 0.468492 | 1.425995 | 1 |
1 | -0.844885 | -1.205788 | -0.516132 | 0.180566 | -0.609776 | -0.851551 | -0.365061 | -0.190672 | 0 |
2 | 1.233880 | 2.016154 | -0.681294 | -0.469981 | -0.609776 | -1.331838 | 0.604397 | -0.105584 | 1 |
3 | -0.844885 | -1.074281 | -0.516132 | -0.469981 | -0.003871 | -0.633239 | -0.920763 | -1.041549 | 0 |
4 | -1.141852 | 0.503814 | -2.663240 | 0.831114 | 0.696707 | 1.549885 | 5.484909 | -0.020496 | 1 |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
763 | 1.827813 | -0.679757 | 0.309679 | 2.240633 | 0.810314 | 0.065361 | -0.908682 | 2.532136 | 0 |
764 | -0.547919 | 0.010659 | -0.185807 | -0.036283 | -0.609776 | 0.632973 | -0.398282 | -0.531023 | 0 |
765 | 0.342981 | -0.022218 | -0.020645 | -0.469981 | 0.166540 | -0.909768 | -0.685193 | -0.275760 | 0 |
766 | -0.844885 | 0.142167 | -1.011618 | -0.469981 | -0.609776 | -0.342155 | -0.371101 | 1.170732 | 1 |
767 | -0.844885 | -0.942773 | -0.185807 | 0.397415 | -0.609776 | -0.298493 | -0.473785 | -0.871374 | 0 |
768 rows × 9 columns
Finally, we're ready to train our model. We set out feature columns to be everything in the dataset except for the has_diabetes
column and use sklearn.neighbors.KNeighborsClassifier
to fit a new model.
from sklearn.neighbors import KNeighborsClassifier
feature_columns = list(scaled.columns)
feature_columns.remove("has_diabetes")
knn = KNeighborsClassifier(11)
knn.fit(scaled[feature_columns], scaled["has_diabetes"])
Output
KNeighborsClassifier(n_neighbors=11)
KNeighborsClassifier(n_neighbors=11)
Finally, we'll use our new KNN model to define a predict function, which will select all of our feature columns and return a new DataFrame with a column called pred_has_diabetes
:
@aq.op
def predict_diabetes(scaled):
"""
This function accepts a rescaled diabetes dataset and uses our
KNN model for diabetes prediction to predict whether the patient
does or does not have diabetes.
This function assumes that the columns in the scaled dataset are
pregnancies, glucose, diastolic_bp, skin_thickness, 2_hr_insulin, bmi,
pedigree_fn, and age. The model will not work correctly otherwise.
The results are returned in a new column called pred_has_diabetes.
"""
feature_columns = list(scaled.columns)
feature_columns.remove("has_diabetes")
scaled["pred_has_diabetes"] = knn.predict(scaled[feature_columns])
return scaled
Now that we've defined all the operators in our workflow, we can go ahead and define our workflow itself. The Aqueduct demo DB comes with the diabetes dataset prepackaged, so we can retrieve it directly and construct our workflow on top of it.
demodb = client.resource("Demo")
# mpg_data is an Aqueduct TableArtifact, which is a wrapper around
# a Pandas DataFrame. A TableArtifact can be used as argument to any operator
# in a workflow; you can also call .get() on a TableArtifact to retrieve
# the underlying DataFrame and interact with it directly.
diabetes_data = demodb.sql("SELECT * FROM diabetes;")
diabetes_data.get()
Output
pregnancies | glucose | diastolic_bp | skin_thickness | 2_hr_insulin | bmi | pedigree_fn | age | has_diabetes | |
---|---|---|---|---|---|---|---|---|---|
0 | 6 | 148 | 72 | 35 | 0 | 33.6 | 0.627 | 50 | 1 |
1 | 1 | 85 | 66 | 29 | 0 | 26.6 | 0.351 | 31 | 0 |
2 | 8 | 183 | 64 | 0 | 0 | 23.3 | 0.672 | 32 | 1 |
3 | 1 | 89 | 66 | 23 | 94 | 28.1 | 0.167 | 21 | 0 |
4 | 0 | 137 | 40 | 35 | 168 | 43.1 | 2.288 | 33 | 1 |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
763 | 10 | 101 | 76 | 48 | 180 | 32.9 | 0.171 | 63 | 0 |
764 | 2 | 122 | 70 | 27 | 0 | 36.8 | 0.340 | 27 | 0 |
765 | 5 | 121 | 72 | 23 | 112 | 26.2 | 0.245 | 30 | 0 |
766 | 1 | 126 | 60 | 0 | 0 | 30.1 | 0.349 | 47 | 1 |
767 | 1 | 93 | 70 | 31 | 0 | 30.4 | 0.315 | 23 | 0 |
768 rows × 9 columns
# We now use all of our @op-annotated functions to define our
# workflow in a few lines of Python.
interpolated = interpolate_missing_values(diabetes_data)
scaled = rescale_data(interpolated)
diabetes_preds = predict_diabetes(scaled)
# This tells Aqueduct to save the results in diabetes_preds
# back to the demo DB we configured earlier.
# NOTE: At this point, no data is actually saved! This is just
# part of a workflow spec that will be executed once the workflow
# is published below.
demodb.save(diabetes_preds, table_name="predicted_diabetes", update_mode="replace")
Finally, before publishing our workflow, we're going to add a check to our workflow just to make sure our predictor works as expected. Here, we're going to write a simple check that ensures that all of our diabetes predictions return a binary class (0 for no diabetes and 1 for diabetes):
# The @check dectorator is similar to the @op decorator from
# above. The only difference is that a check returns a boolean
# value that is used to ensure the correctness of the workflow
@aq.check
def ensure_correct_classes(diabetes_preds):
"""
This function ensures that the diabetes_preds DataFrame has the correct
number and names of classes in the DataFrame. It ensures that there are
two classes named 0 and 1, and it will fail otherwise.
"""
classes = diabetes_preds["pred_has_diabetes"].value_counts()
if len(classes) > 2:
return False
class_names = list(classes.keys())
if 0 not in class_names or 1 not in class_names:
return False
return True
# Similar to a regular operator, a check can be called directly on
# an Aqueduct TableArtifact, and calling `.get()` on the result will
# give you the underlying computation.
has_correct_classes = ensure_correct_classes(diabetes_preds)
has_correct_classes.get()
Output:
True
We're ready to publish our predictions! We can use the publish_flow
API call to create a new workflow called "Diabetes Classifier" that is going to encapsulate all of the code that we've defined in this notebook, package it up as a rerunnable workflow, and deploy it onto the Aqueduct server. We don't do this below, but we could optionally set this workflow to run on a schedule (hourly, daily, weekly, etc.).
# This publishes all of the logic needed to create diabetes_preds
# and rmse to Aqueduct. The URL below will take you to the
# Aqueduct UI, which will show you the status of your workflow
# runs and allow you to inspect them.
client.publish_flow(name="Diabetes Classifier", artifacts=[diabetes_preds])
Output:
<aqueduct.flow.Flow at 0x165f82040>