Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingestion/iceberg): Improvements to iceberg source #11987

Merged
merged 6 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions datahub-web-react/src/app/ingest/source/builder/sources.json
Original file line number Diff line number Diff line change
Expand Up @@ -317,5 +317,13 @@
"displayName": "CassandraDB",
"docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/cassandra",
"recipe": "source:\n type: cassandra\n config:\n # Credentials for on prem cassandra\n contact_point: localhost\n port: 9042\n username: admin\n password: password\n\n # Or\n # Credentials Astra Cloud\n #cloud_config:\n # secure_connect_bundle: Path to Secure Connect Bundle (.zip)\n # token: Application Token\n\n # Optional Allow / Deny extraction of particular keyspaces.\n keyspace_pattern:\n allow: [.*]\n\n # Optional Allow / Deny extraction of particular tables.\n table_pattern:\n allow: [.*]"
},
{
"urn": "urn:li:dataPlatform:iceberg",
"name": "iceberg",
"displayName": "Iceberg",
"description": "Ingest databases and tables from any Iceberg catalog implementation",
"docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/iceberg",
"recipe": "source:\n type: \"iceberg\"\n config:\n env: dev\n # each thread will open internet connections to fetch manifest files independently, \n # this value needs to be adjusted with ulimit\n processing_threads: 1 \n # a single catalog definition with a form of a dictionary\n catalog: \n demo: # name of the catalog\n type: \"rest\" # other types are available\n uri: \"uri\"\n s3.access-key-id: \"access-key\"\n s3.secret-access-key: \"secret-access-key\"\n s3.region: \"aws-region\"\n profiling:\n enabled: false\n"
}
]
6 changes: 4 additions & 2 deletions metadata-ingestion/docs/sources/iceberg/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ This ingestion source maps the following Source System Concepts to DataHub Conce

## Troubleshooting

### [Common Issue]
### Exceptions while increasing `processing_threads`

[Provide description of common issues with this integration and steps to resolve]
Each processing thread will open several files/sockets to download manifest files from blob storage. If you experience
exceptions appearing when increasing `processing_threads` configuration parameter, try to increase limit of open
files (i.e. using `ulimit` in Linux).
3 changes: 2 additions & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,8 @@

iceberg_common = {
# Iceberg Python SDK
"pyiceberg>=0.4,<0.7",
# Kept at 0.4.0 due to higher versions requiring pydantic>2, as soon as we are fine with it, bump this dependency
"pyiceberg>=0.4.0",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this require 0.4.0+ version, which means a newer version is fine as well?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the question

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, we discussed it and it is fine

}

mssql_common = {
Expand Down
17 changes: 12 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
NoSuchIcebergTableError,
NoSuchNamespaceError,
NoSuchPropertyException,
NoSuchTableError,
)
from pyiceberg.schema import Schema, SchemaVisitorPerPrimitiveType, visit
from pyiceberg.table import Table
Expand Down Expand Up @@ -104,7 +105,7 @@
@capability(SourceCapability.DESCRIPTIONS, "Enabled by default.")
@capability(
SourceCapability.OWNERSHIP,
"Optionally enabled via configuration by specifying which Iceberg table property holds user or group ownership.",
"Automatically ingests ownership information from table properties based on `user_ownership_property` and `group_ownership_property`",
)
@capability(SourceCapability.DELETION_DETECTION, "Enabled via stateful ingestion")
class IcebergSource(StatefulIngestionSourceBase):
Expand Down Expand Up @@ -192,9 +193,7 @@ def _process_dataset(dataset_path: Identifier) -> Iterable[MetadataWorkUnit]:
table = thread_local.local_catalog.load_table(dataset_path)
time_taken = timer.elapsed_seconds()
self.report.report_table_load_time(time_taken)
LOGGER.debug(
f"Loaded table: {table.identifier}, time taken: {time_taken}"
)
LOGGER.debug(f"Loaded table: {table.name()}, time taken: {time_taken}")
yield from self._create_iceberg_workunit(dataset_name, table)
except NoSuchPropertyException as e:
self.report.report_warning(
Expand All @@ -206,12 +205,20 @@ def _process_dataset(dataset_path: Identifier) -> Iterable[MetadataWorkUnit]:
)
except NoSuchIcebergTableError as e:
self.report.report_warning(
"no-iceberg-table",
"not-an-iceberg-table",
f"Failed to create workunit for {dataset_name}. {e}",
)
LOGGER.warning(
f"NoSuchIcebergTableError while processing table {dataset_path}, skipping it.",
)
except NoSuchTableError as e:
self.report.report_warning(
"no-such-table",
f"Failed to create workunit for {dataset_name}. {e}",
)
LOGGER.warning(
f"NoSuchTableError while processing table {dataset_path}, skipping it.",
)
except Exception as e:
self.report.report_failure("general", f"Failed to create workunit: {e}")
LOGGER.exception(
Expand Down
Loading