Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metrics): Add Fragment Level Exchange Metrics #3696

Merged
merged 32 commits into from
Jul 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
7444c9c
Edit update.sh
marvenlee2486 Jun 22, 2022
8cfe606
Edit format error
marvenlee2486 Jun 22, 2022
8241831
Add payload to gitignore
marvenlee2486 Jun 22, 2022
7683aa1
Revert risedev.yml modification
marvenlee2486 Jun 22, 2022
b74968f
Fix wrong gitignore
marvenlee2486 Jun 22, 2022
2549902
Fix wrong gitignore
marvenlee2486 Jun 22, 2022
f2166b2
Done coding for Fragment level exchange
marvenlee2486 Jul 4, 2022
0630c76
Done coding for Fragment level exchange
marvenlee2486 Jul 4, 2022
8abd4b2
Merge Conflict
marvenlee2486 Jul 4, 2022
ecd4aa1
Edit implementation error
marvenlee2486 Jul 5, 2022
993f047
Edit down fragment error
marvenlee2486 Jul 6, 2022
ea1624d
Delete println statement and revert configuration settings
marvenlee2486 Jul 6, 2022
18cdda8
Done coding for Fragment level exchange
marvenlee2486 Jul 4, 2022
6ed19fc
Edit update.sh
marvenlee2486 Jun 22, 2022
de05386
Edit format error
marvenlee2486 Jun 22, 2022
8b3953e
Add payload to gitignore
marvenlee2486 Jun 22, 2022
582a574
Revert risedev.yml modification
marvenlee2486 Jun 22, 2022
6338a88
Fix wrong gitignore
marvenlee2486 Jun 22, 2022
eaa5653
Done coding for Fragment level exchange
marvenlee2486 Jul 4, 2022
98ade72
Edit implementation error
marvenlee2486 Jul 5, 2022
4e162f6
Edit down fragment error
marvenlee2486 Jul 6, 2022
8af4137
Delete println statement and revert configuration settings
marvenlee2486 Jul 6, 2022
8432355
Edit Format and Resolve conflict
marvenlee2486 Jul 6, 2022
8395848
Edit Format and Resolve conflict
marvenlee2486 Jul 6, 2022
ade896f
Merge branch 'main' into zongyu/fragmentxchange
marvenlee2486 Jul 6, 2022
527c6bd
Resolve Test error
marvenlee2486 Jul 6, 2022
28f54eb
Merge branch 'zongyu/fragmentxchange' of https://github.com/singulari…
marvenlee2486 Jul 6, 2022
1b3feb5
Resolve some error
marvenlee2486 Jul 6, 2022
ace70a0
generate dashboard
marvenlee2486 Jul 6, 2022
0e51b8d
Merge branch 'main' into zongyu/fragmentxchange
marvenlee2486 Jul 6, 2022
3dc3e16
Merge branch 'main' of https://github.com/singularity-data/risingwave…
skyzh Jul 9, 2022
5d82776
use rate_interval instead of 15s
skyzh Jul 9, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,4 @@ log/
# generated files
risedev-components.user.env
riselab-components.user.env
risedev-compose.yml
risedev-compose.yml
2 changes: 1 addition & 1 deletion grafana/risingwave-dashboard.json

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions grafana/risingwave-dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,16 @@ def section_streaming_exchange(outer_panels):
"rate(stream_exchange_recv_size[$__rate_interval])", "{{up_actor_id}}->{{down_actor_id}}"
),
]),
panels.timeseries_bytes_per_sec("Fragment Exchange Send Throughput",[
panels.target(
"rate(stream_exchange_frag_send_size[$__rate_interval])","{{up_fragment_id}}->{{down_fragment_id}}"
),
]),
panels.timeseries_bytes_per_sec("Fragment Exchange Recv Throughput", [
panels.target(
"rate(stream_exchange_frag_recv_size[$__rate_interval])", "{{up_fragment_id}}->{{down_fragment_id}}"
),
]),
]),
]

Expand Down
2 changes: 2 additions & 0 deletions grafana/update.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ set -euo pipefail
echo "$(tput setaf 4)Upload dashboard to localhost:3001$(tput sgr0)"

payload="{\"dashboard\": $(jq . risingwave-dashboard.json), \"overwrite\": true}"

echo "$payload" > payload.txt


curl -X POST \
-H 'Content-Type: application/json' \
-d @payload.txt \
Expand Down
3 changes: 2 additions & 1 deletion proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,9 @@ message HopWindowNode {

message MergeNode {
repeated uint32 upstream_actor_id = 1;
uint32 upstream_fragment_id = 2;
// The schema of input columns. TODO: remove this field.
repeated plan_common.Field fields = 2;
repeated plan_common.Field fields = 3;
}

// passed from frontend to meta, used by fragmenter to generate `MergeNode`
Expand Down
6 changes: 4 additions & 2 deletions proto/task_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,10 @@ message GetDataResponse {
}

message GetStreamRequest {
uint32 up_fragment_id = 1;
uint32 down_fragment_id = 2;
uint32 up_actor_id = 1;
uint32 down_actor_id = 2;
uint32 up_fragment_id = 3;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So previously we are getting things wrong here? We should use actor_id instead of fragment_id for GetStream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes correct, I did ask Martin before and He say the fragment refer to actor.

uint32 down_fragment_id = 4;
}

message ExecuteRequest {
Expand Down
10 changes: 10 additions & 0 deletions src/compute/src/rpc/service/exchange_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use prometheus::{register_int_counter_vec_with_registry, Registry};
pub struct ExchangeServiceMetrics {
pub registry: Registry,
pub stream_exchange_bytes: GenericCounterVec<AtomicU64>,
pub stream_fragment_exchange_bytes: GenericCounterVec<AtomicU64>,
pub actor_sampled_serialize_duration_ns: GenericCounterVec<AtomicU64>,
}

Expand All @@ -31,6 +32,14 @@ impl ExchangeServiceMetrics {
)
.unwrap();

let stream_fragment_exchange_bytes = register_int_counter_vec_with_registry!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why send and recv metrics are defined in two places? This would make things a little bit confusing...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I follow the PR about the actor exchange, the exchange and receive are defined in two places.

"stream_exchange_frag_send_size",
"Total size of messages that have been send to downstream Fragment",
&["up_fragment_id", "down_fragment_id"],
registry
)
.unwrap();

let actor_sampled_serialize_duration_ns = register_int_counter_vec_with_registry!(
"actor_sampled_serialize_duration_ns",
"Duration (ns) of sampled chunk serialization",
Expand All @@ -42,6 +51,7 @@ impl ExchangeServiceMetrics {
Self {
registry,
stream_exchange_bytes,
stream_fragment_exchange_bytes,
actor_sampled_serialize_duration_ns,
}
}
Expand Down
24 changes: 18 additions & 6 deletions src/compute/src/rpc/service/exchange_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,13 @@ impl ExchangeService for ExchangeServiceImpl {
.remote_addr()
.ok_or_else(|| Status::unavailable("get_stream connection unestablished"))?;
let req = request.into_inner();
let up_down_ids = (req.up_fragment_id, req.down_fragment_id);
let receiver = self.stream_mgr.take_receiver(up_down_ids)?;
match self.get_stream_impl(peer_addr, receiver, up_down_ids).await {
let up_down_actor_ids = (req.up_actor_id, req.down_actor_id);
let up_down_fragment_ids = (req.up_fragment_id, req.down_fragment_id);
let receiver = self.stream_mgr.take_receiver(up_down_actor_ids)?;
match self
.get_stream_impl(peer_addr, receiver, up_down_actor_ids, up_down_fragment_ids)
.await
{
Ok(resp) => Ok(resp),
Err(e) => {
error!(
Expand Down Expand Up @@ -108,14 +112,18 @@ impl ExchangeServiceImpl {
&self,
peer_addr: SocketAddr,
mut receiver: Receiver<Message>,
up_down_ids: (u32, u32),
up_down_actor_ids: (u32, u32),
up_down_fragment_ids: (u32, u32),
) -> Result<Response<<Self as ExchangeService>::GetStreamStream>> {
let (tx, rx) = tokio::sync::mpsc::channel(EXCHANGE_BUFFER_SIZE);
let metrics = self.metrics.clone();
tracing::trace!(target: "events::compute::exchange", peer_addr = %peer_addr, "serve stream exchange RPC");
tokio::spawn(async move {
let up_actor_id = up_down_ids.0.to_string();
let down_actor_id = up_down_ids.1.to_string();
let up_actor_id = up_down_actor_ids.0.to_string();
let down_actor_id = up_down_actor_ids.1.to_string();
let up_fragment_id = up_down_fragment_ids.0.to_string();
let down_fragment_id = up_down_fragment_ids.1.to_string();

let mut rr = 0;
const SAMPLING_FREQUENCY: u64 = 100;

Expand Down Expand Up @@ -162,6 +170,10 @@ impl ExchangeServiceImpl {
.stream_exchange_bytes
.with_label_values(&[&up_actor_id, &down_actor_id])
.inc_by(bytes as u64);
metrics
.stream_fragment_exchange_bytes
.with_label_values(&[&up_fragment_id, &down_fragment_id])
.inc_by(bytes as u64);
Ok(())
}
Err(e) => tx.send(Err(e.into())).await,
Expand Down
35 changes: 30 additions & 5 deletions src/meta/src/stream/stream_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ struct StreamActorDownstream {
struct StreamActorUpstream {
/// Upstream actors
actors: OrderedActorLink,

/// associate fragment id
fragment_id: GlobalFragmentId,
/// Whether to place the upstream actors on the same node
same_worker_node: bool,
}
Expand Down Expand Up @@ -266,13 +267,15 @@ impl StreamActorBuilder {
StreamActorUpstream {
actors,
same_worker_node,
fragment_id,
},
)| {
(
exchange_id,
StreamActorUpstream {
actors: actors.to_global_ids(actor_id_offset, actor_id_len),
same_worker_node,
fragment_id,
},
)
},
Expand Down Expand Up @@ -381,6 +384,7 @@ impl StreamGraphBuilder {
/// Add dependency between two connected node in the graph.
pub fn add_link(
&mut self,
upstream_fragment_id: GlobalFragmentId,
upstream_actor_ids: &[LocalActorId],
downstream_actor_ids: &[LocalActorId],
exchange_operator_id: u64,
Expand Down Expand Up @@ -421,6 +425,7 @@ impl StreamGraphBuilder {
exchange_operator_id,
StreamActorUpstream {
actors: OrderedActorLink(vec![*upstream_id]),
fragment_id: upstream_fragment_id,
same_worker_node,
},
);
Expand Down Expand Up @@ -468,6 +473,7 @@ impl StreamGraphBuilder {
exchange_operator_id,
StreamActorUpstream {
actors: OrderedActorLink(upstream_actor_ids.to_vec()),
fragment_id: upstream_fragment_id,
same_worker_node,
},
);
Expand Down Expand Up @@ -503,9 +509,19 @@ impl StreamGraphBuilder {
.iter()
.map(|(id, StreamActorUpstream { actors, .. })| (*id, actors.clone()))
.collect();
let mut upstream_fragments = builder
.upstreams
.iter()
.map(|(id, StreamActorUpstream { fragment_id, .. })| (*id, *fragment_id))
.collect();
let stream_node = self.build_inner(
ctx,
actor.get_nodes()?,
actor_id,
&mut upstream_actors,
&mut upstream_fragments,
)?;

let stream_node =
self.build_inner(ctx, actor.get_nodes()?, actor_id, &mut upstream_actors)?;
actor.nodes = Some(stream_node);
graph
.entry(builder.get_fragment_id())
Expand All @@ -527,6 +543,7 @@ impl StreamGraphBuilder {
stream_node: &StreamNode,
actor_id: LocalActorId,
upstream_actor_id: &mut HashMap<u64, OrderedActorLink>,
upstream_fragment_id: &mut HashMap<u64, GlobalFragmentId>,
) -> Result<StreamNode> {
let table_id_offset = ctx.table_id_offset;
let mut check_and_fill_internal_table = |table_id: u32, table: Option<Table>| {
Expand Down Expand Up @@ -655,6 +672,7 @@ impl StreamGraphBuilder {
upstream_actor_id: upstream_actor_id
.remove(&input.get_operator_id())
.expect("failed to find upstream actor id for given exchange node").as_global_ids(),
upstream_fragment_id: upstream_fragment_id.get(&input.get_operator_id()).unwrap().as_global_id(),
fields: input.get_fields().clone(),
})),
fields: input.get_fields().clone(),
Expand All @@ -667,8 +685,13 @@ impl StreamGraphBuilder {
new_stream_node.input[idx] = self.resolve_chain_node(input)?;
}
_ => {
new_stream_node.input[idx] =
self.build_inner(ctx, input, actor_id, upstream_actor_id)?;
new_stream_node.input[idx] = self.build_inner(
ctx,
input,
actor_id,
upstream_actor_id,
upstream_fragment_id,
)?;
}
}
}
Expand Down Expand Up @@ -696,6 +719,7 @@ impl StreamGraphBuilder {
pk_indices: stream_node.pk_indices.clone(),
node_body: Some(NodeBody::Merge(MergeNode {
upstream_actor_id: vec![],
upstream_fragment_id: 0,
fields: chain_node.upstream_fields.clone(),
})),
fields: chain_node.upstream_fields.clone(),
Expand Down Expand Up @@ -965,6 +989,7 @@ impl ActorGraphBuilder {
| DispatcherType::Broadcast
| DispatcherType::NoShuffle => {
state.stream_graph_builder.add_link(
fragment_id,
&actor_ids,
downstream_actors,
dispatch_edge.link_id,
Expand Down
8 changes: 6 additions & 2 deletions src/rpc_client/src/compute_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,17 @@ impl ComputeClient {

pub async fn get_stream(
&self,
up_actor_id: u32,
down_actor_id: u32,
up_fragment_id: u32,
down_fragment_id: u32,
) -> Result<Streaming<GetStreamResponse>> {
Ok(self
.exchange_client
.to_owned()
.get_stream(GetStreamRequest {
up_actor_id,
down_actor_id,
up_fragment_id,
down_fragment_id,
})
Expand All @@ -78,8 +82,8 @@ impl ComputeClient {
tracing::error!(
"failed to create stream from remote_input {} from fragment {} to fragment {}",
self.addr,
up_fragment_id,
down_fragment_id
up_actor_id,
down_actor_id
)
})?
.into_inner())
Expand Down
18 changes: 16 additions & 2 deletions src/stream/src/executor/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@ use tonic::Streaming;
use super::error::StreamExecutorError;
use super::*;
use crate::executor::monitor::StreamingMetrics;
use crate::task::UpDownActorIds;
use crate::task::{UpDownActorIds, UpDownFragmentIds};

/// Receive data from `gRPC` and forwards to `MergerExecutor`/`ReceiverExecutor`
pub struct RemoteInput {
stream: Streaming<GetStreamResponse>,
sender: Sender<Message>,
up_down_ids: UpDownActorIds,
up_down_frag: UpDownFragmentIds,
metrics: Arc<StreamingMetrics>,
}

Expand All @@ -46,21 +47,28 @@ impl RemoteInput {
pub async fn create(
client: ComputeClient,
up_down_ids: UpDownActorIds,
up_down_frag: UpDownFragmentIds,
sender: Sender<Message>,
metrics: Arc<StreamingMetrics>,
) -> Result<Self> {
let stream = client.get_stream(up_down_ids.0, up_down_ids.1).await?;
let stream = client
.get_stream(up_down_ids.0, up_down_ids.1, up_down_frag.0, up_down_frag.1)
.await?;
Ok(Self {
stream,
sender,
up_down_ids,
up_down_frag,
metrics,
})
}

pub async fn run(self) {
let up_actor_id = self.up_down_ids.0.to_string();
let down_actor_id = self.up_down_ids.1.to_string();
let up_fragment_id = self.up_down_frag.0.to_string();
let down_fragment_id = self.up_down_frag.1.to_string();

let mut rr = 0;
const SAMPLING_FREQUENCY: u64 = 100;

Expand All @@ -74,6 +82,11 @@ impl RemoteInput {
.with_label_values(&[&up_actor_id, &down_actor_id])
.inc_by(bytes as u64);

self.metrics
.exchange_frag_recv_size
.with_label_values(&[&up_fragment_id, &down_fragment_id])
.inc_by(bytes as u64);

// add deserialization duration metric with given sampling frequency
let msg_res = if rr % SAMPLING_FREQUENCY == 0 {
let start_time = Instant::now();
Expand Down Expand Up @@ -470,6 +483,7 @@ mod tests {
let remote_input = RemoteInput::create(
ComputeClient::new(addr.into()).await.unwrap(),
(0, 0),
(0, 0),
tx,
Arc::new(StreamingMetrics::unused()),
)
Expand Down
10 changes: 10 additions & 0 deletions src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub struct StreamingMetrics {
pub actor_sampled_deserialize_duration_ns: GenericCounterVec<AtomicU64>,
pub source_output_row_count: GenericCounterVec<AtomicU64>,
pub exchange_recv_size: GenericCounterVec<AtomicU64>,
pub exchange_frag_recv_size: GenericCounterVec<AtomicU64>,
pub join_lookup_miss_count: GenericCounterVec<AtomicU64>,
pub join_total_lookup_count: GenericCounterVec<AtomicU64>,
pub join_barrier_align_duration: HistogramVec,
Expand Down Expand Up @@ -104,6 +105,14 @@ impl StreamingMetrics {
)
.unwrap();

let exchange_frag_recv_size = register_int_counter_vec_with_registry!(
"stream_exchange_frag_recv_size",
"Total size of messages that have been received from upstream Fragment",
&["up_fragment_id", "down_fragment_id"],
registry
)
.unwrap();

let actor_fast_poll_duration = register_gauge_vec_with_registry!(
"stream_actor_fast_poll_duration",
"tokio's metrics",
Expand Down Expand Up @@ -255,6 +264,7 @@ impl StreamingMetrics {
actor_sampled_deserialize_duration_ns,
source_output_row_count,
exchange_recv_size,
exchange_frag_recv_size,
join_lookup_miss_count,
join_total_lookup_count,
join_barrier_align_duration,
Expand Down
Loading