Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingestion/lookml): liquid template resolution and view-to-view cll #10542

Merged
merged 65 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
32a6ab0
wip
sid-acryl May 17, 2024
b308f91
introduce datahub sqlparser for sql parsing
sid-acryl May 20, 2024
d09b296
remove import pdb
sid-acryl May 20, 2024
3010df0
Merge branch 'master' into master+ing-510-lookml-cll
sid-acryl May 24, 2024
4dedd87
syntax1 and syntax2 support of lookml sql view
sid-acryl May 24, 2024
7922cc7
update message
sid-acryl May 24, 2024
86026c6
Merge branch 'master' into master+ing-510-lookml-cll
sid-acryl May 24, 2024
9f9d510
add if else for syntax
sid-acryl May 27, 2024
93add15
Merge branch 'master+ing-510-lookml-cll' of github.com:sid-acryl/data…
sid-acryl May 27, 2024
9dbd32b
liquid variable resolution
sid-acryl May 28, 2024
4f44fd9
liquid variable in config
sid-acryl May 29, 2024
fe96655
Merge branch 'master' into master+ing-510-lookml-cll
sid-acryl May 29, 2024
522d4a5
Merge branch 'master' into master+ing-510-lookml-cll
sid-acryl May 30, 2024
fcd9957
sqlglot_lib
sid-acryl May 30, 2024
808ded1
Merge branch 'master+ing-510-lookml-cll' of github.com:sid-acryl/data…
sid-acryl May 30, 2024
542de95
test fixes
sid-acryl Jun 1, 2024
ab61277
view to view lineage
sid-acryl Jun 5, 2024
734fbbe
Merge branch 'master' into master+ing-510-lookml-cll
sid-acryl Jun 5, 2024
7e75198
drop hive. from urn id
sid-acryl Jun 6, 2024
3d2b13b
Merge branch 'master+ing-510-lookml-cll' of github.com:sid-acryl/data…
sid-acryl Jun 6, 2024
2e6bffd
name.SQL_TABLE_NAME handling
sid-acryl Jun 7, 2024
5f9a582
added debug log
sid-acryl Jun 7, 2024
dc231a6
add stack trace
sid-acryl Jun 7, 2024
4cc0048
fix for select * from something.SQL_TABLE_NAME
sid-acryl Jun 11, 2024
54aff14
fix test cases
sid-acryl Jun 11, 2024
6c79b37
test case
sid-acryl Jun 12, 2024
b77eb3f
Merge branch 'master' into master+ing-510-lookml-cll
sid-acryl Jun 12, 2024
d58600f
rephrased description
sid-acryl Jun 12, 2024
1415b10
Merge branch 'master+ing-510-lookml-cll' of github.com:sid-acryl/data…
sid-acryl Jun 12, 2024
853ca3d
Merge branch 'master' into master+ing-510-lookml-cll
sid-acryl Jun 14, 2024
3cc0487
code refactoring to resolve circular dependency
sid-acryl Jun 14, 2024
27c3861
Merge branch 'master+ing-510-lookml-cll' of github.com:sid-acryl/data…
sid-acryl Jun 14, 2024
85c324b
Added case base logic
sid-acryl Jun 14, 2024
16fc4c9
refactored code
sid-acryl Jun 18, 2024
6cc7c60
refactored code
sid-acryl Jun 18, 2024
2b297c5
WIP
sid-acryl Jun 19, 2024
fd943a4
Merge branch 'master' into master+ing-510-lookml-cll
sid-acryl Jun 19, 2024
93a8413
Merge branch 'master+ing-510-lookml-cll' of github.com:sid-acryl/data…
sid-acryl Jun 19, 2024
2f61c16
WIP
sid-acryl Jun 19, 2024
a368a4b
refactor code
sid-acryl Jun 20, 2024
cfbba07
remove unused py file
sid-acryl Jun 20, 2024
045f769
Merge branch 'master' into master+ing-510-lookml-cll
sid-acryl Jun 21, 2024
84fa6c9
updated test case for custom tag
sid-acryl Jun 21, 2024
31e784b
Merge branch 'master+ing-510-lookml-cll' of github.com:sid-acryl/data…
sid-acryl Jun 21, 2024
7094405
update test case
sid-acryl Jun 21, 2024
ae02aae
Merge branch 'master' into master+ing-510-lookml-cll
sid-acryl Jun 24, 2024
b693251
handle special liquid variable
sid-acryl Jun 24, 2024
853fc25
Add double quotes to cover one customer use case
sid-acryl Jun 24, 2024
17f6204
add lru cache
sid-acryl Jun 24, 2024
6ae99c4
test case for sql_table_name having - in name
sid-acryl Jun 25, 2024
d72f475
Merge branch 'master' into master+ing-510-lookml-cll
sid-acryl Jun 25, 2024
0854446
Merge branch 'master' into master+ing-510-lookml-cll
sid-acryl Jun 26, 2024
12859b0
Merge branch 'master' into master+ing-510-lookml-cll
sid-acryl Jun 27, 2024
659dd3a
Merge branch 'master' into master+ing-510-lookml-cll
sid-acryl Jul 1, 2024
5ad8200
Merge branch 'master' into master+ing-510-lookml-cll
sid-acryl Jul 1, 2024
2e8c14f
refactor code
sid-acryl Jul 1, 2024
b9f8b08
fix existing golden files
sid-acryl Jul 2, 2024
cf0f45b
1. Resolve merge conflict
sid-acryl Jul 2, 2024
33985b5
Merge branch 'master' into master+ing-510-lookml-cll
sid-acryl Jul 3, 2024
519c173
Merge branch 'master' into master+ing-510-lookml-cll
sid-acryl Jul 3, 2024
0b926d0
resolve merge conflict
sid-acryl Jul 8, 2024
d5025d4
Merge branch 'master+ing-510-lookml-cll' of github.com:sid-acryl/data…
sid-acryl Jul 8, 2024
c2d2f6b
address review comments
sid-acryl Jul 8, 2024
e7008d2
Merge branch 'master' into master+ing-510-lookml-cll
sid-acryl Jul 8, 2024
8629f42
Merge branch 'master' into master+ing-510-lookml-cll
sid-acryl Jul 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@
"sql-metadata==2.2.2",
*sqllineage_lib,
"GitPython>2",
"python-liquid",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Can we remove dependencies sql-metadata and sqllineage_lib now that sqlglot parser is used ?
  2. Does sqlglot_lib need to be added to looker_common libraries instead of lookml only, unless the sqlglot imports are strictly separated into lookml only files ?

}

