Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DuckDB plugin #1419

Merged
merged 15 commits into from
Feb 27, 2023
Merged

DuckDB plugin #1419

merged 15 commits into from
Feb 27, 2023

Conversation

samhita-alla
Copy link
Contributor

@samhita-alla samhita-alla commented Jan 23, 2023

Signed-off-by: Samhita Alla [email protected]

TL;DR

This PR adds a DuckDBQuery task plugin that runs queries using DuckDB as the DBMS.

Type

  • Bug Fix
  • Feature
  • Plugin

Are all requirements met?

  • Code completed
  • Smoke tested
  • Unit tests added
  • Code documentation added
  • Any pending items have an associated Issue

Complete description

Capturing the crucial assumptions I made while building the task plugin:

  • The DuckDBQuery task parameter that a user needs to send argument to includes query and can contemplate adding includes inputs.
  • query can include a set of queries that'll be run sequentially. The last query needs to be a SELECT query.
  • inputs can include structured dataset or a list of parameters to be sent to the queries.
  • The output is a pyarrow table. Can be converted to any structured dataset compatible type.
  • The connection mode is set to :memory, i.e., the data is always stored in an in-memory, non-persistent database. It can be set to a file, but it's difficult to make the file accessible to different DuckDBQuery pods, which otherwise wouldn't make sense because file is persistent, and it needs to be leveraged.

Example:

duckdb_query = DuckDBQuery(
    name="read_parquet",
    query=[
        "INSTALL httpfs",
        "LOAD httpfs",
        """SELECT hour(lpep_pickup_datetime) AS hour, count(*) AS count FROM READ_PARQUET(?) GROUP BY hour""",
    ],
    inputs=kwtypes(params=list[str]),
)

@workflow
def parquet_wf(parquet_file: str) -> pd.DataFrame:
    return duckdb_query(params=[parquet_file])

assert isinstance(
    parquet_wf(parquet_file="https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2022-02.parquet"),
    pd.DataFrame,
)

Tracking Issue

Fixes flyteorg/flyte#3246, flyteorg/flyte#2865

Follow-up issue

NA
OR
https://github.com/flyteorg/flyte/issues/

@codecov
Copy link

codecov bot commented Jan 23, 2023

Codecov Report

Merging #1419 (9a0e42b) into master (b3ad158) will not change coverage.
The diff coverage is n/a.

@@           Coverage Diff           @@
##           master    #1419   +/-   ##
=======================================
  Coverage   69.32%   69.32%           
=======================================
  Files         305      305           
  Lines       28671    28671           
  Branches     2718     2718           
=======================================
  Hits        19877    19877           
  Misses       8276     8276           
  Partials      518      518           
Impacted Files Coverage Δ
plugins/setup.py 0.00% <ø> (ø)

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

@kumare3
Copy link
Contributor

kumare3 commented Jan 24, 2023

@samhita-alla can we support accepting StructuredDataset as an input?

@samhita-alla
Copy link
Contributor Author

@kumare3, DuckDBQuery task accepts structured dataset already. Here's an example:

duckdb_task = DuckDBQuery(
    name="duckdb_sd_df",
    query="SELECT * FROM pandas_df WHERE i = 2",
    inputs=kwtypes(pandas_df=StructuredDataset),
)

@task
def get_pandas_df() -> StructuredDataset:
    return StructuredDataset(
        dataframe=pd.DataFrame.from_dict({"i": [1, 2, 3, 4], "j": ["one", "two", "three", "four"]})
    )

@workflow
def pandas_wf(pandas_df: StructuredDataset) -> pd.DataFrame:
    return duckdb_task(pandas_df=pandas_df)

assert isinstance(pandas_wf(pandas_df=get_pandas_df()), pd.DataFrame)

Let me know if you're looking for something different.

Signed-off-by: Samhita Alla <[email protected]>
Signed-off-by: Samhita Alla <[email protected]>
Signed-off-by: Samhita Alla <[email protected]>
@samhita-alla samhita-alla changed the base branch from master to 0.3.0b1 February 2, 2023 12:11
@samhita-alla samhita-alla changed the base branch from 0.3.0b1 to master February 2, 2023 12:11
Copy link
Contributor

