Skip to content

Commit

Permalink
fix clippy
Browse files Browse the repository at this point in the history
  • Loading branch information
WindowsXp-Beta committed Apr 13, 2022
1 parent 581d009 commit 1efd827
Show file tree
Hide file tree
Showing 4 changed files with 3 additions and 30 deletions.
18 changes: 0 additions & 18 deletions src/stream/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,24 +510,6 @@ pub fn create_executor(
Ok(real_executor)
}

/// `SimpleExecutor` accepts a single chunk as input.
pub trait SimpleExecutor: Executor {
fn consume_chunk(&mut self, chunk: StreamChunk) -> Result<Message>;
fn input(&mut self) -> &mut dyn Executor;
}

/// Most executors don't care about the control messages, and therefore
/// this method provides a default implementation helper for them.
async fn simple_executor_next<E: SimpleExecutor>(executor: &mut E) -> Result<Message> {
match executor.input().next().await {
Ok(message) => match message {
Message::Chunk(chunk) => executor.consume_chunk(chunk),
Message::Barrier(_) => Ok(message),
},
Err(e) => Err(e),
}
}

/// `StreamConsumer` is the last step in an actor
#[async_trait]
pub trait StreamConsumer: Send + Debug + 'static {
Expand Down
9 changes: 0 additions & 9 deletions src/stream/src/executor/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,6 @@ impl MockSource {
}
}

pub fn with_chunks(schema: Schema, pk_indices: PkIndices, chunks: Vec<StreamChunk>) -> Self {
Self {
schema,
pk_indices,
epoch: 0,
msgs: chunks.into_iter().map(Message::Chunk).collect(),
}
}

pub fn push_chunks(&mut self, chunks: impl Iterator<Item = StreamChunk>) {
self.msgs.extend(chunks.map(Message::Chunk));
}
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor_v2/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl SimpleProjectExecutor {
};
Self {
info: ExecutorInfo {
schema: schema,
schema,
pk_indices: input_info.pk_indices,
identity: format!("ProjectExecutor {:X}", executor_id),
},
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor_v2/v1_compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use super::filter::SimpleFilterExecutor;
use super::project::SimpleProjectExecutor;
use super::{
BatchQueryExecutor, BoxedExecutor, ChainExecutor, Executor, ExecutorInfo, FilterExecutor,
HashAggExecutor, LocalSimpleAggExecutor, MaterializeExecutor, ProjectExecutor
HashAggExecutor, LocalSimpleAggExecutor, MaterializeExecutor, ProjectExecutor,
};
pub use super::{BoxedMessageStream, ExecutorV1, Message, PkIndices, PkIndicesRef};
use crate::executor::AggCall;
Expand Down Expand Up @@ -165,7 +165,7 @@ impl ProjectExecutor {
) -> Self {
let info = ExecutorInfo {
schema: input.schema().to_owned(),
pk_indices: pk_indices,
pk_indices,
identity: "Project".to_owned(),
};
let input = Box::new(ExecutorV1AsV2(input));
Expand Down

0 comments on commit 1efd827

Please sign in to comment.