Skip to content

Commit

Permalink
feat(streaming): migrate TopNExecutor to ExecutorV2
Browse files Browse the repository at this point in the history
  • Loading branch information
a9QrX3Lu committed Apr 10, 2022
1 parent 3f5f703 commit ed71798
Show file tree
Hide file tree
Showing 7 changed files with 1,075 additions and 846 deletions.
860 changes: 16 additions & 844 deletions src/stream/src/executor/top_n.rs

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/stream/src/executor/top_n_appendonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ impl<S: StateStore> StatefulExecutor for AppendOnlyTopNExecutor<S> {
}
}

pub(super) fn generate_output(
pub fn generate_output(
new_rows: Vec<Row>,
new_ops: Vec<Op>,
schema: &Schema,
Expand Down
7 changes: 7 additions & 0 deletions src/stream/src/executor_v2/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ pub enum StreamExecutorError {
#[error("aggregate state error {0}")]
AggStateError(RwError),

#[error("top n state error {0}")]
TopNStateError(RwError),

#[error("channel `{0}` closed")]
ChannelClosed(String),
}
Expand All @@ -57,6 +60,10 @@ impl StreamExecutorError {
Self::AggStateError(error.into()).into()
}

pub fn top_n_state_error(error: impl Into<RwError>) -> TracedStreamExecutorError {
Self::TopNStateError(error.into()).into()
}

pub fn channel_closed(name: impl Into<String>) -> TracedStreamExecutorError {
Self::ChannelClosed(name.into()).into()
}
Expand Down
3 changes: 3 additions & 0 deletions src/stream/src/executor_v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ pub mod receiver;
mod simple;
#[cfg(test)]
mod test_utils;
mod top_n;
mod top_n_executor;
mod v1_compat;

pub use batch_query::BatchQueryExecutor;
Expand All @@ -50,6 +52,7 @@ pub use local_simple_agg::LocalSimpleAggExecutor;
pub use merge::MergeExecutor;
pub use mview::*;
pub(crate) use simple::{SimpleExecutor, SimpleExecutorWrapper};
pub use top_n::TopNExecutor;
pub use v1_compat::StreamExecutorV1;

pub type BoxedExecutor = Box<dyn Executor>;
Expand Down
Loading

0 comments on commit ed71798

Please sign in to comment.