Skip to content

Commit

Permalink
chore: seq as place holder when navigate
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed Mar 29, 2024
1 parent b65a10d commit 142dbea
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 135 deletions.
35 changes: 29 additions & 6 deletions src/query/ee/src/stream/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ use databend_common_meta_app::schema::UpsertTableOptionReq;
use databend_common_meta_types::MatchSeq;
use databend_common_sql::plans::CreateStreamPlan;
use databend_common_sql::plans::DropStreamPlan;
use databend_common_storages_fuse::io::SnapshotsIO;
use databend_common_storages_fuse::FuseTable;
use databend_common_storages_fuse::TableContext;
use databend_common_storages_stream::stream_table::STREAM_ENGINE;
use databend_enterprise_stream_handler::StreamHandler;
use databend_enterprise_stream_handler::StreamHandlerWrapper;
use databend_query::storages::NavigationPoint;
use databend_storages_common_table_meta::table::MODE_APPEND_ONLY;
use databend_storages_common_table_meta::table::MODE_STANDARD;
use databend_storages_common_table_meta::table::OPT_KEY_CHANGE_TRACKING;
Expand Down Expand Up @@ -100,13 +102,34 @@ impl StreamHandler for RealStreamHandler {

table = table.navigate_since_to(&None, &plan.navigation).await?;
let source = FuseTable::try_from_table(table.as_ref())?;
let version = source.get_table_info().ident.seq;
if version == 0 {
return Err(ErrorCode::IllegalStream(
"The stream navigation at point has not table version".to_string(),
));
}
let snapshot_location = source.snapshot_loc().await?;
let version = match &plan.navigation {
Some(NavigationPoint::StreamInfo(info)) => info
.options()
.get(OPT_KEY_TABLE_VER)
.ok_or_else(|| ErrorCode::Internal("table version must be set"))?
.parse::<u64>()?,
Some(_) => {
if let Some(snapshot_loc) = &snapshot_location {
let (snapshot, _) =
SnapshotsIO::read_snapshot(snapshot_loc.clone(), source.get_operator())
.await?;
let Some(prev_table_seq) = snapshot.prev_table_seq else {
return Err(ErrorCode::IllegalStream(
"The stream navigation at point has not table version".to_string(),
));
};

// The table version is the version of the table when the snapshot was created.
// We need make sure the version greater than the table version,
// and less equal than the table version after the snapshot commit.
prev_table_seq + 1
} else {
unreachable!()
}
}
None => table.get_table_info().ident.seq,
};

if let Some(value) = source
.get_table_info()
Expand Down
25 changes: 4 additions & 21 deletions src/query/storages/fuse/src/operations/navigate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use databend_storages_common_cache::LoadParams;
use databend_storages_common_table_meta::meta::TableSnapshot;
use databend_storages_common_table_meta::table::OPT_KEY_SNAPSHOT_LOCATION;
use databend_storages_common_table_meta::table::OPT_KEY_TABLE_ID;
use databend_storages_common_table_meta::table::OPT_KEY_TABLE_VER;
use futures::TryStreamExt;
use log::warn;
use opendal::EntryMode;
Expand Down Expand Up @@ -74,22 +73,16 @@ impl FuseTable {
)));
}

let version = options
.get(OPT_KEY_TABLE_VER)
.ok_or_else(|| ErrorCode::Internal("table version must be set"))?
.parse::<u64>()?;

let Some(snapshot_loc) = options.get(OPT_KEY_SNAPSHOT_LOCATION) else {
let mut table_info = self.table_info.clone();
table_info.ident.seq = version;
table_info.meta.options.remove(OPT_KEY_SNAPSHOT_LOCATION);
table_info.meta.statistics = TableStatistics::default();
let table = FuseTable::do_create(table_info)?;
return Ok(table.into());
};
let (snapshot, format_version) =
SnapshotsIO::read_snapshot(snapshot_loc.clone(), self.get_operator()).await?;
self.load_table_by_snapshot(snapshot.as_ref(), format_version, Some(version))
self.load_table_by_snapshot(snapshot.as_ref(), format_version)
}

#[async_backtrace::framed]
Expand Down Expand Up @@ -150,7 +143,7 @@ impl FuseTable {
}

