Skip to content

Commit

Permalink
Feat!: iceberg version/timestamp snapshots, bigquery, refactor tsql c…
Browse files Browse the repository at this point in the history
…loses #2128
  • Loading branch information
tobymao committed Aug 31, 2023
1 parent e507018 commit f3fee3a
Show file tree
Hide file tree
Showing 10 changed files with 105 additions and 66 deletions.
7 changes: 7 additions & 0 deletions sqlglot/dialects/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ class Tokenizer(tokens.Tokenizer):
"TIMESTAMP": TokenType.TIMESTAMPTZ,
"NOT DETERMINISTIC": TokenType.VOLATILE,
"UNKNOWN": TokenType.NULL,
"FOR SYSTEM_TIME": TokenType.TIMESTAMP_SNAPSHOT,
}
KEYWORDS.pop("DIV")

Expand Down Expand Up @@ -644,3 +645,9 @@ def intersect_op(self, expression: exp.Intersect) -> str:

def with_properties(self, properties: exp.Properties) -> str:
return self.properties(properties, prefix=self.seg("OPTIONS"))

def version_sql(self, expression: exp.Version) -> str:
if expression.name == "TIMESTAMP":
expression = expression.copy()
expression.set("this", "SYSTEM_TIME")
return super().version_sql(expression)
6 changes: 6 additions & 0 deletions sqlglot/dialects/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ class Tokenizer(tokens.Tokenizer):
"MSCK REPAIR": TokenType.COMMAND,
"REFRESH": TokenType.COMMAND,
"WITH SERDEPROPERTIES": TokenType.SERDE_PROPERTIES,
"TIMESTAMP AS OF": TokenType.TIMESTAMP_SNAPSHOT,
"VERSION AS OF": TokenType.VERSION_SNAPSHOT,
}

NUMERIC_LITERALS = {
Expand Down Expand Up @@ -503,3 +505,7 @@ def datatype_sql(self, expression: exp.DataType) -> str:
)

return super().datatype_sql(expression)

def version_sql(self, expression: exp.Version) -> str:
sql = super().version_sql(expression)
return sql.replace("FOR ", "", 1)
66 changes: 14 additions & 52 deletions sqlglot/dialects/tsql.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ class Tokenizer(tokens.Tokenizer):
"XML": TokenType.XML,
"OUTPUT": TokenType.RETURNING,
"SYSTEM_USER": TokenType.CURRENT_USER,
"FOR SYSTEM_TIME": TokenType.TIMESTAMP_SNAPSHOT,
}

class Parser(parser.Parser):
Expand Down Expand Up @@ -461,43 +462,6 @@ def _parse_transaction(self) -> exp.Transaction | exp.Command:

return self._parse_as_command(self._prev)

def _parse_system_time(self) -> t.Optional[exp.Expression]:
if not self._match_text_seq("FOR", "SYSTEM_TIME"):
return None

if self._match_text_seq("AS", "OF"):
system_time = self.expression(
exp.SystemTime, this=self._parse_bitwise(), kind="AS OF"
)
elif self._match_set((TokenType.FROM, TokenType.BETWEEN)):
kind = self._prev.text
this = self._parse_bitwise()
self._match_texts(("TO", "AND"))
expression = self._parse_bitwise()
system_time = self.expression(
exp.SystemTime, this=this, expression=expression, kind=kind
)
elif self._match_text_seq("CONTAINED", "IN"):
args = self._parse_wrapped_csv(self._parse_bitwise)
system_time = self.expression(
exp.SystemTime,
this=seq_get(args, 0),
expression=seq_get(args, 1),
kind="CONTAINED IN",
)
elif self._match(TokenType.ALL):
system_time = self.expression(exp.SystemTime, kind="ALL")
else:
system_time = None
self.raise_error("Unable to parse FOR SYSTEM_TIME clause")

return system_time

def _parse_table_parts(self, schema: bool = False) -> exp.Table:
table = super()._parse_table_parts(schema=schema)
table.set("system_time", self._parse_system_time())
return table

def _parse_returns(self) -> exp.ReturnsProperty:
table = self._parse_id_var(any_token=False, tokens=self.RETURNS_TABLE_TOKENS)
returns = super()._parse_returns()
Expand Down Expand Up @@ -700,22 +664,20 @@ def create_sql(self, expression: exp.Create) -> str:
def offset_sql(self, expression: exp.Offset) -> str:
return f"{super().offset_sql(expression)} ROWS"

def systemtime_sql(self, expression: exp.SystemTime) -> str:
kind = expression.args["kind"]
if kind == "ALL":
return "FOR SYSTEM_TIME ALL"

