Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(stream): remove Executor::clear_cache #1820

Merged
merged 1 commit into from
Apr 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions src/stream/src/executor/managed_state/top_n/top_n_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,14 +377,6 @@ impl<S: StateStore, const TOP_N_TYPE: usize> ManagedTopNState<S, TOP_N_TYPE> {
self.retain_top_n();
Ok(())
}

pub fn clear_cache(&mut self) {
assert!(
!self.is_dirty(),
"cannot clear cache while top n state is dirty"
);
self.top_n.clear();
}
}

/// Test-related methods
Expand Down
9 changes: 0 additions & 9 deletions src/stream/src/executor_v2/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,6 @@ pub trait AggExecutor: Send + 'static {

/// See [`Executor::identity`].
fn identity(&self) -> &str;

/// See [`Executor::clear_cache`].
fn clear_cache(&mut self) -> super::Result<()> {
Ok(())
}
}

/// The struct wraps a [`AggExecutor`]
Expand Down Expand Up @@ -78,10 +73,6 @@ where
fn identity(&self) -> &str {
self.inner.identity()
}

fn clear_cache(&mut self) -> super::Result<()> {
self.inner.clear_cache()
}
}

impl<E> AggExecutorWrapper<E>
Expand Down
13 changes: 0 additions & 13 deletions src/stream/src/executor_v2/global_simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,6 @@ impl<S: StateStore> AggSimpleAggExecutor<S> {
key_indices,
})
}

fn is_dirty(&self) -> bool {
self.states.as_ref().map(|s| s.is_dirty()).unwrap_or(false)
}
}

#[async_trait]
Expand Down Expand Up @@ -241,15 +237,6 @@ impl<S: StateStore> AggExecutor for AggSimpleAggExecutor<S> {
fn identity(&self) -> &str {
self.info.identity.as_str()
}

fn clear_cache(&mut self) -> Result<()> {
assert!(
!self.is_dirty(),
"cannot clear cache while states of simple agg are dirty"
);
self.states.take();
Ok(())
}
}

#[cfg(test)]
Expand Down
9 changes: 0 additions & 9 deletions src/stream/src/executor_v2/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,15 +366,6 @@ impl<K: HashKey, S: StateStore> AggExecutor for AggHashAggExecutor<K, S> {
fn identity(&self) -> &str {
self.info.identity.as_str()
}

fn clear_cache(&mut self) -> Result<()> {
assert!(
!self.is_dirty(),
"cannot clear cache while states of hash agg are dirty"
);
self.state_map.clear();
Ok(())
}
}

#[cfg(test)]
Expand Down
4 changes: 0 additions & 4 deletions src/stream/src/executor_v2/lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,4 @@ impl<S: StateStore> Executor for LookupExecutor<S> {
fn identity(&self) -> &str {
"<unknown>"
}

fn clear_cache(&mut self) -> Result<()> {
Ok(())
}
}
6 changes: 0 additions & 6 deletions src/stream/src/executor_v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use error::StreamExecutorResult;
use futures::stream::BoxStream;
pub use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::error::Result;

pub use super::executor::{
Barrier, Executor as ExecutorV1, Message, Mutation, PkIndices, PkIndicesRef,
Expand Down Expand Up @@ -148,9 +147,4 @@ pub trait Executor: Send + 'static {
{
Box::new(self)
}

/// Clears the in-memory cache of the executor. It's no-op by default.
fn clear_cache(&mut self) -> Result<()> {
Ok(())
}
}
9 changes: 0 additions & 9 deletions src/stream/src/executor_v2/top_n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,15 +208,6 @@ impl<S: StateStore> Executor for InnerTopNExecutor<S> {
fn identity(&self) -> &str {
&self.info.identity
}

fn clear_cache(&mut self) -> Result<()> {
self.managed_lowest_state.clear_cache();
self.managed_middle_state.clear_cache();
self.managed_highest_state.clear_cache();
self.first_execution = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is quite suspicious. Why setting first_execution here before? Could that be related to @BugenZhao 's recently discovered bug?

By the way, we can remove no cache e2e test in CI.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The HashJoin has not been migrated to executor v2 and its clear_cache still makes sense. Let's keep it for a while.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, the first_execution indicates whether we need to fetch the state from storage and was necessary when writing this. 🤣 I'll investigate into TopN in next days. cc @lmatz


Ok(())
}
}

#[async_trait]
Expand Down
8 changes: 0 additions & 8 deletions src/stream/src/executor_v2/top_n_appendonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,14 +197,6 @@ impl<S: StateStore> Executor for InnerAppendOnlyTopNExecutor<S> {
fn identity(&self) -> &str {
&self.info.identity
}

fn clear_cache(&mut self) -> Result<()> {
self.managed_lower_state.clear_cache();
self.managed_higher_state.clear_cache();
self.first_execution = true;

Ok(())
}
}

#[async_trait]
Expand Down