@cosmicBboy cosmicBboy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking great! added some comments for docstrings

plugins/flytekit-duckdb/flytekitplugins/duckdb/task.py Outdated Show resolved Hide resolved
plugins/flytekit-duckdb/setup.py Outdated Show resolved Hide resolved
@samhita-alla
Copy link
Contributor Author

@cosmicBboy, thanks for reviewing the PR! Can you look through it again?

@cosmicBboy
Copy link
Contributor

cosmicBboy commented Feb 15, 2023

image

DuckDB api reference is blank, I think we need to update the https://github.com/flyteorg/flytekit/blob/master/doc-requirements.in file with duckdb as a dependency, or else it won't be rendered.

@cosmicBboy
Copy link
Contributor

@samhita-alla we'll also need to invest a bit in enable warnings as errors in the sphinx build process -W like we did with flytesnacks. It'll let us catch these kinds of docs build errors

@samhita-alla
Copy link
Contributor Author

@cosmicBboy, fixed the docs and added a GitHub action to show warnings as errors.

name: str,
query: Union[str, List[str]],
inputs: Optional[Dict[str, Union[StructuredDataset, list]]] = None,
**kwargs,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add an Output Schema type here like snowflake? if output_schema_type is none, we won't generate output dataset.
And the type should change to StructuredDataset, because we already deprecated FlyteSchema

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pingsutw, this task isn't helpful if there's no output dataset. The DuckDBQuery task runs some queries and returns the output of a SELECT statement. Hence, it must return the query output, and in this case, it's StructuredDataset. Also, I can definitely add output_schema_type. But the output has to always be a StructuredDataset. So is it necessary? I'm already hard-coding the output type in the initialization. Let me know what you think.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. do we support insert or some other operations? If not, I think we don't need schema type for now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we do. So you can give a bunch of queries to a single DuckDBQuery task. But the last one needs to be a SELECT query because after say, you insert the data, you need to retrieve the data, right? Else, it's of no use. I'm using the non-persistent offering by DuckDB. So all the data will be available only within the query. Does that make sense?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense. Thanks for the explanation.

@samhita-alla
Copy link
Contributor Author

Thanks, Kevin! Will merge this PR after @cosmicBboy approves as well.

@samhita-alla
Copy link
Contributor Author

@kumare3, let me know if this PR looks good to you.

@pingsutw pingsutw merged commit d0b72a8 into master Feb 27, 2023
wild-endeavor pushed a commit that referenced this pull request Mar 7, 2023
* DuckDB integration

Signed-off-by: Samhita Alla <[email protected]>

* add sd test and fix import

Signed-off-by: Samhita Alla <[email protected]>
Signed-off-by: Samhita Alla <[email protected]>

* fix lint error

Signed-off-by: Samhita Alla <[email protected]>

* fix lint error

Signed-off-by: Samhita Alla <[email protected]>

* list to List

Signed-off-by: Samhita Alla <[email protected]>

* lint

Signed-off-by: Samhita Alla <[email protected]>

* incorporated suggestions

Signed-off-by: Samhita Alla <[email protected]>

* add duckdb to requirements and add gh action to detect doc warnings and errors

Signed-off-by: Samhita Alla <[email protected]>

* gh action: python 3.9

Signed-off-by: Samhita Alla <[email protected]>

* docs python 3.8 to 3.9

Signed-off-by: Samhita Alla <[email protected]>

---------