start = self.sql(expression, "this")
if kind == "AS OF":
return f"FOR SYSTEM_TIME AS OF {start}"

end = self.sql(expression, "expression")
if kind == "FROM":
return f"FOR SYSTEM_TIME FROM {start} TO {end}"
if kind == "BETWEEN":
return f"FOR SYSTEM_TIME BETWEEN {start} AND {end}"
def version_sql(self, expression: exp.Version) -> str:
name = "SYSTEM_TIME" if expression.name == "TIMESTAMP" else expression.name
this = f"FOR {name}"
expr = expression.expression
kind = expression.text("kind")
if kind in ("FROM", "BETWEEN"):
args = expr.expressions
sep = "TO" if kind == "FROM" else "AND"
expr_sql = f"{self.sql(seq_get(args, 0))} {sep} {self.sql(seq_get(args, 1))}"
else:
expr_sql = self.sql(expr)

return f"FOR SYSTEM_TIME CONTAINED IN ({start}, {end})"
expr_sql = f" {expr_sql}" if expr_sql else ""
return f"{this} {kind}{expr_sql}"

def returnsproperty_sql(self, expression: exp.ReturnsProperty) -> str:
table = expression.args.get("table")
Expand Down
24 changes: 15 additions & 9 deletions sqlglot/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2411,6 +2411,7 @@ class Table(Expression):
"pivots": False,
"hints": False,
"system_time": False,
"version": False,
}

@property
Expand Down Expand Up @@ -2451,15 +2452,6 @@ def parts(self) -> t.List[Identifier]:
return parts


# See the TSQL "Querying data in a system-versioned temporal table" page
class SystemTime(Expression):
arg_types = {
"this": False,
"expression": False,
"kind": True,
}


class Union(Subqueryable):
arg_types = {
"with": False,
Expand Down Expand Up @@ -2594,6 +2586,20 @@ class Var(Expression):
pass


class Version(Expression):
"""
Time travel, iceberg, bigquery etc
https://trino.io/docs/current/connector/iceberg.html?highlight=snapshot#using-snapshots
https://www.databricks.com/blog/2019/02/04/introducing-delta-time-travel-for-large-scale-data-lakes.html
https://cloud.google.com/bigquery/docs/reference/standard-sql/query-syntax#for_system_time_as_of
https://learn.microsoft.com/en-us/sql/relational-databases/tables/querying-data-in-a-system-versioned-temporal-table?view=sql-server-ver16
this is either TIMESTAMP or VERSION
kind is ("AS OF", "BETWEEN")
"""

arg_types = {"this": True, "kind": True, "expression": False}


class Schema(Expression):
arg_types = {"this": False, "expressions": False}

Expand Down
12 changes: 9 additions & 3 deletions sqlglot/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1273,6 +1273,8 @@ def table_sql(self, expression: exp.Table, sep: str = " AS ") -> str:
if part
)

version = self.sql(expression, "version")
version = f" {version}" if version else ""
alias = self.sql(expression, "alias")
alias = f"{sep}{alias}" if alias else ""
hints = self.expressions(expression, key="hints", sep=" ")
Expand All @@ -1281,10 +1283,8 @@ def table_sql(self, expression: exp.Table, sep: str = " AS ") -> str:
pivots = f" {pivots}" if pivots else ""
joins = self.expressions(expression, key="joins", sep="", skip_first=True)
laterals = self.expressions(expression, key="laterals", sep="")
system_time = expression.args.get("system_time")
system_time = f" {self.sql(expression, 'system_time')}" if system_time else ""

return f"{table}{system_time}{alias}{hints}{pivots}{joins}{laterals}"
return f"{table}{version}{alias}{hints}{pivots}{joins}{laterals}"

