Skip to content

Commit

Permalink
fix(metrics): use current actor id for backpressure (risingwavelabs#3721
Browse files Browse the repository at this point in the history
)

* fix(metrics): use current actor id for backpressure

Signed-off-by: Alex Chi <[email protected]>

* fix

Signed-off-by: Alex Chi <[email protected]>
  • Loading branch information
skyzh authored Jul 7, 2022
1 parent 477a4be commit ca0f431
Show file tree
Hide file tree
Showing 8 changed files with 286 additions and 325 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion grafana/risingwave-dashboard.json

Large diffs are not rendered by default.

494 changes: 250 additions & 244 deletions grafana/risingwave-dashboard.py

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions src/stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ log = "0.4"
madsim = "=0.2.0-alpha.3"
maplit = "1.0.2"
memcomparable = { path = "../utils/memcomparable" }
minstant = "0.1"
num-traits = "0.2"
parking_lot = "0.12"
paste = "1"
Expand Down
91 changes: 25 additions & 66 deletions src/stream/src/executor/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use futures::Stream;
use futures_async_stream::try_stream;
use itertools::Itertools;
use madsim::collections::{HashMap, HashSet};
use madsim::time::Instant;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::buffer::BitmapBuilder;
use risingwave_common::error::{internal_error, Result};
Expand Down Expand Up @@ -50,11 +49,7 @@ type BoxedOutput = Box<dyn Output>;
pub struct LocalOutput {
actor_id: ActorId,

actor_id_str: String,

ch: Sender<Message>,

metrics: Arc<StreamingMetrics>,
}

