Skip to content

Commit

Permalink
fix(tableau): fix for incorrect schema returned by tableau api for sn…
Browse files Browse the repository at this point in the history
…owflake connectionType (datahub-project#4577)
  • Loading branch information
mayurinehate authored and aditya-radhakrishnan committed Apr 5, 2022
1 parent 983eb9d commit add16d5
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 8 deletions.
4 changes: 2 additions & 2 deletions metadata-ingestion/source_docs/tableau.md
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ Lineage is emitted as received from Tableau's metadata API for


#### Caveats
- Tableau metadata API might return incorrect schema name for tables for some databases, leading to incorrect metadata in DataHub. Read [Using the databaseTable object in query](https://help.tableau.com/current/api/metadata_api/en-us/docs/meta_api_model.html#schema_attribute) for more details.
- Tableau metadata API might return incorrect schema name for tables for some databases, leading to incorrect metadata in DataHub. This source attempts to extract correct schema from databaseTable's fully qualified name, wherever possible. Read [Using the databaseTable object in query](https://help.tableau.com/current/api/metadata_api/en-us/docs/meta_api_model.html#schema_attribute) for caveats in using schema attribute.


### Supported Capabilities
Expand Down Expand Up @@ -484,7 +484,7 @@ sink:
| `password` | | | Tableau password, must be set if authenticating using username/password. |
| `token_name` | | | Tableau token name, must be set if authenticating using a personal access token. |
| `token_value` | | | Tableau token value, must be set if authenticating using a personal access token. |
| `projects` | | `default` | List of projects |
| `projects` | | `["default"]` | List of projects |
| `workbooks_page_size` | | 10 | Number of workbooks to query at a time using Tableau api. |
| `default_schema_map`* | | | Default schema to use when schema is not found. |
| `ingest_tags` | | `False` | Ingest Tags from source. This will override Tags entered from UI |
Expand Down
41 changes: 36 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,9 @@ def _track_custom_sql_ids(self, field: dict) -> None:
# Tableau shows custom sql datasource as a table in ColumnField.
if field.get("__typename", "") == "ColumnField":
for column in field.get("columns", []):
table_id = column.get("table", {}).get("id")
table_id = (
column.get("table", {}).get("id") if column.get("table") else None
)

if (
table_id is not None
Expand All @@ -262,7 +264,14 @@ def _create_upstream_table_lineage(
continue

upstream_db = table.get("database", {}).get("name", "")
schema = self._get_schema(table.get("schema", ""), upstream_db)
logger.debug(
"Processing Table with Connection Type: {0} and id {1}".format(
table.get("connectionType", ""), table.get("id", "")
)
)
schema = self._get_schema(
table.get("schema", ""), upstream_db, table.get("fullName", "")
)
table_urn = make_table_urn(
self.config.env,
upstream_db,
Expand Down Expand Up @@ -904,13 +913,35 @@ def emit_embedded_datasource(self, workbook: Dict) -> Iterable[MetadataWorkUnit]
yield from self.emit_datasource(datasource, workbook)

@lru_cache(maxsize=None)
def _get_schema(self, schema_provided: str, database: str) -> str:
schema = schema_provided
if not schema_provided and database in self.config.default_schema_map:
def _get_schema(self, schema_provided: str, database: str, fullName: str) -> str:

# For some databases, the schema attribute in tableau api does not return
# correct schema name for the table. For more information, see
# https://help.tableau.com/current/api/metadata_api/en-us/docs/meta_api_model.html#schema_attribute.
# Hence we extract schema from fullName whenever fullName is available
schema = self._extract_schema_from_fullName(fullName) if fullName else ""
if not schema:
schema = schema_provided
elif schema != schema_provided:
logger.debug(
"Correcting schema, provided {0}, corrected {1}".format(
schema_provided, schema
)
)

if not schema and database in self.config.default_schema_map:
schema = self.config.default_schema_map[database]

return schema

@lru_cache(maxsize=None)
def _extract_schema_from_fullName(self, fullName: str) -> str:
# fullName is observed to be in format [schemaName].[tableName]
# OR simply tableName OR [tableName]
if fullName.startswith("[") and fullName.find("].[") >= 0:
return fullName[1 : fullName.index("]")]
return ""

@lru_cache(maxsize=None)
def get_last_modified(
self, creator: str, created_at: bytes, updated_at: bytes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ class MetadataQueryException(Exception):
name
}
schema
fullName
connectionType
}
... on PublishedDatasource {
Expand All @@ -242,6 +243,10 @@ class MetadataQueryException(Exception):
tables {
name
schema
fullName
database {
name
}
connectionType
}
}
Expand Down
Loading

0 comments on commit add16d5

Please sign in to comment.