Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adding support for glue #2319

Merged
merged 1 commit into from
Apr 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions metadata-ingestion/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ We use a plugin architecture so that you can install only the dependencies you a
| console | _included by default_ | Console sink |
| athena | `pip install -e '.[athena]'` | AWS Athena source |
| bigquery | `pip install -e '.[bigquery]'` | BigQuery source |
| glue | `pip install -e '.[glue]'` | AWS Glue source |
| hive | `pip install -e '.[hive]'` | Hive source |
| mssql | `pip install -e '.[mssql]'` | SQL Server source |
| mysql | `pip install -e '.[mysql]'` | MySQL source |
Expand Down Expand Up @@ -355,6 +356,24 @@ source:
# table_pattern/schema_pattern is same as above
```

### AWS Glue `glue`
Extracts:
- List of tables
- Column types associated with each table
- Table metadata, such as owner, description and parameters
```yml
source:
type: glue
config:
aws_region: aws_region_name # i.e. "eu-west-1"
env: environment used for the DatasetSnapshot URN, one of "DEV", "EI", "PROD" or "CORP". # Optional, defaults to "PROD".
databases: list of databases to process. # Optional, if not specified then all databases will be processed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would like to standardize on the allow-deny-list pattern for these across all the sources.

e.g. schema_pattern and table_pattern in SQL-based sources such as
https://datahubproject.io/docs/metadata-ingestion#mysql-metadata-mysql

Not a show-stopper, and can be done in a follow-on PR.

