Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: treat seq as place holder when navigate #15124

Merged
merged 1 commit into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 29 additions & 6 deletions src/query/ee/src/stream/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ use databend_common_meta_app::schema::UpsertTableOptionReq;
use databend_common_meta_types::MatchSeq;
use databend_common_sql::plans::CreateStreamPlan;
use databend_common_sql::plans::DropStreamPlan;
use databend_common_storages_fuse::io::SnapshotsIO;
use databend_common_storages_fuse::FuseTable;
use databend_common_storages_fuse::TableContext;
use databend_common_storages_stream::stream_table::STREAM_ENGINE;
use databend_enterprise_stream_handler::StreamHandler;
use databend_enterprise_stream_handler::StreamHandlerWrapper;
use databend_query::storages::NavigationPoint;
use databend_storages_common_table_meta::table::MODE_APPEND_ONLY;
use databend_storages_common_table_meta::table::MODE_STANDARD;
use databend_storages_common_table_meta::table::OPT_KEY_CHANGE_TRACKING;
Expand Down Expand Up @@ -100,13 +102,34 @@ impl StreamHandler for RealStreamHandler {

table = table.navigate_since_to(&None, &plan.navigation).await?;
let source = FuseTable::try_from_table(table.as_ref())?;
let version = source.get_table_info().ident.seq;
if version == 0 {
return Err(ErrorCode::IllegalStream(
"The stream navigation at point has not table version".to_string(),
));
}
let snapshot_location = source.snapshot_loc().await?;
let version = match &plan.navigation {
Some(NavigationPoint::StreamInfo(info)) => info
.options()
.get(OPT_KEY_TABLE_VER)
.ok_or_else(|| ErrorCode::Internal("table version must be set"))?
.parse::<u64>()?,
Some(_) => {
if let Some(snapshot_loc) = &snapshot_location {
let (snapshot, _) =
SnapshotsIO::read_snapshot(snapshot_loc.clone(), source.get_operator())
.await?;
let Some(prev_table_seq) = snapshot.prev_table_seq else {
return Err(ErrorCode::IllegalStream(
"The stream navigation at point has not table version".to_string(),
));
};

// The table version is the version of the table when the snapshot was created.
// We need make sure the version greater than the table version,
// and less equal than the table version after the snapshot commit.
prev_table_seq + 1
} else {
unreachable!()
}
}
None => table.get_table_info().ident.seq,
};

if let Some(value) = source
.get_table_info()
Expand Down
25 changes: 4 additions & 21 deletions src/query/storages/fuse/src/operations/navigate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use databend_storages_common_cache::LoadParams;
use databend_storages_common_table_meta::meta::TableSnapshot;
use databend_storages_common_table_meta::table::OPT_KEY_SNAPSHOT_LOCATION;
use databend_storages_common_table_meta::table::OPT_KEY_TABLE_ID;
use databend_storages_common_table_meta::table::OPT_KEY_TABLE_VER;
use futures::TryStreamExt;
use log::warn;
use opendal::EntryMode;
Expand Down Expand Up @@ -74,22 +73,16 @@ impl FuseTable {
)));
}

let version = options
.get(OPT_KEY_TABLE_VER)
.ok_or_else(|| ErrorCode::Internal("table version must be set"))?
.parse::<u64>()?;

let Some(snapshot_loc) = options.get(OPT_KEY_SNAPSHOT_LOCATION) else {
let mut table_info = self.table_info.clone();
table_info.ident.seq = version;
table_info.meta.options.remove(OPT_KEY_SNAPSHOT_LOCATION);
table_info.meta.statistics = TableStatistics::default();
let table = FuseTable::do_create(table_info)?;
return Ok(table.into());
};
let (snapshot, format_version) =
SnapshotsIO::read_snapshot(snapshot_loc.clone(), self.get_operator()).await?;
self.load_table_by_snapshot(snapshot.as_ref(), format_version, Some(version))
self.load_table_by_snapshot(snapshot.as_ref(), format_version)
}

#[async_backtrace::framed]
Expand Down Expand Up @@ -150,7 +143,7 @@ impl FuseTable {
}

if let Some((snapshot, format_version)) = instant {
self.load_table_by_snapshot(snapshot.as_ref(), format_version, None)
self.load_table_by_snapshot(snapshot.as_ref(), format_version)
} else {
Err(ErrorCode::TableHistoricalDataNotFound(
"No historical data found at given point",
Expand All @@ -163,21 +156,11 @@ impl FuseTable {
&self,
snapshot: &TableSnapshot,
format_version: u64,
table_seq: Option<u64>,
) -> Result<Arc<FuseTable>> {
// The `seq` of ident that we cloned here is JUST a place holder
// we should NOT use it other than a pure place holder.
let mut table_info = self.table_info.clone();

// The `seq` of ident here is primarily JUST a place holder
// we should NOT use it outside of create stream.
table_info.ident.seq = if let Some(seq) = table_seq {
seq
} else {
// The table version is the version of the table when the snapshot was created.
// We need make sure the version greater than the table version,
// and less equal than the table version after the snapshot commit.
snapshot.prev_table_seq.map_or(0, |v| v + 1)
};

// There are more to be kept in snapshot, like engine_options, ordering keys...
// or we could just keep a clone of TableMeta in the snapshot.
//
Expand Down
214 changes: 107 additions & 107 deletions tests/suites/5_ee/02_data_mask/02_0000_data_mask.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,110 +26,110 @@ def get_license():

# TODO: https://github.com/datafuselabs/databend/pull/15088
# if __name__ == "__main__":
# with NativeClient(name="client1>") as client1:
# client1.expect(prompt)

# client1.send("set global enterprise_license='{}';".format(get_license()))
# client1.expect(prompt)

# client1.send("drop MASKING POLICY if exists mask;")

# mycursor = mydb.cursor()
# mycursor.execute(
# "CREATE MASKING POLICY mask AS (val STRING,num int) RETURNS STRING -> CASE WHEN "
# "current_role() IN ('ANALYST') THEN VAL ELSE '*********'END comment = 'this is a masking policy';"
# )

# mycursor = mydb.cursor()
# mycursor.execute("desc masking policy mask;")
# mask = mycursor.fetchall()[0]
# print(mask[0], mask[2], mask[3], mask[4], mask[5])

# mycursor.execute("drop MASKING POLICY if exists mask;")
# mycursor.execute("desc masking policy mask;")
# mask = mycursor.fetchall()
# print(mask)

# client1.send("drop table if exists data_mask_test;")
# client1.expect(prompt)

# client1.send("create table data_mask_test(a int not null, b string not null);")
# client1.expect(prompt)

# sql = "insert into table data_mask_test(a,b) values(1, 'abc')"
# mycursor.execute(sql)
# mycursor.execute("select * from data_mask_test")
# data = mycursor.fetchall()
# print(data)

# mycursor = mydb.cursor()
# mycursor.execute(
# "CREATE MASKING POLICY maska AS (val int) RETURNS int -> CASE WHEN "
# "current_role() IN ('ANALYST') THEN VAL ELSE 200 END comment = 'this is a masking policy';"
# )
# mycursor = mydb.cursor()
# mycursor.execute(
# "CREATE MASKING POLICY maskb AS (val STRING) RETURNS STRING -> CASE WHEN "
# "current_role() IN ('ANALYST') THEN VAL ELSE '*********'END comment = 'this is a masking policy';"
# )
# mycursor = mydb.cursor()
# mycursor.execute(
# "CREATE MASKING POLICY maskc AS (val int) RETURNS int -> CASE WHEN "
# "current_role() IN ('ANALYST') THEN VAL ELSE 111 END comment = 'this is a masking policy';"
# )

# # set column a masking policy
# sql = " alter table data_mask_test modify column b set masking policy maskb"
# mycursor.execute(sql)
# mycursor.execute("select * from data_mask_test")
# data = mycursor.fetchall()
# print(data)

# # set column b masking policy
# sql = " alter table data_mask_test modify column a set masking policy maska"
# mycursor.execute(sql)
# mycursor.execute("select * from data_mask_test")
# data = mycursor.fetchall()
# print(data)

# # unset column a masking policy
# sql = " alter table data_mask_test modify column a unset masking policy"
# mycursor.execute(sql)
# mycursor.execute("select * from data_mask_test")
# data = mycursor.fetchall()
# print(data)

# # set column a masking policy to maska
# sql = " alter table data_mask_test modify column a set masking policy maska"
# mycursor.execute(sql)
# mycursor.execute("select * from data_mask_test")
# data = mycursor.fetchall()
# print(data)

# # set column a masking policy from maska to maskc
# sql = " alter table data_mask_test modify column a set masking policy maskc"
# mycursor.execute(sql)
# mycursor.execute("select * from data_mask_test")
# data = mycursor.fetchall()
# print(data)

# # drop masking policy maska
# sql = " drop MASKING POLICY if exists maska"
# mycursor.execute(sql)
# mycursor.execute("select * from data_mask_test")
# data = mycursor.fetchall()
# print(data)

# # drop masking policy maskb
# sql = " drop MASKING POLICY if exists maskb"
# mycursor.execute(sql)
# mycursor.execute("select * from data_mask_test")
# data = mycursor.fetchall()
# print(data)

# # drop masking policy maskc
# sql = " drop MASKING POLICY if exists maskc"
# mycursor.execute(sql)
# mycursor.execute("select * from data_mask_test")
# data = mycursor.fetchall()
# print(data)
# with NativeClient(name="client1>") as client1:
# client1.expect(prompt)

# client1.send("set global enterprise_license='{}';".format(get_license()))
# client1.expect(prompt)

# client1.send("drop MASKING POLICY if exists mask;")

# mycursor = mydb.cursor()
# mycursor.execute(
# "CREATE MASKING POLICY mask AS (val STRING,num int) RETURNS STRING -> CASE WHEN "
# "current_role() IN ('ANALYST') THEN VAL ELSE '*********'END comment = 'this is a masking policy';"
# )

# mycursor = mydb.cursor()
# mycursor.execute("desc masking policy mask;")
# mask = mycursor.fetchall()[0]
# print(mask[0], mask[2], mask[3], mask[4], mask[5])

# mycursor.execute("drop MASKING POLICY if exists mask;")
# mycursor.execute("desc masking policy mask;")
# mask = mycursor.fetchall()
# print(mask)

# client1.send("drop table if exists data_mask_test;")
# client1.expect(prompt)

# client1.send("create table data_mask_test(a int not null, b string not null);")
# client1.expect(prompt)

# sql = "insert into table data_mask_test(a,b) values(1, 'abc')"
# mycursor.execute(sql)
# mycursor.execute("select * from data_mask_test")
# data = mycursor.fetchall()
# print(data)

# mycursor = mydb.cursor()
# mycursor.execute(
# "CREATE MASKING POLICY maska AS (val int) RETURNS int -> CASE WHEN "
# "current_role() IN ('ANALYST') THEN VAL ELSE 200 END comment = 'this is a masking policy';"
# )
# mycursor = mydb.cursor()
# mycursor.execute(
# "CREATE MASKING POLICY maskb AS (val STRING) RETURNS STRING -> CASE WHEN "
# "current_role() IN ('ANALYST') THEN VAL ELSE '*********'END comment = 'this is a masking policy';"
# )
# mycursor = mydb.cursor()
# mycursor.execute(
# "CREATE MASKING POLICY maskc AS (val int) RETURNS int -> CASE WHEN "
# "current_role() IN ('ANALYST') THEN VAL ELSE 111 END comment = 'this is a masking policy';"
# )

# # set column a masking policy
# sql = " alter table data_mask_test modify column b set masking policy maskb"
# mycursor.execute(sql)
# mycursor.execute("select * from data_mask_test")
# data = mycursor.fetchall()
# print(data)

# # set column b masking policy
# sql = " alter table data_mask_test modify column a set masking policy maska"
# mycursor.execute(sql)
# mycursor.execute("select * from data_mask_test")
# data = mycursor.fetchall()
# print(data)

# # unset column a masking policy
# sql = " alter table data_mask_test modify column a unset masking policy"
# mycursor.execute(sql)
# mycursor.execute("select * from data_mask_test")
# data = mycursor.fetchall()
# print(data)

# # set column a masking policy to maska
# sql = " alter table data_mask_test modify column a set masking policy maska"
# mycursor.execute(sql)
# mycursor.execute("select * from data_mask_test")
# data = mycursor.fetchall()
# print(data)

# # set column a masking policy from maska to maskc
# sql = " alter table data_mask_test modify column a set masking policy maskc"
# mycursor.execute(sql)
# mycursor.execute("select * from data_mask_test")
# data = mycursor.fetchall()
# print(data)

# # drop masking policy maska
# sql = " drop MASKING POLICY if exists maska"
# mycursor.execute(sql)
# mycursor.execute("select * from data_mask_test")
# data = mycursor.fetchall()
# print(data)

# # drop masking policy maskb
# sql = " drop MASKING POLICY if exists maskb"
# mycursor.execute(sql)
# mycursor.execute("select * from data_mask_test")
# data = mycursor.fetchall()
# print(data)

# # drop masking policy maskc
# sql = " drop MASKING POLICY if exists maskc"
# mycursor.execute(sql)
# mycursor.execute("select * from data_mask_test")
# data = mycursor.fetchall()
# print(data)
5 changes: 4 additions & 1 deletion tests/udf/udf_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,17 @@ def array_index_of(array: List[int], item: int):
def map_access(map: Dict[str, str], key: str) -> str:
return map[key] if key in map else None


@udf(input_types=["VARIANT", "VARCHAR"], result_type="VARIANT")
def json_access(data: Any, key: str) -> Any:
return data[key]


@udf(input_types=["VARCHAR"], result_type="BIGINT")
def url_len(key: str) -> int:
return len(key)


@udf(input_types=["ARRAY(VARIANT)"], result_type="VARIANT")
def json_concat(list: List[Any]) -> Any:
return list
Expand Down Expand Up @@ -324,4 +327,4 @@ def wait_concurrent(x):
udf_server.add_function(wait)
udf_server.add_function(wait_concurrent)
udf_server.add_function(url_len)
udf_server.serve()
udf_server.serve()
Loading