Skip to content

Commit

Permalink
fix(ingest): bigquery-usage - fix dataset name for sharded table (dat…
Browse files Browse the repository at this point in the history
  • Loading branch information
MugdhaHardikar-GSLab authored and PiotrSierkin-Ki committed Jul 26, 2022
1 parent ed33534 commit c936323
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@
""".strip(),
}


OPERATION_STATEMENT_TYPES = {
"INSERT": OperationTypeClass.INSERT,
"UPDATE": OperationTypeClass.UPDATE,
Expand Down Expand Up @@ -269,23 +268,25 @@ def is_temporary_table(self, prefix: str) -> bool:
# Temporary tables will have a dataset that begins with an underscore.
return self.dataset.startswith(prefix)

@staticmethod
def remove_suffix(input_string, suffix):
if suffix and input_string.endswith(suffix):
return input_string[: -len(suffix)]
return input_string

def remove_extras(self, sharded_table_regex: str) -> "BigQueryTableRef":
# Handle partitioned and sharded tables.
table_name: Optional[str] = None
shortened_table_name = self.table
# if table name ends in _* or * then we strip it as that represents a query on a sharded table
shortened_table_name = self.remove_suffix(shortened_table_name, "_*")
shortened_table_name = self.remove_suffix(shortened_table_name, "*")

# if table name ends in _* then we strip it as that represents a query on a sharded table
if self.table.endswith("_*"):
table_name = self.table[:-2]
logger.debug(
f"Found query on sharded table {self.table}. Using {table_name} as the table name."
)
return BigQueryTableRef(self.project, self.dataset, table_name)

matches = re.match(sharded_table_regex, self.table)
matches = re.match(sharded_table_regex, shortened_table_name)
if matches:
table_name = matches.group(2)
else:
matches = PARTITION_SUMMARY_REGEXP.match(self.table)
matches = PARTITION_SUMMARY_REGEXP.match(shortened_table_name)
if matches:
table_name = matches.group(1)
if matches:
Expand All @@ -302,7 +303,7 @@ def remove_extras(self, sharded_table_regex: str) -> "BigQueryTableRef":
return BigQueryTableRef(self.project, self.dataset, table_name)

# Handle table snapshots.
matches = SNAPSHOT_TABLE_REGEX.match(self.table)
matches = SNAPSHOT_TABLE_REGEX.match(shortened_table_name)
if matches:
table_name = matches.group(1)
logger.debug(
Expand All @@ -312,14 +313,14 @@ def remove_extras(self, sharded_table_regex: str) -> "BigQueryTableRef":

# Handle exceptions
invalid_chars_in_table_name: List[str] = [
c for c in {"$", "@"} if c in self.table
c for c in {"$", "@"} if c in shortened_table_name
]
if invalid_chars_in_table_name:
raise ValueError(
f"Cannot handle {self} - poorly formatted table name, contains {invalid_chars_in_table_name}"
)

return self
return BigQueryTableRef(self.project, self.dataset, shortened_table_name)

def __str__(self) -> str:
return f"projects/{self.project}/datasets/{self.dataset}/tables/{self.table}"
Expand Down Expand Up @@ -1146,7 +1147,7 @@ def _create_operation_aspect_work_unit(
aspect=operation_aspect,
)
return MetadataWorkUnit(
id=f"{datetime.fromtimestamp(last_updated_timestamp/1000).isoformat()}-operation-aspect-{destination_table}",
id=f"{datetime.fromtimestamp(last_updated_timestamp / 1000).isoformat()}-operation-aspect-{destination_table}",
mcp=mcp,
)

Expand Down
6 changes: 6 additions & 0 deletions metadata-ingestion/tests/unit/test_bigquery_usage_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,9 @@ def test_bigquery_ref_extra_removal():
assert new_table_ref.table == "foo"
assert new_table_ref.project == table_ref.project
assert new_table_ref.dataset == table_ref.dataset

table_ref = BigQueryTableRef("project-1234", "dataset-4567", "foo_2016*")
new_table_ref = table_ref.remove_extras(_BIGQUERY_DEFAULT_SHARDED_TABLE_REGEX)
assert new_table_ref.table == "foo"
assert new_table_ref.project == table_ref.project
assert new_table_ref.dataset == table_ref.dataset

0 comments on commit c936323

Please sign in to comment.