Signed-off-by: Samhita Alla <[email protected]>
Signed-off-by: Samhita Alla <[email protected]>
Co-authored-by: Kevin Su <[email protected]>
eapolinario added a commit that referenced this pull request Mar 8, 2023
* Create non-root user after apt-get (#1519)

* Create non-root user after apt-get

Signed-off-by: Eduardo Apolinario <[email protected]>

* Create user after pip install

Signed-off-by: Kevin Su <[email protected]>

---------

Signed-off-by: Eduardo Apolinario <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Co-authored-by: Eduardo Apolinario <[email protected]>
Co-authored-by: Kevin Su <[email protected]>

* Add root pyflyte reference to docs (#1520)

Signed-off-by: Eduardo Apolinario <[email protected]>
Co-authored-by: Eduardo Apolinario <[email protected]>

* DuckDB plugin (#1419)

* DuckDB integration

Signed-off-by: Samhita Alla <[email protected]>

* add sd test and fix import

Signed-off-by: Samhita Alla <[email protected]>
Signed-off-by: Samhita Alla <[email protected]>

* fix lint error

Signed-off-by: Samhita Alla <[email protected]>

* fix lint error

Signed-off-by: Samhita Alla <[email protected]>

* list to List

Signed-off-by: Samhita Alla <[email protected]>

* lint

Signed-off-by: Samhita Alla <[email protected]>

* incorporated suggestions

Signed-off-by: Samhita Alla <[email protected]>

* add duckdb to requirements and add gh action to detect doc warnings and errors

Signed-off-by: Samhita Alla <[email protected]>

* gh action: python 3.9

Signed-off-by: Samhita Alla <[email protected]>

* docs python 3.8 to 3.9

Signed-off-by: Samhita Alla <[email protected]>

---------

Signed-off-by: Samhita Alla <[email protected]>
Signed-off-by: Samhita Alla <[email protected]>
Co-authored-by: Kevin Su <[email protected]>

* add string as a valid input (#1527)

* add string as a valid input

Signed-off-by: Samhita Alla <[email protected]>

* isort

Signed-off-by: Samhita Alla <[email protected]>

* tests

Signed-off-by: Samhita Alla <[email protected]>

* Lint

Signed-off-by: Eduardo Apolinario <[email protected]>

---------

Signed-off-by: Samhita Alla <[email protected]>
Signed-off-by: Eduardo Apolinario <[email protected]>
Co-authored-by: Eduardo Apolinario <[email protected]>

* Add back attempt to use existing serialization settings when running (#1529)

Signed-off-by: Yee Hing Tong <[email protected]>

* update configuration docs, fix some docstrings (#1530)

* update configuration docs, fix some docstrings

Signed-off-by: Niels Bantilan <[email protected]>

* update copy

Signed-off-by: Niels Bantilan <[email protected]>

* add config init command

Signed-off-by: Niels Bantilan <[email protected]>

---------

Signed-off-by: Niels Bantilan <[email protected]>

* Revert "Make flytekit comply with PEP-561 (#1516)" (#1532)

This reverts commit b3ad158.

Signed-off-by: Eduardo Apolinario <[email protected]>
Co-authored-by: Eduardo Apolinario <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>

* Failed to initialize FlyteInvalidInputException (#1534)

Signed-off-by: Kevin Su <[email protected]>

* cherry pick pin fsspec commit

Signed-off-by: Yee Hing Tong <[email protected]>

* Set flytekit<1.3.0 in duckdb tests

Signed-off-by: eduardo apolinario <[email protected]>

* Fix flyteidl==1.2.9 in doc-requirements.txt

Signed-off-by: eduardo apolinario <[email protected]>

* No duckdb documentation

Signed-off-by: eduardo apolinario <[email protected]>

* Linting

Signed-off-by: eduardo apolinario <[email protected]>

---------

Signed-off-by: Eduardo Apolinario <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Samhita Alla <[email protected]>
Signed-off-by: Samhita Alla <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Niels Bantilan <[email protected]>
Signed-off-by: eduardo apolinario <[email protected]>
Co-authored-by: Eduardo Apolinario <[email protected]>
Co-authored-by: Eduardo Apolinario <[email protected]>
Co-authored-by: Kevin Su <[email protected]>
Co-authored-by: Samhita Alla <[email protected]>
Co-authored-by: Niels Bantilan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Core feature] DuckDB Integration
4 participants