Skip to content

Commit

Permalink
refactor(stream): remove Executor::clear_cache (#1820)
Browse files Browse the repository at this point in the history
Signed-off-by: TennyZhuang <[email protected]>
  • Loading branch information
TennyZhuang authored Apr 13, 2022
1 parent 1deb559 commit 747d2d1
Show file tree
Hide file tree
Showing 8 changed files with 0 additions and 66 deletions.
8 changes: 0 additions & 8 deletions src/stream/src/executor/managed_state/top_n/top_n_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,14 +377,6 @@ impl<S: StateStore, const TOP_N_TYPE: usize> ManagedTopNState<S, TOP_N_TYPE> {
self.retain_top_n();
Ok(())
}

pub fn clear_cache(&mut self) {
assert!(
!self.is_dirty(),
"cannot clear cache while top n state is dirty"
);
self.top_n.clear();
}
}

/// Test-related methods
Expand Down
9 changes: 0 additions & 9 deletions src/stream/src/executor_v2/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,6 @@ pub trait AggExecutor: Send + 'static {

/// See [`Executor::identity`].
fn identity(&self) -> &str;

/// See [`Executor::clear_cache`].
fn clear_cache(&mut self) -> super::Result<()> {
Ok(())
}
}

/// The struct wraps a [`AggExecutor`]
Expand Down Expand Up @@ -78,10 +73,6 @@ where
fn identity(&self) -> &str {
self.inner.identity()
}

fn clear_cache(&mut self) -> super::Result<()> {
self.inner.clear_cache()
}
}

impl<E> AggExecutorWrapper<E>
Expand Down
13 changes: 0 additions & 13 deletions src/stream/src/executor_v2/global_simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,6 @@ impl<S: StateStore> AggSimpleAggExecutor<S> {
key_indices,
})
}

fn is_dirty(&self) -> bool {
self.states.as_ref().map(|s| s.is_dirty()).unwrap_or(false)
}
}

#[async_trait]
Expand Down Expand Up @@ -241,15 +237,6 @@ impl<S: StateStore> AggExecutor for AggSimpleAggExecutor<S> {
fn identity(&self) -> &str {
self.info.identity.as_str()
}

fn clear_cache(&mut self) -> Result<()> {
assert!(
!self.is_dirty(),
"cannot clear cache while states of simple agg are dirty"
);
self.states.take();
Ok(())
}
}

#[cfg(test)]
Expand Down
9 changes: 0 additions & 9 deletions src/stream/src/executor_v2/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,15 +366,6 @@ impl<K: HashKey, S: StateStore> AggExecutor for AggHashAggExecutor<K, S> {
fn identity(&self) -> &str {
self.info.identity.as_str()
}

fn clear_cache(&mut self) -> Result<()> {
assert!(
!self.is_dirty(),
"cannot clear cache while states of hash agg are dirty"
);
self.state_map.clear();
Ok(())
}
}

#[cfg(test)]
Expand Down
4 changes: 0 additions & 4 deletions src/stream/src/executor_v2/lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,4 @@ impl<S: StateStore> Executor for LookupExecutor<S> {
fn identity(&self) -> &str {
"<unknown>"
}

fn clear_cache(&mut self) -> Result<()> {
Ok(())
}
}
6 changes: 0 additions & 6 deletions src/stream/src/executor_v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use error::StreamExecutorResult;
use futures::stream::BoxStream;
pub use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::error::Result;

pub use super::executor::{
Barrier, Executor as ExecutorV1, Message, Mutation, PkIndices, PkIndicesRef,
Expand Down Expand Up @@ -148,9 +147,4 @@ pub trait Executor: Send + 'static {
{
Box::new(self)
}

/// Clears the in-memory cache of the executor. It's no-op by default.
fn clear_cache(&mut self) -> Result<()> {
Ok(())
}
}
9 changes: 0 additions & 9 deletions src/stream/src/executor_v2/top_n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,15 +208,6 @@ impl<S: StateStore> Executor for InnerTopNExecutor<S> {
fn identity(&self) -> &str {
&self.info.identity
}

fn clear_cache(&mut self) -> Result<()> {
self.managed_lowest_state.clear_cache();
self.managed_middle_state.clear_cache();
self.managed_highest_state.clear_cache();
self.first_execution = true;

Ok(())
}
}

#[async_trait]
Expand Down
8 changes: 0 additions & 8 deletions src/stream/src/executor_v2/top_n_appendonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,14 +197,6 @@ impl<S: StateStore> Executor for InnerAppendOnlyTopNExecutor<S> {
fn identity(&self) -> &str {
&self.info.identity
}

fn clear_cache(&mut self) -> Result<()> {
self.managed_lower_state.clear_cache();
self.managed_higher_state.clear_cache();
self.first_execution = true;

Ok(())
}
}

#[async_trait]
Expand Down

0 comments on commit 747d2d1

Please sign in to comment.