diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hive.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hive.py index 1cde2c3dcc599..f1a44989d1c5b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hive.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hive.py @@ -1,4 +1,5 @@ import json +import logging import re from typing import Any, Dict, List, Optional @@ -33,10 +34,61 @@ from datahub.utilities import config_clean from datahub.utilities.hive_schema_to_avro import get_avro_schema_for_hive_column +logger = logging.getLogger(__name__) + register_custom_type(HiveDate, DateTypeClass) register_custom_type(HiveTimestamp, TimeTypeClass) register_custom_type(HiveDecimal, NumberTypeClass) +try: + + from databricks_dbapi.sqlalchemy_dialects.hive import DatabricksPyhiveDialect + from pyhive.sqlalchemy_hive import _type_map + from sqlalchemy import types, util + from sqlalchemy.engine import reflection + + @reflection.cache # type: ignore + def dbapi_get_columns_patched(self, connection, table_name, schema=None, **kw): + """Patches the get_columns method from dbapi (databricks_dbapi.sqlalchemy_dialects.base) to pass the native type through""" + rows = self._get_table_columns(connection, table_name, schema) + # Strip whitespace + rows = [[col.strip() if col else None for col in row] for row in rows] + # Filter out empty rows and comment + rows = [row for row in rows if row[0] and row[0] != "# col_name"] + result = [] + for (col_name, col_type, _comment) in rows: + # Handle both oss hive and Databricks' hive partition header, respectively + if col_name in ("# Partition Information", "# Partitioning"): + break + # Take out the more detailed type information + # e.g. 'map' -> 'map' + # 'decimal(10,1)' -> decimal + orig_col_type = col_type # keep a copy + col_type = re.search(r"^\w+", col_type).group(0) # type: ignore + try: + coltype = _type_map[col_type] + except KeyError: + util.warn( + "Did not recognize type '%s' of column '%s'" % (col_type, col_name) + ) + coltype = types.NullType # type: ignore + result.append( + { + "name": col_name, + "type": coltype, + "nullable": True, + "default": None, + "full_type": orig_col_type, # pass it through + } + ) + return result + + DatabricksPyhiveDialect.get_columns = dbapi_get_columns_patched +except ModuleNotFoundError: + pass +except Exception as e: + logger.warning(f"Failed to patch method due to {e}") + class HiveConfig(BasicSQLAlchemyConfig): # defaults