Skip to content

Commit

Permalink
feat(meta): ensure chain index is placed at the same node of upstream (
Browse files Browse the repository at this point in the history
…#2270)

Signed-off-by: Alex Chi <[email protected]>
  • Loading branch information
skyzh authored May 5, 2022
1 parent 70626c4 commit bb71a45
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 20 deletions.
30 changes: 30 additions & 0 deletions e2e_test/v2/streaming/index.slt
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,36 @@ select v1, v2, v3, v4 from iii_mv2;
0 0 0 3
1 0 1 2

statement ok
insert into iii_t1 values (2, 0), (3, 0), (0, 0), (1, 0), (5, 5);

statement ok
insert into iii_t2 values (2, 5), (3, 4), (0, 3), (1, 2), (5, 5);

statement ok
flush;

query IIII rowsort
select v1, v2, v3, v4 from iii_mv2;
----
2 0 2 5
2 0 2 5
2 0 2 5
2 0 2 5
3 0 3 4
3 0 3 4
3 0 3 4
3 0 3 4
0 0 0 3
0 0 0 3
0 0 0 3
0 0 0 3
1 0 1 2
1 0 1 2
1 0 1 2
1 0 1 2
5 5 5 5

statement ok
drop materialized view iii_mv2

Expand Down
2 changes: 2 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ message ChainNode {
// large. However, in some cases, e.g., shared state, the barrier cannot be rearranged in ChainNode.
// This option is used to disable barrier rearrangement.
bool disable_rearrange = 4;
// Whether to place this chain on the same worker node as upstream actors.
bool same_worker_node = 5;
}

// BatchPlanNode is used for mv on mv snapshot read.
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/stream_index_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ impl StreamIndexScan {
},
],
node_body: Some(ProstStreamNode::Chain(ChainNode {
same_worker_node: true,
disable_rearrange: true,
table_ref_id: Some(TableRefId {
table_id: self.logical.table_desc().table_id.table_id as i32,
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/stream_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ impl StreamTableScan {
},
],
node_body: Some(ProstStreamNode::Chain(ChainNode {
same_worker_node: false,
disable_rearrange: false,
table_ref_id: Some(TableRefId {
table_id: self.logical.table_desc().table_id.table_id as i32,
Expand Down
31 changes: 23 additions & 8 deletions src/meta/src/stream/fragmenter/graph/stream_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,24 @@ struct StreamActorBuilder {
/// upstreams, exchange node operator_id -> upstream actor ids
upstreams: HashMap<u64, StreamActorUpstream>,

/// Whether to place this actors on the same node as chain's upstream MVs.
chain_same_worker_node: bool,

/// whether this actor builder has been sealed
sealed: bool,
}

impl StreamActorBuilder {
fn is_chain_same_worker_node(stream_node: &StreamNode) -> bool {
fn visit(stream_node: &StreamNode) -> bool {
if let Some(NodeBody::Chain(ref chain)) = stream_node.node_body {
return chain.same_worker_node;
}
stream_node.input.iter().any(visit)
}
visit(stream_node)
}

pub fn new(
actor_id: LocalActorId,
fragment_id: LocalFragmentId,
Expand All @@ -102,6 +115,7 @@ impl StreamActorBuilder {
Self {
actor_id,
fragment_id,
chain_same_worker_node: Self::is_chain_same_worker_node(&node),
nodes: node,
downstreams: vec![],
upstreams: HashMap::new(),
Expand Down Expand Up @@ -238,14 +252,15 @@ impl StreamActorBuilder {
.flat_map(|(_, StreamActorUpstream { actors, .. })| actors.0.iter().copied())
.map(|x| x.as_global_id())
.collect(), // TODO: store each upstream separately
same_worker_node_as_upstream: self.upstreams.iter().any(
|(
_,
StreamActorUpstream {
same_worker_node, ..
},
)| *same_worker_node,
),
same_worker_node_as_upstream: self.chain_same_worker_node
|| self.upstreams.iter().any(
|(
_,
StreamActorUpstream {
same_worker_node, ..
},
)| *same_worker_node,
),
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions src/meta/src/stream/fragmenter/rewrite/delta_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ impl StreamFragmenter {
let i0_length = arrange_0.fields.len();
let i1_length = arrange_1.fields.len();

// lookup left table by right stream
let lookup_0 = self.build_lookup_for_delta_join(
state,
(&exchange_a1l0, &exchange_a0l0),
Expand All @@ -243,6 +244,7 @@ impl StreamFragmenter {
},
);

// lookup right table by left stream
let lookup_1 = self.build_lookup_for_delta_join(
state,
(&exchange_a0l1, &exchange_a1l1),
Expand Down Expand Up @@ -280,7 +282,8 @@ impl StreamFragmenter {
lookup_1_frag.fragment_id,
StreamFragmentEdge {
dispatch_strategy: Self::dispatch_no_shuffle(),
same_worker_node: true,
// stream input doesn't need to be on the same worker node as lookup
same_worker_node: false,
link_id: exchange_a0l1.operator_id,
},
);
Expand All @@ -290,7 +293,8 @@ impl StreamFragmenter {
lookup_0_frag.fragment_id,
StreamFragmentEdge {
dispatch_strategy: Self::dispatch_no_shuffle(),
same_worker_node: true,
// stream input doesn't need to be on the same worker node as lookup
same_worker_node: false,
link_id: exchange_a1l0.operator_id,
},
);
Expand Down
58 changes: 48 additions & 10 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,12 @@ where
// The closure environment. Used to simulate recursive closure.
struct Env<'a> {
hash_mapping: &'a Vec<ParallelUnitId>,

/// Records what's the correspoding actor of each parallel unit of one table.
upstream_parallel_unit_info: &'a HashMap<TableId, BTreeMap<ParallelUnitId, ActorId>>,
/// Records what's the actors on each worker of one table.
tables_node_actors: &'a HashMap<TableId, BTreeMap<WorkerId, Vec<ActorId>>>,
/// Schedule information of all actors.
locations: &'a ScheduledLocations,

dispatches: &'a mut HashMap<(ActorId, DispatcherId), Vec<ActorId>>,
Expand All @@ -131,35 +135,59 @@ where
&mut self,
stream_node: &mut StreamNode,
actor_id: ActorId,
same_worker_node_as_upstream: bool,
) -> Result<()> {
let Some(NodeBody::Chain(ref mut chain)) = stream_node.node_body else {
// If node is not chain node, recursively deal with input nodes
for input in &mut stream_node.input {
self.resolve_chain_node_inner(input, actor_id)?;
self.resolve_chain_node_inner(input, actor_id, same_worker_node_as_upstream)?;
}
return Ok(());
};
// If node is chain node, we insert upstream ids into chain's input(merge)
// If node is chain node, we insert upstream ids into chain's input (merge)

// get upstream table id
let table_id = TableId::from(&chain.table_ref_id);

let (upstream_actor_id, parallel_unit_id) = {
// 1. use table id to get upstream parallel_unit->actor_id mapping
// 1. use table id to get upstream parallel_unit -> actor_id mapping
let upstream_parallel_actor_mapping =
self.upstream_parallel_unit_info.get(&table_id).unwrap();
// 2. use our actor id to get our parallel unit id
// 2. use our actor id to get parallel unit id of the chain actor
let parallel_unit_id =
self.locations.actor_locations.get(&actor_id).unwrap().id;
// 3. and use our parallel unit id to get upstream actor id
// 3. and use chain actor's parallel unit id to get the corresponding upstream
// actor id
(
upstream_parallel_actor_mapping
*upstream_parallel_actor_mapping
.get(&parallel_unit_id)
.unwrap(),
parallel_unit_id,
)
};

// The current implementation already ensures chain and upstream are on the same
// worker node. So we do a sanity check here, in case that the logic get changed but
// `same_worker_node` constraint is not satisfied.
if same_worker_node_as_upstream {
// Parallel unit id is a globally unique id across all worker nodes. It can be
// seen as something like CPU core id. Therefore, we verify that actor's unit id
// == upstream's unit id.

let actor_parallel_unit_id =
self.locations.actor_locations.get(&actor_id).unwrap().id;

assert_eq!(
*self
.upstream_parallel_unit_info
.get(&table_id)
.unwrap()
.get(&actor_parallel_unit_id)
.unwrap(),
upstream_actor_id
);
}

// fill upstream node-actor info for later use
let upstream_table_node_actors = self.tables_node_actors.get(&table_id).unwrap();

Expand All @@ -168,7 +196,7 @@ where
.flat_map(|(node_id, actor_ids)| {
actor_ids.iter().map(|actor_id| (*node_id, *actor_id))
})
.filter(|(_, actor_id)| *upstream_actor_id == *actor_id)
.filter(|(_, actor_id)| upstream_actor_id == *actor_id)
.into_group_map();
for (node_id, actor_ids) in chain_upstream_node_actors {
self.upstream_node_actors
Expand All @@ -180,7 +208,7 @@ where
// deal with merge and batch query node, setting upstream infos.
let merge_stream_node = &mut stream_node.input[0];
if let Some(NodeBody::Merge(ref mut merge)) = merge_stream_node.node_body {
merge.upstream_actor_id.push(*upstream_actor_id);
merge.upstream_actor_id.push(upstream_actor_id);
} else {
unreachable!("chain's input[0] should always be merge");
}
Expand All @@ -199,7 +227,7 @@ where

// finally, we should also build dispatcher infos here.
self.dispatches
.entry((*upstream_actor_id, 0))
.entry((upstream_actor_id, 0))
.or_default()
.push(actor_id);

Expand Down Expand Up @@ -229,7 +257,11 @@ where
for fragment in table_fragments.fragments.values_mut() {
for actor in &mut fragment.actors {
if let Some(ref mut stream_node) = actor.nodes {
env.resolve_chain_node_inner(stream_node, actor.actor_id)?;
env.resolve_chain_node_inner(
stream_node,
actor.actor_id,
actor.same_worker_node_as_upstream,
)?;
}
}
}
Expand Down Expand Up @@ -300,6 +332,12 @@ where
.await?;
}

// Verify whether all same_as_upstream constraints are satisfied.
//
// Currently, the scheduler (when there's no scale-in or scale-out) will always schedule
// chain node on the same node as upstreams. However, this constraint will easily be broken
// if parallel units are not aligned between upstream nodes.

// Fill hash dispatcher's mapping with scheduled locations.
table_fragments
.fragments
Expand Down

0 comments on commit bb71a45

Please sign in to comment.