Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion): schema, table filtering for redshift-usage #4396

Merged
merged 13 commits into from
Apr 2, 2022
4 changes: 4 additions & 0 deletions metadata-ingestion/source_docs/redshift.md
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,10 @@ By default, we extract usage stats for the last day, with the recommendation tha
| `user_email_pattern.allow` | | * | List of regex patterns for user emails to include in usage. |
| `user_email_pattern.deny` | | | List of regex patterns for user emails to exclude from usage. |
| `user_email_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. |
| `table_pattern.allow` | | | List of regex patterns for tables to include in ingestion. |
| `table_pattern.deny` | | | List of regex patterns for tables to exclude from ingestion. |
| `schema_pattern.allow` | | | List of regex patterns for schemas to include in ingestion. |
| `schema_pattern.deny` | | | List of regex patterns for schemas to exclude from ingestion.

## Questions

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import collections
import dataclasses
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict, Iterable, List, Optional, Union
from typing import Any, Dict, Iterable, List, Optional, Set, Union

from dateutil import parser
from pydantic import Field
Expand Down Expand Up @@ -90,19 +89,21 @@ def get_sql_alchemy_url(self):
return super().get_sql_alchemy_url()


@dataclass
class RedshiftUsageReport(SourceReport):
pass
@dataclasses.dataclass
class RedshiftUsageSourceReport(SourceReport):
filtered: Set[str] = dataclasses.field(default_factory=set)

def report_dropped(self, key: str) -> None:
self.filtered.add(key)


@dataclasses.dataclass
class RedshiftUsageSource(Source):
def __init__(self, config: RedshiftUsageConfig, ctx: PipelineContext):
self.config: RedshiftUsageConfig = config
self.report: RedshiftUsageReport = RedshiftUsageReport()
self.report: RedshiftUsageSourceReport = RedshiftUsageSourceReport()

@classmethod
def create(cls, config_dict, ctx):
def create(cls, config_dict: Dict, ctx: PipelineContext) -> "RedshiftUsageSource":
config = RedshiftUsageConfig.parse_obj(config_dict)
return cls(config, ctx)

Expand Down Expand Up @@ -233,7 +234,20 @@ def _get_redshift_history(
event_dict["endtime"] = event_dict.get("endtime").__str__()

logger.debug(f"event_dict: {event_dict}")
events.append(event_dict)
# filter based on schema and table pattern
if (
event_dict["schema"]
and self.config.schema_pattern.allowed(event_dict["schema"])
and event_dict["table"]
and self.config.table_pattern.allowed(event_dict["table"])
):
events.append(event_dict)
else:
full_table_name: str = (
f"{row['database']}.{row['schema']}.{row['table']}"
)
logger.debug(f"Filtering out {full_table_name}")
self.report.report_dropped(full_table_name)

if events:
return events
Expand Down Expand Up @@ -364,7 +378,7 @@ def _make_usage_stat(self, agg: AggregatedDataset) -> MetadataWorkUnit:
self.config.format_sql_queries,
)

def get_report(self) -> SourceReport:
def get_report(self) -> RedshiftUsageSourceReport:
return self.report

def close(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
[
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.users,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "operation",
"aspect": {
"value": "{\"timestampMillis\": 1631664000000, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"lastUpdatedTimestamp\": 1631664000000, \"actor\": \"urn:li:corpuser:test-name\", \"operationType\": \"INSERT\"}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1629795600000,
"runId": "test-redshift-usage",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.users,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "operation",
"aspect": {
"value": "{\"timestampMillis\": 1631664000000, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"lastUpdatedTimestamp\": 1631664000000, \"actor\": \"urn:li:corpuser:test-name\", \"operationType\": \"DELETE\"}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1629795600000,
"runId": "test-redshift-usage",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.users,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "datasetUsageStatistics",
"aspect": {
"value": "{\"timestampMillis\": 1631577600000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"uniqueUserCount\": 1, \"totalSqlQueries\": 1, \"topSqlQueries\": [\"select userid from users\"], \"userCounts\": [{\"user\": \"urn:li:corpuser:test-name\", \"count\": 1, \"userEmail\": \"[email protected]\"}], \"fieldCounts\": []}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1629795600000,
"runId": "test-redshift-usage",
"registryName": null,
"registryVersion": null,
"properties": null
}
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,25 @@
"properties": null
}
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.orders,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "operation",
"aspect": {
"value": "{\"timestampMillis\": 1631664000000, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"lastUpdatedTimestamp\": 1631664000000, \"actor\": \"urn:li:corpuser:real_shirshanka\", \"operationType\": \"INSERT\"}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1629795600000,
"runId": "test-redshift-usage",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"entityType": "dataset",
Expand Down Expand Up @@ -75,6 +94,25 @@
"properties": null
}
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.orders,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "operation",
"aspect": {
"value": "{\"timestampMillis\": 1631664000000, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"lastUpdatedTimestamp\": 1631664000000, \"actor\": \"urn:li:corpuser:real_shirshanka\", \"operationType\": \"DELETE\"}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1629795600000,
"runId": "test-redshift-usage",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"entityType": "dataset",
Expand Down Expand Up @@ -112,5 +150,24 @@
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.orders,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "datasetUsageStatistics",
"aspect": {
"value": "{\"timestampMillis\": 1631577600000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"uniqueUserCount\": 1, \"totalSqlQueries\": 1, \"topSqlQueries\": [\"select cost from orders\"], \"userCounts\": [{\"user\": \"urn:li:corpuser:real_shirshanka\", \"count\": 1, \"userEmail\": \"[email protected]\"}], \"fieldCounts\": []}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1629795600000,
"runId": "test-redshift-usage",
"registryName": null,
"registryVersion": null,
"properties": null
}
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,53 @@ def test_redshift_usage_source(pytestconfig, tmp_path):
)


@freeze_time(FROZEN_TIME)
def test_redshift_usage_filtering(pytestconfig, tmp_path):

test_resources_dir = pathlib.Path(
pytestconfig.rootpath / "tests/integration/redshift-usage"
)

with patch(
"datahub.ingestion.source.usage.redshift_usage.Engine.execute"
) as mock_event_history:
access_events = load_access_events(test_resources_dir)
mock_event_history.return_value = access_events

# Run ingestion
pipeline = Pipeline.create(
{
"run_id": "test-redshift-usage",
"source": {
"type": "redshift-usage",
"config": {
"host_port": "xxxxx",
"database": "xxxxx",
"username": "xxxxx",
"password": "xxxxx",
"email_domain": "acryl.io",
"include_views": True,
"include_tables": True,
"schema_pattern": {"allow": ["public"]},
"table_pattern": {"deny": ["orders"]},
},
},
"sink": {
"type": "file",
"config": {"filename": f"{tmp_path}/redshift_usages.json"},
},
},
)
pipeline.run()
pipeline.raise_from_status()

mce_helpers.check_golden_file(
pytestconfig=pytestconfig,
output_path=tmp_path / "redshift_usages.json",
golden_path=test_resources_dir / "redshift_usages_filtered_golden.json",
)


def load_access_events(test_resources_dir):
access_events_history_file = test_resources_dir / "usage_events_history.json"
with access_events_history_file.open() as access_events_json:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,17 @@
"table": "category",
"starttime": "2021-09-14 00:00:00",
"endtime": "2021-09-15 00:00:00"
},
{
"userid": 5,
"query": 293103,
"usename": "[email protected]",
"tbl": 101589,
"querytxt": "select cost from orders",
"database": "dev",
"schema": "public",
"table": "orders",
"starttime": "2021-09-14 00:00:00",
"endtime": "2021-09-15 00:00:00"
}
]