Skip to content

Commit

Permalink
Merge branch 'main' into rc/expose-vnode-function-for-debugging
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Aug 1, 2022
2 parents 91d6b54 + 0da423e commit 241c90a
Show file tree
Hide file tree
Showing 10 changed files with 195 additions and 111 deletions.
12 changes: 6 additions & 6 deletions e2e_test/streaming/string_agg.slt
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@ statement ok
insert into t values ('a', 1, 2), ('b', 4, 6);

statement ok
create materialized view mv1 as select string_agg(a order by a desc) as res from t;
create materialized view mv1 as select string_agg(a, ',' order by a desc) as res from t;

statement ok
create materialized view mv2 as select string_agg(a order by b) as res from t group by c;
create materialized view mv2 as select string_agg(a, b::varchar order by b) as res from t group by c;

statement ok
flush;

query T
select * from mv1;
----
ba
b,a

query T
select * from mv2 order by res;
Expand All @@ -33,13 +33,13 @@ insert into t values ('c', 2, 2), ('d', 3, 6);
query T
select * from mv1;
----
dcba
d,c,b,a

query T
select * from mv2 order by res;
----
ac
db
a2c
d4b

statement ok
drop materialized view mv1;
Expand Down
6 changes: 5 additions & 1 deletion src/frontend/src/optimizer/rule/apply_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ impl Rule for ApplyScanRule {

// LogicalJoin with correlated inputs in join condition has been handled by ApplyJoin. This
// handles the ones without correlation
if let (None, None) = (right.as_logical_scan(), right.as_logical_join()) {
if let (None, None, None) = (
right.as_logical_scan(),
right.as_logical_join(),
right.as_logical_values(),
) {
return None;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -558,3 +558,18 @@
LogicalAgg { group_key: [t2.y], aggs: [] }
LogicalScan { table: t2, columns: [y] }
LogicalScan { table: t3, columns: [x, y] }
- sql: |
create table t1 (a int, b int);
create table t2 (b int, c int);
select a, (select t1.a), c from t1, t2 where t1.b = t2.b order by c;
optimized_logical_plan: |
LogicalProject { exprs: [t1.a, t1.a, t2.c] }
LogicalJoin { type: LeftOuter, on: (t1.a = t1.a) }
LogicalJoin { type: Inner, on: (t1.b = t2.b) }
LogicalScan { table: t1, columns: [a, b] }
LogicalScan { table: t2, columns: [b, c] }
LogicalProject { exprs: [t1.a, t1.a] }
LogicalJoin { type: Inner, on: true }
LogicalAgg { group_key: [t1.a], aggs: [] }
LogicalScan { table: t1, columns: [a] }
LogicalValues { rows: [[]], schema: Schema { fields: [] } }
18 changes: 12 additions & 6 deletions src/source/src/connector_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl InnerConnectorSourceReader {
metrics: Arc<SourceMetrics>,
context: SourceContext,
) -> Result<Self> {
log::debug!(
tracing::debug!(
"Spawning new connector source inner reader with config {:?}, split {:?}",
prop,
split
Expand Down Expand Up @@ -141,7 +141,7 @@ impl InnerConnectorSourceReader {
biased;
// stop chan has high priority
_ = stop.borrow_mut() => {
log::debug!("connector reader {} stop signal received", id);
tracing::debug!("connector reader {} stop signal received", id);
break;
}

Expand All @@ -152,12 +152,12 @@ impl InnerConnectorSourceReader {

match chunk.map_err(|e| internal_error(e.to_string())) {
Err(e) => {
log::error!("connector reader {} error happened {}", id, e.to_string());
tracing::error!("connector reader {} error happened {}", id, e.to_string());
output.send(Either::Right(e)).await.ok();
break;
}
Ok(None) => {
log::warn!("connector reader {} stream stopped", id);
tracing::warn!("connector reader {} stream stopped", id);
break;
}
Ok(Some(msg)) => {
Expand Down Expand Up @@ -192,7 +192,13 @@ impl StreamSourceReader for ConnectorSourceReader {
*split_offset_mapping
.entry(msg.split_id.clone())
.or_insert_with(|| "".to_string()) = msg.offset.to_string();
events.push(self.parser.parse(content.as_ref(), &self.columns)?);
match self.parser.parse(content.as_ref(), &self.columns) {
Err(e) => {
tracing::warn!("message parsing failed {}, skipping", e.to_string());
continue;
}
Ok(result) => events.push(result),
}
}
}
let mut ops = Vec::with_capacity(events.iter().map(|e| e.ops.len()).sum());
Expand Down Expand Up @@ -323,7 +329,7 @@ impl ConnectorSource {
};
let readers =
try_join_all(to_reader_splits.into_iter().map(|split| {
log::debug!("spawning connector split reader for split {:?}", split);
tracing::debug!("spawning connector split reader for split {:?}", split);
let props = config.clone();
let columns = columns.clone();
let metrics = source_metrics.clone();
Expand Down
Loading

0 comments on commit 241c90a

Please sign in to comment.