Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

doc: add some comments for keys & rename keys -> key #3700

Merged
merged 6 commits into from
Jul 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -109,19 +109,19 @@ message NestedLoopJoinNode {
}

message HashAggNode {
repeated uint32 group_keys = 1;
repeated uint32 group_key = 1;
repeated expr.AggCall agg_calls = 2;
}

message ExpandNode {
message Subset {
repeated uint32 keys = 1;
repeated uint32 column_indices = 1;
}
repeated Subset column_subsets = 1;
}

message SortAggNode {
repeated expr.ExprNode group_keys = 1;
repeated expr.ExprNode group_key = 1;
repeated expr.AggCall agg_calls = 2;
}

Expand All @@ -135,8 +135,8 @@ message HashJoinNode {

message SortMergeJoinNode {
plan_common.JoinType join_type = 1;
repeated int32 left_keys = 2;
repeated int32 right_keys = 3;
repeated int32 left_key = 2;
repeated int32 right_key = 3;
plan_common.OrderType direction = 4;
repeated uint32 output_indices = 5;
}
Expand Down Expand Up @@ -242,7 +242,7 @@ message ExchangeInfo {
}
message HashInfo {
uint32 output_count = 1;
repeated uint32 keys = 3;
repeated uint32 key = 3;
}
DistributionMode mode = 1;
oneof distribution {
Expand Down
4 changes: 2 additions & 2 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ message Table {
uint32 database_id = 3;
string name = 4;
repeated plan_common.ColumnCatalog columns = 5;
repeated plan_common.ColumnOrder order_keys = 6;
repeated plan_common.ColumnOrder order_key = 6;
repeated uint32 dependent_relations = 8;
oneof optional_associated_source_id {
uint32 associated_source_id = 9;
}
bool is_index = 10;
uint32 index_on_id = 11;
repeated int32 distribution_keys = 12;
repeated int32 distribution_key = 12;
repeated int32 pk = 13;
bool appendonly = 14;
string owner = 15;
Expand Down
26 changes: 13 additions & 13 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -44,26 +44,26 @@ message FilterNode {
// In addition, we also specify primary key to MV for efficient point lookup during update and deletion.
//
// The node will be used for both create mv and create index.
// - When creating mv, `pk == distribution_keys == column_orders`.
// - When creating mv, `pk == distribution_key == column_orders`.
// - When creating index, `column_orders` will contain both
// arrange columns and pk columns, while distribution keys will be arrange columns.
// arrange columns and pk columns, while distribution key will be arrange columns.
message MaterializeNode {
plan_common.TableRefId table_ref_id = 1;
plan_common.TableRefId associated_table_ref_id = 2;
// Column indexes and orders of primary key
repeated plan_common.ColumnOrder column_orders = 3;
// Column IDs of input schema
repeated int32 column_ids = 4;
// Hash keys of the materialize node, which is a subset of pk.
repeated uint32 distribution_keys = 5;
// Hash key of the materialize node, which is a subset of pk.
repeated uint32 distribution_key = 5;
}

// Remark by Yanghao: for both local and global we use the same node in the protobuf.
// Local and global aggregator distinguish with each other in PlanNode definition.
message SimpleAggNode {
repeated expr.AggCall agg_calls = 1;
// Only used for local simple agg, not used for global simple agg.
repeated uint32 distribution_keys = 2;
repeated uint32 distribution_key = 2;
repeated catalog.Table internal_tables = 3;
map<uint32, int32> column_mapping = 4;
// Whether to optimize for append only stream.
Expand All @@ -72,7 +72,7 @@ message SimpleAggNode {
}

message HashAggNode {
repeated uint32 group_keys = 1;
repeated uint32 group_key = 1;
repeated expr.AggCall agg_calls = 2;
repeated catalog.Table internal_tables = 3;
map<uint32, int32> column_mapping = 4;
Expand All @@ -86,7 +86,7 @@ message TopNNode {
// 0 means no limit as limit of 0 means this node should be optimized away
uint64 limit = 2;
uint64 offset = 3;
repeated uint32 distribution_keys = 4;
repeated uint32 distribution_key = 4;
// Used for internal table states
uint32 table_id_l = 5;
uint32 table_id_m = 6;
Expand Down Expand Up @@ -180,7 +180,7 @@ message BatchPlanNode {
}

message ArrangementInfo {
// Order keys of the arrangement, including order by keys and pks from the materialize
// Order key of the arrangement, including order by columns and pk from the materialize
// executor.
repeated plan_common.ColumnOrder arrange_key_orders = 1;
// Column descs of the arrangement
Expand All @@ -194,15 +194,15 @@ message ArrangeNode {
uint32 table_id = 2;
// Info about the arrangement
ArrangementInfo table_info = 3;
// Hash keys of the materialize node, which is a subset of pk.
repeated uint32 distribution_keys = 4;
// Hash key of the materialize node, which is a subset of pk.
repeated uint32 distribution_key = 4;
}

// Special node for shared state. LookupNode will join an arrangement with a stream.
message LookupNode {
// Join keys of the arrangement side
// Join key of the arrangement side
repeated int32 arrange_key = 1;
// Join keys of the stream side
// Join key of the stream side
repeated int32 stream_key = 2;
// Whether to join the current epoch of arrangement
bool use_current_epoch = 3;
Expand Down Expand Up @@ -230,7 +230,7 @@ message LookupUnionNode {

message ExpandNode {
message Subset {
repeated uint32 keys = 1;
repeated uint32 column_indices = 1;
}
repeated Subset column_subsets = 1;
}
Expand Down
8 changes: 7 additions & 1 deletion src/batch/src/executor/expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,13 @@ impl BoxedExecutorBuilder for ExpandExecutor {
let column_subsets = expand_node
.column_subsets
.iter()
.map(|subset| subset.keys.iter().map(|key| *key as usize).collect_vec())
.map(|subset| {
subset
.column_indices
.iter()
.map(|idx| *idx as usize)
.collect_vec()
})
.collect_vec();

let child = inputs.remove(0);
Expand Down
8 changes: 4 additions & 4 deletions src/batch/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl HashAggExecutorBuilder {
identity: String,
) -> Result<BoxedExecutor> {
let group_key_columns = hash_agg_node
.get_group_keys()
.get_group_key()
.iter()
.map(|x| *x as usize)
.collect_vec();
Expand Down Expand Up @@ -140,7 +140,7 @@ impl BoxedExecutorBuilder for HashAggExecutorBuilder {
pub(crate) struct HashAggExecutor<K> {
/// factories to construct aggregator for each groups
agg_factories: Vec<AggStateFactory>,
/// Column indexes of keys that specify a group
/// Column indexes that specify a group
group_key_columns: Vec<usize>,
/// child executor
child: BoxedExecutor,
Expand Down Expand Up @@ -313,7 +313,7 @@ mod tests {
};

let agg_prost = HashAggNode {
group_keys: vec![0, 1],
group_key: vec![0, 1],
agg_calls: vec![agg_call],
};

Expand Down Expand Up @@ -378,7 +378,7 @@ mod tests {
};

let agg_prost = HashAggNode {
group_keys: vec![],
group_key: vec![],
agg_calls: vec![agg_call],
};

Expand Down
8 changes: 4 additions & 4 deletions src/batch/src/executor/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,16 @@ use crate::task::{BatchTaskContext, TaskId};
#[derive(Default)]
pub(super) struct EquiJoinParams {
join_type: JoinType,
/// Column indexes of left keys in equi join, e.g., the column indexes of `b1` and `b3` in `b`.
/// Column indexes of left key in equi join, e.g., the column indexes of `b1` and `b3` in `b`.
left_key_columns: Vec<usize>,
/// Data types of left keys in equi join, e.g., the column types of `b1` and `b3` in `b`.
/// Data types of left key in equi join, e.g., the column types of `b1` and `b3` in `b`.
left_key_types: Vec<DataType>,
/// Data types of left columns in equi join, e.g., the column types of `b1` `b2` `b3` in `b`.
left_col_len: usize,
/// Column indexes of right keys in equi join, e.g., the column indexes of `a1` and `a3` in
/// Column indexes of right key in equi join, e.g., the column indexes of `a1` and `a3` in
/// `a`.
right_key_columns: Vec<usize>,
/// Data types of right keys in equi join, e.g., the column types of `a1` and `a3` in `a`.
/// Data types of right key in equi join, e.g., the column types of `a1` and `a3` in `a`.
right_key_types: Vec<DataType>,
/// Data types of right columns in equi join, e.g., the column types of `a1` `a2` `a3` in `a`.
right_col_len: usize,
Expand Down
12 changes: 6 additions & 6 deletions src/batch/src/executor/join/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,16 +280,16 @@ impl BoxedExecutorBuilder for SortMergeJoinExecutor {
.iter()
.map(|&idx| original_schema[idx].clone())
.collect();
let left_keys = sort_merge_join_node.get_left_keys();
let left_key = sort_merge_join_node.get_left_key();
let mut probe_key_idxs = vec![];
for key in left_keys {
probe_key_idxs.push(*key as usize);
for idx in left_key {
probe_key_idxs.push(*idx as usize);
}

let right_keys = sort_merge_join_node.get_right_keys();
let right_key = sort_merge_join_node.get_right_key();
let mut build_key_idxs = vec![];
for key in right_keys {
build_key_idxs.push(*key as usize);
for idx in right_key {
build_key_idxs.push(*idx as usize);
}
match join_type {
JoinType::Inner => {
Expand Down
32 changes: 16 additions & 16 deletions src/batch/src/executor/sort_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::task::BatchTaskContext;
/// automatically because all tuples should be aggregated together.
pub struct SortAggExecutor {
agg_states: Vec<BoxedAggState>,
group_keys: Vec<BoxedExpression>,
group_key: Vec<BoxedExpression>,
sorted_groupers: Vec<BoxedSortedGrouper>,
child: BoxedExecutor,
schema: Schema,
Expand Down Expand Up @@ -70,18 +70,18 @@ impl BoxedExecutorBuilder for SortAggExecutor {
.map(|x| AggStateFactory::new(x)?.create_agg_state())
.try_collect()?;

let group_keys: Vec<_> = sort_agg_node
.get_group_keys()
let group_key: Vec<_> = sort_agg_node
.get_group_key()
.iter()
.map(build_from_prost)
.try_collect()?;

let sorted_groupers: Vec<_> = group_keys
let sorted_groupers: Vec<_> = group_key
.iter()
.map(|e| create_sorted_grouper(e.return_type()))
.try_collect()?;

let fields = group_keys
let fields = group_key
.iter()
.map(|e| e.return_type())
.chain(agg_states.iter().map(|e| e.return_type()))
Expand All @@ -90,7 +90,7 @@ impl BoxedExecutorBuilder for SortAggExecutor {

Ok(Box::new(Self {
agg_states,
group_keys,
group_key,
sorted_groupers,
child: inputs.remove(0),
schema: Schema { fields },
Expand Down Expand Up @@ -119,13 +119,13 @@ impl SortAggExecutor {
async fn do_execute(mut self: Box<Self>) {
let mut left_capacity = self.output_size_limit;
let (mut group_builders, mut agg_builders) =
SortAggExecutor::create_builders(&self.group_keys, &self.agg_states);
SortAggExecutor::create_builders(&self.group_key, &self.agg_states);

#[for_await]
for child_chunk in self.child.execute() {
let child_chunk = child_chunk?.compact()?;
let group_columns: Vec<_> = self
.group_keys
.group_key
.iter_mut()
.map(|expr| expr.eval(&child_chunk))
.try_collect()?;
Expand Down Expand Up @@ -178,7 +178,7 @@ impl SortAggExecutor {

// reset builders and capactiy to build next output chunk
(group_builders, agg_builders) =
SortAggExecutor::create_builders(&self.group_keys, &self.agg_states);
SortAggExecutor::create_builders(&self.group_key, &self.agg_states);

left_capacity = self.output_size_limit;
}
Expand Down Expand Up @@ -242,10 +242,10 @@ impl SortAggExecutor {
}

fn create_builders(
group_keys: &[BoxedExpression],
group_key: &[BoxedExpression],
agg_states: &[BoxedAggState],
) -> (Vec<ArrayBuilderImpl>, Vec<ArrayBuilderImpl>) {
let group_builders = group_keys
let group_builders = group_key
.iter()
.map(|e| e.return_type().create_array_builder(1))
.collect();
Expand Down Expand Up @@ -336,7 +336,7 @@ mod tests {

let executor = Box::new(SortAggExecutor {
agg_states,
group_keys: group_exprs,
group_key: group_exprs,
sorted_groupers,
child: Box::new(child),
schema: Schema { fields },
Expand Down Expand Up @@ -444,7 +444,7 @@ mod tests {

let executor = Box::new(SortAggExecutor {
agg_states,
group_keys: group_exprs,
group_key: group_exprs,
sorted_groupers,
child: Box::new(child),
schema: Schema { fields },
Expand Down Expand Up @@ -552,7 +552,7 @@ mod tests {
.collect::<Vec<Field>>();
let executor = Box::new(SortAggExecutor {
agg_states,
group_keys: vec![],
group_key: vec![],
sorted_groupers: vec![],
child: Box::new(child),
schema: Schema { fields },
Expand Down Expand Up @@ -655,7 +655,7 @@ mod tests {
let output_size_limit = 4;
let executor = Box::new(SortAggExecutor {
agg_states,
group_keys: group_exprs,
group_key: group_exprs,
sorted_groupers,
child: Box::new(child),
schema: Schema { fields },
Expand Down Expand Up @@ -776,7 +776,7 @@ mod tests {

let executor = Box::new(SortAggExecutor {
agg_states,
group_keys: group_exprs,
group_key: group_exprs,
sorted_groupers,
child: Box::new(child),
schema: Schema { fields },
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/task/hash_shuffle_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ fn generate_hash_values(chunk: &DataChunk, hash_info: &HashInfo) -> Result<Vec<u
let hash_values = chunk
.get_hash_values(
&hash_info
.keys
.key
.iter()
.map(|key| *key as usize)
.map(|idx| *idx as usize)
.collect::<Vec<_>>(),
hasher_builder,
)
Expand Down
Loading