impl Debug for LocalOutput {
Expand All @@ -66,38 +61,18 @@ impl Debug for LocalOutput {
}

impl LocalOutput {
pub fn new(actor_id: ActorId, ch: Sender<Message>, metrics: Arc<StreamingMetrics>) -> Self {
Self {
actor_id,
actor_id_str: actor_id.to_string(),
ch,
metrics,
}
pub fn new(actor_id: ActorId, ch: Sender<Message>) -> Self {
Self { actor_id, ch }
}
}

#[async_trait]
impl Output for LocalOutput {
async fn send(&mut self, message: Message) -> Result<()> {
// if the buffer is full when sending, the sender is backpressured
if self.ch.capacity() == 0 {
let start_time = Instant::now();
// local channel should never fail
self.ch
.send(message)
.await
.map_err(|_| internal_error("failed to send"))?;
self.metrics
.actor_output_buffer_blocking_duration
.with_label_values(&[&self.actor_id_str])
.inc_by(start_time.elapsed().as_nanos() as u64);
} else {
self.ch
.send(message)
.await
.map_err(|_| internal_error("failed to send"))?;
};

self.ch
.send(message)
.await
.map_err(|_| internal_error("failed to send"))?;
Ok(())
}

Expand All @@ -110,11 +85,7 @@ impl Output for LocalOutput {
pub struct RemoteOutput {
actor_id: ActorId,

actor_id_str: String,

ch: Sender<Message>,

metrics: Arc<StreamingMetrics>,
}

impl Debug for RemoteOutput {
Expand All @@ -126,13 +97,8 @@ impl Debug for RemoteOutput {
}

impl RemoteOutput {
pub fn new(actor_id: ActorId, ch: Sender<Message>, metrics: Arc<StreamingMetrics>) -> Self {
Self {
actor_id,
actor_id_str: actor_id.to_string(),
ch,
metrics,
}
pub fn new(actor_id: ActorId, ch: Sender<Message>) -> Self {
Self { actor_id, ch }
}
}

Expand All @@ -143,24 +109,11 @@ impl Output for RemoteOutput {
Message::Chunk(chk) => Message::Chunk(chk.compact()?),
_ => message,
};
// if the buffer is full when sending, the sender is backpressured
if self.ch.capacity() == 0 {
let start_time = Instant::now();
// local channel should never fail
self.ch
.send(message)
.await
.map_err(|_| internal_error("failed to send"))?;
self.metrics
.actor_output_buffer_blocking_duration
.with_label_values(&[&self.actor_id_str])
.inc_by(start_time.elapsed().as_nanos() as u64);
} else {
self.ch
.send(message)
.await
.map_err(|_| internal_error("failed to send"))?;
};

self.ch
.send(message)
.await
.map_err(|_| internal_error("failed to send"))?;

Ok(())
}
Expand All @@ -175,14 +128,13 @@ pub fn new_output(
addr: HostAddr,
actor_id: ActorId,
down_id: ActorId,
metrics: Arc<StreamingMetrics>,
) -> Result<Box<dyn Output>> {
let tx = context.take_sender(&(actor_id, down_id))?;
if is_local_address(&addr, &context.addr) {
// if this is a local downstream actor
Ok(Box::new(LocalOutput::new(down_id, tx, metrics)) as Box<dyn Output>)
Ok(Box::new(LocalOutput::new(down_id, tx)) as Box<dyn Output>)
} else {
Ok(Box::new(RemoteOutput::new(down_id, tx, metrics)) as Box<dyn Output>)
Ok(Box::new(RemoteOutput::new(down_id, tx)) as Box<dyn Output>)
}
}

Expand Down Expand Up @@ -219,7 +171,7 @@ impl DispatchExecutorInner {
.actor_out_record_cnt
.with_label_values(&[&self.actor_id_str])
.inc_by(chunk.cardinality() as _);

let start_time = minstant::Instant::now();
if self.dispatchers.len() == 1 {
// special clone optimization when there is only one downstream dispatcher
self.single_inner_mut().dispatch_data(chunk).await?;
Expand All @@ -228,14 +180,23 @@ impl DispatchExecutorInner {
dispatcher.dispatch_data(chunk.clone()).await?;
}
}
self.metrics
.actor_output_buffer_blocking_duration_ns
.with_label_values(&[&self.actor_id_str])
.inc_by(start_time.elapsed().as_nanos() as u64);
}
Message::Barrier(barrier) => {
let start_time = minstant::Instant::now();
let mutation = barrier.mutation.clone();
self.pre_mutate_outputs(&mutation).await?;
for dispatcher in &mut self.dispatchers {
dispatcher.dispatch_barrier(barrier.clone()).await?;
}
self.post_mutate_outputs(&mutation).await?;
self.metrics
.actor_output_buffer_blocking_duration_ns
.with_label_values(&[&self.actor_id_str])
.inc_by(start_time.elapsed().as_nanos() as u64);
}
};
Ok(())
Expand Down Expand Up @@ -270,7 +231,6 @@ impl DispatchExecutorInner {
downstream_addr,
self.actor_id,
down_id,
self.metrics.clone(),
)?);
}
dispatcher.set_outputs(new_outputs)
Expand All @@ -293,7 +253,6 @@ impl DispatchExecutorInner {
downstream_addr,
self.actor_id,
down_id,
self.metrics.clone(),
)?);
}
dispatcher.add_outputs(outputs_to_add);
Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/executor/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ async fn test_merger_sum_aggr() {
ActorContext::create(),
0,
0,
metrics.clone(),
metrics,
);
let append_only = false;
// for the local aggregator, we need two states: row count and sum
Expand All @@ -79,7 +79,7 @@ async fn test_merger_sum_aggr() {
let (tx, rx) = channel(16);
let consumer = SenderConsumer {
input: aggregator.boxed(),
channel: Box::new(LocalOutput::new(233, tx, metrics)),
channel: Box::new(LocalOutput::new(233, tx)),
};
let context = SharedContext::for_test().into();
let actor = Actor::new(
Expand Down Expand Up @@ -108,7 +108,7 @@ async fn test_merger_sum_aggr() {
let (actor, channel) = make_actor(rx);
outputs.push(channel);
handles.push(tokio::spawn(actor.run()));
inputs.push(Box::new(LocalOutput::new(233, tx, metrics.clone())) as Box<dyn Output>);
inputs.push(Box::new(LocalOutput::new(233, tx)) as Box<dyn Output>);
}

// create a round robin dispatcher, which dispatches messages to the actors
Expand Down
8 changes: 4 additions & 4 deletions src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub struct StreamingMetrics {
pub actor_processing_time: GenericGaugeVec<AtomicF64>,
pub actor_barrier_time: GenericGaugeVec<AtomicF64>,
pub actor_execution_time: GenericGaugeVec<AtomicF64>,
pub actor_output_buffer_blocking_duration: GenericCounterVec<AtomicU64>,
pub actor_output_buffer_blocking_duration_ns: GenericCounterVec<AtomicU64>,
pub actor_scheduled_duration: GenericGaugeVec<AtomicF64>,
pub actor_scheduled_cnt: GenericGaugeVec<AtomicI64>,
pub actor_fast_poll_duration: GenericGaugeVec<AtomicF64>,
Expand Down Expand Up @@ -88,8 +88,8 @@ impl StreamingMetrics {
)
.unwrap();

let actor_output_buffer_blocking_duration = register_int_counter_vec_with_registry!(
"stream_actor_output_buffer_blocking_duration",
let actor_output_buffer_blocking_duration_ns = register_int_counter_vec_with_registry!(
"stream_actor_output_buffer_blocking_duration_ns",
"Total blocking duration (ns) of output buffer",
&["actor_id"],
registry
Expand Down Expand Up @@ -239,7 +239,7 @@ impl StreamingMetrics {
actor_processing_time,
actor_barrier_time,
actor_execution_time,
actor_output_buffer_blocking_duration,
actor_output_buffer_blocking_duration_ns,
actor_scheduled_duration,
actor_scheduled_cnt,
actor_fast_poll_duration,
Expand Down
8 changes: 1 addition & 7 deletions src/stream/src/task/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,13 +417,7 @@ impl LocalStreamManagerCore {
.iter()
.map(|down_id| {
let downstream_addr = self.get_actor_info(down_id)?.get_host()?.into();
new_output(
&self.context,
downstream_addr,
actor_id,
*down_id,
self.streaming_metrics.clone(),
)
new_output(&self.context, downstream_addr, actor_id, *down_id)
})
.collect::<Result<Vec<_>>>()?;

Expand Down

0 comments on commit ca0f431

Please sign in to comment.