Skip to content

Commit

Permalink
feat(metrics): Add Fragment Level Exchange Metrics (risingwavelabs#3696)
Browse files Browse the repository at this point in the history
* Edit update.sh

* Edit format error

* Add payload to gitignore

* Revert risedev.yml modification

* Fix wrong gitignore

* Fix wrong gitignore

* Done coding for Fragment level exchange

* Done coding for Fragment level exchange

* Edit implementation error

* Edit down fragment error

* Delete println statement and revert configuration settings

* Done coding for Fragment level exchange

* Edit update.sh

* Edit format error

* Add payload to gitignore

* Revert risedev.yml modification

* Fix wrong gitignore

* Done coding for Fragment level exchange

* Edit implementation error

* Edit down fragment error

* Delete println statement and revert configuration settings

* Edit Format and Resolve conflict

* Resolve Test error

* Resolve some error

* generate dashboard

* use rate_interval instead of 15s

Signed-off-by: Alex Chi <[email protected]>

Co-authored-by: Alex Chi <[email protected]>
  • Loading branch information
2 people authored and nasnoisaac committed Aug 9, 2022
1 parent ec22e86 commit 702e759
Show file tree
Hide file tree
Showing 15 changed files with 130 additions and 28 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,4 @@ log/
# generated files
risedev-components.user.env
riselab-components.user.env
risedev-compose.yml
risedev-compose.yml
2 changes: 1 addition & 1 deletion grafana/risingwave-dashboard.json

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions grafana/risingwave-dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,16 @@ def section_streaming_exchange(outer_panels):
"rate(stream_exchange_recv_size[$__rate_interval])", "{{up_actor_id}}->{{down_actor_id}}"
),
]),
panels.timeseries_bytes_per_sec("Fragment Exchange Send Throughput",[
panels.target(
"rate(stream_exchange_frag_send_size[$__rate_interval])","{{up_fragment_id}}->{{down_fragment_id}}"
),
]),
panels.timeseries_bytes_per_sec("Fragment Exchange Recv Throughput", [
panels.target(
"rate(stream_exchange_frag_recv_size[$__rate_interval])", "{{up_fragment_id}}->{{down_fragment_id}}"
),
]),
]),
]

Expand Down
2 changes: 2 additions & 0 deletions grafana/update.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ set -euo pipefail
echo "$(tput setaf 4)Upload dashboard to localhost:3001$(tput sgr0)"

payload="{\"dashboard\": $(jq . risingwave-dashboard.json), \"overwrite\": true}"

echo "$payload" > payload.txt


