From 1da2b8470574a4fae68aa628820ad44c289ad6bf Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Wed, 4 Dec 2024 14:59:48 +0800 Subject: [PATCH] chore: adjust fuse_time_travel_size() --- .../table_functions/fuse_time_travel_size.rs | 28 ++++++++++++++----- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/src/query/storages/fuse/src/table_functions/fuse_time_travel_size.rs b/src/query/storages/fuse/src/table_functions/fuse_time_travel_size.rs index 63db705c821a7..f4e4d360ec80b 100644 --- a/src/query/storages/fuse/src/table_functions/fuse_time_travel_size.rs +++ b/src/query/storages/fuse/src/table_functions/fuse_time_travel_size.rs @@ -88,8 +88,9 @@ impl SimpleArgFunc for FuseTimeTravelSize { ), TableField::new( "latest_snapshot_size", - TableDataType::Number(NumberDataType::UInt64), + TableDataType::Number(NumberDataType::UInt64).wrap_nullable(), ), + TableField::new("error", TableDataType::String.wrap_nullable()), ]) } @@ -103,6 +104,7 @@ impl SimpleArgFunc for FuseTimeTravelSize { let mut table_names = Vec::new(); let mut sizes = Vec::new(); let mut latest_snapshot_sizes = Vec::new(); + let mut errors = Vec::new(); let catalog = ctx.get_default_catalog()?; let dbs = match &args.database_name { Some(db_name) => { @@ -148,14 +150,24 @@ impl SimpleArgFunc for FuseTimeTravelSize { database_names.push(db.name().to_string()); table_names.push(tbl.name().to_string()); sizes.push(time_travel_size); - latest_snapshot_sizes.push(latest_snapshot_size); + match latest_snapshot_size { + Ok(size) => { + latest_snapshot_sizes.push(Some(size)); + errors.push(None); + } + Err(e) => { + latest_snapshot_sizes.push(None); + errors.push(Some(e.to_string())); + } + } } } Ok(DataBlock::new_from_columns(vec![ StringType::from_data(database_names), StringType::from_data(table_names), UInt64Type::from_data(sizes), - UInt64Type::from_data(latest_snapshot_sizes), + UInt64Type::from_opt_data(latest_snapshot_sizes), + StringType::from_opt_data(errors), ])) } } @@ -173,7 +185,7 @@ async fn get_time_travel_size(storage_prefix: &str, op: &Operator) -> Result Result<(u64, u64)> { +async fn calc_tbl_size(tbl: &FuseTable) -> Result<(u64, Result)> { info!( "fuse_time_travel_size start calc_tbl_size:{}", tbl.get_table_info().desc @@ -188,11 +200,13 @@ async fn calc_tbl_size(tbl: &FuseTable) -> Result<(u64, u64)> { Some(snapshot_location) => { let start = std::time::Instant::now(); info!("fuse_time_travel_size will read: {}", snapshot_location); - let (snapshot, _) = SnapshotsIO::read_snapshot(snapshot_location, operator).await?; + let snapshot = SnapshotsIO::read_snapshot(snapshot_location, operator).await; info!("read_snapshot cost: {:?}", start.elapsed()); - snapshot.summary.compressed_byte_size + snapshot.summary.index_size + snapshot.map(|(snapshot, _)| { + snapshot.summary.compressed_byte_size + snapshot.summary.index_size + }) } - None => 0, + None => Ok(0), }; Ok((time_travel_size, latest_snapshot_size)) }