Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logstore issues on AWS Lambda #2410

Closed
timon-schmelzer-gcx opened this issue Apr 12, 2024 · 5 comments
Closed

Logstore issues on AWS Lambda #2410

timon-schmelzer-gcx opened this issue Apr 12, 2024 · 5 comments
Labels
bug Something isn't working

Comments

@timon-schmelzer-gcx
Copy link

Environment

Delta-rs version:
0.16.4

Binding:
python

Environment:

  • Cloud provider:
    • AWS, Lambda function, Layer AWSSDKPandas-Python39 + custom layer containing delta-rs

Bug

What happened:
I am currently working on a simple AWS data pipeline, mainly build on AWS Lambda function + deltars. There are two steps:

  1. Take data that has been added to a S3 bucket, convert it to pyarrow + append it to a delta table, also living in S3. This lambda gets called every few minutes.
    a. One special case: If the table is not existing yet, create one and set table parameters correctly
  2. In order to keep the ingestion and reading speed high, I want to perform a few cleanup tasks once in a while (e.g. every 24h). The cleanup consists of OPTIMIZE, VACUUM, checkpointing and metadata removement.

As explained here, we also configured a DynamoDb as logstore. The corresponding environment variables are directly set for both lambda functions.

Here is the code:

# 1.
import io
import json
import os
import urllib.parse

import boto3
import pandas as pd
import pyarrow as pa
from deltalake.writer import write_deltalake

print("Loading function")

s3 = boto3.client("s3")

my_schema = pa.schema(
    [
        ("systemname", pa.string()), 
        ...
    ]
)


def folder_exists(s3_bucket: str, s3_path: str) -> bool:
    """
    Return True if the given path exists in the given bucket.
    """
    s3_path = s3_path.rstrip("/")
    resp = s3.list_objects(Bucket=s3_bucket, Prefix=s3_path, Delimiter="/", MaxKeys=1)
    return "CommonPrefixes" in resp


def create_or_append_delta(data: pd.DataFrame, s3_bucket: str, s3_path: str, schema: pa.Schema):
    """Create new delta table with reasonable configuration.

    If delta table already exists, appends data to it.
    """
    write_config = {
        "mode": "append",
    }
    if not folder_exists(s3_bucket, s3_path):
        print("Writing new table")
        write_config = {
            "mode": "overwrite",
            "configuration": {"delta.logRetentionDuration": "interval 0 hour"},
        }
    write_deltalake(s3_path, data, schema=schema, **write_config)


def process_data_bronze(data_stream: io.StringIO, column_names: list[str]) -> pd.DataFrame:
    """Process data stream and return a pandas DataFrame."""
    ...



def lambda_handler(event, context):
    # Define Bucket and Delta Table name
    s3_bucket = os.environ["BRONZE_TABLE_BUCKET_NAME"]
    delta_table_name = os.environ["BRONZE_TABLE_NAME"]
    s3_path = f"s3://{s3_bucket}/v1/{delta_table_name}/"

    # Get the object from the event and show its content type
    raw_data_bucket_name = event["Records"][0]["s3"]["bucket"]["name"]
    raw_data_key = urllib.parse.unquote_plus(
        event["Records"][0]["s3"]["object"]["key"], encoding="utf-8"
    )

    try:
        # Process incoming data - create dataframe and adjust types
        response = s3.get_object(Bucket=raw_data_bucket_name, Key=raw_data_key)
        data = response["Body"].read().decode("utf-8")
    except Exception as e:
        print(e)
        print(
            "Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.".format(
                raw_data_key, raw_data_bucket_name
            )
        )
        raise e

    print("Processing incoming data.")
    df = process_data_bronze(data_stream=data, column_names=my_schema.names)
    print("Finished processing. Now writing to s3 bucket.")
    create_or_append_delta(df, s3_bucket=s3_bucket, s3_path=s3_path, schema=my_schema)
    print("Finished writing data to s3 bucket.")

Optimize code:

# 2.
from deltalake import DeltaTable
import os


