Skip to content

Commit

Permalink
Merge branch 'master' into dml-vuln-updates-1
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored Mar 7, 2024
2 parents a35c38d + c4a4532 commit beeed74
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,11 @@ class RedshiftConfig(

use_lineage_v2: bool = Field(
default=False,
description="Whether to use the new SQL-based lineage and usage collector.",
description="Whether to use the new SQL-based lineage collector.",
)
lineage_v2_generate_queries: bool = Field(
default=True,
description="Whether to generate queries entities for the new SQL-based lineage collector.",
)

include_table_lineage: bool = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ def __init__(
platform_instance=self.config.platform_instance,
env=self.config.env,
generate_lineage=True,
generate_queries=True,
generate_usage_statistics=True,
generate_operations=True,
generate_queries=self.config.lineage_v2_generate_queries,
generate_usage_statistics=False,
generate_operations=False,
usage_config=self.config,
graph=self.context.graph,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
LINEAGE_EXTRACTION,
METADATA_EXTRACTION,
PROFILING,
USAGE_EXTRACTION_INGESTION,
)
from datahub.metadata.com.linkedin.pegasus2avro.common import SubTypes, TimeStamp
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
Expand Down Expand Up @@ -429,12 +430,13 @@ def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit
)

self.report.report_ingestion_stage_start(LINEAGE_EXTRACTION)
yield from self.extract_lineage_usage_v2(
yield from self.extract_lineage_v2(
connection=connection,
database=database,
lineage_extractor=lineage_extractor,
)

all_tables = self.get_all_tables()
else:
yield from self.process_schemas(connection, database)

Expand All @@ -450,10 +452,11 @@ def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit
connection=connection, all_tables=all_tables, database=database
)

if self.config.include_usage_statistics:
yield from self.extract_usage(
connection=connection, all_tables=all_tables, database=database
)
self.report.report_ingestion_stage_start(USAGE_EXTRACTION_INGESTION)
if self.config.include_usage_statistics:
yield from self.extract_usage(
connection=connection, all_tables=all_tables, database=database
)

if self.config.is_profiling_enabled():
self.report.report_ingestion_stage_start(PROFILING)
Expand Down Expand Up @@ -951,7 +954,7 @@ def extract_lineage(
self.config.start_time, self.config.end_time
)

def extract_lineage_usage_v2(
def extract_lineage_v2(
self,
connection: redshift_connector.Connection,
database: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@
"policy_id": "PREFIXdatahub_usage_event_policy",
"description": "Datahub Usage Event Policy",
"default_state": "Rollover",
"schema_version": 3,
"schema_version": 4,
"states": [
{
"name": "Rollover",
"actions": [
{
"rollover": {
"min_size": "5gb",
"min_index_age": "1d"
"min_size": "5gb"
}
}
],
Expand Down

0 comments on commit beeed74

Please sign in to comment.