Skip to content

Commit

Permalink
Fix bugs.
Browse files Browse the repository at this point in the history
  • Loading branch information
RinChanNOWWW committed Nov 28, 2023
1 parent cfebe94 commit 66601f8
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ impl<T: BlockingTransform + 'static> Processor for BlockingTransformer<T> {
return Ok(Event::NeedConsume);
}

if let Some(output) = self.output_data.take() {
self.output.push_data(Ok(output));
return Ok(Event::NeedConsume);
}

if !self.need_data {
// There is data needed to be transformed.
return Ok(Event::Sync);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ pub struct TransformSRF {
func_ctx: FunctionContext,
projections: ColumnSet,
srf_exprs: Vec<Expr>,
/// The output number of rows for each input row.
num_rows: VecDeque<usize>,
/// The output of each set-returning function for each input row.
srf_results: Vec<VecDeque<(Value<AnyType>, usize)>>,
input: Option<DataBlock>,
max_block_size: usize,
Expand All @@ -57,12 +59,13 @@ impl TransformSRF {
srf_exprs: Vec<Expr>,
max_block_size: usize,
) -> Box<dyn Processor> {
let srf_results = vec![VecDeque::new(); srf_exprs.len()];
BlockingTransformer::create(input, output, TransformSRF {
func_ctx,
projections,
srf_exprs,
num_rows: VecDeque::new(),
srf_results: Vec::new(),
srf_results,
input: None,
max_block_size,
})
Expand All @@ -86,23 +89,18 @@ impl BlockingTransform for TransformSRF {
// ]
let input_num_rows = input.num_rows();
let mut max_nums_per_row = vec![0; input_num_rows];
let srf_results = self
.srf_exprs
.iter()
.map(|srf_expr| {
let res = eval.run_srf(srf_expr, &mut max_nums_per_row)?;
debug_assert_eq!(res.len(), input_num_rows);
Ok(VecDeque::from(res))
})
.collect::<Result<Vec<_>>>()?;
for (i, expr) in self.srf_exprs.iter().enumerate() {
let res = eval.run_srf(expr, &mut max_nums_per_row)?;
debug_assert_eq!(res.len(), input_num_rows);
debug_assert!(self.srf_results[i].is_empty());
self.srf_results[i] = VecDeque::from(res);
}

debug_assert_eq!(max_nums_per_row.len(), input_num_rows);
debug_assert!(self.num_rows.is_empty());
debug_assert!(self.srf_results.is_empty());
debug_assert!(self.input.is_none());

self.num_rows = VecDeque::from(max_nums_per_row);
self.srf_results = srf_results;
self.input = Some(input.project(&self.projections));

Ok(())
Expand All @@ -119,11 +117,11 @@ impl BlockingTransform for TransformSRF {

for num_rows in self.num_rows.iter() {
result_size += num_rows;
used += 1;
// TBD: if we need to limit `result_size` under `max_block_size`.
if result_size >= self.max_block_size {
break;
}
used += 1;
}

// TODO: if there is only one row can be used, we can use `Value::Scalar` directly.
Expand Down Expand Up @@ -272,7 +270,7 @@ impl BlockingTransform for TransformSRF {
// Release consumed rows.
self.num_rows.drain(0..used);
// `self.srf_results` is already drained.
let input = input.slice(0..used);
let input = input.slice(used..input.num_rows());
if input.num_rows() == 0 {
debug_assert!(self.num_rows.is_empty());
debug_assert!(self.srf_results.iter().all(|res| res.is_empty()));
Expand Down

0 comments on commit 66601f8

Please sign in to comment.