Skip to content

Commit

Permalink
fix(ingest): resolve missing numeric types for profiling (#11991)
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurinehate authored Dec 2, 2024
1 parent ce6474d commit 59181e4
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@
convert_to_cardinality,
)
from datahub.ingestion.source.sql.sql_report import SQLSourceReport
from datahub.metadata.com.linkedin.pegasus2avro.schema import EditableSchemaMetadata
from datahub.ingestion.source.sql.sql_types import resolve_sql_type
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
EditableSchemaMetadata,
NumberType,
)
from datahub.metadata.schema_classes import (
DatasetFieldProfileClass,
DatasetProfileClass,
Expand Down Expand Up @@ -361,6 +365,8 @@ class _SingleDatasetProfiler(BasicDatasetProfilerBase):
platform: str
env: str

column_types: Dict[str, str] = dataclasses.field(default_factory=dict)

def _get_columns_to_profile(self) -> List[str]:
if not self.config.any_field_level_metrics_enabled():
return []
Expand All @@ -374,6 +380,7 @@ def _get_columns_to_profile(self) -> List[str]:

for col_dict in self.dataset.columns:
col = col_dict["name"]
self.column_types[col] = str(col_dict["type"])
# We expect the allow/deny patterns to specify '<table_pattern>.<column_pattern>'
if not self.config._allow_deny_patterns.allowed(
f"{self.dataset_name}.{col}"
Expand Down Expand Up @@ -430,6 +437,21 @@ def _get_column_type(self, column_spec: _SingleColumnSpec, column: str) -> None:
self.dataset, column
)

if column_spec.type_ == ProfilerDataType.UNKNOWN:
try:
datahub_field_type = resolve_sql_type(
self.column_types[column], self.dataset.engine.dialect.name.lower()
)
except Exception as e:
logger.debug(
f"Error resolving sql type {self.column_types[column]}: {e}"
)
datahub_field_type = None
if datahub_field_type is None:
return
if isinstance(datahub_field_type, NumberType):
column_spec.type_ = ProfilerDataType.NUMERIC

@_run_with_query_combiner
def _get_column_cardinality(
self, column_spec: _SingleColumnSpec, column: str
Expand Down
16 changes: 14 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/sql/sql_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,6 @@ def resolve_vertica_modified_type(type_string: str) -> Any:
return VERTICA_SQL_TYPES_MAP[type_string]


# see https://docs.snowflake.com/en/sql-reference/intro-summary-data-types.html
SNOWFLAKE_TYPES_MAP: Dict[str, Any] = {
"NUMBER": NumberType,
"DECIMAL": NumberType,
Expand Down Expand Up @@ -312,6 +311,18 @@ def resolve_vertica_modified_type(type_string: str) -> Any:
"GEOGRAPHY": None,
}


def resolve_snowflake_modified_type(type_string: str) -> Any:
# Match types with precision and scale, e.g., 'DECIMAL(38,0)'
match = re.match(r"([a-zA-Z_]+)\(\d+,\s\d+\)", type_string)
if match:
modified_type_base = match.group(1) # Extract the base type
return SNOWFLAKE_TYPES_MAP.get(modified_type_base, None)

# Fallback for types without precision/scale
return SNOWFLAKE_TYPES_MAP.get(type_string, None)


# see https://github.com/googleapis/python-bigquery-sqlalchemy/blob/main/sqlalchemy_bigquery/_types.py#L32
BIGQUERY_TYPES_MAP: Dict[str, Any] = {
"STRING": StringType,
Expand Down Expand Up @@ -380,6 +391,7 @@ def resolve_vertica_modified_type(type_string: str) -> Any:
"row": RecordType,
"map": MapType,
"array": ArrayType,
"json": RecordType,
}

# https://docs.aws.amazon.com/athena/latest/ug/data-types.html
Expand Down Expand Up @@ -490,7 +502,7 @@ def resolve_sql_type(
TypeClass = resolve_vertica_modified_type(column_type)
elif platform == "snowflake":
# Snowflake types are uppercase, so we check that.
TypeClass = _merged_mapping.get(column_type.upper())
TypeClass = resolve_snowflake_modified_type(column_type.upper())

if TypeClass:
return TypeClass()
Expand Down

0 comments on commit 59181e4

Please sign in to comment.