From 142dbea8dcb3e270d3706b6b7a1cdb5869355d83 Mon Sep 17 00:00:00 2001 From: zhyass Date: Fri, 29 Mar 2024 12:20:07 +0800 Subject: [PATCH] chore: seq as place holder when navigate --- src/query/ee/src/stream/handler.rs | 35 ++- .../storages/fuse/src/operations/navigate.rs | 25 +- .../5_ee/02_data_mask/02_0000_data_mask.py | 214 +++++++++--------- tests/udf/udf_server.py | 5 +- 4 files changed, 144 insertions(+), 135 deletions(-) diff --git a/src/query/ee/src/stream/handler.rs b/src/query/ee/src/stream/handler.rs index f542ad1476142..b19b77d1952a1 100644 --- a/src/query/ee/src/stream/handler.rs +++ b/src/query/ee/src/stream/handler.rs @@ -31,11 +31,13 @@ use databend_common_meta_app::schema::UpsertTableOptionReq; use databend_common_meta_types::MatchSeq; use databend_common_sql::plans::CreateStreamPlan; use databend_common_sql::plans::DropStreamPlan; +use databend_common_storages_fuse::io::SnapshotsIO; use databend_common_storages_fuse::FuseTable; use databend_common_storages_fuse::TableContext; use databend_common_storages_stream::stream_table::STREAM_ENGINE; use databend_enterprise_stream_handler::StreamHandler; use databend_enterprise_stream_handler::StreamHandlerWrapper; +use databend_query::storages::NavigationPoint; use databend_storages_common_table_meta::table::MODE_APPEND_ONLY; use databend_storages_common_table_meta::table::MODE_STANDARD; use databend_storages_common_table_meta::table::OPT_KEY_CHANGE_TRACKING; @@ -100,13 +102,34 @@ impl StreamHandler for RealStreamHandler { table = table.navigate_since_to(&None, &plan.navigation).await?; let source = FuseTable::try_from_table(table.as_ref())?; - let version = source.get_table_info().ident.seq; - if version == 0 { - return Err(ErrorCode::IllegalStream( - "The stream navigation at point has not table version".to_string(), - )); - } let snapshot_location = source.snapshot_loc().await?; + let version = match &plan.navigation { + Some(NavigationPoint::StreamInfo(info)) => info + .options() + .get(OPT_KEY_TABLE_VER) + .ok_or_else(|| ErrorCode::Internal("table version must be set"))? + .parse::()?, + Some(_) => { + if let Some(snapshot_loc) = &snapshot_location { + let (snapshot, _) = + SnapshotsIO::read_snapshot(snapshot_loc.clone(), source.get_operator()) + .await?; + let Some(prev_table_seq) = snapshot.prev_table_seq else { + return Err(ErrorCode::IllegalStream( + "The stream navigation at point has not table version".to_string(), + )); + }; + + // The table version is the version of the table when the snapshot was created. + // We need make sure the version greater than the table version, + // and less equal than the table version after the snapshot commit. + prev_table_seq + 1 + } else { + unreachable!() + } + } + None => table.get_table_info().ident.seq, + }; if let Some(value) = source .get_table_info() diff --git a/src/query/storages/fuse/src/operations/navigate.rs b/src/query/storages/fuse/src/operations/navigate.rs index c641c94bc4991..f7be8ce92a193 100644 --- a/src/query/storages/fuse/src/operations/navigate.rs +++ b/src/query/storages/fuse/src/operations/navigate.rs @@ -27,7 +27,6 @@ use databend_storages_common_cache::LoadParams; use databend_storages_common_table_meta::meta::TableSnapshot; use databend_storages_common_table_meta::table::OPT_KEY_SNAPSHOT_LOCATION; use databend_storages_common_table_meta::table::OPT_KEY_TABLE_ID; -use databend_storages_common_table_meta::table::OPT_KEY_TABLE_VER; use futures::TryStreamExt; use log::warn; use opendal::EntryMode; @@ -74,14 +73,8 @@ impl FuseTable { ))); } - let version = options - .get(OPT_KEY_TABLE_VER) - .ok_or_else(|| ErrorCode::Internal("table version must be set"))? - .parse::()?; - let Some(snapshot_loc) = options.get(OPT_KEY_SNAPSHOT_LOCATION) else { let mut table_info = self.table_info.clone(); - table_info.ident.seq = version; table_info.meta.options.remove(OPT_KEY_SNAPSHOT_LOCATION); table_info.meta.statistics = TableStatistics::default(); let table = FuseTable::do_create(table_info)?; @@ -89,7 +82,7 @@ impl FuseTable { }; let (snapshot, format_version) = SnapshotsIO::read_snapshot(snapshot_loc.clone(), self.get_operator()).await?; - self.load_table_by_snapshot(snapshot.as_ref(), format_version, Some(version)) + self.load_table_by_snapshot(snapshot.as_ref(), format_version) } #[async_backtrace::framed] @@ -150,7 +143,7 @@ impl FuseTable { } if let Some((snapshot, format_version)) = instant { - self.load_table_by_snapshot(snapshot.as_ref(), format_version, None) + self.load_table_by_snapshot(snapshot.as_ref(), format_version) } else { Err(ErrorCode::TableHistoricalDataNotFound( "No historical data found at given point", @@ -163,21 +156,11 @@ impl FuseTable { &self, snapshot: &TableSnapshot, format_version: u64, - table_seq: Option, ) -> Result> { + // The `seq` of ident that we cloned here is JUST a place holder + // we should NOT use it other than a pure place holder. let mut table_info = self.table_info.clone(); - // The `seq` of ident here is primarily JUST a place holder - // we should NOT use it outside of create stream. - table_info.ident.seq = if let Some(seq) = table_seq { - seq - } else { - // The table version is the version of the table when the snapshot was created. - // We need make sure the version greater than the table version, - // and less equal than the table version after the snapshot commit. - snapshot.prev_table_seq.map_or(0, |v| v + 1) - }; - // There are more to be kept in snapshot, like engine_options, ordering keys... // or we could just keep a clone of TableMeta in the snapshot. // diff --git a/tests/suites/5_ee/02_data_mask/02_0000_data_mask.py b/tests/suites/5_ee/02_data_mask/02_0000_data_mask.py index 142b374285a1d..edcca20998f0e 100755 --- a/tests/suites/5_ee/02_data_mask/02_0000_data_mask.py +++ b/tests/suites/5_ee/02_data_mask/02_0000_data_mask.py @@ -26,110 +26,110 @@ def get_license(): # TODO: https://github.com/datafuselabs/databend/pull/15088 # if __name__ == "__main__": - # with NativeClient(name="client1>") as client1: - # client1.expect(prompt) - - # client1.send("set global enterprise_license='{}';".format(get_license())) - # client1.expect(prompt) - - # client1.send("drop MASKING POLICY if exists mask;") - - # mycursor = mydb.cursor() - # mycursor.execute( - # "CREATE MASKING POLICY mask AS (val STRING,num int) RETURNS STRING -> CASE WHEN " - # "current_role() IN ('ANALYST') THEN VAL ELSE '*********'END comment = 'this is a masking policy';" - # ) - - # mycursor = mydb.cursor() - # mycursor.execute("desc masking policy mask;") - # mask = mycursor.fetchall()[0] - # print(mask[0], mask[2], mask[3], mask[4], mask[5]) - - # mycursor.execute("drop MASKING POLICY if exists mask;") - # mycursor.execute("desc masking policy mask;") - # mask = mycursor.fetchall() - # print(mask) - - # client1.send("drop table if exists data_mask_test;") - # client1.expect(prompt) - - # client1.send("create table data_mask_test(a int not null, b string not null);") - # client1.expect(prompt) - - # sql = "insert into table data_mask_test(a,b) values(1, 'abc')" - # mycursor.execute(sql) - # mycursor.execute("select * from data_mask_test") - # data = mycursor.fetchall() - # print(data) - - # mycursor = mydb.cursor() - # mycursor.execute( - # "CREATE MASKING POLICY maska AS (val int) RETURNS int -> CASE WHEN " - # "current_role() IN ('ANALYST') THEN VAL ELSE 200 END comment = 'this is a masking policy';" - # ) - # mycursor = mydb.cursor() - # mycursor.execute( - # "CREATE MASKING POLICY maskb AS (val STRING) RETURNS STRING -> CASE WHEN " - # "current_role() IN ('ANALYST') THEN VAL ELSE '*********'END comment = 'this is a masking policy';" - # ) - # mycursor = mydb.cursor() - # mycursor.execute( - # "CREATE MASKING POLICY maskc AS (val int) RETURNS int -> CASE WHEN " - # "current_role() IN ('ANALYST') THEN VAL ELSE 111 END comment = 'this is a masking policy';" - # ) - - # # set column a masking policy - # sql = " alter table data_mask_test modify column b set masking policy maskb" - # mycursor.execute(sql) - # mycursor.execute("select * from data_mask_test") - # data = mycursor.fetchall() - # print(data) - - # # set column b masking policy - # sql = " alter table data_mask_test modify column a set masking policy maska" - # mycursor.execute(sql) - # mycursor.execute("select * from data_mask_test") - # data = mycursor.fetchall() - # print(data) - - # # unset column a masking policy - # sql = " alter table data_mask_test modify column a unset masking policy" - # mycursor.execute(sql) - # mycursor.execute("select * from data_mask_test") - # data = mycursor.fetchall() - # print(data) - - # # set column a masking policy to maska - # sql = " alter table data_mask_test modify column a set masking policy maska" - # mycursor.execute(sql) - # mycursor.execute("select * from data_mask_test") - # data = mycursor.fetchall() - # print(data) - - # # set column a masking policy from maska to maskc - # sql = " alter table data_mask_test modify column a set masking policy maskc" - # mycursor.execute(sql) - # mycursor.execute("select * from data_mask_test") - # data = mycursor.fetchall() - # print(data) - - # # drop masking policy maska - # sql = " drop MASKING POLICY if exists maska" - # mycursor.execute(sql) - # mycursor.execute("select * from data_mask_test") - # data = mycursor.fetchall() - # print(data) - - # # drop masking policy maskb - # sql = " drop MASKING POLICY if exists maskb" - # mycursor.execute(sql) - # mycursor.execute("select * from data_mask_test") - # data = mycursor.fetchall() - # print(data) - - # # drop masking policy maskc - # sql = " drop MASKING POLICY if exists maskc" - # mycursor.execute(sql) - # mycursor.execute("select * from data_mask_test") - # data = mycursor.fetchall() - # print(data) +# with NativeClient(name="client1>") as client1: +# client1.expect(prompt) + +# client1.send("set global enterprise_license='{}';".format(get_license())) +# client1.expect(prompt) + +# client1.send("drop MASKING POLICY if exists mask;") + +# mycursor = mydb.cursor() +# mycursor.execute( +# "CREATE MASKING POLICY mask AS (val STRING,num int) RETURNS STRING -> CASE WHEN " +# "current_role() IN ('ANALYST') THEN VAL ELSE '*********'END comment = 'this is a masking policy';" +# ) + +# mycursor = mydb.cursor() +# mycursor.execute("desc masking policy mask;") +# mask = mycursor.fetchall()[0] +# print(mask[0], mask[2], mask[3], mask[4], mask[5]) + +# mycursor.execute("drop MASKING POLICY if exists mask;") +# mycursor.execute("desc masking policy mask;") +# mask = mycursor.fetchall() +# print(mask) + +# client1.send("drop table if exists data_mask_test;") +# client1.expect(prompt) + +# client1.send("create table data_mask_test(a int not null, b string not null);") +# client1.expect(prompt) + +# sql = "insert into table data_mask_test(a,b) values(1, 'abc')" +# mycursor.execute(sql) +# mycursor.execute("select * from data_mask_test") +# data = mycursor.fetchall() +# print(data) + +# mycursor = mydb.cursor() +# mycursor.execute( +# "CREATE MASKING POLICY maska AS (val int) RETURNS int -> CASE WHEN " +# "current_role() IN ('ANALYST') THEN VAL ELSE 200 END comment = 'this is a masking policy';" +# ) +# mycursor = mydb.cursor() +# mycursor.execute( +# "CREATE MASKING POLICY maskb AS (val STRING) RETURNS STRING -> CASE WHEN " +# "current_role() IN ('ANALYST') THEN VAL ELSE '*********'END comment = 'this is a masking policy';" +# ) +# mycursor = mydb.cursor() +# mycursor.execute( +# "CREATE MASKING POLICY maskc AS (val int) RETURNS int -> CASE WHEN " +# "current_role() IN ('ANALYST') THEN VAL ELSE 111 END comment = 'this is a masking policy';" +# ) + +# # set column a masking policy +# sql = " alter table data_mask_test modify column b set masking policy maskb" +# mycursor.execute(sql) +# mycursor.execute("select * from data_mask_test") +# data = mycursor.fetchall() +# print(data) + +# # set column b masking policy +# sql = " alter table data_mask_test modify column a set masking policy maska" +# mycursor.execute(sql) +# mycursor.execute("select * from data_mask_test") +# data = mycursor.fetchall() +# print(data) + +# # unset column a masking policy +# sql = " alter table data_mask_test modify column a unset masking policy" +# mycursor.execute(sql) +# mycursor.execute("select * from data_mask_test") +# data = mycursor.fetchall() +# print(data) + +# # set column a masking policy to maska +# sql = " alter table data_mask_test modify column a set masking policy maska" +# mycursor.execute(sql) +# mycursor.execute("select * from data_mask_test") +# data = mycursor.fetchall() +# print(data) + +# # set column a masking policy from maska to maskc +# sql = " alter table data_mask_test modify column a set masking policy maskc" +# mycursor.execute(sql) +# mycursor.execute("select * from data_mask_test") +# data = mycursor.fetchall() +# print(data) + +# # drop masking policy maska +# sql = " drop MASKING POLICY if exists maska" +# mycursor.execute(sql) +# mycursor.execute("select * from data_mask_test") +# data = mycursor.fetchall() +# print(data) + +# # drop masking policy maskb +# sql = " drop MASKING POLICY if exists maskb" +# mycursor.execute(sql) +# mycursor.execute("select * from data_mask_test") +# data = mycursor.fetchall() +# print(data) + +# # drop masking policy maskc +# sql = " drop MASKING POLICY if exists maskc" +# mycursor.execute(sql) +# mycursor.execute("select * from data_mask_test") +# data = mycursor.fetchall() +# print(data) diff --git a/tests/udf/udf_server.py b/tests/udf/udf_server.py index 12c570eab4416..b0106d7bf777c 100644 --- a/tests/udf/udf_server.py +++ b/tests/udf/udf_server.py @@ -123,14 +123,17 @@ def array_index_of(array: List[int], item: int): def map_access(map: Dict[str, str], key: str) -> str: return map[key] if key in map else None + @udf(input_types=["VARIANT", "VARCHAR"], result_type="VARIANT") def json_access(data: Any, key: str) -> Any: return data[key] + @udf(input_types=["VARCHAR"], result_type="BIGINT") def url_len(key: str) -> int: return len(key) + @udf(input_types=["ARRAY(VARIANT)"], result_type="VARIANT") def json_concat(list: List[Any]) -> Any: return list @@ -324,4 +327,4 @@ def wait_concurrent(x): udf_server.add_function(wait) udf_server.add_function(wait_concurrent) udf_server.add_function(url_len) - udf_server.serve() \ No newline at end of file + udf_server.serve()