def tablesample_sql(
self, expression: exp.TableSample, seed_prefix: str = "SEED", sep=" AS "
Expand Down Expand Up @@ -1339,6 +1339,12 @@ def pivot_sql(self, expression: exp.Pivot) -> str:
nulls = ""
return f"{direction}{nulls}({expressions} FOR {field}){alias}"

def version_sql(self, expression: exp.Version) -> str:
this = f"FOR {expression.name}"
kind = expression.text("kind")
expr = self.sql(expression, "expression")
return f"{this} {kind} {expr}"

def tuple_sql(self, expression: exp.Tuple) -> str:
return f"({self.expressions(expression, flat=True)})"

Expand Down
40 changes: 38 additions & 2 deletions sqlglot/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -2543,18 +2543,23 @@ def _parse_table(
if schema:
return self._parse_schema(this=this)

version = self._parse_version()

if version:
this.set("version", version)

if self.ALIAS_POST_TABLESAMPLE:
table_sample = self._parse_table_sample()

alias = self._parse_table_alias(alias_tokens=alias_tokens or self.TABLE_ALIAS_TOKENS)
if alias:
this.set("alias", alias)

this.set("hints", self._parse_table_hints())

if not this.args.get("pivots"):
this.set("pivots", self._parse_pivots())

this.set("hints", self._parse_table_hints())

if not self.ALIAS_POST_TABLESAMPLE:
table_sample = self._parse_table_sample()

Expand All @@ -2568,6 +2573,37 @@ def _parse_table(

return this

def _parse_version(self) -> t.Optional[exp.Version]:
if self._match(TokenType.TIMESTAMP_SNAPSHOT):
this = "TIMESTAMP"
elif self._match(TokenType.VERSION_SNAPSHOT):
this = "VERSION"
else:
return None

if self._match_set((TokenType.FROM, TokenType.BETWEEN)):
kind = self._prev.text.upper()
start = self._parse_bitwise()
self._match_texts(("TO", "AND"))
end = self._parse_bitwise()
expression: t.Optional[exp.Expression] = self.expression(
exp.Tuple, expressions=[start, end]
)
elif self._match_text_seq("CONTAINED", "IN"):
kind = "CONTAINED IN"
expression = self.expression(
exp.Tuple, expressions=self._parse_wrapped_csv(self._parse_bitwise)
)
elif self._match(TokenType.ALL):
kind = "ALL"
expression = None
else:
self._match_text_seq("AS", "OF")
kind = "AS OF"
expression = self._parse_type()

return self.expression(exp.Version, this=this, expression=expression, kind=kind)

def _parse_unnest(self, with_alias: bool = True) -> t.Optional[exp.Unnest]:
if not self._match(TokenType.UNNEST):
return None
Expand Down
4 changes: 4 additions & 0 deletions sqlglot/tokens.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@ class TokenType(AutoName):
WINDOW = auto()
WITH = auto()
UNIQUE = auto()
VERSION_SNAPSHOT = auto()
TIMESTAMP_SNAPSHOT = auto()


class Token:
Expand Down Expand Up @@ -748,6 +750,8 @@ class Tokenizer(metaclass=_Tokenizer):
"TRUNCATE": TokenType.COMMAND,
"VACUUM": TokenType.COMMAND,
"USER-DEFINED": TokenType.USERDEFINED,
"FOR VERSION": TokenType.VERSION_SNAPSHOT,
"FOR TIMESTAMP": TokenType.TIMESTAMP_SNAPSHOT,
}

WHITE_SPACE: t.Dict[t.Optional[str], TokenType] = {
Expand Down
2 changes: 2 additions & 0 deletions tests/dialects/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ class TestBigQuery(Validator):
maxDiff = None

def test_bigquery(self):
self.validate_identity("SELECT * FROM tbl FOR SYSTEM_TIME AS OF z")

self.validate_all(
"""SELECT
`u`.`harness_user_email` AS `harness_user_email`,
Expand Down
3 changes: 3 additions & 0 deletions tests/dialects/test_hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,9 @@ def test_order_by(self):
)

def test_hive(self):
self.validate_identity("SELECT * FROM my_table TIMESTAMP AS OF DATE_ADD(CURRENT_DATE, -1)")
self.validate_identity("SELECT * FROM my_table VERSION AS OF DATE_ADD(CURRENT_DATE, -1)")

self.validate_identity(
"SELECT ROW() OVER (DISTRIBUTE BY x SORT BY y)",
"SELECT ROW() OVER (PARTITION BY x ORDER BY y)",
Expand Down
7 changes: 7 additions & 0 deletions tests/dialects/test_presto.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,13 @@ def test_unnest(self):

@mock.patch("sqlglot.helper.logger")
def test_presto(self, logger):
self.validate_identity(
"SELECT * FROM example.testdb.customer_orders FOR VERSION AS OF 8954597067493422955"
)
self.validate_identity(
"SELECT * FROM example.testdb.customer_orders FOR TIMESTAMP AS OF CAST('2022-03-23 09:59:29.803 Europe/Vienna' AS TIMESTAMP)"
)

self.validate_identity("SELECT * FROM x OFFSET 1 LIMIT 1")
self.validate_identity("SELECT * FROM x OFFSET 1 FETCH FIRST 1 ROWS ONLY")
self.validate_identity("SELECT BOOL_OR(a > 10) FROM asd AS T(a)")
Expand Down

0 comments on commit f3fee3a

Please sign in to comment.