Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

errors when using pyarrow dataset as a source #1779

Closed
djouallah opened this issue Oct 27, 2023 · 16 comments · Fixed by #1780
Closed

errors when using pyarrow dataset as a source #1779

djouallah opened this issue Oct 27, 2023 · 16 comments · Fixed by #1780
Labels
bug Something isn't working

Comments

@djouallah
Copy link

pyarrow dataset is not a supported source for delta-rs but I noticed it works fine and it does generate the files but at then end i get an error that py.dataset is not iterable.

the reason I amusing dataset and not table is to avoid OOM

@djouallah djouallah added the bug Something isn't working label Oct 27, 2023
@stefnba
Copy link

stefnba commented Oct 27, 2023

Same for me.

It works fine the first time but as soon as the table exists the error you mentioned is thrown.

@ion-elgreco
Copy link
Collaborator

Can you specify when you were passing a pyarrow dataset as a source?

@djouallah
Copy link
Author

here is the code

import pyarrow.dataset as ds
from deltalake.writer import write_deltalake

from trident_token_library_wrapper import PyTridentTokenLibrary
aadToken = PyTridentTokenLibrary.get_access_token("storage")

for tbl in ['nation','region','customer','supplier','orders','part','lineitem','partsupp'] :
     print(tbl)
     write_deltalake(f"abfss://[email protected]/TPCH_DuckDB100.Lakehouse/Tables/{tbl}",\
     ds.dataset(f'/lakehouse/default/Files/{sf}/{tbl}',format="parquet")\
     ,mode='overwrite',overwrite_schema=True\
     ,max_rows_per_file =250000000,min_rows_per_group=8000000,max_rows_per_group=16000000\
     ,storage_options={"bearer_token": aadToken, "use_fabric_endpoint": "true"})

@ion-elgreco
Copy link
Collaborator

ion-elgreco commented Oct 27, 2023

The only inputs that are accepted are these:

data (Union[pandas.core.frame.DataFrame, pyarrow.lib.Table, pyarrow.lib.RecordBatch, Iterable[pyarrow.lib.RecordBatch], pyarrow.lib.RecordBatchReader]) – Data to write. If passing iterable, the schema must also be given.

So you will need to add .to_table() after the ds.dataset(f'')

But essentially you are just cloning/rewriting the table to another location, right?

@djouallah
Copy link
Author

yes, I am doing .to_table() but what I notice, it does works and write the files even without it, I much prefer not to use to_table as it consume a lot of memory

@stefnba
Copy link

stefnba commented Oct 27, 2023

I recommend using .to_batches() which returns an iterator of RecordBatch.

.to_table() reads all the selected data from the dataset into memory and by doing so defeats the purpose of using a pyarrow dataset in the first place.

@djouallah
Copy link
Author

@stefnba do you have please a full example using to_batch() i could not make it to works ?

@ion-elgreco
Copy link
Collaborator

ion-elgreco commented Oct 27, 2023

Hmm I think I get it now, looking at the code in our python writer, pyarrow.dataset.Dataset does support writing datasets.

@stefnba
Copy link

stefnba commented Oct 27, 2023

In your code sample just add .to_batches(), then it should work.

More info, see here

ds.dataset(f'/lakehouse/default/Files/{sf}/{tbl}',format="parquet").to_batches()

@djouallah
Copy link
Author

thanks @stefnba now it ask about the schema :)

@ion-elgreco
Copy link
Collaborator

I am working on a quick fix it's actually really small fix.

@stefnba
Copy link

stefnba commented Oct 27, 2023

Hmm I think I get it now, looking at the code in our python writer, pyarrow.dataset.Dataset does support writing datasets.

Yes it does. That's why it also works when no table exists yet since in write_deltalake(), if table is not None is skipped and the data is directly written with ds.write_dataset().

