diff --git a/examples/duckdb_plugin/README.md b/examples/duckdb_plugin/README.md index fcf209b51..769269527 100644 --- a/examples/duckdb_plugin/README.md +++ b/examples/duckdb_plugin/README.md @@ -6,9 +6,9 @@ .. tags:: Integration, Data, Analytics, Beginner ``` -[DuckDB](https://duckdb.org/) is an in-process SQL OLAP database management system that is explicitly designed to achieve high performance in analytics. +[DuckDB](https://duckdb.org/) is an in-process SQL OLAP database management system that is explicitly designed to achieve high performance in analytics. MotherDuck is a collaborative data warehouse that extends the power of DuckDB to the cloud. -The Flytekit DuckDB plugin facilitates the efficient execution of intricate analytical queries within your workflow. +The Flytekit DuckDB plugin facilitates the efficient execution of intricate analytical queries within your workflow either in-process with DuckDB, on the cloud with MotherDuck, or a hybrid of the two. To install the Flytekit DuckDB plugin, run the following command: @@ -18,8 +18,9 @@ pip install flytekitplugins-duckdb The Flytekit DuckDB plugin includes the {py:class}`~flytekitplugins:flytekitplugins.duckdb.DuckDBQuery` task, which allows you to specify the following parameters: -- `query`: The DuckDB query to execute. +- `query`: The DuckDB query to execute. This is optional as it can be passed at initialization or run time. - `inputs`: The query parameters to be used during query execution. This can be a StructuredDataset, a string or a list. +- `provider`: This is a {py:class}`~flytekitplugins:flytekitplugins.duckdb.DuckDBProvider` or a callable that facilitates the connection to a remote database if desired. ```{auto-examples-toc} duckdb_example diff --git a/examples/duckdb_plugin/duckdb_plugin/duckdb_example.py b/examples/duckdb_plugin/duckdb_plugin/duckdb_example.py index 80f6b3993..c3ee5b484 100644 --- a/examples/duckdb_plugin/duckdb_plugin/duckdb_example.py +++ b/examples/duckdb_plugin/duckdb_plugin/duckdb_example.py @@ -7,13 +7,13 @@ # %% import json -from typing import List +from typing import List, Tuple import pandas as pd import pyarrow as pa -from flytekit import kwtypes, task, workflow +from flytekit import Secret, dynamic, kwtypes, task, workflow from flytekit.types.structured.structured_dataset import StructuredDataset -from flytekitplugins.duckdb import DuckDBQuery +from flytekitplugins.duckdb import DuckDBProvider, DuckDBQuery from typing_extensions import Annotated # %% [markdown] @@ -160,3 +160,96 @@ def params_wf( if __name__ == "__main__": print(f"Running params_wf()... {params_wf()}") + +# %% [markdown] +# ## Queries to MotherDuck +# +# The DuckDB plugin can be used to make DuckDB queries to a remote MotherDuck data warehouse by specifying the +# MotherDuck `DuckDBProvider` and passing a secret called `motherduck_token`. Hybrid queries can target remote +# data in MotherDuck and data local to a Flyte task at the same time. +# +# %% +# This query targets a sample_data.nyc.rideshare table in MotherDuck +motherduck_query = DuckDBQuery( + name="motherduck_query", + query="SELECT MEAN(trip_time) FROM sample_data.nyc.rideshare", + provider=DuckDBProvider.MOTHERDUCK, + secret_requests=[Secret(key="motherduck_token")], +) + +# This query targets a e_commerce.year_09_10 table in MotherDuck and a DataFrame mydf local to the task +hybrid_motherduck_query = DuckDBQuery( + name="my_query", + query=""" + WITH HistoricalData AS + (SELECT COUNT(DISTINCT "Customer ID") AS CustomerCount_Historical FROM e_commerce.year_09_10), + RecentData AS + (SELECT COUNT(DISTINCT "Customer ID") AS CustomerCount_Recent FROM mydf) + + SELECT HistoricalData.CustomerCount_Historical, RecentData.CustomerCount_Recent FROM HistoricalData, RecentData + """, + inputs=kwtypes(mydf=pd.DataFrame), + provider=DuckDBProvider.MOTHERDUCK, + secret_requests=[Secret(key="motherduck_token")], +) + + +@workflow +def motherduck_wf(mydf: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]: + motherduck_response = motherduck_query() + hybrid_response = hybrid_motherduck_query(mydf=mydf) + return motherduck_response, hybrid_response + + +if __name__ == "__main__": + print(f"Running motherduck_wf()... {motherduck_wf()}") + +# %% [markdown] +# ## Runtime Queries +# +# If task logic is necessary to craft a query, the query can be passed at runtime rather than when the `DuckDBQuery` is +# initialized. +# +# %% +runtime_query_task = DuckDBQuery( + name="runtime_query_task", + inputs=kwtypes(query=str, mydf=pd.DataFrame), +) + + +@dynamic +def check_dataframe(mydf: pd) -> pd.DataFrame: + col_a_present = "column_a" in mydf.columns + col_b_present = "column_b" in mydf.columns + + if col_a_present and col_b_present: + query = """ + SELECT column_a, column_b + FROM mydf + WHERE column_a > 10 AND column_b < 100; + """ + elif col_a_present: + query = """ + SELECT column_a + FROM mydf + WHERE column_a > 10; + """ + elif col_b_present: + query = """ + SELECT column_b + FROM mydf + WHERE column_b < 100; + """ + else: + raise ValueError("Neither 'column_a' nor 'column_b' is present in the DataFrame.") + + return simple_duckdb_query(query=query, mydf=mydf) + + +@workflow +def runtime_query_wf(mydf: pd.DataFrame) -> pd.DataFrame: + return check_dataframe(mydf=mydf) + + +if __name__ == "__main__": + print(f"Running runtime_query_wf()... {runtime_query_wf()}")