From 2f98c2c279e41d53197dedc4812aeb16c056de4f Mon Sep 17 00:00:00 2001 From: Daniel Sola Date: Mon, 9 Sep 2024 17:45:44 -0700 Subject: [PATCH 1/3] add motherduck examples for duckdb plugin --- examples/duckdb_plugin/README.md | 7 +- .../duckdb_plugin/duckdb_example.py | 98 ++++++++++++++++++- 2 files changed, 99 insertions(+), 6 deletions(-) diff --git a/examples/duckdb_plugin/README.md b/examples/duckdb_plugin/README.md index fcf209b51..769269527 100644 --- a/examples/duckdb_plugin/README.md +++ b/examples/duckdb_plugin/README.md @@ -6,9 +6,9 @@ .. tags:: Integration, Data, Analytics, Beginner ``` -[DuckDB](https://duckdb.org/) is an in-process SQL OLAP database management system that is explicitly designed to achieve high performance in analytics. +[DuckDB](https://duckdb.org/) is an in-process SQL OLAP database management system that is explicitly designed to achieve high performance in analytics. MotherDuck is a collaborative data warehouse that extends the power of DuckDB to the cloud. -The Flytekit DuckDB plugin facilitates the efficient execution of intricate analytical queries within your workflow. +The Flytekit DuckDB plugin facilitates the efficient execution of intricate analytical queries within your workflow either in-process with DuckDB, on the cloud with MotherDuck, or a hybrid of the two. To install the Flytekit DuckDB plugin, run the following command: @@ -18,8 +18,9 @@ pip install flytekitplugins-duckdb The Flytekit DuckDB plugin includes the {py:class}`~flytekitplugins:flytekitplugins.duckdb.DuckDBQuery` task, which allows you to specify the following parameters: -- `query`: The DuckDB query to execute. +- `query`: The DuckDB query to execute. This is optional as it can be passed at initialization or run time. - `inputs`: The query parameters to be used during query execution. This can be a StructuredDataset, a string or a list. +- `provider`: This is a {py:class}`~flytekitplugins:flytekitplugins.duckdb.DuckDBProvider` or a callable that facilitates the connection to a remote database if desired. ```{auto-examples-toc} duckdb_example diff --git a/examples/duckdb_plugin/duckdb_plugin/duckdb_example.py b/examples/duckdb_plugin/duckdb_plugin/duckdb_example.py index 80f6b3993..a54350832 100644 --- a/examples/duckdb_plugin/duckdb_plugin/duckdb_example.py +++ b/examples/duckdb_plugin/duckdb_plugin/duckdb_example.py @@ -7,13 +7,13 @@ # %% import json -from typing import List +from typing import List, Tuple import pandas as pd import pyarrow as pa -from flytekit import kwtypes, task, workflow +from flytekit import kwtypes, task, workflow, Secret, dynamic from flytekit.types.structured.structured_dataset import StructuredDataset -from flytekitplugins.duckdb import DuckDBQuery +from flytekitplugins.duckdb import DuckDBQuery, DuckDBProvider from typing_extensions import Annotated # %% [markdown] @@ -160,3 +160,95 @@ def params_wf( if __name__ == "__main__": print(f"Running params_wf()... {params_wf()}") + +# %% [markdown] +# ## Queries to MotherDuck +# +# The DuckDB plugin can be used to make DuckDB queries to a remote MotherDuck data warehouse by specifying the +# MotherDuck `DuckDBProvider` and passing a secret called `motherduck_token`. Hybrid queries can target remote +# data in MotherDuck and data local to a Flyte task at the same time. +# +# %% +# This query targets a sample_data.nyc.rideshare table in MotherDuck +motherduck_query = DuckDBQuery( + name="motherduck_query", + query="SELECT MEAN(trip_time) FROM sample_data.nyc.rideshare", + provider=DuckDBProvider.MOTHERDUCK, + secret_requests=[Secret(key="motherduck_token")], +) + +# This query targets a e_commerce.year_09_10 table in MotherDuck +hybrid_motherduck_query = DuckDBQuery( + name="my_query", + # query="SELECT MEAN(trip_time) FROM sample_data.nyc.rideshare", + query=""" + WITH HistoricalData AS + (SELECT COUNT(DISTINCT "Customer ID") AS CustomerCount_Historical FROM e_commerce.year_09_10), + RecentData AS + (SELECT COUNT(DISTINCT "Customer ID") AS CustomerCount_Recent FROM mydf) + + SELECT HistoricalData.CustomerCount_Historical, RecentData.CustomerCount_Recent FROM HistoricalData, RecentData + """, + inputs=kwtypes(mydf=pd.DataFrame), + provider=DuckDBProvider.MOTHERDUCK, + secret_requests=[Secret(key="motherduck_token")], +) + + +@workflow +def motherduck_wf(mydf: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]: + motherduck_response = motherduck_query() + hybrid_response = hybrid_motherduck_query(mydf=mydf) + return motherduck_response, hybrid_response + + +if __name__ == "__main__": + print(f"Running motherduck_wf()... {motherduck_wf()}") + +# %% [markdown] +# ## Runtime Queries +# +# If task logic is necessary to craft a query, the query can be passed at runtime rather than when the `DuckDBQuery` is +# initialized. +# +# %% +runtime_query_task = DuckDBQuery( + name="runtime_query_task", + inputs=kwtypes(query=str, mydf=pd.DataFrame), +) + +@dynamic +def check_dataframe(mydf: pd) -> pd.DataFrame: + col_a_present = 'column_a' in mydf.columns + col_b_present = 'column_b' in mydf.columns + + if col_a_present and col_b_present: + query = f""" + SELECT column_a, column_b + FROM mydf + WHERE column_a > 10 AND column_b < 100; + """ + elif col_a_present: + query = f""" + SELECT column_a + FROM mydf + WHERE column_a > 10; + """ + elif col_b_present: + query = f""" + SELECT column_b + FROM mydf + WHERE column_b < 100; + """ + else: + raise ValueError("Neither 'column_a' nor 'column_b' is present in the DataFrame.") + + return simple_duckdb_query(query=query, mydf=mydf) + +@workflow +def runtime_query_wf(mydf: pd.DataFrame) -> pd.DataFrame: + return check_dataframe(mydf=mydf) + + +if __name__ == "__main__": + print(f"Running runtime_query_wf()... {runtime_query_wf()}") \ No newline at end of file From 04f07d4088a3dc7a6038e3aed558e502ed771dde Mon Sep 17 00:00:00 2001 From: Daniel Sola Date: Wed, 11 Sep 2024 11:28:19 -0700 Subject: [PATCH 2/3] fix comments and formatting Signed-off-by: Daniel Sola --- .../duckdb_plugin/duckdb_example.py | 37 ++++++++++--------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/examples/duckdb_plugin/duckdb_plugin/duckdb_example.py b/examples/duckdb_plugin/duckdb_plugin/duckdb_example.py index a54350832..c3ee5b484 100644 --- a/examples/duckdb_plugin/duckdb_plugin/duckdb_example.py +++ b/examples/duckdb_plugin/duckdb_plugin/duckdb_example.py @@ -11,9 +11,9 @@ import pandas as pd import pyarrow as pa -from flytekit import kwtypes, task, workflow, Secret, dynamic +from flytekit import Secret, dynamic, kwtypes, task, workflow from flytekit.types.structured.structured_dataset import StructuredDataset -from flytekitplugins.duckdb import DuckDBQuery, DuckDBProvider +from flytekitplugins.duckdb import DuckDBProvider, DuckDBQuery from typing_extensions import Annotated # %% [markdown] @@ -177,16 +177,15 @@ def params_wf( secret_requests=[Secret(key="motherduck_token")], ) -# This query targets a e_commerce.year_09_10 table in MotherDuck +# This query targets a e_commerce.year_09_10 table in MotherDuck and a DataFrame mydf local to the task hybrid_motherduck_query = DuckDBQuery( name="my_query", - # query="SELECT MEAN(trip_time) FROM sample_data.nyc.rideshare", query=""" - WITH HistoricalData AS - (SELECT COUNT(DISTINCT "Customer ID") AS CustomerCount_Historical FROM e_commerce.year_09_10), - RecentData AS - (SELECT COUNT(DISTINCT "Customer ID") AS CustomerCount_Recent FROM mydf) - + WITH HistoricalData AS + (SELECT COUNT(DISTINCT "Customer ID") AS CustomerCount_Historical FROM e_commerce.year_09_10), + RecentData AS + (SELECT COUNT(DISTINCT "Customer ID") AS CustomerCount_Recent FROM mydf) + SELECT HistoricalData.CustomerCount_Historical, RecentData.CustomerCount_Recent FROM HistoricalData, RecentData """, inputs=kwtypes(mydf=pd.DataFrame), @@ -217,26 +216,27 @@ def motherduck_wf(mydf: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]: inputs=kwtypes(query=str, mydf=pd.DataFrame), ) + @dynamic def check_dataframe(mydf: pd) -> pd.DataFrame: - col_a_present = 'column_a' in mydf.columns - col_b_present = 'column_b' in mydf.columns + col_a_present = "column_a" in mydf.columns + col_b_present = "column_b" in mydf.columns if col_a_present and col_b_present: - query = f""" - SELECT column_a, column_b + query = """ + SELECT column_a, column_b FROM mydf WHERE column_a > 10 AND column_b < 100; """ elif col_a_present: - query = f""" - SELECT column_a + query = """ + SELECT column_a FROM mydf WHERE column_a > 10; """ elif col_b_present: - query = f""" - SELECT column_b + query = """ + SELECT column_b FROM mydf WHERE column_b < 100; """ @@ -245,10 +245,11 @@ def check_dataframe(mydf: pd) -> pd.DataFrame: return simple_duckdb_query(query=query, mydf=mydf) + @workflow def runtime_query_wf(mydf: pd.DataFrame) -> pd.DataFrame: return check_dataframe(mydf=mydf) if __name__ == "__main__": - print(f"Running runtime_query_wf()... {runtime_query_wf()}") \ No newline at end of file + print(f"Running runtime_query_wf()... {runtime_query_wf()}") From 94264b83ed7cbdb9365a1bb6ac4574c8f39395e8 Mon Sep 17 00:00:00 2001 From: Daniel Sola Date: Wed, 11 Sep 2024 11:55:44 -0700 Subject: [PATCH 3/3] bump upload-artifacts to v3 Signed-off-by: Daniel Sola --- .github/workflows/checks.yml | 2 +- .github/workflows/serialize_example.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index 9c7b9fb10..2280b987c 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -136,7 +136,7 @@ jobs: --force tar -xvf flyte-package.tgz - name: Upload artifacts - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v3 with: name: snacks-examples-${{ matrix.example }} path: examples/${{ matrix.example }}/**/*.pb diff --git a/.github/workflows/serialize_example.yml b/.github/workflows/serialize_example.yml index 4d1f38c78..d96f4e927 100644 --- a/.github/workflows/serialize_example.yml +++ b/.github/workflows/serialize_example.yml @@ -38,7 +38,7 @@ jobs: ./scripts/serialize-example.sh ${{ matrix.directory }} ${{ github.sha }} tar -xvf ${{ matrix.directory }}/flyte-package.tgz -C ${{ matrix.directory }} - name: Upload artifacts - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v3 with: name: snacks-${{ steps.example_id.outputs.EXAMPLE_ID }} path: ${{ matrix.directory }}/**/*.pb