-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingest): set pipeline name in system metadata #10190
Conversation
70c90ec
to
9bc8b31
Compare
Also fixes a bug, so now the pipeline name and run ID are set even for aspects generated by transformers.
fix mypy + some tests tweak fix
WalkthroughThis update primarily involves refactoring the way system metadata is handled across various files in the DataHub metadata ingestion module. Key changes include moving the Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant Pipeline
participant SystemMetadataTransformer
participant AutoHelperTransformer
participant MetadataWorkUnit
User->>Pipeline: Executes pipeline with flags
Pipeline->>SystemMetadataTransformer: Initialize and configure transforms
SystemMetadataTransformer->>AutoHelperTransformer: Apply transformations
AutoHelperTransformer->>MetadataWorkUnit: Convert and handle metadata
MetadataWorkUnit-->>Pipeline: Return processed work units
Pipeline-->>User: Complete execution
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Outside diff range and nitpick comments (4)
metadata-ingestion/src/datahub/ingestion/extractor/mce_extractor.py (1)
Line range hint
54-65
: Simplify nestedif
statements for clarity and efficiency.The nested
if
statements can be combined for better readability and performance.- if isinstance(workunit.metadata, MetadataChangeEvent): - if len(workunit.metadata.proposedSnapshot.aspects) == 0: - raise AttributeError("every mce must have at least one aspect") + if isinstance(workunit.metadata, MetadataChangeEvent) and len(workunit.metadata.proposedSnapshot.aspects) == 0: + raise AttributeError("every mce must have at least one aspect")metadata-ingestion/src/datahub/ingestion/run/pipeline.py (2)
Line range hint
102-103
: Optimize nested if statements.The nested
if
statements can be combined into a singleif
statement to improve readability and reduce complexity.- if "workunit_id" in record_envelope.metadata: - if isinstance(record_envelope.record, MetadataChangeProposalClass): + if "workunit_id" in record_envelope.metadata and isinstance(record_envelope.record, MetadataChangeProposalClass):
Line range hint
548-550
: Simplify boolean expression.The conditional expression can be simplified using the
bool
function to improve readability.- has_errors: bool = ( - True - if self.source.get_report().failures or self.sink.get_report().failures - else False - ) + has_errors: bool = bool(self.source.get_report().failures or self.sink.get_report().failures)metadata-ingestion/tests/unit/test_bigquery_source.py (1)
Line range hint
726-726
: Optimize dictionary key checks.The static analysis tool suggests using
key in dict
instead ofkey in dict.keys()
for efficiency. This is a valid suggestion to improve the performance of dictionary operations.- if key in dict.keys(): + if key in dict:Also applies to: 800-800
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (25)
- metadata-ingestion/docs/sources/datahub/datahub_recipe.yml (1 hunks)
- metadata-ingestion/src/datahub/cli/check_cli.py (1 hunks)
- metadata-ingestion/src/datahub/ingestion/api/workunit.py (2 hunks)
- metadata-ingestion/src/datahub/ingestion/extractor/mce_extractor.py (4 hunks)
- metadata-ingestion/src/datahub/ingestion/run/pipeline.py (2 hunks)
- metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py (1 hunks)
- metadata-ingestion/src/datahub/ingestion/transformer/auto_helper_transformer.py (1 hunks)
- metadata-ingestion/src/datahub/ingestion/transformer/system_metadata_transformer.py (1 hunks)
- metadata-ingestion/tests/integration/iceberg/docker-compose.yml (1 hunks)
- metadata-ingestion/tests/integration/iceberg/iceberg_deleted_table_mces_golden.json (4 hunks)
- metadata-ingestion/tests/integration/ldap/ldap_mces_golden_deleted_group_stateful.json (3 hunks)
- metadata-ingestion/tests/integration/ldap/ldap_mces_golden_deleted_stateful.json (3 hunks)
- metadata-ingestion/tests/integration/ldap/ldap_mces_golden_group_stateful.json (5 hunks)
- metadata-ingestion/tests/integration/ldap/ldap_mces_golden_stateful.json (4 hunks)
- metadata-ingestion/tests/integration/looker/golden_looker_mces.json (33 hunks)
- metadata-ingestion/tests/integration/looker/golden_test_independent_look_ingest.json (46 hunks)
- metadata-ingestion/tests/integration/looker/looker_mces_golden_deleted_stateful.json (34 hunks)
- metadata-ingestion/tests/integration/metabase/metabase_mces_golden.json (17 hunks)
- metadata-ingestion/tests/integration/superset/golden_test_stateful_ingest.json (6 hunks)
- metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py (3 hunks)
- metadata-ingestion/tests/test_helpers/graph_helpers.py (1 hunks)
- metadata-ingestion/tests/unit/glue/glue_deleted_actor_mces_golden.json (17 hunks)
- metadata-ingestion/tests/unit/test_bigquery_source.py (2 hunks)
- metadata-ingestion/tests/unit/test_mlflow_source.py (2 hunks)
- metadata-ingestion/tests/unit/test_usage_common.py (4 hunks)
Files skipped from review due to trivial changes (4)
- metadata-ingestion/tests/integration/ldap/ldap_mces_golden_stateful.json
- metadata-ingestion/tests/integration/looker/golden_test_independent_look_ingest.json
- metadata-ingestion/tests/integration/superset/golden_test_stateful_ingest.json
- metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py
Additional context used
Ruff
metadata-ingestion/src/datahub/ingestion/extractor/mce_extractor.py
54-65: Use a single
if
statement instead of nestedif
statementsCombine
if
statements usingand
(SIM102)
metadata-ingestion/src/datahub/ingestion/run/pipeline.py
102-103: Use a single
if
statement instead of nestedif
statements(SIM102)
548-550: Use
bool(...)
instead ofTrue if ... else False
Replace with `bool(...)
(SIM210)
metadata-ingestion/tests/unit/test_bigquery_source.py
726-726: Use
key in dict
instead ofkey in dict.keys()
Remove
.keys()
(SIM118)
800-800: Use
key in dict
instead ofkey in dict.keys()
Remove
.keys()
(SIM118)
Additional comments not posted (68)
metadata-ingestion/docs/sources/datahub/datahub_recipe.yml (1)
29-30
: Configuration Section ImprovedThe new
flags
section centralizes control flags, enhancing readability and manageability of the configuration. Ensure existing configurations are updated to reflect this change.Verification successful
Configuration Section Improved
The new
flags
section centralizes control flags, enhancing readability and manageability of the configuration. Existing configurations have been updated to reflect this change.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify that existing configurations are updated. # Test: Search for old flag usage. Expect: No occurrences. rg --type yaml 'extractor_config:\s*\n\s*set_system_metadata'Length of output: 255
Script:
#!/bin/bash # Description: Verify that existing configurations are updated. # Test: Search for old flag usage. Expect: No occurrences. rg --multiline --type yaml 'extractor_config:\s*\n\s*set_system_metadata'Length of output: 75
metadata-ingestion/tests/integration/ldap/ldap_mces_golden_deleted_group_stateful.json (1)
20-22
: System Metadata and Aspect Handling EnhancedThe addition of
lastRunId
andpipelineName
to system metadata in test cases aligns with the PR's objectives to standardize metadata handling. Verify consistency across all test files.Also applies to: 31-33, 37-39, 48-50, 54-56
Verification successful
System Metadata and Aspect Handling Enhanced
The addition of
lastRunId
andpipelineName
to system metadata in test cases aligns with the PR's objectives to standardize metadata handling. The search results confirm that thelastRunId
field is consistently used across the relevant test files.
- File: metadata-ingestion/tests/integration/ldap/ldap_mces_golden_deleted_group_stateful.json
- Lines: 20-22, 31-33, 37-39, 48-50, 54-56
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify consistency of system metadata across all test files. # Test: Search for inconsistencies in system metadata. Expect: Consistent usage. rg --type json 'lastRunId' 'tests/integration'Length of output: 102
Script:
#!/bin/bash # Description: Verify consistency of system metadata across all test files. # Locate the directory containing JSON test files and search for 'lastRunId' fd --type f --extension json --exec rg --type json 'lastRunId' {}Length of output: 352401
metadata-ingestion/tests/integration/iceberg/docker-compose.yml (1)
22-22
: Port Mapping UpdatedThe port change from
8080
to28080
may help avoid conflicts or align with external constraints. Verify that this change does not impact other network configurations.Verification successful
Port Mapping Updated
The port change from
8080
to28080
inmetadata-ingestion/tests/integration/iceberg/docker-compose.yml
does not conflict with other network configurations in the repository.
- Verified in the following file:
metadata-ingestion/tests/integration/iceberg/docker-compose.yml
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the impact of the port change on network configurations. # Test: Search for potential conflicts in network configurations. Expect: No conflicts. rg --type yaml '28080' 'tests/integration'Length of output: 98
Script:
#!/bin/bash # Locate all docker-compose.yml files in the repository fd docker-compose.yml # Search for the port 28080 in all located docker-compose.yml files fd docker-compose.yml --exec rg '28080' {}Length of output: 1370
metadata-ingestion/tests/integration/ldap/ldap_mces_golden_deleted_stateful.json (1)
29-31
: Consistent System Metadata EnhancementsThe introduction of
lastRunId
andpipelineName
in the system metadata is consistent with the overall objective of enhancing metadata management. Ensure these fields are consistently populated across all test configurations.Also applies to: 46-48, 63-65
Verification successful
Consistent System Metadata Enhancements
The introduction of
lastRunId
andpipelineName
in the system metadata is consistent with the overall objective of enhancing metadata management. The verification confirms that thepipelineName
field is consistently populated across various test configurations.
metadata-ingestion/tests/integration/ldap/ldap_mces_golden_deleted_stateful.json
contains"pipelineName": "ldap-test-pipeline"
.Ensure that the
lastRunId
field is also consistently populated across all relevant test configurations.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify consistency of system metadata across all test configurations. # Test: Search for inconsistencies in system metadata. Expect: Consistent usage. rg --type json 'pipelineName' 'tests/integration'Length of output: 105
Script:
#!/bin/bash # Description: Locate the correct directory for integration tests and verify consistency of system metadata. # Step 1: Locate the integration tests directory. fd --type d --name 'integration' # Step 2: If the directory is found, search for the 'pipelineName' field within JSON files in the located directory. if fd --type d --name 'integration' | grep -q 'integration'; then fd --type d --name 'integration' | xargs -I {} rg --type json 'pipelineName' {} else echo "Integration tests directory not found." fiLength of output: 676
Script:
#!/bin/bash # Description: Locate the correct directory for integration tests and verify consistency of system metadata. # Step 1: Locate the integration tests directory. integration_dirs=$(fd --type d 'integration') # Step 2: If the directory is found, search for the 'pipelineName' field within JSON files in the located directory. if [[ -n "$integration_dirs" ]]; then echo "$integration_dirs" | xargs -I {} rg --type json 'pipelineName' {} else echo "Integration tests directory not found." fiLength of output: 301164
metadata-ingestion/src/datahub/ingestion/transformer/system_metadata_transformer.py (1)
12-45
: New Transformer Implementation ReviewedThe implementation of the
SystemMetadataTransformer
and theauto_system_metadata
function is robust and aligns with the PR's objectives. However, thecreate
method raises aNotImplementedError
. It would be beneficial to add documentation or comments explaining why this method should not be used to instantiate the class, to guide future developers.Verification successful
New Transformer Implementation Reviewed
The implementation of the
SystemMetadataTransformer
and theauto_system_metadata
function is robust and aligns with the PR's objectives. However, thecreate
method raises aNotImplementedError
. It would be beneficial to add documentation or comments explaining why this method should not be used to instantiate the class, to guide future developers.
- No occurrences of
SystemMetadataTransformer.create
were found, confirming it is not instantiated directly in the codebase.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify that the `SystemMetadataTransformer` is not instantiated directly in other parts of the codebase. # Test: Search for direct instantiations of `SystemMetadataTransformer`. Expect: No occurrences. rg --type python 'SystemMetadataTransformer.create' 'src'Length of output: 86
Script:
#!/bin/bash # Description: Verify that the `SystemMetadataTransformer` is not instantiated directly in other parts of the codebase. # Test: Search for direct instantiations of `SystemMetadataTransformer.create`. Expect: No occurrences. rg --glob '*.py' 'SystemMetadataTransformer.create' srcLength of output: 99
Script:
#!/bin/bash # Description: Verify that the `SystemMetadataTransformer` is not instantiated directly in other parts of the codebase. # Find all Python files and search for direct instantiations of `SystemMetadataTransformer.create`. fd --extension py --exec rg 'SystemMetadataTransformer.create' {}Length of output: 67
metadata-ingestion/tests/integration/ldap/ldap_mces_golden_group_stateful.json (1)
20-22
: Confirm consistent insertion of new system metadata fields.The new fields
runId
,lastRunId
, andpipelineName
have been added consistently across different entries. Ensure these fields are correctly utilized in related tests and functionalities.Also applies to: 43-45, 60-62, 77-79, 94-96
metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py (1)
65-71
: Review new flags for system metadata settings.The new flags
set_system_metadata
andset_system_metadata_pipeline_name
are correctly implemented with clear and informative descriptions. Ensure these flags are appropriately used in the pipeline configurations.metadata-ingestion/tests/unit/test_mlflow_source.py (1)
75-75
: Review changes to metadata access in unit tests.The updates to access metadata directly via
wu.metadata
instead ofwu.get_metadata()["metadata"]
simplify the code and improve readability. Ensure that these changes are reflected in all relevant test cases.Also applies to: 103-103
metadata-ingestion/tests/test_helpers/graph_helpers.py (1)
45-45
: LGTM: Direct metadata access implemented as per new standards.This change aligns with the project's new approach to handling metadata directly, improving code clarity and reducing complexity.
metadata-ingestion/src/datahub/ingestion/api/workunit.py (1)
159-176
: Well-implemented method for constructing MetadataWorkUnit from various metadata types.The method correctly ensures that exactly one metadata type is provided and handles different types appropriately. This robust handling prevents potential data integrity issues.
metadata-ingestion/tests/integration/iceberg/iceberg_deleted_table_mces_golden.json (1)
Line range hint
153-189
: Updated system metadata fields align with project enhancements.The addition of
lastRunId
andpipelineName
to the system metadata is crucial for better tracking and consistency across tests. These changes align well with the project's objectives to enhance metadata handling.metadata-ingestion/src/datahub/cli/check_cli.py (1)
65-65
: Proper implementation of system metadata control in CLI.The addition of the
set_system_metadata
flag to the pipeline configuration in the CLI provides necessary control over metadata settings, aligning with the project's enhanced metadata handling strategy.metadata-ingestion/tests/unit/test_usage_common.py (1)
Line range hint
193-307
: Consistent and improved metadata access in unit tests.The updates to direct metadata access in unit tests simplify the code and enhance clarity, which is crucial for maintaining and extending the test suite. These changes are well-implemented and align with the project's standards.
metadata-ingestion/tests/integration/metabase/metabase_mces_golden.json (6)
65-66
: Consistent addition of system metadata fieldslastRunId
andpipelineName
.The addition of these fields aligns with the PR's objective to enhance system metadata handling. The values and structure are correctly implemented.
123-124
: Consistent system metadata updates across different entries.The fields
lastRunId
andpipelineName
have been consistently added across various entries, ensuring uniformity in metadata handling.
184-185
: System metadata fields added correctly.The addition of
lastRunId
andpipelineName
is consistent with other entries and aligns with the PR's enhancements to metadata tracking.
238-239
: Proper addition of system metadata in dashboard entries.The fields
lastRunId
andpipelineName
are added to dashboard-related entries, which is crucial for tracking and consistency in metadata management.
292-293
: Uniform system metadata updates across dashboard snapshots.The consistent addition of these fields across different dashboard snapshots ensures that the system metadata is uniformly managed.
346-347
: System metadata fields correctly added to the last dashboard entry.The addition of
lastRunId
andpipelineName
to the system metadata of the dashboard snapshot maintains consistency and aligns with the project's metadata enhancement goals.metadata-ingestion/tests/unit/glue/glue_deleted_actor_mces_golden.json (15)
20-22
: Correct implementation of system metadata fields in Glue dataset entries.The addition of
lastRunId
andpipelineName
to the system metadata is consistent with changes in other files and aligns with the PR's objectives.
37-39
: Uniform system metadata updates across Glue dataset status aspects.The fields
lastRunId
andpipelineName
have been added uniformly across various status aspects of Glue datasets, ensuring consistent metadata management.
54-56
: System metadata fields added correctly to data platform instance aspects.The addition of
lastRunId
andpipelineName
to the system metadata of data platform instance aspects is correctly implemented and consistent with other entries.
73-75
: Consistent addition of system metadata in subTypes aspects.The system metadata fields
lastRunId
andpipelineName
are consistently added across subTypes aspects of Glue datasets, aligning with the overall metadata enhancement strategy.
90-92
: Proper handling of system metadata in browsePathsV2 aspects.The fields
lastRunId
andpipelineName
are correctly added to the system metadata of browsePathsV2 aspects, ensuring uniformity in metadata tracking.
Line range hint
233-247
: Uniform system metadata updates across dataset snapshots.The consistent addition of
lastRunId
andpipelineName
across different dataset snapshots ensures that the system metadata is uniformly managed.
264-266
: System metadata fields added correctly to subTypes aspects of datasets.The addition of
lastRunId
andpipelineName
to the system metadata of subTypes aspects maintains consistency and aligns with the project's metadata enhancement goals.
281-283
: Correct implementation of system metadata fields in container aspects.The addition of
lastRunId
andpipelineName
to the system metadata of container aspects is consistent with changes in other files and aligns with the PR's objectives.
303-305
: Uniform system metadata updates across browsePathsV2 aspects of datasets.The fields
lastRunId
andpipelineName
have been added uniformly across various browsePathsV2 aspects of datasets, ensuring consistent metadata management.
Line range hint
447-461
: System metadata fields added correctly to ownership aspects of datasets.The addition of
lastRunId
andpipelineName
to the system metadata of ownership aspects is correctly implemented and consistent with other entries.
478-480
: Consistent addition of system metadata in subTypes aspects of datasets.The system metadata fields
lastRunId
andpipelineName
are consistently added across subTypes aspects of datasets, aligning with the overall metadata enhancement strategy.
495-497
: Proper handling of system metadata in container aspects of datasets.The fields
lastRunId
andpipelineName
are correctly added to the system metadata of container aspects, ensuring uniformity in metadata tracking.
517-519
: Uniform system metadata updates across browsePathsV2 aspects of datasets.The consistent addition of
lastRunId
andpipelineName
across different browsePathsV2 aspects of datasets ensures that the system metadata is uniformly managed.
534-536
: System metadata fields added correctly to status aspects of datasets.The addition of
lastRunId
andpipelineName
to the system metadata of status aspects maintains consistency and aligns with the project's metadata enhancement goals.
551-553
: Correct implementation of system metadata fields in status aspects of containers.The addition of
lastRunId
andpipelineName
to the system metadata of status aspects of containers is consistent with changes in other files and aligns with the PR's objectives.metadata-ingestion/tests/integration/looker/looker_mces_golden_deleted_stateful.json (31)
20-21
: Consistent addition of system metadata fieldslastRunId
andpipelineName
.The addition of these fields aligns with the PR's objective to enhance system metadata handling. The values and structure are correctly implemented.
37-38
: Consistent system metadata updates across different entries.The fields
lastRunId
andpipelineName
have been consistently added across various entries, ensuring uniformity in metadata handling.
54-55
: System metadata fields added correctly.The addition of
lastRunId
andpipelineName
is consistent with other entries and aligns with the PR's enhancements to metadata tracking.
73-74
: Proper addition of system metadata in dashboard entries.The fields
lastRunId
andpipelineName
are added to dashboard-related entries, which is crucial for tracking and consistency in metadata management.
94-95
: Uniform system metadata updates across dashboard snapshots.The consistent addition of these fields across different dashboard snapshots ensures that the system metadata is uniformly managed.
146-147
: System metadata fields correctly added to the last dashboard entry.The addition of
lastRunId
andpipelineName
to the system metadata of the dashboard snapshot maintains consistency and aligns with the project's metadata enhancement goals.
165-166
: Correct implementation of system metadata fields in Glue dataset entries.The addition of
lastRunId
andpipelineName
to the system metadata is consistent with changes in other files and aligns with the PR's objectives.
193-194
: Uniform system metadata updates across Glue dataset status aspects.The fields
lastRunId
andpipelineName
have been added uniformly across various status aspects of Glue datasets, ensuring consistent metadata management.
242-243
: System metadata fields added correctly to data platform instance aspects.The addition of
lastRunId
andpipelineName
to the system metadata of data platform instance aspects is correctly implemented and consistent with other entries.
259-260
: Consistent addition of system metadata in subTypes aspects.The system metadata fields
lastRunId
andpipelineName
are consistently added across subTypes aspects of Glue datasets, aligning with the overall metadata enhancement strategy.
276-277
: Proper handling of system metadata in browsePathsV2 aspects.The fields
lastRunId
andpipelineName
are correctly added to the system metadata of browsePathsV2 aspects, ensuring uniformity in metadata tracking.
301-302
: Uniform system metadata updates across browsePathsV2 aspects of datasets.The consistent addition of
lastRunId
andpipelineName
across different browsePathsV2 aspects of datasets ensures that the system metadata is uniformly managed.
363-364
: System metadata fields added correctly to ownership aspects of datasets.The addition of
lastRunId
andpipelineName
to the system metadata of ownership aspects is correctly implemented and consistent with other entries.
425-426
: Consistent addition of system metadata in subTypes aspects of datasets.The system metadata fields
lastRunId
andpipelineName
are consistently added across subTypes aspects of datasets, aligning with the overall metadata enhancement strategy.
447-448
: Correct implementation of system metadata fields in container aspects.The addition of
lastRunId
andpipelineName
to the system metadata of container aspects is consistent with changes in other files and aligns with the PR's objectives.
464-465
: Uniform system metadata updates across container status aspects.The fields
lastRunId
andpipelineName
have been added uniformly across various status aspects of containers, ensuring consistent metadata management.
481-482
: System metadata fields added correctly to data platform instance aspects.The addition of
lastRunId
andpipelineName
to the system metadata of data platform instance aspects is correctly implemented and consistent with other entries.
500-501
: Consistent addition of system metadata in subTypes aspects.The system metadata fields
lastRunId
andpipelineName
are consistently added across subTypes aspects of containers, aligning with the overall metadata enhancement strategy.
521-522
: Proper handling of system metadata in browsePathsV2 aspects.The fields
lastRunId
andpipelineName
are correctly added to the system metadata of browsePathsV2 aspects, ensuring uniformity in metadata tracking.
619-620
: Uniform system metadata updates across dataset snapshots.The consistent addition of
lastRunId
andpipelineName
across different dataset snapshots ensures that the system metadata is uniformly managed.
638-639
: System metadata fields added correctly to subTypes aspects of datasets.The addition of
lastRunId
andpipelineName
to the system metadata of subTypes aspects maintains consistency and aligns with the project's metadata enhancement goals.
655-656
: Correct implementation of system metadata fields in Glue dataset entries.The addition of
lastRunId
andpipelineName
to the system metadata is consistent with changes in other files and aligns with the PR's objectives.
672-673
: Uniform system metadata updates across Glue dataset status aspects.The fields
lastRunId
andpipelineName
have been added uniformly across various status aspects of Glue datasets, ensuring consistent metadata management.
697-698
: System metadata fields added correctly to data platform instance aspects.The addition of
lastRunId
andpipelineName
to the system metadata of data platform instance aspects is correctly implemented and consistent with other entries.
718-719
: Consistent addition of system metadata in subTypes aspects.The system metadata fields
lastRunId
andpipelineName
are consistently added across subTypes aspects of Glue datasets, aligning with the overall metadata enhancement strategy.
739-740
: Proper handling of system metadata in browsePathsV2 aspects.The fields
lastRunId
andpipelineName
are correctly added to the system metadata of browsePathsV2 aspects, ensuring uniformity in metadata tracking.
760-761
: Uniform system metadata updates across browsePathsV2 aspects of datasets.The consistent addition of
lastRunId
andpipelineName
across different browsePathsV2 aspects of datasets ensures that the system metadata is uniformly managed.
777-778
: System metadata fields added correctly to ownership aspects of datasets.The addition of
lastRunId
andpipelineName
to the system metadata of ownership aspects is correctly implemented and consistent with other entries.
794-795
: Consistent addition of system metadata in subTypes aspects of datasets.The system metadata fields
lastRunId
andpipelineName
are consistently added across subTypes aspects of datasets, aligning with the overall metadata enhancement strategy.
811-812
: Correct implementation of system metadata fields in container aspects.The addition of
lastRunId
andpipelineName
to the system metadata of container aspects is consistent with changes in other files and aligns with the PR's objectives.
828-829
: Uniform system metadata updates across container status aspects.The fields
lastRunId
andpipelineName
have been added uniformly across various status aspects of containers, ensuring consistent metadata management.metadata-ingestion/src/datahub/ingestion/run/pipeline.py (2)
42-44
: Import of SystemMetadataTransformer is correct and well-placed.This import is necessary for the use of
SystemMetadataTransformer
later in the file. The placement is consistent with other imports, maintaining good organization.
323-325
: Proper addition of SystemMetadataTransformer to the pipeline.The addition of
SystemMetadataTransformer
at the end of the transformers list ensures that system metadata is applied after all other transformations, which is crucial for maintaining data integrity and traceability.metadata-ingestion/tests/integration/looker/golden_looker_mces.json (1)
38-39
: Consistent addition of system metadata across test entities.The addition of
lastRunId
andpipelineName
to thesystemMetadata
section across various test entities is consistent and aligns with the PR objectives to ensure these attributes are set across all relevant aspects.Also applies to: 55-56, 117-118, 155-156, 237-238, 259-260, 276-277, 293-294, 312-313, 355-356, 372-373, 389-390, 408-409, 429-430, 527-528, 546-547, 563-564, 580-581, 605-606, 703-704, 722-723, 739-740, 756-757, 781-782, 802-803, 823-824, 844-845, 861-862, 878-879, 895-896, 912-913, 929-930
class AutoHelperTransformer(Transformer): | ||
"""Converts an auto_* source helper into a transformer. | ||
|
||
Important usage note: this assumes that the auto helper is stateless. The converter | ||
will be called multiple times, once for each batch of records. If the helper | ||
attempts to maintain state or perform some cleanup at the end of the stream, it | ||
will not behave correctly. | ||
""" | ||
|
||
def __init__( | ||
self, | ||
converter: Callable[[Iterable[MetadataWorkUnit]], Iterable[MetadataWorkUnit]], | ||
): | ||
self.converter = converter | ||
|
||
def transform( | ||
self, record_envelopes: Iterable[RecordEnvelope] | ||
) -> Iterable[RecordEnvelope]: | ||
records = list(record_envelopes) | ||
|
||
normal_records = [r for r in records if not isinstance(r.record, ControlRecord)] | ||
control_records = [r for r in records if isinstance(r.record, ControlRecord)] | ||
|
||
yield from self._from_workunits( | ||
self.converter( | ||
self._into_workunits(normal_records), | ||
) | ||
) | ||
|
||
# Pass through control records as-is. Note that this isn't fully correct, since it technically | ||
# reorders the control records relative to the normal records. This is ok since the only control | ||
# record we have marks the end of the stream. | ||
yield from control_records | ||
|
||
@classmethod | ||
def _into_workunits( | ||
cls, | ||
stream: Iterable[ | ||
RecordEnvelope[ | ||
Union[ | ||
MetadataChangeEvent, | ||
MetadataChangeProposal, | ||
MetadataChangeProposalWrapper, | ||
] | ||
] | ||
], | ||
) -> Iterable[MetadataWorkUnit]: | ||
for record in stream: | ||
workunit_id: Optional[str] = record.metadata.get("workunit_id") | ||
metadata = record.record | ||
yield MetadataWorkUnit.from_metadata(metadata, id=workunit_id) | ||
|
||
@classmethod | ||
def _from_workunits( | ||
cls, stream: Iterable[MetadataWorkUnit] | ||
) -> Iterable[RecordEnvelope]: | ||
for workunit in stream: | ||
yield RecordEnvelope( | ||
workunit.metadata, | ||
{ | ||
"workunit_id": workunit.id, | ||
}, | ||
) | ||
|
||
@classmethod | ||
def create(cls, config_dict: dict, ctx: PipelineContext) -> Transformer: | ||
raise NotImplementedError(f"{cls.__name__} cannot be created from config") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review of new AutoHelperTransformer
class.
The AutoHelperTransformer
class is well-implemented with clear separation of responsibilities and good use of list comprehensions for filtering records. However, consider improving the handling of control records to avoid potential issues with record ordering.
- # Pass through control records as-is. Note that this isn't fully correct, since it technically
- # reorders the control records relative to the normal records. This is ok since the only control
- # record we have marks the end of the stream.
+ # TODO: Revisit the handling of control records to ensure correct ordering relative to normal records.
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
class AutoHelperTransformer(Transformer): | |
"""Converts an auto_* source helper into a transformer. | |
Important usage note: this assumes that the auto helper is stateless. The converter | |
will be called multiple times, once for each batch of records. If the helper | |
attempts to maintain state or perform some cleanup at the end of the stream, it | |
will not behave correctly. | |
""" | |
def __init__( | |
self, | |
converter: Callable[[Iterable[MetadataWorkUnit]], Iterable[MetadataWorkUnit]], | |
): | |
self.converter = converter | |
def transform( | |
self, record_envelopes: Iterable[RecordEnvelope] | |
) -> Iterable[RecordEnvelope]: | |
records = list(record_envelopes) | |
normal_records = [r for r in records if not isinstance(r.record, ControlRecord)] | |
control_records = [r for r in records if isinstance(r.record, ControlRecord)] | |
yield from self._from_workunits( | |
self.converter( | |
self._into_workunits(normal_records), | |
) | |
) | |
# Pass through control records as-is. Note that this isn't fully correct, since it technically | |
# reorders the control records relative to the normal records. This is ok since the only control | |
# record we have marks the end of the stream. | |
yield from control_records | |
@classmethod | |
def _into_workunits( | |
cls, | |
stream: Iterable[ | |
RecordEnvelope[ | |
Union[ | |
MetadataChangeEvent, | |
MetadataChangeProposal, | |
MetadataChangeProposalWrapper, | |
] | |
] | |
], | |
) -> Iterable[MetadataWorkUnit]: | |
for record in stream: | |
workunit_id: Optional[str] = record.metadata.get("workunit_id") | |
metadata = record.record | |
yield MetadataWorkUnit.from_metadata(metadata, id=workunit_id) | |
@classmethod | |
def _from_workunits( | |
cls, stream: Iterable[MetadataWorkUnit] | |
) -> Iterable[RecordEnvelope]: | |
for workunit in stream: | |
yield RecordEnvelope( | |
workunit.metadata, | |
{ | |
"workunit_id": workunit.id, | |
}, | |
) | |
@classmethod | |
def create(cls, config_dict: dict, ctx: PipelineContext) -> Transformer: | |
raise NotImplementedError(f"{cls.__name__} cannot be created from config") | |
# TODO: Revisit the handling of control records to ensure correct ordering relative to normal records. |
Co-authored-by: david-leifker <[email protected]>
…0190) Co-authored-by: david-leifker <[email protected]>
Also fixes a bug, so now the pipeline name and run ID are set even for
aspects generated by transformers.
Checklist
Summary by CodeRabbit
New Features
SystemMetadataTransformer
for automatic metadata addition in pipelines.AutoHelperTransformer
to convert source helpers into transformers.Refactor
flags
configuration.Bug Fixes
Tests