aws_access_key_id # Optional. If not specified, credentials are picked up according to boto3 rules.
# See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html
aws_secret_access_key # Optional.
aws_session_token # Optional.
```

### Druid `druid`

Extracts:
Expand Down
9 changes: 9 additions & 0 deletions metadata-ingestion/examples/recipes/glue_to_datahub.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
source:
type: glue
config:
aws_region: "us-east-1"

sink:
type: "datahub-rest"
config:
server: 'http://localhost:8080'
5 changes: 5 additions & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def get_long_description():
# Sink plugins.
"datahub-kafka": kafka_common,
"datahub-rest": {"requests>=2.25.1"},
"glue": {"boto3"},
Copy link
Contributor

@shirshanka shirshanka Apr 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should be moved up to the #Source plugins. area.

}

dev_requirements = {
Expand All @@ -97,6 +98,8 @@ def get_long_description():
"pytest-docker",
"sqlalchemy-stubs",
"deepdiff",
"freezegun",
"botocore",
# Also add the plugins which are used for tests.
*list(
dependency
Expand All @@ -108,6 +111,7 @@ def get_long_description():
"ldap",
"datahub-kafka",
"datahub-rest",
"glue",
]
for dependency in plugins[plugin]
),
Expand Down Expand Up @@ -159,6 +163,7 @@ def get_long_description():
"bigquery = datahub.ingestion.source.bigquery:BigQuerySource",
"dbt = datahub.ingestion.source.dbt:DBTSource",
"druid = datahub.ingestion.source.druid:DruidSource",
"glue = datahub.ingestion.source.glue:GlueSource",
"hive = datahub.ingestion.source.hive:HiveSource",
"kafka = datahub.ingestion.source.kafka:KafkaSource",
"ldap = datahub.ingestion.source.ldap:LDAPSource",
Expand Down
268 changes: 268 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/glue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
import time
from dataclasses import dataclass
from typing import Dict, Iterable, List, Optional

import boto3

from datahub.configuration import ConfigModel
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.source.metadata_common import MetadataWorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.common import AuditStamp, Status
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
ArrayTypeClass,
BooleanTypeClass,
BytesTypeClass,
DateTypeClass,
MySqlDDL,
NullTypeClass,
NumberTypeClass,
SchemaField,
SchemaFieldDataType,
SchemaMetadata,
StringTypeClass,
TimeTypeClass,
UnionTypeClass,
)
from datahub.metadata.schema_classes import (
AuditStampClass,
DatasetPropertiesClass,
MapTypeClass,
OwnerClass,
OwnershipClass,
OwnershipTypeClass,
)


class GlueSourceConfig(ConfigModel):
env: str = "PROD"
databases: Optional[List[str]] = None
aws_access_key_id: Optional[str] = None
amonkhouse marked this conversation as resolved.
Show resolved Hide resolved
aws_secret_access_key: Optional[str] = None
aws_session_token: Optional[str] = None
aws_region: str

@property
def glue_client(self):
if (
self.aws_access_key_id
and self.aws_secret_access_key
and self.aws_session_token
):
return boto3.client(
"glue",
aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key,
aws_session_token=self.aws_session_token,
region_name=self.aws_region,
)
elif self.aws_access_key_id and self.aws_secret_access_key:
return boto3.client(
"glue",
aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key,
region_name=self.aws_region,
)
else:
return boto3.client("glue", region_name=self.aws_region)


@dataclass
class GlueSourceReport(SourceReport):
tables_scanned = 0

def report_table_scanned(self) -> None:
self.tables_scanned += 1


class GlueSource(Source):
source_config: GlueSourceConfig
report = GlueSourceReport()

def __init__(self, config: GlueSourceConfig, ctx: PipelineContext):
super().__init__(ctx)
self.source_config = config
self.report = GlueSourceReport()
self.glue_client = config.glue_client
self.env = config.env
self.databases = config.databases

@classmethod
def create(cls, config_dict, ctx):
config = GlueSourceConfig.parse_obj(config_dict)
return cls(config, ctx)

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
def get_all_tables(database_names: Optional[List[str]]):
def get_tables_from_database(database_name: str, tables: List):
kwargs = {"DatabaseName": database_name}
while True:
data = self.glue_client.get_tables(**kwargs)
tables += data["TableList"]
if "NextToken" in data:
kwargs["NextToken"] = data["NextToken"]
else:
break
return tables

def get_tables_from_all_databases():
tables = []
kwargs: Dict = {}
while True:
data = self.glue_client.search_tables(**kwargs)
tables += data["TableList"]
if "NextToken" in data:
kwargs["NextToken"] = data["NextToken"]
else:
break
return tables

if database_names:
all_tables: List = []
for database in database_names:
all_tables += get_tables_from_database(database, all_tables)
else:
all_tables = get_tables_from_all_databases()
return all_tables

tables = get_all_tables(self.databases)

for table in tables:
table_name = table["Name"]
database_name = table["DatabaseName"]
full_table_name = f"{database_name}.{table_name}"

self.report.report_table_scanned()
mce = self._extract_record(table, full_table_name)
workunit = MetadataWorkUnit(id=f"glue-{full_table_name}", mce=mce)
self.report.report_workunit(workunit)
yield workunit

def _extract_record(self, table: Dict, table_name: str) -> MetadataChangeEvent:
def get_owner(time) -> OwnershipClass:
owner = table.get("Owner")
if owner:
owners = [
OwnerClass(
owner=f"urn:li:corpuser:{owner}",
type=OwnershipTypeClass.DATAOWNER,
)
]
else:
owners = []
return OwnershipClass(
owners=owners,
lastModified=AuditStampClass(
time=time,
actor="urn:li:corpuser:datahub",
),
)

def get_dataset_properties() -> DatasetPropertiesClass:
return DatasetPropertiesClass(
description=table.get("Description"),
customProperties={
**table.get("Parameters", {}),
**{
k: str(v)
for k, v in table["StorageDescriptor"].items()
if k not in ["Columns", "Parameters"]
},
},
uri=table.get("Location"),
tags=[],
)

def get_schema_metadata(glue_source: GlueSource):
schema = table["StorageDescriptor"]["Columns"]
fields: List[SchemaField] = []
for field in schema:
schema_field = SchemaField(
fieldPath=field["Name"],
nativeDataType=field["Type"],
type=get_column_type(
glue_source, field["Type"], table_name, field["Name"]
),
description=field.get("Comment"),
recursive=False,
nullable=True,
)
fields.append(schema_field)
return SchemaMetadata(
schemaName=table_name,
version=0,
fields=fields,
platform="urn:li:dataPlatform:glue",
created=AuditStamp(time=sys_time, actor="urn:li:corpuser:etl"),
lastModified=AuditStamp(time=sys_time, actor="urn:li:corpuser:etl"),
hash="",
platformSchema=MySqlDDL(tableSchema=""),
)

sys_time = int(time.time() * 1000)
metadata_record = MetadataChangeEvent()
dataset_snapshot = DatasetSnapshot(
urn=f"urn:li:dataset:(urn:li:dataPlatform:glue,{table_name},{self.env})",
aspects=[],
)

dataset_snapshot.aspects.append(Status(removed=False))
dataset_snapshot.aspects.append(get_owner(sys_time))
dataset_snapshot.aspects.append(get_dataset_properties())
dataset_snapshot.aspects.append(get_schema_metadata(self))

metadata_record.proposedSnapshot = dataset_snapshot

return metadata_record

def get_report(self):
return self.report


def get_column_type(
glue_source: GlueSource, field_type: str, table_name: str, field_name: str
) -> SchemaFieldDataType:
field_type_mapping = {
"array": ArrayTypeClass,
"bigint": NumberTypeClass,
"binary": BytesTypeClass,
"boolean": BooleanTypeClass,
"char": StringTypeClass,
"date": DateTypeClass,
"decimal": NumberTypeClass,
"double": NumberTypeClass,
"float": NumberTypeClass,
"int": NumberTypeClass,
"integer": NumberTypeClass,
"interval": TimeTypeClass,
"long": NumberTypeClass,
"map": MapTypeClass,
"null": NullTypeClass,
"set": ArrayTypeClass,
"smallint": NumberTypeClass,
"string": StringTypeClass,
"struct": MapTypeClass,
"timestamp": TimeTypeClass,
"tinyint": NumberTypeClass,
"union": UnionTypeClass,
"varchar": StringTypeClass,
}

if field_type in field_type_mapping.keys():
type_class = field_type_mapping[field_type]
elif field_type.startswith("array"):
type_class = ArrayTypeClass
elif field_type.startswith("map") or field_type.startswith("struct"):
type_class = MapTypeClass
elif field_type.startswith("set"):
type_class = ArrayTypeClass
else:
glue_source.report.report_warning(
field_type,
f"The type '{field_type}' is not recognised for field '{field_name}' in table '{table_name}', setting as StringTypeClass.",
)
type_class = StringTypeClass
data_type = SchemaFieldDataType(type=type_class())
return data_type
Loading