Skip to content

Commit

Permalink
feat(streaming): migrate TopNExecutor to ExecutorV2 (#1737)
Browse files Browse the repository at this point in the history
  • Loading branch information
a9QrX3Lu authored Apr 11, 2022
1 parent 01947e4 commit fc93222
Show file tree
Hide file tree
Showing 7 changed files with 1,075 additions and 846 deletions.
860 changes: 16 additions & 844 deletions src/stream/src/executor/top_n.rs

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/stream/src/executor/top_n_appendonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ impl<S: StateStore> StatefulExecutor for AppendOnlyTopNExecutor<S> {
}
}

pub(super) fn generate_output(
pub fn generate_output(
new_rows: Vec<Row>,
new_ops: Vec<Op>,
schema: &Schema,
Expand Down
7 changes: 7 additions & 0 deletions src/stream/src/executor_v2/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ pub enum StreamExecutorError {
#[error("input error")]
InputError(RwError),

#[error("top n state error {0}")]
TopNStateError(RwError),

#[error("channel `{0}` closed")]
ChannelClosed(String),
}
Expand All @@ -64,6 +67,10 @@ impl StreamExecutorError {
Self::InputError(error.into()).into()
}

pub fn top_n_state_error(error: impl Into<RwError>) -> TracedStreamExecutorError {
Self::TopNStateError(error.into()).into()
}

pub fn channel_closed(name: impl Into<String>) -> TracedStreamExecutorError {
Self::ChannelClosed(name.into()).into()
}
Expand Down
3 changes: 3 additions & 0 deletions src/stream/src/executor_v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ pub mod receiver;
mod simple;
#[cfg(test)]
mod test_utils;
mod top_n;
mod top_n_executor;
mod v1_compat;

pub use batch_query::BatchQueryExecutor;
Expand All @@ -53,6 +55,7 @@ pub use lookup::*;
pub use merge::MergeExecutor;
pub use mview::*;
pub(crate) use simple::{SimpleExecutor, SimpleExecutorWrapper};
pub use top_n::TopNExecutor;
pub use v1_compat::StreamExecutorV1;

pub type BoxedExecutor = Box<dyn Executor>;
Expand Down
Loading

0 comments on commit fc93222

Please sign in to comment.