bigquery_common = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from functools import lru_cache
from typing import (
TYPE_CHECKING,
Any,
Dict,
Iterable,
Iterator,
Expand All @@ -23,14 +24,18 @@

from looker_sdk.error import SDKError
from looker_sdk.sdk.api40.models import (
DBConnection,
LookmlModelExplore,
LookmlModelExploreField,
User,
WriteQuery,
)
from pydantic import Field
from pydantic.class_validators import validator

import datahub.emitter.mce_builder as builder
from datahub.configuration.common import ConfigModel, ConfigurationError
from datahub.configuration.source_common import EnvConfigMixin
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import ContainerKey, create_embed_mcp
from datahub.ingestion.api.report import Report
Expand Down Expand Up @@ -93,6 +98,7 @@
TagSnapshotClass,
)
from datahub.metadata.urns import TagUrn
from datahub.sql_parsing.sqlglot_lineage import ColumnRef
from datahub.utilities.lossy_collections import LossyList, LossySet
from datahub.utilities.url_util import remove_port_from_url

Expand Down Expand Up @@ -243,7 +249,10 @@ class ViewField:
project_name: Optional[str] = None
view_name: Optional[str] = None
is_primary_key: bool = False
upstream_fields: List[str] = dataclasses_field(default_factory=list)
# It is the list of ColumnRef for derived view defined using SQL otherwise simple column name
upstream_fields: Union[List[str], List[ColumnRef]] = cast(
List[str], dataclasses_field(default_factory=list)
)


