Skip to content

Commit

Permalink
feat(ingest): capture MSSQL table+column descriptions (datahub-projec…
Browse files Browse the repository at this point in the history
…t#4579)

* feat(ingest): capture MSSQL table+column descriptions

Co-authored-by: Ravindra Lanka <[email protected]>
  • Loading branch information
2 people authored and maggiehays committed Aug 1, 2022
1 parent d85391e commit 1095c22
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 11 deletions.
104 changes: 96 additions & 8 deletions metadata-ingestion/src/datahub/ingestion/source/sql/mssql.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import urllib.parse
from typing import Dict
from typing import Any, Dict, List, Optional, Tuple

import pydantic

# This import verifies that the dependencies are available.
import sqlalchemy_pytds # noqa: F401
from sqlalchemy.engine.base import Connection
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.engine.result import ResultProxy, RowProxy

from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.sql.sql_common import (
BasicSQLAlchemyConfig,
SQLAlchemySource,
Expand All @@ -14,9 +18,8 @@

class SQLServerConfig(BasicSQLAlchemyConfig):
# defaults
host_port = "localhost:1433"
scheme = "mssql+pytds"

host_port: str = "localhost:1433"
scheme: str = "mssql+pytds"
use_odbc: bool = False
uri_args: Dict[str, str] = {}

Expand All @@ -28,14 +31,14 @@ def passwords_match(cls, v, values, **kwargs):
raise ValueError("uri_args is not supported when ODBC is disabled")
return v

def get_sql_alchemy_url(self):
def get_sql_alchemy_url(self, uri_opts: Optional[Dict[str, Any]] = None) -> str:
if self.use_odbc:
# Ensure that the import is available.
import pyodbc # noqa: F401

self.scheme = "mssql+pyodbc"

uri = super().get_sql_alchemy_url(uri_opts=None)
uri: str = super().get_sql_alchemy_url(uri_opts=uri_opts)
if self.use_odbc:
uri = f"{uri}?{urllib.parse.urlencode(self.uri_args)}"
return uri
Expand All @@ -50,10 +53,95 @@ def get_identifier(self, schema: str, table: str) -> str:


class SQLServerSource(SQLAlchemySource):
def __init__(self, config, ctx):
def __init__(self, config: SQLServerConfig, ctx: PipelineContext):
super().__init__(config, ctx, "mssql")

# Cache the table and column descriptions
self.table_descriptions: Dict[str, str] = {}
self.column_descriptions: Dict[str, str] = {}
for inspector in self.get_inspectors():
db_name: str = self.get_db_name(inspector)
with inspector.engine.connect() as conn:
self._populate_table_descriptions(conn, db_name)
self._populate_column_descriptions(conn, db_name)

def _populate_table_descriptions(self, conn: Connection, db_name: str) -> None:
# see https://stackoverflow.com/questions/5953330/how-do-i-map-the-id-in-sys-extended-properties-to-an-object-name
# also see https://www.mssqltips.com/sqlservertip/5384/working-with-sql-server-extended-properties/
table_metadata: ResultProxy = conn.execute(
"""
SELECT
SCHEMA_NAME(T.SCHEMA_ID) AS schema_name,
T.NAME AS table_name,
EP.VALUE AS table_description
FROM SYS.TABLES AS T
INNER JOIN SYS.EXTENDED_PROPERTIES AS EP
ON EP.MAJOR_ID = T.[OBJECT_ID]
AND EP.MINOR_ID = 0
AND EP.NAME = 'MS_Description'
AND EP.CLASS = 1
"""
)
for row in table_metadata: # type: RowProxy
self.table_descriptions[
f"{db_name}.{row['schema_name']}.{row['table_name']}"
] = row["table_description"]

def _populate_column_descriptions(self, conn: Connection, db_name: str) -> None:
column_metadata: RowProxy = conn.execute(
"""
SELECT
SCHEMA_NAME(T.SCHEMA_ID) AS schema_name,
T.NAME AS table_name,
C.NAME AS column_name ,
EP.VALUE AS column_description
FROM SYS.TABLES AS T
INNER JOIN SYS.ALL_COLUMNS AS C
ON C.OBJECT_ID = T.[OBJECT_ID]
INNER JOIN SYS.EXTENDED_PROPERTIES AS EP
ON EP.MAJOR_ID = T.[OBJECT_ID]
AND EP.MINOR_ID = C.COLUMN_ID
AND EP.NAME = 'MS_Description'
AND EP.CLASS = 1
"""
)
for row in column_metadata: # type: RowProxy
self.column_descriptions[
f"{db_name}.{row['schema_name']}.{row['table_name']}.{row['column_name']}"
] = row["column_description"]

@classmethod
def create(cls, config_dict, ctx):
def create(cls, config_dict: Dict, ctx: PipelineContext) -> "SQLServerSource":
config = SQLServerConfig.parse_obj(config_dict)
return cls(config, ctx)

# override to get table descriptions
def get_table_properties(
self, inspector: Inspector, schema: str, table: str
) -> Tuple[Optional[str], Optional[Dict[str, str]], Optional[str]]:
description, properties, location_urn = super().get_table_properties(
inspector, schema, table
) # type:Tuple[Optional[str], Optional[Dict[str, str]], Optional[str]]
# Update description if available.
db_name: str = self.get_db_name(inspector)
description = self.table_descriptions.get(
f"{db_name}.{schema}.{table}", description
)
return description, properties, location_urn

# override to get column descriptions
def _get_columns(
self, dataset_name: str, inspector: Inspector, schema: str, table: str
) -> List[Dict]:
columns: List[Dict] = super()._get_columns(
dataset_name, inspector, schema, table
)
# Update column description if available.
db_name: str = self.get_db_name(inspector)
for column in columns:
description: Optional[str] = self.column_descriptions.get(
f"{db_name}.{schema}.{table}.{column['name']}",
)
if description:
column["comment"] = description
return columns
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ class BasicSQLAlchemyConfig(SQLAlchemyConfig):
scheme: Optional[str] = None
sqlalchemy_uri: Optional[str] = None

def get_sql_alchemy_url(self, uri_opts=None):
def get_sql_alchemy_url(self, uri_opts: Optional[Dict[str, Any]] = None) -> str:
if not ((self.host_port and self.scheme) or self.sqlalchemy_uri):
raise ValueError("host_port and schema or connect_uri required.")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1065,7 +1065,7 @@
"externalUrl": null,
"name": "Items",
"qualifiedName": null,
"description": null,
"description": "Description for table Items of schema Foo.",
"uri": null,
"tags": []
}
Expand Down Expand Up @@ -1253,7 +1253,7 @@
"fieldPath": "LastName",
"jsonPath": null,
"nullable": false,
"description": null,
"description": "Description for column LastName of table Persons of schema Foo.",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
Expand Down
17 changes: 17 additions & 0 deletions metadata-ingestion/tests/integration/sql_server/setup/setup.sql
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,20 @@ CREATE TABLE Foo.SalesReason
)
;
GO

GO
EXEC sys.sp_addextendedproperty
@name = N'MS_Description',
@value = N'Description for table Items of schema Foo.',
@level0type = N'SCHEMA', @level0name = 'Foo',
@level1type = N'TABLE', @level1name = 'Items';
GO

GO
EXEC sys.sp_addextendedproperty
@name = N'MS_Description',
@value = N'Description for column LastName of table Persons of schema Foo.',
@level0type = N'SCHEMA', @level0name = 'Foo',
@level1type = N'TABLE', @level1name = 'Persons',
@level2type = N'COLUMN',@level2name = 'LastName';
GO

0 comments on commit 1095c22

Please sign in to comment.