curl -X POST \
-H 'Content-Type: application/json' \
-d @payload.txt \
Expand Down
3 changes: 2 additions & 1 deletion proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,9 @@ message HopWindowNode {

message MergeNode {
repeated uint32 upstream_actor_id = 1;
uint32 upstream_fragment_id = 2;
// The schema of input columns. TODO: remove this field.
repeated plan_common.Field fields = 2;
repeated plan_common.Field fields = 3;
}

// passed from frontend to meta, used by fragmenter to generate `MergeNode`
Expand Down
6 changes: 4 additions & 2 deletions proto/task_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,10 @@ message GetDataResponse {
}

message GetStreamRequest {
uint32 up_fragment_id = 1;
uint32 down_fragment_id = 2;
uint32 up_actor_id = 1;
uint32 down_actor_id = 2;
uint32 up_fragment_id = 3;
uint32 down_fragment_id = 4;
}

message ExecuteRequest {
Expand Down
10 changes: 10 additions & 0 deletions src/compute/src/rpc/service/exchange_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use prometheus::{register_int_counter_vec_with_registry, Registry};
pub struct ExchangeServiceMetrics {
pub registry: Registry,
pub stream_exchange_bytes: GenericCounterVec<AtomicU64>,
pub stream_fragment_exchange_bytes: GenericCounterVec<AtomicU64>,
pub actor_sampled_serialize_duration_ns: GenericCounterVec<AtomicU64>,
}

Expand All @@ -31,6 +32,14 @@ impl ExchangeServiceMetrics {
)
.unwrap();

let stream_fragment_exchange_bytes = register_int_counter_vec_with_registry!(
"stream_exchange_frag_send_size",
"Total size of messages that have been send to downstream Fragment",
&["up_fragment_id", "down_fragment_id"],
registry
)
.unwrap();

let actor_sampled_serialize_duration_ns = register_int_counter_vec_with_registry!(
"actor_sampled_serialize_duration_ns",
"Duration (ns) of sampled chunk serialization",
Expand All @@ -42,6 +51,7 @@ impl ExchangeServiceMetrics {
Self {
registry,
stream_exchange_bytes,
stream_fragment_exchange_bytes,
actor_sampled_serialize_duration_ns,
}
}
Expand Down
24 changes: 18 additions & 6 deletions src/compute/src/rpc/service/exchange_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,13 @@ impl ExchangeService for ExchangeServiceImpl {
.remote_addr()
.ok_or_else(|| Status::unavailable("get_stream connection unestablished"))?;
let req = request.into_inner();
let up_down_ids = (req.up_fragment_id, req.down_fragment_id);
let receiver = self.stream_mgr.take_receiver(up_down_ids)?;
match self.get_stream_impl(peer_addr, receiver, up_down_ids).await {
let up_down_actor_ids = (req.up_actor_id, req.down_actor_id);
let up_down_fragment_ids = (req.up_fragment_id, req.down_fragment_id);
let receiver = self.stream_mgr.take_receiver(up_down_actor_ids)?;
match self
.get_stream_impl(peer_addr, receiver, up_down_actor_ids, up_down_fragment_ids)
.await
{
Ok(resp) => Ok(resp),
Err(e) => {
error!(
Expand Down Expand Up @@ -108,14 +112,18 @@ impl ExchangeServiceImpl {
&self,
peer_addr: SocketAddr,
mut receiver: Receiver<Message>,
up_down_ids: (u32, u32),
up_down_actor_ids: (u32, u32),
up_down_fragment_ids: (u32, u32),
) -> Result<Response<<Self as ExchangeService>::GetStreamStream>> {
let (tx, rx) = tokio::sync::mpsc::channel(EXCHANGE_BUFFER_SIZE);
let metrics = self.metrics.clone();
tracing::trace!(target: "events::compute::exchange", peer_addr = %peer_addr, "serve stream exchange RPC");
tokio::spawn(async move {
let up_actor_id = up_down_ids.0.to_string();
let down_actor_id = up_down_ids.1.to_string();
let up_actor_id = up_down_actor_ids.0.to_string();
let down_actor_id = up_down_actor_ids.1.to_string();
let up_fragment_id = up_down_fragment_ids.0.to_string();
let down_fragment_id = up_down_fragment_ids.1.to_string();

let mut rr = 0;
const SAMPLING_FREQUENCY: u64 = 100;

Expand Down Expand Up @@ -162,6 +170,10 @@ impl ExchangeServiceImpl {
.stream_exchange_bytes
.with_label_values(&[&up_actor_id, &down_actor_id])
.inc_by(bytes as u64);
metrics
.stream_fragment_exchange_bytes
.with_label_values(&[&up_fragment_id, &down_fragment_id])
.inc_by(bytes as u64);
Ok(())
}
Err(e) => tx.send(Err(e.into())).await,
Expand Down
35 changes: 30 additions & 5 deletions src/meta/src/stream/stream_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ struct StreamActorDownstream {
struct StreamActorUpstream {
/// Upstream actors
actors: OrderedActorLink,

/// associate fragment id
fragment_id: GlobalFragmentId,
/// Whether to place the upstream actors on the same node
same_worker_node: bool,
}
Expand Down Expand Up @@ -266,13 +267,15 @@ impl StreamActorBuilder {
StreamActorUpstream {
actors,
same_worker_node,
fragment_id,
},
)| {
(
exchange_id,
StreamActorUpstream {
actors: actors.to_global_ids(actor_id_offset, actor_id_len),
same_worker_node,
fragment_id,
},
)
},
Expand Down Expand Up @@ -381,6 +384,7 @@ impl StreamGraphBuilder {
/// Add dependency between two connected node in the graph.
pub fn add_link(
&mut self,
upstream_fragment_id: GlobalFragmentId,
upstream_actor_ids: &[LocalActorId],
downstream_actor_ids: &[LocalActorId],
exchange_operator_id: u64,
Expand Down Expand Up @@ -421,6 +425,7 @@ impl StreamGraphBuilder {
exchange_operator_id,
StreamActorUpstream {
actors: OrderedActorLink(vec![*upstream_id]),
fragment_id: upstream_fragment_id,
same_worker_node,
},
);
Expand Down Expand Up @@ -468,6 +473,7 @@ impl StreamGraphBuilder {
exchange_operator_id,
StreamActorUpstream {
actors: OrderedActorLink(upstream_actor_ids.to_vec()),
fragment_id: upstream_fragment_id,
same_worker_node,
},
);
Expand Down Expand Up @@ -503,9 +509,19 @@ impl StreamGraphBuilder {
.iter()
.map(|(id, StreamActorUpstream { actors, .. })| (*id, actors.clone()))
.collect();
let mut upstream_fragments = builder
.upstreams
.iter()
.map(|(id, StreamActorUpstream { fragment_id, .. })| (*id, *fragment_id))
.collect();
let stream_node = self.build_inner(
ctx,
actor.get_nodes()?,
actor_id,
&mut upstream_actors,
&mut upstream_fragments,
)?;

let stream_node =
self.build_inner(ctx, actor.get_nodes()?, actor_id, &mut upstream_actors)?;
actor.nodes = Some(stream_node);
graph
.entry(builder.get_fragment_id())
Expand All @@ -527,6 +543,7 @@ impl StreamGraphBuilder {
stream_node: &StreamNode,
actor_id: LocalActorId,
upstream_actor_id: &mut HashMap<u64, OrderedActorLink>,
upstream_fragment_id: &mut HashMap<u64, GlobalFragmentId>,
) -> Result<StreamNode> {
let table_id_offset = ctx.table_id_offset;
let mut check_and_fill_internal_table = |table_id: u32, table: Option<Table>| {
Expand Down Expand Up @@ -655,6 +672,7 @@ impl StreamGraphBuilder {
upstream_actor_id: upstream_actor_id
.remove(&input.get_operator_id())
.expect("failed to find upstream actor id for given exchange node").as_global_ids(),
upstream_fragment_id: upstream_fragment_id.get(&input.get_operator_id()).unwrap().as_global_id(),
fields: input.get_fields().clone(),
})),
fields: input.get_fields().clone(),
Expand All @@ -667,8 +685,13 @@ impl StreamGraphBuilder {
new_stream_node.input[idx] = self.resolve_chain_node(input)?;
}
_ => {
new_stream_node.input[idx] =
self.build_inner(ctx, input, actor_id, upstream_actor_id)?;
new_stream_node.input[idx] = self.build_inner(
ctx,
input,
actor_id,
upstream_actor_id,
upstream_fragment_id,
)?;
}
}
}
Expand Down Expand Up @@ -696,6 +719,7 @@ impl StreamGraphBuilder {
pk_indices: stream_node.pk_indices.clone(),
node_body: Some(NodeBody::Merge(MergeNode {
upstream_actor_id: vec![],
upstream_fragment_id: 0,
fields: chain_node.upstream_fields.clone(),
})),
fields: chain_node.upstream_fields.clone(),
Expand Down Expand Up @@ -965,6 +989,7 @@ impl ActorGraphBuilder {
| DispatcherType::Broadcast
| DispatcherType::NoShuffle => {
state.stream_graph_builder.add_link(
fragment_id,
&actor_ids,
downstream_actors,
dispatch_edge.link_id,
Expand Down
8 changes: 6 additions & 2 deletions src/rpc_client/src/compute_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,17 @@ impl ComputeClient {

pub async fn get_stream(
&self,
up_actor_id: u32,
down_actor_id: u32,
up_fragment_id: u32,
down_fragment_id: u32,
) -> Result<Streaming<GetStreamResponse>> {
Ok(self
.exchange_client
.to_owned()
.get_stream(GetStreamRequest {
up_actor_id,
down_actor_id,
up_fragment_id,
down_fragment_id,
})
Expand All @@ -78,8 +82,8 @@ impl ComputeClient {
tracing::error!(
"failed to create stream from remote_input {} from fragment {} to fragment {}",
self.addr,
up_fragment_id,
down_fragment_id
up_actor_id,
down_actor_id
)
})?
.into_inner())
Expand Down
18 changes: 16 additions & 2 deletions src/stream/src/executor/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@ use tonic::Streaming;
use super::error::StreamExecutorError;
use super::*;
use crate::executor::monitor::StreamingMetrics;
use crate::task::UpDownActorIds;
use crate::task::{UpDownActorIds, UpDownFragmentIds};

/// Receive data from `gRPC` and forwards to `MergerExecutor`/`ReceiverExecutor`
pub struct RemoteInput {
stream: Streaming<GetStreamResponse>,
sender: Sender<Message>,
up_down_ids: UpDownActorIds,
up_down_frag: UpDownFragmentIds,
metrics: Arc<StreamingMetrics>,
}

Expand All @@ -46,21 +47,28 @@ impl RemoteInput {
pub async fn create(
client: ComputeClient,
up_down_ids: UpDownActorIds,
up_down_frag: UpDownFragmentIds,
sender: Sender<Message>,
metrics: Arc<StreamingMetrics>,
) -> Result<Self> {
let stream = client.get_stream(up_down_ids.0, up_down_ids.1).await?;
let stream = client
.get_stream(up_down_ids.0, up_down_ids.1, up_down_frag.0, up_down_frag.1)
.await?;
Ok(Self {
stream,
sender,
up_down_ids,
up_down_frag,
metrics,
})
}

pub async fn run(self) {
let up_actor_id = self.up_down_ids.0.to_string();
let down_actor_id = self.up_down_ids.1.to_string();
let up_fragment_id = self.up_down_frag.0.to_string();
let down_fragment_id = self.up_down_frag.1.to_string();

let mut rr = 0;
const SAMPLING_FREQUENCY: u64 = 100;

Expand All @@ -74,6 +82,11 @@ impl RemoteInput {
.with_label_values(&[&up_actor_id, &down_actor_id])
.inc_by(bytes as u64);

self.metrics
.exchange_frag_recv_size
.with_label_values(&[&up_fragment_id, &down_fragment_id])
.inc_by(bytes as u64);

// add deserialization duration metric with given sampling frequency
let msg_res = if rr % SAMPLING_FREQUENCY == 0 {
let start_time = Instant::now();
Expand Down Expand Up @@ -470,6 +483,7 @@ mod tests {
let remote_input = RemoteInput::create(
ComputeClient::new(addr.into()).await.unwrap(),
(0, 0),
(0, 0),
tx,
Arc::new(StreamingMetrics::unused()),
)
Expand Down
10 changes: 10 additions & 0 deletions src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub struct StreamingMetrics {
pub actor_sampled_deserialize_duration_ns: GenericCounterVec<AtomicU64>,
pub source_output_row_count: GenericCounterVec<AtomicU64>,
pub exchange_recv_size: GenericCounterVec<AtomicU64>,
pub exchange_frag_recv_size: GenericCounterVec<AtomicU64>,
pub join_lookup_miss_count: GenericCounterVec<AtomicU64>,
pub join_total_lookup_count: GenericCounterVec<AtomicU64>,
pub join_barrier_align_duration: HistogramVec,
Expand Down Expand Up @@ -104,6 +105,14 @@ impl StreamingMetrics {
)
.unwrap();

let exchange_frag_recv_size = register_int_counter_vec_with_registry!(
"stream_exchange_frag_recv_size",
"Total size of messages that have been received from upstream Fragment",
&["up_fragment_id", "down_fragment_id"],
registry
)
.unwrap();

let actor_fast_poll_duration = register_gauge_vec_with_registry!(
"stream_actor_fast_poll_duration",
"tokio's metrics",
Expand Down Expand Up @@ -255,6 +264,7 @@ impl StreamingMetrics {
actor_sampled_deserialize_duration_ns,
source_output_row_count,
exchange_recv_size,
exchange_frag_recv_size,
join_lookup_miss_count,
join_total_lookup_count,
join_barrier_align_duration,
Expand Down
Loading

0 comments on commit 702e759

Please sign in to comment.