@dataclass
Expand Down Expand Up @@ -1054,7 +1063,8 @@ def _to_metadata_events( # noqa: C901
observed_lineage_ts = datetime.datetime.now(tz=datetime.timezone.utc)
for view_ref in sorted(self.upstream_views):
# set file_path to ViewFieldType.UNKNOWN if file_path is not available to keep backward compatibility
# if we raise error on file_path equal to None then existing test-cases will fail as mock data doesn't have required attributes.
# if we raise error on file_path equal to None then existing test-cases will fail as mock data
# doesn't have required attributes.
file_path: str = (
cast(str, self.upstream_views_file_path[view_ref.include])
if self.upstream_views_file_path[view_ref.include] is not None
Expand Down Expand Up @@ -1086,7 +1096,7 @@ def _to_metadata_events( # noqa: C901
fine_grained_lineages = []
if config.extract_column_level_lineage:
for field in self.fields or []:
for upstream_field in field.upstream_fields:
for upstream_field in cast(List[str], field.upstream_fields):
if len(upstream_field.split(".")) >= 2:
(view_name, field_path) = upstream_field.split(".")[
0
Expand Down Expand Up @@ -1473,3 +1483,79 @@ def get_by_id(self, id_: str) -> Optional[LookerUser]:

looker_user = LookerUser.create_looker_user(raw_user)
return looker_user


def _get_bigquery_definition(
looker_connection: DBConnection,
) -> Tuple[str, Optional[str], Optional[str]]:
platform = "bigquery"
# bigquery project ids are returned in the host field
db = looker_connection.host
schema = looker_connection.database
return (platform, db, schema)


def _get_generic_definition(
looker_connection: DBConnection, platform: Optional[str] = None
) -> Tuple[str, Optional[str], Optional[str]]:
if platform is None:
# We extract the platform from the dialect name
dialect_name = looker_connection.dialect_name
assert dialect_name is not None
# generally the first part of the dialect name before _ is the name of the platform
# versions are encoded as numbers and can be removed
# e.g. spark1 or hive2 or druid_18
platform = re.sub(r"[0-9]+", "", dialect_name.split("_")[0])

assert (
platform is not None
), f"Failed to extract a valid platform from connection {looker_connection}"
db = looker_connection.database
schema = looker_connection.schema # ok for this to be None
return (platform, db, schema)


class LookerConnectionDefinition(ConfigModel):
platform: str
default_db: str
default_schema: Optional[str] # Optional since some sources are two-level only
platform_instance: Optional[str] = None
platform_env: Optional[str] = Field(
default=None,
description="The environment that the platform is located in. Leaving this empty will inherit defaults from "
"the top level Looker configuration",
)

@validator("platform_env")
def platform_env_must_be_one_of(cls, v: Optional[str]) -> Optional[str]:
if v is not None:
return EnvConfigMixin.env_must_be_one_of(v)
return v

@validator("platform", "default_db", "default_schema")
def lower_everything(cls, v):
"""We lower case all strings passed in to avoid casing issues later"""
if v is not None:
return v.lower()

@classmethod
def from_looker_connection(
cls, looker_connection: DBConnection
) -> "LookerConnectionDefinition":
"""Dialect definitions are here: https://docs.looker.com/setup-and-management/database-config"""
extractors: Dict[str, Any] = {
"^bigquery": _get_bigquery_definition,
".*": _get_generic_definition,
}

if looker_connection.dialect_name is None:
raise ConfigurationError(
f"Unable to fetch a fully filled out connection for {looker_connection.name}. Please check your API permissions."
)
for extractor_pattern, extracting_function in extractors.items():
if re.match(extractor_pattern, looker_connection.dialect_name):
(platform, db, schema) = extracting_function(looker_connection)
return cls(platform=platform, default_db=db, default_schema=schema)
raise ConfigurationError(
f"Could not find an appropriate platform for looker_connection: {looker_connection.name} with dialect: {looker_connection.dialect_name}"
)
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
Tuple,
Type,
Union,
cast,
)

import lkml
Expand All @@ -29,7 +30,6 @@
from pydantic.fields import Field

import datahub.emitter.mce_builder as builder
from datahub.configuration import ConfigModel
from datahub.configuration.common import AllowDenyPattern, ConfigurationError
from datahub.configuration.git import GitInfo
from datahub.configuration.source_common import EnvConfigMixin
Expand Down Expand Up @@ -57,6 +57,7 @@
from datahub.ingestion.source.looker.looker_common import (
CORPUSER_DATAHUB,
LookerCommonConfig,
LookerConnectionDefinition,
LookerExplore,
LookerUtil,
LookerViewId,
Expand Down Expand Up @@ -101,6 +102,7 @@
FineGrainedLineageUpstreamTypeClass,
SubTypesClass,
)
from datahub.sql_parsing.sqlglot_lineage import ColumnRef
from datahub.utilities.lossy_collections import LossyList
from datahub.utilities.sql_parser import SQLParser

Expand Down Expand Up @@ -141,82 +143,6 @@ def deduplicate_fields(fields: List[ViewField]) -> List[ViewField]:
return new_fields


def _get_bigquery_definition(
looker_connection: DBConnection,
) -> Tuple[str, Optional[str], Optional[str]]:
platform = "bigquery"
# bigquery project ids are returned in the host field
db = looker_connection.host
schema = looker_connection.database
return (platform, db, schema)


def _get_generic_definition(
looker_connection: DBConnection, platform: Optional[str] = None
) -> Tuple[str, Optional[str], Optional[str]]:
if platform is None:
# We extract the platform from the dialect name
dialect_name = looker_connection.dialect_name
assert dialect_name is not None
# generally the first part of the dialect name before _ is the name of the platform
# versions are encoded as numbers and can be removed
# e.g. spark1 or hive2 or druid_18
platform = re.sub(r"[0-9]+", "", dialect_name.split("_")[0])

assert (
platform is not None
), f"Failed to extract a valid platform from connection {looker_connection}"
db = looker_connection.database
schema = looker_connection.schema # ok for this to be None
return (platform, db, schema)


class LookerConnectionDefinition(ConfigModel):
platform: str
default_db: str
default_schema: Optional[str] # Optional since some sources are two-level only
platform_instance: Optional[str] = None
platform_env: Optional[str] = Field(
default=None,
description="The environment that the platform is located in. Leaving this empty will inherit defaults from "
"the top level Looker configuration",
)

@validator("platform_env")
def platform_env_must_be_one_of(cls, v: Optional[str]) -> Optional[str]:
if v is not None:
return EnvConfigMixin.env_must_be_one_of(v)
return v

@validator("platform", "default_db", "default_schema")
def lower_everything(cls, v):
"""We lower case all strings passed in to avoid casing issues later"""
if v is not None:
return v.lower()

@classmethod
def from_looker_connection(
cls, looker_connection: DBConnection
) -> "LookerConnectionDefinition":
"""Dialect definitions are here: https://docs.looker.com/setup-and-management/database-config"""
extractors: Dict[str, Any] = {
"^bigquery": _get_bigquery_definition,
".*": _get_generic_definition,
}

if looker_connection.dialect_name is None:
raise ConfigurationError(
f"Unable to fetch a fully filled out connection for {looker_connection.name}. Please check your API permissions."
)
for extractor_pattern, extracting_function in extractors.items():
if re.match(extractor_pattern, looker_connection.dialect_name):
(platform, db, schema) = extracting_function(looker_connection)
return cls(platform=platform, default_db=db, default_schema=schema)
raise ConfigurationError(
f"Could not find an appropriate platform for looker_connection: {looker_connection.name} with dialect: {looker_connection.dialect_name}"
)


class LookMLSourceConfig(
LookerCommonConfig, StatefulIngestionConfigBase, EnvConfigMixin
):
Expand Down Expand Up @@ -1201,10 +1127,7 @@ def from_looker_dict(

# Parse SQL to extract dependencies.
if parse_table_names_from_sql:
(
fields,
sql_table_names,
) = cls._extract_metadata_from_derived_table_sql(
fields = cls._extract_metadata_from_derived_table_sql(
reporter,
connection,
config.env,
Expand Down Expand Up @@ -1289,22 +1212,21 @@ def _extract_metadata_from_derived_table_sql(
sql_table_name: Optional[str],
sql_query: str,
fields: List[ViewField],
) -> Tuple[List[ViewField], List[str]]:

sql_table_names: List[str] = []
) -> List[ViewField]:

logger.debug(f"Parsing sql from derived table section of view: {view_name}")
reporter.query_parse_attempts += 1

# Skip queries that contain liquid variables. We currently don't parse them correctly.
# Docs: https://cloud.google.com/looker/docs/liquid-variable-reference.
# TODO: also support ${EXTENDS} and ${TABLE}
try:
view_field_builder: ViewFieldBuilder = ViewFieldBuilder(fields)

fields, sql_table_names = view_field_builder.create_or_update_fields(
fields = view_field_builder.create_or_update_fields(
sql_query=SqlQuery(
lookml_sql_query=sql_query,
liquid_context={}, # liquid_context is variable and their value as dictionary, However we don't
# need to resolve variable to its value as we only need valid sql to generate CLL, SqlQuery by
# default setting them to NULL to make valid SQL query.
view_name=sql_table_name
if sql_table_name is not None
else view_name,
Expand All @@ -1321,7 +1243,7 @@ def _extract_metadata_from_derived_table_sql(
f"Failed to parse sql query, lineage will not be accurate. Exception: {e}",
)

return fields, sql_table_names
return fields

@classmethod
def _extract_metadata_from_derived_table_explore(
Expand Down Expand Up @@ -1673,7 +1595,30 @@ def _get_upstream_lineage(
make_schema_field_urn(
upstream_dataset_urn, upstream_field
)
for upstream_field in field.upstream_fields
for upstream_field in cast(
List[str], field.upstream_fields
sid-acryl marked this conversation as resolved.
Show resolved Hide resolved
)
],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[
make_schema_field_urn(
looker_view.id.get_urn(self.source_config),
field.name,
)
],
)
fine_grained_lineages.append(fine_grained_lineage)
else:
# View is defined as SQL
for field in looker_view.fields:
if field.upstream_fields:
fine_grained_lineage = FineGrainedLineageClass(
upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET,
upstreams=[
make_schema_field_urn(cll_ref.table, cll_ref.column)
for cll_ref in cast(
List[ColumnRef], field.upstream_fields
sid-acryl marked this conversation as resolved.
Show resolved Hide resolved
)
],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[
Expand All @@ -1685,7 +1630,7 @@ def _get_upstream_lineage(
)
fine_grained_lineages.append(fine_grained_lineage)

if upstreams != []:
if upstreams:
return UpstreamLineage(
upstreams=upstreams, fineGrainedLineages=fine_grained_lineages or None
)
Expand Down
Loading
Loading