if let Some((snapshot, format_version)) = instant {
self.load_table_by_snapshot(snapshot.as_ref(), format_version, None)
self.load_table_by_snapshot(snapshot.as_ref(), format_version)
} else {
Err(ErrorCode::TableHistoricalDataNotFound(
"No historical data found at given point",
Expand All @@ -163,21 +156,11 @@ impl FuseTable {
&self,
snapshot: &TableSnapshot,
format_version: u64,
table_seq: Option<u64>,
) -> Result<Arc<FuseTable>> {
// The `seq` of ident that we cloned here is JUST a place holder
// we should NOT use it other than a pure place holder.
let mut table_info = self.table_info.clone();

// The `seq` of ident here is primarily JUST a place holder
// we should NOT use it outside of create stream.
table_info.ident.seq = if let Some(seq) = table_seq {
seq
} else {
// The table version is the version of the table when the snapshot was created.
// We need make sure the version greater than the table version,
// and less equal than the table version after the snapshot commit.
snapshot.prev_table_seq.map_or(0, |v| v + 1)
};

// There are more to be kept in snapshot, like engine_options, ordering keys...
// or we could just keep a clone of TableMeta in the snapshot.
//
Expand Down
214 changes: 107 additions & 107 deletions tests/suites/5_ee/02_data_mask/02_0000_data_mask.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,110 +26,110 @@ def get_license():

# TODO: https://github.com/datafuselabs/databend/pull/15088
# if __name__ == "__main__":
# with NativeClient(name="client1>") as client1:
# client1.expect(prompt)

# client1.send("set global enterprise_license='{}';".format(get_license()))
# client1.expect(prompt)

# client1.send("drop MASKING POLICY if exists mask;")

# mycursor = mydb.cursor()
# mycursor.execute(
# "CREATE MASKING POLICY mask AS (val STRING,num int) RETURNS STRING -> CASE WHEN "
# "current_role() IN ('ANALYST') THEN VAL ELSE '*********'END comment = 'this is a masking policy';"
# )

# mycursor = mydb.cursor()
# mycursor.execute("desc masking policy mask;")
# mask = mycursor.fetchall()[0]
# print(mask[0], mask[2], mask[3], mask[4], mask[5])

# mycursor.execute("drop MASKING POLICY if exists mask;")
# mycursor.execute("desc masking policy mask;")
# mask = mycursor.fetchall()
# print(mask)

# client1.send("drop table if exists data_mask_test;")
# client1.expect(prompt)

# client1.send("create table data_mask_test(a int not null, b string not null);")
# client1.expect(prompt)

# sql = "insert into table data_mask_test(a,b) values(1, 'abc')"
# mycursor.execute(sql)
# mycursor.execute("select * from data_mask_test")
# data = mycursor.fetchall()
# print(data)

# mycursor = mydb.cursor()
# mycursor.execute(
# "CREATE MASKING POLICY maska AS (val int) RETURNS int -> CASE WHEN "
# "current_role() IN ('ANALYST') THEN VAL ELSE 200 END comment = 'this is a masking policy';"
# )
# mycursor = mydb.cursor()
# mycursor.execute(
# "CREATE MASKING POLICY maskb AS (val STRING) RETURNS STRING -> CASE WHEN "
# "current_role() IN ('ANALYST') THEN VAL ELSE '*********'END comment = 'this is a masking policy';"
# )
# mycursor = mydb.cursor()
# mycursor.execute(
# "CREATE MASKING POLICY maskc AS (val int) RETURNS int -> CASE WHEN "
# "current_role() IN ('ANALYST') THEN VAL ELSE 111 END comment = 'this is a masking policy';"
# )

# # set column a masking policy
# sql = " alter table data_mask_test modify column b set masking policy maskb"
# mycursor.execute(sql)
# mycursor.execute("select * from data_mask_test")
# data = mycursor.fetchall()
# print(data)

# # set column b masking policy
# sql = " alter table data_mask_test modify column a set masking policy maska"
# mycursor.execute(sql)
# mycursor.execute("select * from data_mask_test")
# data = mycursor.fetchall()
# print(data)

# # unset column a masking policy
# sql = " alter table data_mask_test modify column a unset masking policy"
# mycursor.execute(sql)
# mycursor.execute("select * from data_mask_test")
# data = mycursor.fetchall()
# print(data)

# # set column a masking policy to maska
# sql = " alter table data_mask_test modify column a set masking policy maska"
# mycursor.execute(sql)
# mycursor.execute("select * from data_mask_test")
# data = mycursor.fetchall()
# print(data)

# # set column a masking policy from maska to maskc
# sql = " alter table data_mask_test modify column a set masking policy maskc"
# mycursor.execute(sql)
# mycursor.execute("select * from data_mask_test")
# data = mycursor.fetchall()
# print(data)

# # drop masking policy maska
# sql = " drop MASKING POLICY if exists maska"
# mycursor.execute(sql)
# mycursor.execute("select * from data_mask_test")
# data = mycursor.fetchall()
# print(data)

# # drop masking policy maskb
# sql = " drop MASKING POLICY if exists maskb"
# mycursor.execute(sql)
# mycursor.execute("select * from data_mask_test")
# data = mycursor.fetchall()
# print(data)

# # drop masking policy maskc
# sql = " drop MASKING POLICY if exists maskc"
# mycursor.execute(sql)
# mycursor.execute("select * from data_mask_test")
# data = mycursor.fetchall()
# print(data)
# with NativeClient(name="client1>") as client1:
# client1.expect(prompt)

# client1.send("set global enterprise_license='{}';".format(get_license()))
# client1.expect(prompt)

# client1.send("drop MASKING POLICY if exists mask;")

# mycursor = mydb.cursor()
# mycursor.execute(
# "CREATE MASKING POLICY mask AS (val STRING,num int) RETURNS STRING -> CASE WHEN "
# "current_role() IN ('ANALYST') THEN VAL ELSE '*********'END comment = 'this is a masking policy';"
# )

# mycursor = mydb.cursor()
# mycursor.execute("desc masking policy mask;")
# mask = mycursor.fetchall()[0]
# print(mask[0], mask[2], mask[3], mask[4], mask[5])

# mycursor.execute("drop MASKING POLICY if exists mask;")
# mycursor.execute("desc masking policy mask;")
# mask = mycursor.fetchall()
# print(mask)

# client1.send("drop table if exists data_mask_test;")
# client1.expect(prompt)

# client1.send("create table data_mask_test(a int not null, b string not null);")
# client1.expect(prompt)

# sql = "insert into table data_mask_test(a,b) values(1, 'abc')"
# mycursor.execute(sql)
# mycursor.execute("select * from data_mask_test")
# data = mycursor.fetchall()
# print(data)

# mycursor = mydb.cursor()
# mycursor.execute(
# "CREATE MASKING POLICY maska AS (val int) RETURNS int -> CASE WHEN "
# "current_role() IN ('ANALYST') THEN VAL ELSE 200 END comment = 'this is a masking policy';"
# )
# mycursor = mydb.cursor()
# mycursor.execute(
# "CREATE MASKING POLICY maskb AS (val STRING) RETURNS STRING -> CASE WHEN "
# "current_role() IN ('ANALYST') THEN VAL ELSE '*********'END comment = 'this is a masking policy';"
# )
# mycursor = mydb.cursor()
# mycursor.execute(
# "CREATE MASKING POLICY maskc AS (val int) RETURNS int -> CASE WHEN "
# "current_role() IN ('ANALYST') THEN VAL ELSE 111 END comment = 'this is a masking policy';"
# )

# # set column a masking policy
# sql = " alter table data_mask_test modify column b set masking policy maskb"
# mycursor.execute(sql)
# mycursor.execute("select * from data_mask_test")
# data = mycursor.fetchall()
# print(data)

# # set column b masking policy
# sql = " alter table data_mask_test modify column a set masking policy maska"
# mycursor.execute(sql)
# mycursor.execute("select * from data_mask_test")
# data = mycursor.fetchall()
# print(data)

# # unset column a masking policy
# sql = " alter table data_mask_test modify column a unset masking policy"
# mycursor.execute(sql)
# mycursor.execute("select * from data_mask_test")
# data = mycursor.fetchall()
# print(data)

# # set column a masking policy to maska
# sql = " alter table data_mask_test modify column a set masking policy maska"
# mycursor.execute(sql)
# mycursor.execute("select * from data_mask_test")
# data = mycursor.fetchall()
# print(data)

# # set column a masking policy from maska to maskc
# sql = " alter table data_mask_test modify column a set masking policy maskc"
# mycursor.execute(sql)
# mycursor.execute("select * from data_mask_test")
# data = mycursor.fetchall()
# print(data)

# # drop masking policy maska
# sql = " drop MASKING POLICY if exists maska"
# mycursor.execute(sql)
# mycursor.execute("select * from data_mask_test")
# data = mycursor.fetchall()
# print(data)

# # drop masking policy maskb
# sql = " drop MASKING POLICY if exists maskb"
# mycursor.execute(sql)
# mycursor.execute("select * from data_mask_test")
# data = mycursor.fetchall()
# print(data)

# # drop masking policy maskc
# sql = " drop MASKING POLICY if exists maskc"
# mycursor.execute(sql)
# mycursor.execute("select * from data_mask_test")
# data = mycursor.fetchall()
# print(data)
5 changes: 4 additions & 1 deletion tests/udf/udf_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,17 @@ def array_index_of(array: List[int], item: int):
def map_access(map: Dict[str, str], key: str) -> str:
return map[key] if key in map else None


@udf(input_types=["VARIANT", "VARCHAR"], result_type="VARIANT")
def json_access(data: Any, key: str) -> Any:
return data[key]


@udf(input_types=["VARCHAR"], result_type="BIGINT")
def url_len(key: str) -> int:
return len(key)


@udf(input_types=["ARRAY(VARIANT)"], result_type="VARIANT")
def json_concat(list: List[Any]) -> Any:
return list
Expand Down Expand Up @@ -324,4 +327,4 @@ def wait_concurrent(x):
udf_server.add_function(wait)
udf_server.add_function(wait_concurrent)
udf_server.add_function(url_len)
udf_server.serve()
udf_server.serve()

0 comments on commit 142dbea

Please sign in to comment.