def table_cleanup(delta_table: DeltaTable):
    """Perform different delta table cleanup tasks.

    Should be called every few hundered "append" operations to ensure a constant read/write
    performance.
    """
    # Merge small files into bigger ones for faster query performance.
    res_optimize = delta_table.optimize.compact()
    print("OPTIMIZE:", res_optimize)

    # Remove outdated files as they are not required any more. Note that vacuum never removes
    # the most recent files, even if retention_hours is set to 0.
    res_vacuum = delta_table.vacuum(
        retention_hours=0, enforce_retention_duration=False, dry_run=False
    )
    print("VACUUM:", res_vacuum)

    # Create a checkpoint, summarizing all previous commits to the delta log. For spark delta,
    # this is automatically done every 10 commits. However, deltars is still working on such
    # features. See also: https://github.com/delta-io/delta-rs/issues/913
    res_checkpoint = delta_table.create_checkpoint()
    print("CHECKPOINT:", res_checkpoint)

    # Remove outdated commits, which are stored as JSON files inside the _delta_log folder. In
    # contrast to the `vacuum` method, the retention_hours cannot be set directly in the call
    # but when creating the table. This can be done as follows:
    # write_deltalake(..., configuration={"delta.logRetentionDuration": "interval 10 second"})
    # Note that the interval duration has to be given in singual time intervals, i.e. "second"
    # instead of "seconds". As for the vacuum command, this command does not remove the most
    # recent commits. Make sure to call `create_checkpoint` before!
    # See also: https://github.com/delta-io/delta-rs/issues/2180
    res_cleanup_metadata = delta_table.cleanup_metadata()
    print("CLEANUP_METADATA:", res_cleanup_metadata)


def lambda_handler(event, context):
    bronze_table_bucket_name = os.environ["BRONZE_TABLE_BUCKET_NAME"]
    bronze_table_name = os.environ["BRONZE_TABLE_NAME"]
    bronze_delta_table_path = f"s3://{bronze_table_bucket_name}/v1/{bronze_table_name}/"

    print("Cleanup bronze table.")
    bronze_delta_table = DeltaTable(bronze_delta_table_path)
    print("FILES:", bronze_delta_table.file_uris())
    table_cleanup(bronze_delta_table)

    print("Cleanup tasks executed successfully.")

What you expected to happen:
I would expect that the OPTIMZE merges multiple small files into bigger ones and that the VACUUM remove the now outdated small files. Instead, the OPTIMIZE code does nothing and the VACUUM removes every file besides the most recent one! As written in the comment, I understood that VACUUM will not remove parts of the table that are currently in use, even if you set retention_hours=0.

How to reproduce it:
I expect a logstore issue here, as the optimize / vacuum code is working fine locally.

More details:
This is the situation before the second lambda function is called:
Bildschirmfoto 2024-04-12 um 10 43 59

Output of the second lambda function:

Function Logs
START RequestId: xxx Version: $LATEST
Cleanup bronze table.
FILES: ['s3://xxx/v1/xxx/1-057c4485-902f-4dde-8ed0-1a22a392cbf8-0.parquet']
OPTIMIZE: {'numFilesAdded': 0, 'numFilesRemoved': 0, 'filesAdded': '{"avg":0.0,"max":0,"min":0,"totalFiles":0,"totalSize":0}', 'filesRemoved': '{"avg":0.0,"max":0,"min":0,"totalFiles":0,"totalSize":0}', 'partitionsOptimized': 0, 'numBatches': 0, 'totalConsideredFiles': 1, 'totalFilesSkipped': 1, 'preserveInsertionOrder': True}
[2024-04-12T08:44:16Z WARN  deltalake_aws::logstore] LockClientError::VersionAlreadyExists(2)
VACUUM: ['0-ec2c2342-c51f-47d2-9d39-b4d6cb15e86d-0.parquet']
CHECKPOINT: None
CLEANUP_METADATA: None
Cleanup tasks executed successfully.
END RequestId: xxx
REPORT RequestId: xxx	Duration: 1551.10 ms	Billed Duration: 1552 ms	Memory Size: 512 MB	Max Memory Used: 201 MB	Init Duration: 2629.44 ms

So even if there are two files available, bronze_delta_table.file_uris() only returns one. Also, the OPTIMIZE only considers one file instead of two. I run this code also on a table containing hundereds of small files with the result that all files besides the most recent ones have been removed.

There is also a warning about the lock client ([2024-04-12T08:44:16Z WARN deltalake_aws::logstore] LockClientError::VersionAlreadyExists(2)), which I do not really understand. When looking at the dynamodb, it seems like the second lambda function created two entries at the same time, but I am not sure, why. Could that be the reason for this strange behaviour?

@timon-schmelzer-gcx timon-schmelzer-gcx added the bug Something isn't working label Apr 12, 2024
@ion-elgreco
Copy link
Collaborator

@timon-schmelzer-gcx check the _delta_log files, and share those contents here

@timon-schmelzer-gcx
Copy link
Author

Sure, hope this helps!