if table is not None:
        # We don't currently provide a way to set invariants
        # (and maybe never will), so only enforce if already exist.
        invariants = table.schema().invariants
        checker = _DeltaDataChecker(invariants)

        def check_data_is_aligned_with_partition_filtering(
            batch: pa.RecordBatch,
        ) -> None:
            if table is None:
                return
            existed_partitions: FrozenSet[
                FrozenSet[Tuple[str, Optional[str]]]
            ] = table._table.get_active_partitions()
            allowed_partitions: FrozenSet[
                FrozenSet[Tuple[str, Optional[str]]]
            ] = table._table.get_active_partitions(partition_filters)
            partition_values = pa.RecordBatch.from_arrays(
                [
                    batch.column(column_name)
                    for column_name in table.metadata().partition_columns
                ],
                table.metadata().partition_columns,
            )
            partition_values = batch_distinct(partition_values)
            for i in range(partition_values.num_rows):
                # Map will maintain order of partition_columns
                partition_map = {
                    column_name: encode_partition_value(
                        batch.column(column_name)[i].as_py()
                    )
                    for column_name in table.metadata().partition_columns
                }
                partition = frozenset(partition_map.items())
                if (
                    partition not in allowed_partitions
                    and partition in existed_partitions
                ):
                    partition_repr = " ".join(
                        f"{key}={value}" for key, value in partition_map.items()
                    )
                    raise ValueError(
                        f"Data should be aligned with partitioning. "
                        f"Data contained values for partition {partition_repr}"
                    )

        def validate_batch(batch: pa.RecordBatch) -> pa.RecordBatch:
            checker.check_batch(batch)

            if mode == "overwrite" and partition_filters:
                check_data_is_aligned_with_partition_filtering(batch)

            return batch

        if isinstance(data, RecordBatchReader):
            batch_iter = data
        elif isinstance(data, pa.RecordBatch):
            batch_iter = [data]
        elif isinstance(data, pa.Table):
            batch_iter = data.to_batches()
        else:
            batch_iter = data

        data = RecordBatchReader.from_batches(
            schema, (validate_batch(batch) for batch in batch_iter)
        )

    ds.write_dataset(
        data,
        base_dir="/",
        basename_template=f"{current_version + 1}-{uuid.uuid4()}-{{i}}.parquet",
        format="parquet",
        partitioning=partitioning,
        # It will not accept a schema if using a RBR
        schema=schema if not isinstance(data, RecordBatchReader) else None,
        file_visitor=visitor,
        existing_data_behavior="overwrite_or_ignore",
        file_options=file_options,
        max_open_files=max_open_files,
        max_rows_per_file=max_rows_per_file,
        min_rows_per_group=min_rows_per_group,
        max_rows_per_group=max_rows_per_group,
        filesystem=filesystem,
        max_partitions=max_partitions,
    )

Here is the error message:

320     else:
    321         batch_iter = data
    323     data = RecordBatchReader.from_batches(
--> 324         schema, (validate_batch(batch) for batch in batch_iter)
    325     )
    327 ds.write_dataset(
    328     data,
    329     base_dir="[/](https://file+.vscode-resource.vscode-cdn.net/)",
   (...)
    343     max_partitions=max_partitions,
    344 )
    346 if table is None:

TypeError: 'pyarrow._dataset.FileSystemDataset' object is not iterable

@stefnba
Copy link

stefnba commented Oct 27, 2023

thanks @stefnba now it ask about the schema :)

When you use an iterable in write_deltalake() you need to provide a schema. Notice the schema=dataset.schema and dataset.to_batches().

from trident_token_library_wrapper import PyTridentTokenLibrary

aadToken = PyTridentTokenLibrary.get_access_token("storage")

for tbl in ["nation", "region", "customer", "supplier", "orders", "part", "lineitem", "partsupp"]:
    print(tbl)

    dataset = ds.dataset(f"/lakehouse/default/Files/{sf}/{tbl}", format="parquet")

    write_deltalake(
        f"abfss://[email protected]/TPCH_DuckDB100.Lakehouse/Tables/{tbl}",
        data=dataset.to_batches(),
        mode="overwrite",
        schema= dataset.schema,
        overwrite_schema=True,
        max_rows_per_file=250000000,
        min_rows_per_group=8000000,
        max_rows_per_group=16000000,
        storage_options={"bearer_token": aadToken, "use_fabric_endpoint": "true"},
    )

@ion-elgreco
Copy link
Collaborator

ion-elgreco commented Oct 27, 2023

When the table exists it would validate the batches and since we didn't check for pyarrow dataset it would iterate over it which is not supported. This adds explicit support for pyarrow datasets. #1780

@djouallah
Copy link
Author

@ion-elgreco @stefnba a question, Delta-s can read delta table from onelale using Storage option, do you know if this works can be ported upstream to pyarrow dataset, I have a case where I want to read some parquet files from another url ?

@ion-elgreco
Copy link
Collaborator

@ion-elgreco @stefnba a question, Delta-s can read delta table from onelale using Storage option, do you know if this works can be ported upstream to pyarrow dataset, I have a case where I want to read some parquet files from another url ?

I don't get the question, fsspec doesn't support one lake? If they don't you need to create an issue there and at adlfs repo

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants