From 8498c2a9dbead5d62c9adbbdcc0180b69b6ea016 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Mon, 7 Nov 2022 19:50:47 +0800 Subject: [PATCH 1/2] fix(copy): fix wrong assert. --- .../processors/sources/input_formats/input_pipeline.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/input_pipeline.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/input_pipeline.rs index 8e591fb04907e..55e4c0393a44c 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/input_pipeline.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/input_pipeline.rs @@ -21,6 +21,7 @@ use common_base::base::tokio::sync::mpsc::Sender; use common_base::base::GlobalIORuntime; use common_base::base::TrySpawn; use common_datablocks::DataBlock; +use common_exception::ErrorCode; use common_exception::Result; use common_pipeline_core::Pipeline; use futures::AsyncRead; @@ -288,6 +289,12 @@ pub trait InputFormatPipe: Sized + Send + 'static { let mut batch = vec![0u8; batch_size]; let n = read_full(&mut reader, &mut batch[0..]).await?; if n == 0 { + if total_read != size { + return Err(ErrorCode::BadBytes(format!( + "split {} expect {} bytes, read only {} bytes", + split_info, size, total_read + ))); + } break; } else { total_read += n; @@ -299,7 +306,6 @@ pub trait InputFormatPipe: Sized + Send + 'static { } } } - assert_eq!(total_read, size); tracing::debug!("finished"); Ok(()) } From cbc23b4312363f2d2c43536858f5c3593752c5b3 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Mon, 7 Nov 2022 19:59:53 +0800 Subject: [PATCH 2/2] fix(copy): fix assert about decompress. --- .../processors/sources/input_formats/input_format_text.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs index 2acfc2f51b096..d3aa7490a3e23 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs @@ -352,7 +352,10 @@ impl AligningStateTrait for AligningState { T::align(self, &buf)? } else { if let Some(decoder) = &self.decoder { - assert_eq!(decoder.state(), DecompressState::Done) + let state = decoder.state(); + if !matches!(state, DecompressState::Done | DecompressState::Reading) { + tracing::warn!("decompressor end with state {:?}", state) + } } self.flush() };