Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest): databricks - ingest structs correctly through hive #5223

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/sql/hive.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import logging
import re
from typing import Any, Dict, List, Optional

Expand Down Expand Up @@ -33,10 +34,62 @@
from datahub.utilities import config_clean
from datahub.utilities.hive_schema_to_avro import get_avro_schema_for_hive_column

logger = logging.getLogger(__name__)

register_custom_type(HiveDate, DateTypeClass)
register_custom_type(HiveTimestamp, TimeTypeClass)
register_custom_type(HiveDecimal, NumberTypeClass)

try:

from databricks_dbapi.sqlalchemy_dialects.hive import DatabricksPyhiveDialect
from pyhive.sqlalchemy_hive import _type_map
from sqlalchemy import types, util
from sqlalchemy.engine import reflection

@reflection.cache # type: ignore
def dbapi_get_columns_patched(self, connection, table_name, schema=None, **kw):
"""Patches the get_columns method from dbapi (databricks_dbapi.sqlalchemy_dialects.base) to pass the native type through"""
rows = self._get_table_columns(connection, table_name, schema)
# Strip whitespace
rows = [[col.strip() if col else None for col in row] for row in rows]
# Filter out empty rows and comment
rows = [row for row in rows if row[0] and row[0] != "# col_name"]
result = []
for (col_name, col_type, _comment) in rows:
# Handle both oss hive and Databricks' hive partition header, respectively
if col_name in ("# Partition Information", "# Partitioning"):
break
# Take out the more detailed type information
# e.g. 'map<int,int>' -> 'map'
# 'decimal(10,1)' -> decimal
orig_col_type = col_type # keep a copy
col_type = re.search(r"^\w+", col_type).group(0) # type: ignore
try:
coltype = _type_map[col_type]
except KeyError:
util.warn(
"Did not recognize type '%s' of column '%s'" % (col_type, col_name)
)
coltype = types.NullType # type: ignore
result.append(
{
"name": col_name,
"type": coltype,
"nullable": True,
"default": None,
"full_type": orig_col_type, # pass it through
}
)
return result

DatabricksPyhiveDialect.get_columns = dbapi_get_columns_patched
except ModuleNotFoundError:
pass
except Exception as e:
logger.warning(f"Failed to patch method due to {e}")
pass
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need of pass here. Everything else looks good to me.



class HiveConfig(BasicSQLAlchemyConfig):
# defaults
Expand Down