Skip to content

Commit

Permalink
doc: add some comments for keys & rename keys -> key (risingwavelabs#…
Browse files Browse the repository at this point in the history
…3700)

* doc: add some comments for keys

* keys -> key

* keys -> key

* keys -> key

* fix doc link

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and nasnoisaac committed Aug 9, 2022
1 parent c5d9623 commit c6d618c
Show file tree
Hide file tree
Showing 66 changed files with 575 additions and 548 deletions.
12 changes: 6 additions & 6 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -109,19 +109,19 @@ message NestedLoopJoinNode {
}

message HashAggNode {
repeated uint32 group_keys = 1;
repeated uint32 group_key = 1;
repeated expr.AggCall agg_calls = 2;
}

message ExpandNode {
message Subset {
repeated uint32 keys = 1;
repeated uint32 column_indices = 1;
}
repeated Subset column_subsets = 1;
}

message SortAggNode {
repeated expr.ExprNode group_keys = 1;
repeated expr.ExprNode group_key = 1;
repeated expr.AggCall agg_calls = 2;
}

Expand All @@ -135,8 +135,8 @@ message HashJoinNode {

message SortMergeJoinNode {
plan_common.JoinType join_type = 1;
repeated int32 left_keys = 2;
repeated int32 right_keys = 3;
repeated int32 left_key = 2;
repeated int32 right_key = 3;
plan_common.OrderType direction = 4;
repeated uint32 output_indices = 5;
}
Expand Down Expand Up @@ -242,7 +242,7 @@ message ExchangeInfo {
}
message HashInfo {
uint32 output_count = 1;
repeated uint32 keys = 3;
repeated uint32 key = 3;
}
DistributionMode mode = 1;
oneof distribution {
Expand Down
4 changes: 2 additions & 2 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ message Table {
uint32 database_id = 3;
string name = 4;
repeated plan_common.ColumnCatalog columns = 5;
repeated plan_common.ColumnOrder order_keys = 6;
repeated plan_common.ColumnOrder order_key = 6;
repeated uint32 dependent_relations = 8;
oneof optional_associated_source_id {
uint32 associated_source_id = 9;
}
bool is_index = 10;
uint32 index_on_id = 11;
repeated int32 distribution_keys = 12;
repeated int32 distribution_key = 12;
repeated int32 pk = 13;
bool appendonly = 14;
string owner = 15;
Expand Down
26 changes: 13 additions & 13 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -44,26 +44,26 @@ message FilterNode {
// In addition, we also specify primary key to MV for efficient point lookup during update and deletion.
//
// The node will be used for both create mv and create index.
// - When creating mv, `pk == distribution_keys == column_orders`.
// - When creating mv, `pk == distribution_key == column_orders`.
// - When creating index, `column_orders` will contain both
// arrange columns and pk columns, while distribution keys will be arrange columns.
// arrange columns and pk columns, while distribution key will be arrange columns.
message MaterializeNode {
plan_common.TableRefId table_ref_id = 1;
plan_common.TableRefId associated_table_ref_id = 2;
// Column indexes and orders of primary key
repeated plan_common.ColumnOrder column_orders = 3;
// Column IDs of input schema
repeated int32 column_ids = 4;
// Hash keys of the materialize node, which is a subset of pk.
repeated uint32 distribution_keys = 5;
// Hash key of the materialize node, which is a subset of pk.
repeated uint32 distribution_key = 5;
}

// Remark by Yanghao: for both local and global we use the same node in the protobuf.
// Local and global aggregator distinguish with each other in PlanNode definition.
message SimpleAggNode {
repeated expr.AggCall agg_calls = 1;
// Only used for local simple agg, not used for global simple agg.
repeated uint32 distribution_keys = 2;
repeated uint32 distribution_key = 2;
repeated catalog.Table internal_tables = 3;
map<uint32, int32> column_mapping = 4;
// Whether to optimize for append only stream.
Expand All @@ -72,7 +72,7 @@ message SimpleAggNode {
}

message HashAggNode {
repeated uint32 group_keys = 1;
repeated uint32 group_key = 1;
repeated expr.AggCall agg_calls = 2;
repeated catalog.Table internal_tables = 3;
map<uint32, int32> column_mapping = 4;
Expand All @@ -86,7 +86,7 @@ message TopNNode {
// 0 means no limit as limit of 0 means this node should be optimized away
uint64 limit = 2;
uint64 offset = 3;
repeated uint32 distribution_keys = 4;
repeated uint32 distribution_key = 4;
// Used for internal table states
uint32 table_id_l = 5;
uint32 table_id_m = 6;
Expand Down Expand Up @@ -180,7 +180,7 @@ message BatchPlanNode {
}

message ArrangementInfo {
// Order keys of the arrangement, including order by keys and pks from the materialize
// Order key of the arrangement, including order by columns and pk from the materialize
// executor.
repeated plan_common.ColumnOrder arrange_key_orders = 1;
// Column descs of the arrangement
Expand All @@ -194,15 +194,15 @@ message ArrangeNode {
uint32 table_id = 2;
// Info about the arrangement
ArrangementInfo table_info = 3;
// Hash keys of the materialize node, which is a subset of pk.
repeated uint32 distribution_keys = 4;
// Hash key of the materialize node, which is a subset of pk.
repeated uint32 distribution_key = 4;
}

// Special node for shared state. LookupNode will join an arrangement with a stream.
message LookupNode {
// Join keys of the arrangement side
// Join key of the arrangement side
repeated int32 arrange_key = 1;
// Join keys of the stream side
// Join key of the stream side
repeated int32 stream_key = 2;
// Whether to join the current epoch of arrangement
bool use_current_epoch = 3;
Expand Down Expand Up @@ -230,7 +230,7 @@ message LookupUnionNode {

message ExpandNode {
message Subset {
repeated uint32 keys = 1;
repeated uint32 column_indices = 1;
}
repeated Subset column_subsets = 1;
}
Expand Down
8 changes: 7 additions & 1 deletion src/batch/src/executor/expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,13 @@ impl BoxedExecutorBuilder for ExpandExecutor {
let column_subsets = expand_node
.column_subsets
.iter()
.map(|subset| subset.keys.iter().map(|key| *key as usize).collect_vec())
.map(|subset| {
subset
.column_indices
.iter()
.map(|idx| *idx as usize)
.collect_vec()
})
.collect_vec();

let child = inputs.remove(0);
Expand Down
8 changes: 4 additions & 4 deletions src/batch/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl HashAggExecutorBuilder {
identity: String,
) -> Result<BoxedExecutor> {
let group_key_columns = hash_agg_node
.get_group_keys()
.get_group_key()
.iter()
.map(|x| *x as usize)
.collect_vec();
Expand Down Expand Up @@ -140,7 +140,7 @@ impl BoxedExecutorBuilder for HashAggExecutorBuilder {
pub(crate) struct HashAggExecutor<K> {
/// factories to construct aggregator for each groups
agg_factories: Vec<AggStateFactory>,
/// Column indexes of keys that specify a group
/// Column indexes that specify a group
group_key_columns: Vec<usize>,
/// child executor
child: BoxedExecutor,
Expand Down Expand Up @@ -313,7 +313,7 @@ mod tests {
};

let agg_prost = HashAggNode {
group_keys: vec![0, 1],
group_key: vec![0, 1],
agg_calls: vec![agg_call],
};

Expand Down Expand Up @@ -378,7 +378,7 @@ mod tests {
};

let agg_prost = HashAggNode {
group_keys: vec![],
group_key: vec![],
agg_calls: vec![agg_call],
};

Expand Down
8 changes: 4 additions & 4 deletions src/batch/src/executor/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,16 @@ use crate::task::{BatchTaskContext, TaskId};
#[derive(Default)]
pub(super) struct EquiJoinParams {
join_type: JoinType,
/// Column indexes of left keys in equi join, e.g., the column indexes of `b1` and `b3` in `b`.
/// Column indexes of left key in equi join, e.g., the column indexes of `b1` and `b3` in `b`.
left_key_columns: Vec<usize>,
/// Data types of left keys in equi join, e.g., the column types of `b1` and `b3` in `b`.
/// Data types of left key in equi join, e.g., the column types of `b1` and `b3` in `b`.
left_key_types: Vec<DataType>,
/// Data types of left columns in equi join, e.g., the column types of `b1` `b2` `b3` in `b`.
left_col_len: usize,
/// Column indexes of right keys in equi join, e.g., the column indexes of `a1` and `a3` in
/// Column indexes of right key in equi join, e.g., the column indexes of `a1` and `a3` in
/// `a`.
right_key_columns: Vec<usize>,
/// Data types of right keys in equi join, e.g., the column types of `a1` and `a3` in `a`.
/// Data types of right key in equi join, e.g., the column types of `a1` and `a3` in `a`.
right_key_types: Vec<DataType>,
/// Data types of right columns in equi join, e.g., the column types of `a1` `a2` `a3` in `a`.
right_col_len: usize,
Expand Down
12 changes: 6 additions & 6 deletions src/batch/src/executor/join/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,16 +280,16 @@ impl BoxedExecutorBuilder for SortMergeJoinExecutor {
.iter()
.map(|&idx| original_schema[idx].clone())
.collect();
let left_keys = sort_merge_join_node.get_left_keys();
let left_key = sort_merge_join_node.get_left_key();
let mut probe_key_idxs = vec![];
for key in left_keys {
probe_key_idxs.push(*key as usize);
for idx in left_key {
probe_key_idxs.push(*idx as usize);
}

let right_keys = sort_merge_join_node.get_right_keys();
let right_key = sort_merge_join_node.get_right_key();
let mut build_key_idxs = vec![];
for key in right_keys {
build_key_idxs.push(*key as usize);
for idx in right_key {
build_key_idxs.push(*idx as usize);
}
match join_type {
JoinType::Inner => {
Expand Down
32 changes: 16 additions & 16 deletions src/batch/src/executor/sort_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::task::BatchTaskContext;
/// automatically because all tuples should be aggregated together.
pub struct SortAggExecutor {
agg_states: Vec<BoxedAggState>,
group_keys: Vec<BoxedExpression>,
group_key: Vec<BoxedExpression>,
sorted_groupers: Vec<BoxedSortedGrouper>,
child: BoxedExecutor,
schema: Schema,
Expand Down Expand Up @@ -70,18 +70,18 @@ impl BoxedExecutorBuilder for SortAggExecutor {
.map(|x| AggStateFactory::new(x)?.create_agg_state())
.try_collect()?;

let group_keys: Vec<_> = sort_agg_node
.get_group_keys()
let group_key: Vec<_> = sort_agg_node
.get_group_key()
.iter()
.map(build_from_prost)
.try_collect()?;

let sorted_groupers: Vec<_> = group_keys
let sorted_groupers: Vec<_> = group_key
.iter()
.map(|e| create_sorted_grouper(e.return_type()))
.try_collect()?;

let fields = group_keys
let fields = group_key
.iter()
.map(|e| e.return_type())
.chain(agg_states.iter().map(|e| e.return_type()))
Expand All @@ -90,7 +90,7 @@ impl BoxedExecutorBuilder for SortAggExecutor {

Ok(Box::new(Self {
agg_states,
group_keys,
group_key,
sorted_groupers,
child: inputs.remove(0),
schema: Schema { fields },
Expand Down Expand Up @@ -119,13 +119,13 @@ impl SortAggExecutor {
async fn do_execute(mut self: Box<Self>) {
let mut left_capacity = self.output_size_limit;
let (mut group_builders, mut agg_builders) =
SortAggExecutor::create_builders(&self.group_keys, &self.agg_states);
SortAggExecutor::create_builders(&self.group_key, &self.agg_states);

#[for_await]
for child_chunk in self.child.execute() {
let child_chunk = child_chunk?.compact()?;
let group_columns: Vec<_> = self
.group_keys
.group_key
.iter_mut()
.map(|expr| expr.eval(&child_chunk))
.try_collect()?;
Expand Down Expand Up @@ -178,7 +178,7 @@ impl SortAggExecutor {

// reset builders and capactiy to build next output chunk
(group_builders, agg_builders) =
SortAggExecutor::create_builders(&self.group_keys, &self.agg_states);
SortAggExecutor::create_builders(&self.group_key, &self.agg_states);

left_capacity = self.output_size_limit;
}
Expand Down Expand Up @@ -242,10 +242,10 @@ impl SortAggExecutor {
}

fn create_builders(
group_keys: &[BoxedExpression],
group_key: &[BoxedExpression],
agg_states: &[BoxedAggState],
) -> (Vec<ArrayBuilderImpl>, Vec<ArrayBuilderImpl>) {
let group_builders = group_keys
let group_builders = group_key
.iter()
.map(|e| e.return_type().create_array_builder(1))
.collect();
Expand Down Expand Up @@ -336,7 +336,7 @@ mod tests {

let executor = Box::new(SortAggExecutor {
agg_states,
group_keys: group_exprs,
group_key: group_exprs,
sorted_groupers,
child: Box::new(child),
schema: Schema { fields },
Expand Down Expand Up @@ -444,7 +444,7 @@ mod tests {

let executor = Box::new(SortAggExecutor {
agg_states,
group_keys: group_exprs,
group_key: group_exprs,
sorted_groupers,
child: Box::new(child),
schema: Schema { fields },
Expand Down Expand Up @@ -552,7 +552,7 @@ mod tests {
.collect::<Vec<Field>>();
let executor = Box::new(SortAggExecutor {
agg_states,
group_keys: vec![],
group_key: vec![],
sorted_groupers: vec![],
child: Box::new(child),
schema: Schema { fields },
Expand Down Expand Up @@ -655,7 +655,7 @@ mod tests {
let output_size_limit = 4;
let executor = Box::new(SortAggExecutor {
agg_states,
group_keys: group_exprs,
group_key: group_exprs,
sorted_groupers,
child: Box::new(child),
schema: Schema { fields },
Expand Down Expand Up @@ -776,7 +776,7 @@ mod tests {

let executor = Box::new(SortAggExecutor {
agg_states,
group_keys: group_exprs,
group_key: group_exprs,
sorted_groupers,
child: Box::new(child),
schema: Schema { fields },
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/task/hash_shuffle_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ fn generate_hash_values(chunk: &DataChunk, hash_info: &HashInfo) -> Result<Vec<u
let hash_values = chunk
.get_hash_values(
&hash_info
.keys
.key
.iter()
.map(|key| *key as usize)
.map(|idx| *idx as usize)
.collect::<Vec<_>>(),
hasher_builder,
)
Expand Down
Loading

0 comments on commit c6d618c

Please sign in to comment.