Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge data freezes indefenetely #1920

Closed
edgBR opened this issue Nov 29, 2023 · 13 comments
Closed

Merge data freezes indefenetely #1920

edgBR opened this issue Nov 29, 2023 · 13 comments
Labels
bug Something isn't working

Comments

@edgBR
Copy link

edgBR commented Nov 29, 2023

Environment

[tool.poetry]
name = "code"
version = "0.1.0"
description = ""
authors = ["Edgar Bahilo <[email protected]>"]
readme = "README.md"

[tool.poetry.dependencies]
python = "^3.10"
black = "^23.11.0"
ruff = "^0.1.6"
pytest = "^7.4.3"
pre-commit = "^3.5.0"
deltalake = "^0.13.0"
polars = "^0.19.17"
azure-identity = "^1.15.0"
azure-storage-blob = "^12.19.0"
pandas = "^2.1.3"
ipdb = "^0.13.13"
fsspec = "^2023.10.0"
adlfs = "^2023.10.0"
azure-keyvault-secrets = "^4.7.0"
pyarrow = "^14.0.1"
python-dotenv = "^1.0.0"
azure-storage-file-datalake = "^12.14.0"


[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

[tool.black]
skip-string-normalization = true
line-length = 120

[tool.ruff]
# Same as Black.
line-length = 120

exclude = [
    "jupyter_notebook_config.py",
    ".bzr",
    ".direnv",
    ".eggs",
    ".git",
    ".git-rewrite",
    ".hg",
    ".mypy_cache",
    ".nox",
    ".pants.d",
    ".pytype",
    ".ruff_cache",
    ".svn",
    ".tox",
    ".venv",
    "__pypackages__",
    "_build",
    "buck-out",
    "build",
    "dist",
    "node_modules",
    "venv"]

select = [
    "E",  # pycodestyle errors (settings from FastAPI, thanks, @tiangolo!)
    "W",  # pycodestyle warnings
    "F",  # pyflakes
    "I",  # isort
    "C",  # flake8-comprehensions
    "B",  # flake8-bugbear
]
ignore = [
    "E501",  # line too long, handled by black
    "C901",  # too complex
]

[tool.ruff.isort]
order-by-type = true
relative-imports-order = "closest-to-furthest"
extra-standard-library = ["typing"]
section-order = ["future", "standard-library", "third-party", "first-party", "local-folder"]
known-first-party = []

Delta-rs version:

0.13.0

Binding:
Python bindings

Environment:


Bug

What happened:

Hi,

Im trying to ramp up the knoweldge of the team in transactional datalakes. I have used delta before with pyspark with more or less proper success but in my current company we do not have yet the use case for spark (as we are not processing not even TB of data for our projects) and polars/duckDB seems a good compromise. I have created the following demo code which performs the following:

  • Unzip CSV data from a set of URLs.
  • Convert the CSV to .parquet and send it to a landing zone.
  • Read the parquet data from the landing zone.
  • Convert the parquet to delta and write it in an append only mode.
  • Make upserts into a silver table.

The code is as follows:

import logging
import os
from datetime import datetime
from io import BytesIO
from zipfile import ZipFile

import polars as pl
import requests
from azure.identity import AzureCliCredential
from azure.storage.filedatalake import DataLakeServiceClient
from deltalake import DeltaTable
from dotenv import load_dotenv

load_dotenv('../.env')
# Set the logging level for all azure-* libraries
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('azure')
logger.setLevel(logging.ERROR)
logger_normal = logging.getLogger(__name__)

DOWNLOAD_URIS = [
    "https://divvy-tripdata.s3.amazonaws.com/Divvy_Trips_2018_Q4.zip",
    "https://divvy-tripdata.s3.amazonaws.com/Divvy_Trips_2019_Q1.zip",
    "https://divvy-tripdata.s3.amazonaws.com/Divvy_Trips_2019_Q2.zip",
    "https://divvy-tripdata.s3.amazonaws.com/Divvy_Trips_2019_Q3.zip",
    "https://divvy-tripdata.s3.amazonaws.com/Divvy_Trips_2019_Q4.zip",
    "https://divvy-tripdata.s3.amazonaws.com/Divvy_Trips_2020_Q1.zip",
]

LANDING_ZONE_PATH = os.getenv('LANDING_ZONE_PATH')
ACCOUNT_NAME = os.getenv('STORAGE_ACCOUNT_NAME')
BRONZE_CONTAINER = os.getenv('STAGING_PATH')
SILVER_CONTAINER = os.getenv('HISTORICAL_PATH')
GOLD_CONTAINER = os.getenv('DW_PATH')


class ETLPipeline:
    """
    Mock class that simulates a modulith to process from landing to bronze, silver and gold.
    It takes the assumption that we process one file at a time for simplification (mostly because
    I did not want to write asyncio calls and deal with the connections in the ADLSGenClient)
    """

    def __init__(self) -> None:
        self.adlsgen2_client = DataLakeServiceClient(
            account_url=f"https://{ACCOUNT_NAME}.dfs.core.windows.net/", credential=AzureCliCredential()
        )
        self.bronze_client = self.adlsgen2_client.get_directory_client(file_system=LANDING_ZONE_PATH, directory='/')

    def upload_to_landing(self, uri: str):
        """_summary_

        Args:
            uri (str): _description_
        """

        try:
            response = requests.get(url=uri)
            response.raise_for_status()
            with ZipFile(BytesIO(response.content), 'r') as zip_ref:
                # Assuming there is only one file in the zip archive
                file_name = zip_ref.namelist()[0]
                # Read the CSV file into a polars dataframe
                raw_df = pl.read_csv(zip_ref.open(file_name).read())

                # Save the DataFrame as a parquet file in blob
                file_name = f"{file_name.lower().split('.csv')[0]}.parquet"
                path_name = f"../data/input/{file_name}"
                raw_df.write_parquet(path_name, use_pyarrow=True, compression='lz4')

                file_client = self.bronze_client.get_file_client(file_name)

                with open(file=path_name, mode="rb") as data:
                    file_client.upload_data(data, overwrite=True)
                logger_normal.info(f"{file_name} uploaded to adlsgen2")
                self.file_name = file_name
        except requests.exceptions.HTTPError as e:
            logger_normal.error(f"Failed to download {uri}. HTTP error occurred: {e}")
        except Exception as e:
            logger_normal.error(f"An error occurred processing {uri}: {e}")

    def raw_to_bronze(self):
        try:
            storage_options_raw = {"account_name": ACCOUNT_NAME, "anon": False}
            storage_options_raw_delta = {"account_name": ACCOUNT_NAME, "use_azure_cli": "True"}
            logger_normal.info(f"Reading {self.file_name}")
            df = pl.read_parquet(
                source=f'abfs://{LANDING_ZONE_PATH}/{self.file_name}', storage_options=storage_options_raw
            )
            df = df.with_columns(insertion_date=datetime.now())
            # df = df.with_columns([(pl.col("start_time").str.to_datetime().dt.strftime("%Y-%m").alias("monthdate"))])
            logger_normal.info(f"Converting {self.file_name} to delta")
            df.write_delta(
                target=f'abfs://{BRONZE_CONTAINER}/',
                mode='append',
                storage_options=storage_options_raw_delta
                # delta_write_options={"partition_by": ['monthdate']}
            )
            # bronze_df = DeltaTable(
            #         table_uri=f'abfs://{BRONZE_CONTAINER}/', storage_options=storage_options_raw_delta)
            # bronze_df.optimize.z_order(['trip_id'])
        except Exception as e:
            logger_normal.error(e)
            logger_normal.error(f"Failed to conver to delta {self.file_name}")

    def bronze_to_silver(self):
        try:
            storage_options_raw_delta = {"account_name": ACCOUNT_NAME, "use_azure_cli": "True"}
            # source
            bronze_df = pl.read_delta(source=f'abfs://{BRONZE_CONTAINER}/', storage_options=storage_options_raw_delta)
            # target
            silver_check = self._table_checker(container=SILVER_CONTAINER, options=storage_options_raw_delta)
            if silver_check:
                bronze_df = DeltaTable(
                    table_uri=f'abfs://{BRONZE_CONTAINER}/', storage_options=storage_options_raw_delta
                ).to_pyarrow_table()
                logger_normal.info("Merging new data into silver")
                silver_df = DeltaTable(
                    table_uri=f'abfs://{SILVER_CONTAINER}/', storage_options=storage_options_raw_delta
                )
                (
                    silver_df.merge(
                        source=bronze_df, predicate="s.trip_id = t.trip_id", source_alias="s", target_alias="t"
                    )
                    .when_matched_update_all()
                    .when_not_matched_insert_all()
                    .execute()
                )
                silver_df.optimize.z_order(['trip_id'])
            else:
                logger_normal.info("Because silver table is empty we save the first bronze file as silver")
                bronze_df.write_delta(
                    target=f'abfs://{SILVER_CONTAINER}/', mode='overwrite', storage_options=storage_options_raw_delta
                )

        except Exception as e:
            logger_normal.error(e)
            logger_normal.error(f"Failed to merge {self.file_name}")

    def silver_to_gold(self):
        return True

    def _table_checker(self, container, options):
        try:
            delta_table = DeltaTable(table_uri=f"abfs://{container}/", storage_options=options)
            logger_normal.info(f"Delta table version is {delta_table.version()}")
            table_exist = True
            logger_normal.info(f"Delta Table Exists in {container}")
        except Exception as e:
            logger_normal.error(e)
            table_exist = False
        return table_exist


if __name__ == "__main__":
    etl_workflow = ETLPipeline()
    for uri in DOWNLOAD_URIS:
        etl_workflow.upload_to_landing(uri=uri)
        etl_workflow.raw_to_bronze()
        etl_workflow.bronze_to_silver()

When running the code:

image

The merge seems not to work.

I am aware that I am not using bloom filters and partitions but I am running this in a pretty beefy machine (112RAM and 16 cores). Is this normal?

When looking to the storage account I am only able to see the first file:

image

And no additional commits seem to be running.

What you expected to happen:

I would expect either a warning or something that indicates why this is working so slowly.

BR
E

@edgBR edgBR added the bug Something isn't working label Nov 29, 2023
@ion-elgreco
Copy link
Collaborator

@edgBR are you able to build and compile the latest python release to see if that will make it run?

In the main branch there are some improvements for the merge operation, which you can use in the next python release

@edgBR
Copy link
Author

edgBR commented Nov 29, 2023

Hi @ion-elgreco

Where I can find the instructions for compilation?

Do I need rust installed?

Also would it be possible for you to reproduce locally?

@ion-elgreco
Copy link
Collaborator

@edgBR in the CONTRIBUTING.md you should be able to see some instructions on how to compile it.

@edgBR
Copy link
Author

edgBR commented Dec 5, 2023

Hi @ion-elgreco

I have cloned and installed the version based on: bca00ae (last commit)

Unfortunately I am not able to build it properly.

I am getting the following error:

error: could not compile `arrow-data` (lib)

Caused by:
  process didn't exit successfully: `/home/azureuser/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/bin/rustc --crate-name arrow_data --edition=2021 /home/azureuser/.cargo/registry/src/index.crates.io-6f17d22bba15001f/arrow-data-48.0.1/src/lib.rs --error-format=json --json=diagnostic-rendered-ansi,artifacts,future-incompat --diagnostic-width=225 --crate-type lib --emit=dep-info,metadata,link -C embed-bitcode=no -C debuginfo=2 --cfg 'feature="ffi"' -C metadata=3bd98a31750d62d1 -C extra-filename=-3bd98a31750d62d1 --out-dir /mnt/batch/tasks/shared/LS_root/mounts/clusters/poetryebrhpc/code/Users/edgar.bahilo/delta-rs/target/debug/deps -L dependency=/mnt/batch/tasks/shared/LS_root/mounts/clusters/poetryebrhpc/code/Users/edgar.bahilo/delta-rs/target/debug/deps --extern arrow_buffer=/mnt/batch/tasks/shared/LS_root/mounts/clusters/poetryebrhpc/code/Users/edgar.bahilo/delta-rs/target/debug/deps/libarrow_buffer-fa7d385f97e94f64.rmeta --extern arrow_schema=/mnt/batch/tasks/shared/LS_root/mounts/clusters/poetryebrhpc/code/Users/edgar.bahilo/delta-rs/target/debug/deps/libarrow_schema-75a9e732fa8d2b13.rmeta --extern half=/mnt/batch/tasks/shared/LS_root/mounts/clusters/poetryebrhpc/code/Users/edgar.bahilo/delta-rs/target/debug/deps/libhalf-c4c9d4c9e1f59e7e.rmeta --extern num=/mnt/batch/tasks/shared/LS_root/mounts/clusters/poetryebrhpc/code/Users/edgar.bahilo/delta-rs/target/debug/deps/libnum-bde8b76108b30fd4.rmeta --cap-lints allow` (signal: 7, SIGBUS: access to undefined memory)

Is there a way you can try to reproduce?

I will not try to run the same code but using local storage instead of ADLSGen2 to see if the problem is also reproducible there.

BR
E

@edgBR
Copy link
Author

edgBR commented Dec 5, 2023

Hi @ion-elgreco

I have reproduced locally and unfortunately the merge still freezes:

import logging
import os
from datetime import datetime
from io import BytesIO
from zipfile import ZipFile
import pdb

import polars as pl
from polars.io.delta import _convert_pa_schema_to_delta
import requests
from azure.identity import AzureCliCredential
from azure.storage.filedatalake import DataLakeServiceClient
from deltalake import DeltaTable, write_deltalake
from dotenv import load_dotenv

load_dotenv('../.env')
# Set the logging level for all azure-* libraries
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('azure')
logger.setLevel(logging.ERROR)
logger_normal = logging.getLogger(__name__)

DOWNLOAD_URIS = [
    "https://divvy-tripdata.s3.amazonaws.com/Divvy_Trips_2018_Q4.zip",
    "https://divvy-tripdata.s3.amazonaws.com/Divvy_Trips_2019_Q1.zip",
    "https://divvy-tripdata.s3.amazonaws.com/Divvy_Trips_2019_Q2.zip",
    "https://divvy-tripdata.s3.amazonaws.com/Divvy_Trips_2019_Q3.zip",
    "https://divvy-tripdata.s3.amazonaws.com/Divvy_Trips_2019_Q4.zip",
    "https://divvy-tripdata.s3.amazonaws.com/Divvy_Trips_2020_Q1.zip",
]

LANDING_ZONE_PATH = os.getenv('LANDING_ZONE_PATH')
ACCOUNT_NAME = os.getenv('STORAGE_ACCOUNT_NAME')
BRONZE_CONTAINER = os.getenv('STAGING_PATH')
SILVER_CONTAINER = os.getenv('HISTORICAL_PATH')
GOLD_CONTAINER = os.getenv('DW_PATH')


class ETLPipeline:
    """
    Mock class that simulates a modulith to process from landing to bronze, silver and gold.
    It takes the assumption that we process one file at a time for simplification (mostly because
    I did not want to write asyncio calls and deal with the connections in the ADLSGenClient)
    """

    def __init__(self) -> None:
        self.adlsgen2_client = DataLakeServiceClient(
            account_url=f"https://{ACCOUNT_NAME}.dfs.core.windows.net/", credential=AzureCliCredential()
        )
        self.bronze_client = self.adlsgen2_client.get_directory_client(file_system=LANDING_ZONE_PATH, directory='/')

    def upload_to_landing(self, uri: str):
        """_summary_

        Args:
            uri (str): _description_
        """

        try:
            response = requests.get(url=uri)
            response.raise_for_status()
            with ZipFile(BytesIO(response.content), 'r') as zip_ref:
                # Assuming there is only one file in the zip archive
                file_name = zip_ref.namelist()[0]
                # Read the CSV file into a polars dataframe
                raw_df = pl.read_csv(zip_ref.open(file_name).read())

                # Save the DataFrame as a parquet file in blob
                file_name = f"{file_name.lower().split('.csv')[0]}.parquet"
                path_name = f"../data/input/{file_name}"
                raw_df.write_parquet(path_name, use_pyarrow=True, compression='lz4')

                # file_client = self.bronze_client.get_file_client(file_name)

                # with open(file=path_name, mode="rb") as data:
                #     file_client.upload_data(data, overwrite=True)
                # logger_normal.info(f"{file_name} uploaded to adlsgen2")
                self.file_name = file_name
        except requests.exceptions.HTTPError as e:
            logger_normal.error(f"Failed to download {uri}. HTTP error occurred: {e}")
        except Exception as e:
            logger_normal.error(f"An error occurred processing {uri}: {e}")

    def raw_to_bronze(self):
        try:
            # storage_options_raw = {"account_name": ACCOUNT_NAME, "anon": False}
            # storage_options_raw_delta = {"account_name": ACCOUNT_NAME, "use_azure_cli": "True"}
            logger_normal.info(f"Reading {self.file_name}")
            # df = pl.read_parquet(
            #     source=f'abfss://{LANDING_ZONE_PATH}/{self.file_name}', storage_options=storage_options_raw
            # )
            df = pl.read_parquet(
                source=f'../data/input/{self.file_name}'
            )
            #df = df.with_columns(insertion_date=datetime.now())
            #df = df.with_columns([(pl.col("start_time").str.to_datetime().dt.strftime("%Y-%m").alias("monthdate"))])
            logger_normal.info(f"Converting {self.file_name} to delta")
            # df.write_delta(
            #     target=f'abfss://{BRONZE_CONTAINER}/',
            #     mode='append',
            #     storage_options=storage_options_raw_delta
            #     # delta_write_options={"partition_by": ['monthdate']}
            # )
            df.write_delta(target="/tmp/bronze", 
                           mode='append')
            # bronze_df = DeltaTable(
            #         table_uri=f'abfs://{BRONZE_CONTAINER}/', storage_options=storage_options_raw_delta)
            # bronze_df.optimize.z_order(['trip_id'])
            # bronze_df = DeltaTable(
            #         table_uri="tmp/bronze")
            # bronze_df.optimize.z_order(['trip_id'])
        except Exception as e:
            logger_normal.error(e)
            logger_normal.error(f"Failed to conver to delta {self.file_name}")

    def bronze_to_silver(self):
        try:
            #storage_options_raw_delta = {"account_name": ACCOUNT_NAME, "use_azure_cli": "True"}
            # source
            #bronze_df = pl.read_delta(source=f'abfss://{BRONZE_CONTAINER}/', storage_options=storage_options_raw_delta)
            bronze_df = DeltaTable(
                    table_uri="/tmp/bronze"
                ).to_pyarrow_table()
            # target
            #silver_check = self._table_checker(container=SILVER_CONTAINER, options=storage_options_raw_delta)
            silver_check = self._table_checker_local(container=SILVER_CONTAINER)
            if silver_check:
                # bronze_df = DeltaTable(
                #     table_uri=f'abfss://{BRONZE_CONTAINER}/', storage_options=storage_options_raw_delta
                # ).to_pyarrow_table()

                logger_normal.info("Merging new data into silver")
                silver_df = DeltaTable(
                    table_uri="/tmp/silver"
                )
                (
                    silver_df.merge(
                        source=bronze_df, predicate="s.trip_id = t.trip_id", source_alias="s", target_alias="t"
                    )
                    .when_matched_update_all()
                    .when_not_matched_insert_all()
                    .execute()
                )
                logger_normal.info("Optimizing by Z order")
                # silver_df.optimize.z_order(['trip_id'])
            else:
                logger_normal.info("Because silver table is empty we save the first bronze file as silver")
                # bronze_df.write_delta(
                #     target=f'abfss://{SILVER_CONTAINER}/', mode='append', storage_options=storage_options_raw_delta
                # )
                write_deltalake(
                table_or_uri="/tmp/silver",
                data=bronze_df,
                mode='append')
        except Exception as e:
            logger_normal.error(e)
            logger_normal.error(f"Failed to merge {self.file_name}")

    def silver_to_gold(self):
        return True

    def _table_checker(self, container, options):
        try:
            delta_table = DeltaTable(table_uri=f"abfss://{container}/", storage_options=options)
            logger_normal.info(f"Delta table version is {delta_table.version()}")
            table_exist = True
            logger_normal.info(f"Delta Table Exists in {container}")
        except Exception as e:
            logger_normal.error(e)
            table_exist = False
        return table_exist
    
    def _table_checker_local(self, container):
        try:
            delta_table = DeltaTable(table_uri=f"/tmp/{container}")
            logger_normal.info(f"Delta table version is {delta_table.version()}")
            table_exist = True
            logger_normal.info(f"Delta Table Exists in {container}")
        except Exception as e:
            logger_normal.error(e)
            table_exist = False
        return table_exist


if __name__ == "__main__":
    etl_workflow = ETLPipeline()
    for uri in DOWNLOAD_URIS:
        etl_workflow.upload_to_landing(uri=uri)
        etl_workflow.raw_to_bronze()
        etl_workflow.bronze_to_silver()

@ion-elgreco
Copy link
Collaborator

@edgBR that's unfortunate, can you make a minimal reproducible example with fake data?

@edgBR
Copy link
Author

edgBR commented Dec 5, 2023

Hi @ion-elgreco

I am now using the examples from here: https://delta.io/blog/2023-10-22-delta-rs-python-v0.12.0/ and running:

from deltalake import DeltaTable, write_deltalake
from datetime import datetime
import polars as pl
from polars.io.delta import _convert_pa_schema_to_delta


def execute():

    df = pl.DataFrame(
        {
            "sales_order_id": ["1000", "1001", "1002", "1003"],
            "product": ["bike", "scooter", "car", "motorcycle"],
            "order_date": [
                datetime(2023, 1, 1),
                datetime(2023, 1, 5),
                datetime(2023, 1, 10),
                datetime(2023, 2, 1),
            ],
            "sales_price": [120.25, 2400, 32000, 9000],
            "paid_by_customer": [True, False, False, True],
        }
    )
    print(df)

    df.write_delta("sales_orders", mode="append")

    new_data = pl.DataFrame(
        {
            "sales_order_id": ["1002", "1004"],
            "product": ["car", "car"],
            "order_date": [datetime(2023, 1, 10), datetime(2023, 2, 5)],
            "sales_price": [30000.0, 40000.0],
            "paid_by_customer": [True, True],
        }
    )

    dt = DeltaTable("sales_orders")
    source = new_data.to_arrow()
    delta_schema = _convert_pa_schema_to_delta(source.schema)
    source = source.cast(delta_schema)

    (
        dt.merge(
            source=source,
            predicate="s.sales_order_id = t.sales_order_id",
            source_alias="s",
            target_alias="t",
        )
        .when_matched_update_all()
        .when_not_matched_insert_all()
        .execute()
    )


if __name__ == "__main__":
    execute()

Error is now:

Traceback (most recent call last):
  File "/anaconda/lib/python3.10/runpy.py", line 196, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/anaconda/lib/python3.10/runpy.py", line 86, in _run_code
    exec(code, run_globals)
  File "/mnt/batch/tasks/shared/LS_root/mounts/clusters/poetryebrhpc/code/Users/edgar.bahilo/delta-lake-examples/code/minimal.py", line 56, in <module>
    execute()
  File "/mnt/batch/tasks/shared/LS_root/mounts/clusters/poetryebrhpc/code/Users/edgar.bahilo/delta-lake-examples/code/minimal.py", line 25, in execute
    df.write_delta("sales_orders", mode="append")
  File "/mnt/batch/tasks/shared/LS_root/mounts/clusters/poetryebrhpc/code/Users/edgar.bahilo/delta-lake-examples/.venv/lib/python3.10/site-packages/polars/dataframe/frame.py", line 3655, in write_delta
    write_deltalake(
  File "/mnt/batch/tasks/shared/LS_root/mounts/clusters/poetryebrhpc/code/Users/edgar.bahilo/delta-lake-examples/.venv/lib/python3.10/site-packages/deltalake/writer.py", line 372, in write_deltalake
    _write_new_deltalake(
OSError: Generic DeltaLocalObjectStore error: Operation not supported (os error 95)

@ion-elgreco
Copy link
Collaborator

ion-elgreco commented Dec 5, 2023

@edgBR where are you writing to? Seems like a mounted storage. Ah it is mounted indeed, looking at your filepath, and also seems to be azureml right? :)

So writing to mounted storage is not supported. See here: #1765

Some one has a fix open to allow these illegal write operations, however I would suggest to still write directly to the cloud storage and not the mounted storage.

Did you do the merge operation also to a mounted storage?

@edgBR
Copy link
Author

edgBR commented Dec 5, 2023

Hi,

As I am using AML (Azure Machine Learning) the folders are over a similar thing to an NFS storage.

When appending /tmp/ to the paths I get now another error:

    metrics = self.table._table.merge_execute(
_internal.DeltaError: Generic DeltaTable error: Execution error: Fail to build join indices in NestedLoopJoinExec, error:Arrow error: Invalid argument error: Invalid comparison operation: LargeUtf8 == Utf8

@ion-elgreco
Copy link
Collaborator

@edgBR with which deltalake version? With version 0.14, I can run this fine.

@edgBR
Copy link
Author

edgBR commented Dec 5, 2023

Hi @ion-elgreco

As mentioned in the .toml I was using 0.13.

I have upgraded to 0.14 and this seems to work. Just as a side node the version 0.14 was published 7h ago in PyPI and it is not in the github releases.

image

Now this seems to work with local storage. Will update tomorrow about the ADLSGen2 tests.

@ion-elgreco
Copy link
Collaborator

Yes, there were some build failures with 0.14 so not everything got released to pypi and docs are not updated yet. I am looking into that now.

@edgBR
Copy link
Author

edgBR commented Dec 5, 2023

Hi @ion-elgreco

It seems now everything is working:

image

I think we can close this now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants