Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingestion/lookml): liquid template resolution and view-to-view cll #10542

Merged
merged 65 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
32a6ab0
wip
sid-acryl May 17, 2024
b308f91
introduce datahub sqlparser for sql parsing
sid-acryl May 20, 2024
d09b296
remove import pdb
sid-acryl May 20, 2024
3010df0
Merge branch 'master' into master+ing-510-lookml-cll
sid-acryl May 24, 2024
4dedd87
syntax1 and syntax2 support of lookml sql view
sid-acryl May 24, 2024
7922cc7
update message
sid-acryl May 24, 2024
86026c6
Merge branch 'master' into master+ing-510-lookml-cll
sid-acryl May 24, 2024
9f9d510
add if else for syntax
sid-acryl May 27, 2024
93add15
Merge branch 'master+ing-510-lookml-cll' of github.com:sid-acryl/data…
sid-acryl May 27, 2024
9dbd32b
liquid variable resolution
sid-acryl May 28, 2024
4f44fd9
liquid variable in config
sid-acryl May 29, 2024
fe96655
Merge branch 'master' into master+ing-510-lookml-cll
sid-acryl May 29, 2024
522d4a5
Merge branch 'master' into master+ing-510-lookml-cll
sid-acryl May 30, 2024
fcd9957
sqlglot_lib
sid-acryl May 30, 2024
808ded1
Merge branch 'master+ing-510-lookml-cll' of github.com:sid-acryl/data…
sid-acryl May 30, 2024
542de95
test fixes
sid-acryl Jun 1, 2024
ab61277
view to view lineage
sid-acryl Jun 5, 2024
734fbbe
Merge branch 'master' into master+ing-510-lookml-cll
sid-acryl Jun 5, 2024
7e75198
drop hive. from urn id
sid-acryl Jun 6, 2024
3d2b13b
Merge branch 'master+ing-510-lookml-cll' of github.com:sid-acryl/data…
sid-acryl Jun 6, 2024
2e6bffd
name.SQL_TABLE_NAME handling
sid-acryl Jun 7, 2024
5f9a582
added debug log
sid-acryl Jun 7, 2024
dc231a6
add stack trace
sid-acryl Jun 7, 2024
4cc0048
fix for select * from something.SQL_TABLE_NAME
sid-acryl Jun 11, 2024
54aff14
fix test cases
sid-acryl Jun 11, 2024
6c79b37
test case
sid-acryl Jun 12, 2024
b77eb3f
Merge branch 'master' into master+ing-510-lookml-cll
sid-acryl Jun 12, 2024
d58600f
rephrased description
sid-acryl Jun 12, 2024
1415b10
Merge branch 'master+ing-510-lookml-cll' of github.com:sid-acryl/data…
sid-acryl Jun 12, 2024
853ca3d
Merge branch 'master' into master+ing-510-lookml-cll
sid-acryl Jun 14, 2024
3cc0487
code refactoring to resolve circular dependency
sid-acryl Jun 14, 2024
27c3861
Merge branch 'master+ing-510-lookml-cll' of github.com:sid-acryl/data…
sid-acryl Jun 14, 2024
85c324b
Added case base logic
sid-acryl Jun 14, 2024
16fc4c9
refactored code
sid-acryl Jun 18, 2024
6cc7c60
refactored code
sid-acryl Jun 18, 2024
2b297c5
WIP
sid-acryl Jun 19, 2024
fd943a4
Merge branch 'master' into master+ing-510-lookml-cll
sid-acryl Jun 19, 2024
93a8413
Merge branch 'master+ing-510-lookml-cll' of github.com:sid-acryl/data…
sid-acryl Jun 19, 2024
2f61c16
WIP
sid-acryl Jun 19, 2024
a368a4b
refactor code
sid-acryl Jun 20, 2024
cfbba07
remove unused py file
sid-acryl Jun 20, 2024
045f769
Merge branch 'master' into master+ing-510-lookml-cll
sid-acryl Jun 21, 2024
84fa6c9
updated test case for custom tag
sid-acryl Jun 21, 2024
31e784b
Merge branch 'master+ing-510-lookml-cll' of github.com:sid-acryl/data…
sid-acryl Jun 21, 2024
7094405
update test case
sid-acryl Jun 21, 2024
ae02aae
Merge branch 'master' into master+ing-510-lookml-cll
sid-acryl Jun 24, 2024
b693251
handle special liquid variable
sid-acryl Jun 24, 2024
853fc25
Add double quotes to cover one customer use case
sid-acryl Jun 24, 2024
17f6204
add lru cache
sid-acryl Jun 24, 2024
6ae99c4
test case for sql_table_name having - in name
sid-acryl Jun 25, 2024
d72f475
Merge branch 'master' into master+ing-510-lookml-cll
sid-acryl Jun 25, 2024
0854446
Merge branch 'master' into master+ing-510-lookml-cll
sid-acryl Jun 26, 2024
12859b0
Merge branch 'master' into master+ing-510-lookml-cll
sid-acryl Jun 27, 2024
659dd3a
Merge branch 'master' into master+ing-510-lookml-cll
sid-acryl Jul 1, 2024
5ad8200
Merge branch 'master' into master+ing-510-lookml-cll
sid-acryl Jul 1, 2024
2e8c14f
refactor code
sid-acryl Jul 1, 2024
b9f8b08
fix existing golden files
sid-acryl Jul 2, 2024
cf0f45b
1. Resolve merge conflict
sid-acryl Jul 2, 2024
33985b5
Merge branch 'master' into master+ing-510-lookml-cll
sid-acryl Jul 3, 2024
519c173
Merge branch 'master' into master+ing-510-lookml-cll
sid-acryl Jul 3, 2024
0b926d0
resolve merge conflict
sid-acryl Jul 8, 2024
d5025d4
Merge branch 'master+ing-510-lookml-cll' of github.com:sid-acryl/data…
sid-acryl Jul 8, 2024
c2d2f6b
address review comments
sid-acryl Jul 8, 2024
e7008d2
Merge branch 'master' into master+ing-510-lookml-cll
sid-acryl Jul 8, 2024
8629f42
Merge branch 'master' into master+ing-510-lookml-cll
sid-acryl Jul 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
307 changes: 170 additions & 137 deletions metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import dataclasses
import os
import re
from typing import Any, ClassVar, Dict, List, Optional, Union, cast
from typing import Any, ClassVar, Dict, List, Optional, Tuple, Union, cast

import pydantic
from looker_sdk.sdk.api40.models import DBConnection
from pydantic import Field, validator

from datahub.configuration import ConfigModel
from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.common import AllowDenyPattern, ConfigurationError
from datahub.configuration.source_common import (
EnvConfigMixin,
PlatformInstanceConfigMixin,
Expand Down Expand Up @@ -143,6 +144,82 @@ class LookerCommonConfig(EnvConfigMixin, PlatformInstanceConfigMixin):
)


def _get_bigquery_definition(
looker_connection: DBConnection,
) -> Tuple[str, Optional[str], Optional[str]]:
platform = "bigquery"
# bigquery project ids are returned in the host field
db = looker_connection.host
schema = looker_connection.database
return platform, db, schema


def _get_generic_definition(
looker_connection: DBConnection, platform: Optional[str] = None
) -> Tuple[str, Optional[str], Optional[str]]:
if platform is None:
# We extract the platform from the dialect name
dialect_name = looker_connection.dialect_name
assert dialect_name is not None
# generally the first part of the dialect name before _ is the name of the platform
# versions are encoded as numbers and can be removed
# e.g. spark1 or hive2 or druid_18
platform = re.sub(r"[0-9]+", "", dialect_name.split("_")[0])

assert (
platform is not None
), f"Failed to extract a valid platform from connection {looker_connection}"
db = looker_connection.database
schema = looker_connection.schema # ok for this to be None
return platform, db, schema


class LookerConnectionDefinition(ConfigModel):
platform: str
default_db: str
default_schema: Optional[str] # Optional since some sources are two-level only
platform_instance: Optional[str] = None
platform_env: Optional[str] = Field(
default=None,
description="The environment that the platform is located in. Leaving this empty will inherit defaults from "
"the top level Looker configuration",
)

@validator("platform_env")
def platform_env_must_be_one_of(cls, v: Optional[str]) -> Optional[str]:
if v is not None:
return EnvConfigMixin.env_must_be_one_of(v)
return v

@validator("platform", "default_db", "default_schema")
def lower_everything(cls, v):
"""We lower case all strings passed in to avoid casing issues later"""
if v is not None:
return v.lower()

@classmethod
def from_looker_connection(
cls, looker_connection: DBConnection
) -> "LookerConnectionDefinition":
"""Dialect definitions are here: https://docs.looker.com/setup-and-management/database-config"""
extractors: Dict[str, Any] = {
"^bigquery": _get_bigquery_definition,
".*": _get_generic_definition,
}

if looker_connection.dialect_name is None:
raise ConfigurationError(
f"Unable to fetch a fully filled out connection for {looker_connection.name}. Please check your API permissions."
)
for extractor_pattern, extracting_function in extractors.items():
if re.match(extractor_pattern, looker_connection.dialect_name):
(platform, db, schema) = extracting_function(looker_connection)
return cls(platform=platform, default_db=db, default_schema=schema)
raise ConfigurationError(
f"Could not find an appropriate platform for looker_connection: {looker_connection.name} with dialect: {looker_connection.dialect_name}"
)


class LookerDashboardSourceConfig(
LookerAPIConfig,
LookerCommonConfig,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,85 +1,55 @@
import re
from typing import Any, Dict, Optional, Tuple
import logging
from typing import Optional

from looker_sdk.error import SDKError
from looker_sdk.sdk.api40.models import DBConnection
from pydantic import Field, validator

from datahub.configuration import ConfigModel
from datahub.configuration.common import ConfigurationError
from datahub.configuration.source_common import EnvConfigMixin


def _get_bigquery_definition(
looker_connection: DBConnection,
) -> Tuple[str, Optional[str], Optional[str]]:
platform = "bigquery"
# bigquery project ids are returned in the host field
db = looker_connection.host
schema = looker_connection.database
return (platform, db, schema)


def _get_generic_definition(
looker_connection: DBConnection, platform: Optional[str] = None
) -> Tuple[str, Optional[str], Optional[str]]:
if platform is None:
# We extract the platform from the dialect name
dialect_name = looker_connection.dialect_name
assert dialect_name is not None
# generally the first part of the dialect name before _ is the name of the platform
# versions are encoded as numbers and can be removed
# e.g. spark1 or hive2 or druid_18
platform = re.sub(r"[0-9]+", "", dialect_name.split("_")[0])

assert (
platform is not None
), f"Failed to extract a valid platform from connection {looker_connection}"
db = looker_connection.database
schema = looker_connection.schema # ok for this to be None
return (platform, db, schema)


class LookerConnectionDefinition(ConfigModel):
platform: str
default_db: str
default_schema: Optional[str] # Optional since some sources are two-level only
platform_instance: Optional[str] = None
platform_env: Optional[str] = Field(
default=None,
description="The environment that the platform is located in. Leaving this empty will inherit defaults from "
"the top level Looker configuration",
)

@validator("platform_env")
def platform_env_must_be_one_of(cls, v: Optional[str]) -> Optional[str]:
if v is not None:
return EnvConfigMixin.env_must_be_one_of(v)
return v

@validator("platform", "default_db", "default_schema")
def lower_everything(cls, v):
"""We lower case all strings passed in to avoid casing issues later"""
if v is not None:
return v.lower()

@classmethod
def from_looker_connection(
cls, looker_connection: DBConnection
) -> "LookerConnectionDefinition":
"""Dialect definitions are here: https://docs.looker.com/setup-and-management/database-config"""
extractors: Dict[str, Any] = {
"^bigquery": _get_bigquery_definition,
".*": _get_generic_definition,
}

if looker_connection.dialect_name is None:
raise ConfigurationError(
f"Unable to fetch a fully filled out connection for {looker_connection.name}. Please check your API permissions."
from datahub.ingestion.source.looker.looker_config import LookerConnectionDefinition
from datahub.ingestion.source.looker.looker_lib_wrapper import LookerAPI
from datahub.ingestion.source.looker.lookml_config import (
LookMLSourceConfig,
LookMLSourceReport,
)

logger = logging.getLogger(__name__)


def get_connection_def_based_on_connection_string(
connection: str,
source_config: LookMLSourceConfig,
looker_client: Optional[LookerAPI],
reporter: LookMLSourceReport,
) -> Optional[LookerConnectionDefinition]:
if source_config.connection_to_platform_map is None:
source_config.connection_to_platform_map = {}

assert source_config.connection_to_platform_map is not None

connection_def: Optional[LookerConnectionDefinition] = None

if connection in source_config.connection_to_platform_map:
connection_def = source_config.connection_to_platform_map[connection]
elif looker_client:
try:
looker_connection: DBConnection = looker_client.connection(connection)
except SDKError:
logger.error(
f"Failed to retrieve connection {connection} from Looker. This usually happens when the "
f"credentials provided are not admin credentials."
)
for extractor_pattern, extracting_function in extractors.items():
if re.match(extractor_pattern, looker_connection.dialect_name):
(platform, db, schema) = extracting_function(looker_connection)
return cls(platform=platform, default_db=db, default_schema=schema)
raise ConfigurationError(
f"Could not find an appropriate platform for looker_connection: {looker_connection.name} with dialect: {looker_connection.dialect_name}"
)
else:
try:
connection_def = LookerConnectionDefinition.from_looker_connection(
looker_connection
)

# Populate the cache (using the config map) to avoid calling looker again for this connection
source_config.connection_to_platform_map[connection] = connection_def
except ConfigurationError:
reporter.report_warning(
f"connection-{connection}",
"Failed to load connection from Looker",
)

return connection_def
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@
from typing import Any, Dict, Optional

from datahub.ingestion.source.looker.lkml_patched import load_lkml
from datahub.ingestion.source.looker.looker_connection import LookerConnectionDefinition
from datahub.ingestion.source.looker.looker_config import LookerConnectionDefinition
from datahub.ingestion.source.looker.looker_dataclasses import LookerViewFile
from datahub.ingestion.source.looker.looker_template_language import (
resolve_liquid_variable_in_view_dict,
)
from datahub.ingestion.source.looker.lookml_config import (
_EXPLORE_FILE_EXTENSION,
_VIEW_FILE_EXTENSION,
LookMLSourceReport,
)
from datahub.ingestion.source.looker.template_lang_resolver import (
resolve_liquid_variable_in_view_dict,
)

logger = logging.getLogger(__name__)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
from functools import lru_cache
from typing import ClassVar, Optional, TextIO, cast

from liquid import Environment
from liquid.ast import Node
from liquid.context import Context
from liquid.parse import expect, get_parser
from liquid.stream import TokenStream
from liquid.tag import Tag
from liquid.template import BoundTemplate
from liquid.token import TOKEN_EXPRESSION, TOKEN_LITERAL, TOKEN_TAG, Token


class CustomTagException(Exception):
def __init__(self, message):
super().__init__(message)


class ConditionNode(Node):
def __init__(self, tok: Token, sql_or_lookml_reference: str, filter_name: str):
self.tok = tok

self.sql_or_lookml_reference = sql_or_lookml_reference

self.filter_name = filter_name

def render_to_output(self, context: Context, buffer: TextIO) -> Optional[bool]:
filter_value: Optional[str] = cast(
str, context.globals.get(self.filter_name)
) # to silent lint

if filter_value is None:
raise CustomTagException(
f'filter {self.filter_name} value is not provided for "condition" tag'
)

filter_value = filter_value.strip()

buffer.write(f"{self.sql_or_lookml_reference}='{filter_value}'")

return True
Comment on lines +27 to +41
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handle potential edge cases in the render_to_output method.

Ensure that the filter_value is properly escaped to prevent SQL injection.

filter_value = filter_value.strip()
+        # Escape single quotes to prevent SQL injection
+        filter_value = filter_value.replace("'", "''")
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def render_to_output(self, context: Context, buffer: TextIO) -> Optional[bool]:
filter_value: Optional[str] = cast(
str, context.globals.get(self.filter_name)
) # to silent lint
if filter_value is None:
raise CustomTagException(
f'filter {self.filter_name} value is not provided for "condition" tag'
)
filter_value = filter_value.strip()
buffer.write(f"{self.sql_or_lookml_reference}='{filter_value}'")
return True
def render_to_output(self, context: Context, buffer: TextIO) -> Optional[bool]:
filter_value: Optional[str] = cast(
str, context.globals.get(self.filter_name)
) # to silent lint
if filter_value is None:
raise CustomTagException(
f'filter {self.filter_name} value is not provided for "condition" tag'
)
filter_value = filter_value.strip()
# Escape single quotes to prevent SQL injection
filter_value = filter_value.replace("'", "''")
buffer.write(f"{self.sql_or_lookml_reference}='{filter_value}'")
return True



# Define the custom tag
class ConditionTag(Tag):
"""
ConditionTag is the equivalent implementation of looker's custom liquid tag "condition".
Refer doc: https://cloud.google.com/looker/docs/templated-filters#basic_usage

Refer doc to see how to write liquid custom tag: https://jg-rp.github.io/liquid/guides/custom-tags

This class render the below tag as order.region='ap-south-1' if order_region is provided in config.liquid_variables
as order_region: 'ap-south-1'
{% condition order_region %} order.region {% endcondition %}

"""

TAG_START: ClassVar[str] = "condition"
TAG_END: ClassVar[str] = "endcondition"
name: str = "condition"

def __init__(self, env: Environment):
super().__init__(env)
self.parser = get_parser(self.env)

def parse(self, stream: TokenStream) -> Node:
expect(stream, TOKEN_TAG, value=ConditionTag.TAG_START)

start_token = stream.current

stream.next_token()
expect(stream, TOKEN_EXPRESSION)
filter_name: str = stream.current.value.strip()

stream.next_token()
expect(stream, TOKEN_LITERAL)

sql_or_lookml_reference: str = stream.current.value.strip()

stream.next_token()
expect(stream, TOKEN_TAG, value=ConditionTag.TAG_END)

return ConditionNode(
tok=start_token,
sql_or_lookml_reference=sql_or_lookml_reference,
filter_name=filter_name,
)
Comment on lines +66 to +87
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure proper error handling in the parse method.

Add error handling for unexpected tokens to improve robustness.

def parse(self, stream: TokenStream) -> Node:
    try:
        expect(stream, TOKEN_TAG, value=ConditionTag.TAG_START)
        start_token = stream.current

        stream.next_token()
        expect(stream, TOKEN_EXPRESSION)
        filter_name: str = stream.current.value.strip()

        stream.next_token()
        expect(stream, TOKEN_LITERAL)
        sql_or_lookml_reference: str = stream.current.value.strip()

        stream.next_token()
        expect(stream, TOKEN_TAG, value=ConditionTag.TAG_END)

        return ConditionNode(
            tok=start_token,
            sql_or_lookml_reference=sql_or_lookml_reference,
            filter_name=filter_name,
        )
+    except Exception as e:
+        raise CustomTagException(f"Error parsing condition tag: {e}")
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def parse(self, stream: TokenStream) -> Node:
expect(stream, TOKEN_TAG, value=ConditionTag.TAG_START)
start_token = stream.current
stream.next_token()
expect(stream, TOKEN_EXPRESSION)
filter_name: str = stream.current.value.strip()
stream.next_token()
expect(stream, TOKEN_LITERAL)
sql_or_lookml_reference: str = stream.current.value.strip()
stream.next_token()
expect(stream, TOKEN_TAG, value=ConditionTag.TAG_END)
return ConditionNode(
tok=start_token,
sql_or_lookml_reference=sql_or_lookml_reference,
filter_name=filter_name,
)
def parse(self, stream: TokenStream) -> Node:
try:
expect(stream, TOKEN_TAG, value=ConditionTag.TAG_START)
start_token = stream.current
stream.next_token()
expect(stream, TOKEN_EXPRESSION)
filter_name: str = stream.current.value.strip()
stream.next_token()
expect(stream, TOKEN_LITERAL)
sql_or_lookml_reference: str = stream.current.value.strip()
stream.next_token()
expect(stream, TOKEN_TAG, value=ConditionTag.TAG_END)
return ConditionNode(
tok=start_token,
sql_or_lookml_reference=sql_or_lookml_reference,
filter_name=filter_name,
)
except Exception as e:
raise CustomTagException(f"Error parsing condition tag: {e}")



custom_tags = [ConditionTag]


@lru_cache(maxsize=1)
def _create_env() -> Environment:
env: Environment = Environment()
# register tag. One time activity
for custom_tag in custom_tags:
env.add_tag(custom_tag)
return env


def create_template(text: str) -> BoundTemplate:
env: Environment = _create_env()
return env.from_string(text)
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,8 @@ def _get_input_fields_from_query(
if field is None:
continue

# we haven't loaded in metadata about the explore yet, so we need to wait until explores are populated later to fetch this
# we haven't loaded in metadata about the explore yet, so we need to wait until explores are populated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimize dictionary key check.

Use key in dict instead of key in dict.keys() for better performance.

- for field in filters.keys():
+ for field in filters:
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# we haven't loaded in metadata about the explore yet, so we need to wait until explores are populated
# we haven't loaded in metadata about the explore yet, so we need to wait until explores are populated
for field in filters:

# later to fetch this
result.append(
InputFieldElement(
name=field, view_field=None, model=query.model, explore=query.view
Expand Down
Loading
Loading