# content of 0000001.checkpoint.parquet
{"protocol":
    {
        "minReaderVersion": 1.0, 
        "minWriterVersion": 2.0, 
        "writerFeatures": None, 
        "readerFeatures": None
    }
}
{
    "metaData": {
        "id": "26207a48-4cac-481c-a86d-cecae2238bb0",
        "name": None,
        "description": None,
        "schemaString": "{"type":"struct","fields":[{"name":"systemname","type":"string","nullable":true,"metadata":{}},...]}",
        "createdTime": 1712911315257.0,
        "partitionColumns": array([], dtype=object),
        "configuration": [("delta.logRetentionDuration",
            "interval 0 hour")
        ],
        "format": {
            "provider": "parquet",
            "options": []
        }
    }
}
{
    "remove": {
        "path": "0-ec2c2342-c51f-47d2-9d39-b4d6cb15e86d-0.parquet",
        "deletionTimestamp": 1712911432981.0,
        "dataChange": True,
        "extendedFileMetadata": True,
        "size": 3233.0,
        "partitionValues": [],
        "tags": []
    }
}
{
    "add": {
        "path": "1-057c4485-902f-4dde-8ed0-1a22a392cbf8-0.parquet",
        "size": 2926.0,
        "modificationTime": 1712911432899.0,
        "dataChange": False,
        "stats": "{"numRecords": 8, ...}",
        "partitionValues": [],
        "tags": [],
        "deletionVector": None,
        "stats_parsed": {
            "numRecords": 8.0,
            "minValues": {...},
            "maxValues": {...
            },
            "nullCount": {...}
        }
    }
}

# content of 0000001.json
{
    "add": {
        "path": "1-057c4485-902f-4dde-8ed0-1a22a392cbf8-0.parquet",
        "partitionValues": {},
        "size": 2926,
        "modificationTime": 1712911432899,
        "dataChange": true,
        "stats": "{\"numRecords\": 8, \"minValues\": {...}}",
        "tags": null,
        "deletionVector": null,
        "baseRowId": null,
        "defaultRowCommitVersion": null,
        "clusteringProvider": null
    }
}
{
    "remove": {
        "path": "0-ec2c2342-c51f-47d2-9d39-b4d6cb15e86d-0.parquet",
        "dataChange": true,
        "deletionTimestamp": 1712911432981,
        "extendedFileMetadata": true,
        "partitionValues": {},
        "size": 3233
    }
}
{
    "commitInfo": {
        "timestamp": 1712911433141,
        "operation": "WRITE",
        "operationParameters": {
            "partitionBy": "[]",
            "mode": "Overwrite"
        },
        "clientVersion": "delta-rs.0.17.1"
    }
}

# content of 0000002.json
{
    "commitInfo": {
        "timestamp": 1712911456325,
        "operation": "VACUUM START",
        "operationParameters": {
            "defaultRetentionMillis": "604800000",
            "retentionCheckEnabled": "false",
            "specifiedRetentionMillis": "0"
        },
        "operationMetrics": {
            "numFilesToDelete": 1,
            "sizeOfDataToDelete": 3233
        },
        "clientVersion": "delta-rs.0.17.1"
    }
}

# content of 0000003.json
{
    "commitInfo": {
        "timestamp": 1712911456485,
        "operation": "VACUUM END",
        "operationParameters": {
            "status": "COMPLETED"
        },
        "operationMetrics": {
            "numDeletedFiles": 1,
            "numVacuumedDirectories": 0
        },
        "clientVersion": "delta-rs.0.17.1"
    }
}

@ion-elgreco
Copy link
Collaborator

I don't see anything wrong here. You overwrote the table, so it now also has a remove action, and that file later got vacuumed

@timon-schmelzer-gcx
Copy link
Author

Wait a second, I do not want to overwrite when adding new data, just when the table is not existing yet. Is the problem that the write config cannot be changed (or is ignored) once the table is created?

def create_or_append_delta(data: pd.DataFrame, s3_bucket: str, s3_path: str, schema: pa.Schema):
    """Create new delta table with reasonable configuration.

    If delta table already exists, appends data to it.
    """
    write_config = {
        "mode": "append",
    }
    if not folder_exists(s3_bucket, s3_path):
        print("Writing new table")
        write_config = {
            "mode": "overwrite",
            "configuration": {"delta.logRetentionDuration": "interval 0 hour"},
        }
    write_deltalake(s3_path, data, schema=schema, **write_config)

@ion-elgreco
Copy link
Collaborator

@timon-schmelzer-gcx the writer mode is not in the table config. There is likely something wrong in how you check if the table exists or not, you need to look into that

@ion-elgreco ion-elgreco closed this as not planned Won't fix, can't repro, duplicate, stale Apr 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants