diff --git a/metadata-ingestion/source_docs/redshift.md b/metadata-ingestion/source_docs/redshift.md index 24c750315fa7a..9b0a0717d14e2 100644 --- a/metadata-ingestion/source_docs/redshift.md +++ b/metadata-ingestion/source_docs/redshift.md @@ -264,6 +264,10 @@ By default, we extract usage stats for the last day, with the recommendation tha | `user_email_pattern.allow` | | * | List of regex patterns for user emails to include in usage. | | `user_email_pattern.deny` | | | List of regex patterns for user emails to exclude from usage. | | `user_email_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. | +| `table_pattern.allow` | | | List of regex patterns for tables to include in ingestion. | +| `table_pattern.deny` | | | List of regex patterns for tables to exclude from ingestion. | +| `schema_pattern.allow` | | | List of regex patterns for schemas to include in ingestion. | +| `schema_pattern.deny` | | | List of regex patterns for schemas to exclude from ingestion. ## Questions diff --git a/metadata-ingestion/src/datahub/ingestion/source/usage/redshift_usage.py b/metadata-ingestion/src/datahub/ingestion/source/usage/redshift_usage.py index e7dd85f2e4b85..fe06b434d966a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/usage/redshift_usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/usage/redshift_usage.py @@ -1,9 +1,8 @@ import collections import dataclasses import logging -from dataclasses import dataclass from datetime import datetime -from typing import Any, Dict, Iterable, List, Optional, Union +from typing import Any, Dict, Iterable, List, Optional, Set, Union from dateutil import parser from pydantic import Field @@ -90,19 +89,21 @@ def get_sql_alchemy_url(self): return super().get_sql_alchemy_url() -@dataclass -class RedshiftUsageReport(SourceReport): - pass +@dataclasses.dataclass +class RedshiftUsageSourceReport(SourceReport): + filtered: Set[str] = dataclasses.field(default_factory=set) + + def report_dropped(self, key: str) -> None: + self.filtered.add(key) -@dataclasses.dataclass class RedshiftUsageSource(Source): def __init__(self, config: RedshiftUsageConfig, ctx: PipelineContext): self.config: RedshiftUsageConfig = config - self.report: RedshiftUsageReport = RedshiftUsageReport() + self.report: RedshiftUsageSourceReport = RedshiftUsageSourceReport() @classmethod - def create(cls, config_dict, ctx): + def create(cls, config_dict: Dict, ctx: PipelineContext) -> "RedshiftUsageSource": config = RedshiftUsageConfig.parse_obj(config_dict) return cls(config, ctx) @@ -233,7 +234,20 @@ def _get_redshift_history( event_dict["endtime"] = event_dict.get("endtime").__str__() logger.debug(f"event_dict: {event_dict}") - events.append(event_dict) + # filter based on schema and table pattern + if ( + event_dict["schema"] + and self.config.schema_pattern.allowed(event_dict["schema"]) + and event_dict["table"] + and self.config.table_pattern.allowed(event_dict["table"]) + ): + events.append(event_dict) + else: + full_table_name: str = ( + f"{row['database']}.{row['schema']}.{row['table']}" + ) + logger.debug(f"Filtering out {full_table_name}") + self.report.report_dropped(full_table_name) if events: return events @@ -364,7 +378,7 @@ def _make_usage_stat(self, agg: AggregatedDataset) -> MetadataWorkUnit: self.config.format_sql_queries, ) - def get_report(self) -> SourceReport: + def get_report(self) -> RedshiftUsageSourceReport: return self.report def close(self) -> None: diff --git a/metadata-ingestion/tests/integration/redshift-usage/redshift_usages_filtered_golden.json b/metadata-ingestion/tests/integration/redshift-usage/redshift_usages_filtered_golden.json new file mode 100644 index 0000000000000..6199f60141427 --- /dev/null +++ b/metadata-ingestion/tests/integration/redshift-usage/redshift_usages_filtered_golden.json @@ -0,0 +1,59 @@ +[ +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.users,PROD)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "operation", + "aspect": { + "value": "{\"timestampMillis\": 1631664000000, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"lastUpdatedTimestamp\": 1631664000000, \"actor\": \"urn:li:corpuser:test-name\", \"operationType\": \"INSERT\"}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1629795600000, + "runId": "test-redshift-usage", + "registryName": null, + "registryVersion": null, + "properties": null + } +}, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.users,PROD)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "operation", + "aspect": { + "value": "{\"timestampMillis\": 1631664000000, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"lastUpdatedTimestamp\": 1631664000000, \"actor\": \"urn:li:corpuser:test-name\", \"operationType\": \"DELETE\"}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1629795600000, + "runId": "test-redshift-usage", + "registryName": null, + "registryVersion": null, + "properties": null + } +}, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.users,PROD)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetUsageStatistics", + "aspect": { + "value": "{\"timestampMillis\": 1631577600000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"uniqueUserCount\": 1, \"totalSqlQueries\": 1, \"topSqlQueries\": [\"select userid from users\"], \"userCounts\": [{\"user\": \"urn:li:corpuser:test-name\", \"count\": 1, \"userEmail\": \"test-name@acryl.io\"}], \"fieldCounts\": []}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1629795600000, + "runId": "test-redshift-usage", + "registryName": null, + "registryVersion": null, + "properties": null + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/redshift-usage/redshift_usages_golden.json b/metadata-ingestion/tests/integration/redshift-usage/redshift_usages_golden.json index cea6db675569b..835d68b90c0c6 100644 --- a/metadata-ingestion/tests/integration/redshift-usage/redshift_usages_golden.json +++ b/metadata-ingestion/tests/integration/redshift-usage/redshift_usages_golden.json @@ -37,6 +37,25 @@ "properties": null } }, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.orders,PROD)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "operation", + "aspect": { + "value": "{\"timestampMillis\": 1631664000000, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"lastUpdatedTimestamp\": 1631664000000, \"actor\": \"urn:li:corpuser:real_shirshanka\", \"operationType\": \"INSERT\"}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1629795600000, + "runId": "test-redshift-usage", + "registryName": null, + "registryVersion": null, + "properties": null + } +}, { "auditHeader": null, "entityType": "dataset", @@ -75,6 +94,25 @@ "properties": null } }, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.orders,PROD)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "operation", + "aspect": { + "value": "{\"timestampMillis\": 1631664000000, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"lastUpdatedTimestamp\": 1631664000000, \"actor\": \"urn:li:corpuser:real_shirshanka\", \"operationType\": \"DELETE\"}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1629795600000, + "runId": "test-redshift-usage", + "registryName": null, + "registryVersion": null, + "properties": null + } +}, { "auditHeader": null, "entityType": "dataset", @@ -112,5 +150,24 @@ "registryVersion": null, "properties": null } +}, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.orders,PROD)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetUsageStatistics", + "aspect": { + "value": "{\"timestampMillis\": 1631577600000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"uniqueUserCount\": 1, \"totalSqlQueries\": 1, \"topSqlQueries\": [\"select cost from orders\"], \"userCounts\": [{\"user\": \"urn:li:corpuser:real_shirshanka\", \"count\": 1, \"userEmail\": \"real_shirshanka@acryl.io\"}], \"fieldCounts\": []}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1629795600000, + "runId": "test-redshift-usage", + "registryName": null, + "registryVersion": null, + "properties": null + } } ] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/redshift-usage/test_redshift_usage.py b/metadata-ingestion/tests/integration/redshift-usage/test_redshift_usage.py index 6f9c9fac28d83..6a1e2ed1ca3e1 100644 --- a/metadata-ingestion/tests/integration/redshift-usage/test_redshift_usage.py +++ b/metadata-ingestion/tests/integration/redshift-usage/test_redshift_usage.py @@ -82,6 +82,53 @@ def test_redshift_usage_source(pytestconfig, tmp_path): ) +@freeze_time(FROZEN_TIME) +def test_redshift_usage_filtering(pytestconfig, tmp_path): + + test_resources_dir = pathlib.Path( + pytestconfig.rootpath / "tests/integration/redshift-usage" + ) + + with patch( + "datahub.ingestion.source.usage.redshift_usage.Engine.execute" + ) as mock_event_history: + access_events = load_access_events(test_resources_dir) + mock_event_history.return_value = access_events + + # Run ingestion + pipeline = Pipeline.create( + { + "run_id": "test-redshift-usage", + "source": { + "type": "redshift-usage", + "config": { + "host_port": "xxxxx", + "database": "xxxxx", + "username": "xxxxx", + "password": "xxxxx", + "email_domain": "acryl.io", + "include_views": True, + "include_tables": True, + "schema_pattern": {"allow": ["public"]}, + "table_pattern": {"deny": ["orders"]}, + }, + }, + "sink": { + "type": "file", + "config": {"filename": f"{tmp_path}/redshift_usages.json"}, + }, + }, + ) + pipeline.run() + pipeline.raise_from_status() + + mce_helpers.check_golden_file( + pytestconfig=pytestconfig, + output_path=tmp_path / "redshift_usages.json", + golden_path=test_resources_dir / "redshift_usages_filtered_golden.json", + ) + + def load_access_events(test_resources_dir): access_events_history_file = test_resources_dir / "usage_events_history.json" with access_events_history_file.open() as access_events_json: diff --git a/metadata-ingestion/tests/integration/redshift-usage/usage_events_history.json b/metadata-ingestion/tests/integration/redshift-usage/usage_events_history.json index 71c577bc3afd0..f6cf8ac31837b 100644 --- a/metadata-ingestion/tests/integration/redshift-usage/usage_events_history.json +++ b/metadata-ingestion/tests/integration/redshift-usage/usage_events_history.json @@ -58,5 +58,17 @@ "table": "category", "starttime": "2021-09-14 00:00:00", "endtime": "2021-09-15 00:00:00" + }, + { + "userid": 5, + "query": 293103, + "usename": "real_shirshanka@acryl.io", + "tbl": 101589, + "querytxt": "select cost from orders", + "database": "dev", + "schema": "public", + "table": "orders", + "starttime": "2021-09-14 00:00:00", + "endtime": "2021-09-15 00:00:00" } ]