Skip to content

Commit

Permalink
Merge branch 'main' into issue-13815
Browse files Browse the repository at this point in the history
  • Loading branch information
RinChanNOWWW authored Nov 28, 2023
2 parents 66601f8 + 282b69d commit 833abda
Show file tree
Hide file tree
Showing 15 changed files with 380 additions and 198 deletions.
27 changes: 23 additions & 4 deletions src/common/tracing/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::fmt::Display;
use std::fmt::Formatter;

Expand Down Expand Up @@ -44,11 +45,13 @@ impl Config {
on: false,
level: "INFO".to_string(),
endpoint: "http://127.0.0.1:4317".to_string(),
labels: BTreeMap::new(),
},
query: QueryLogConfig {
on: false,
dir: "".to_string(),
otlp_endpoint: "".to_string(),
labels: BTreeMap::new(),
},
tracing: TracingConfig {
on: false,
Expand Down Expand Up @@ -127,14 +130,21 @@ pub struct OTLPConfig {
pub on: bool,
pub level: String,
pub endpoint: String,
pub labels: BTreeMap<String, String>,
}

impl Display for OTLPConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let labels = self
.labels
.iter()
.map(|(k, v)| format!("{}:{}", k, v))
.collect::<Vec<_>>()
.join(",");
write!(
f,
"enabled={}, level={}, endpoint={}",
self.on, self.level, self.endpoint
"enabled={}, level={}, endpoint={}, labels={}",
self.on, self.level, self.endpoint, labels
)
}
}
Expand All @@ -145,6 +155,7 @@ impl Default for OTLPConfig {
on: false,
level: "INFO".to_string(),
endpoint: "http://127.0.0.1:4317".to_string(),
labels: BTreeMap::new(),
}
}
}
Expand All @@ -154,14 +165,21 @@ pub struct QueryLogConfig {
pub on: bool,
pub dir: String,
pub otlp_endpoint: String,
pub labels: BTreeMap<String, String>,
}

impl Display for QueryLogConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let labels = self
.labels
.iter()
.map(|(k, v)| format!("{}:{}", k, v))
.collect::<Vec<_>>()
.join(",");
write!(
f,
"enabled={}, dir={}, otlp_endpoint={}",
self.on, self.dir, self.otlp_endpoint
"enabled={}, dir={}, otlp_endpoint={}, labels={}",
self.on, self.dir, self.otlp_endpoint, labels,
)
}
}
Expand All @@ -172,6 +190,7 @@ impl Default for QueryLogConfig {
on: false,
dir: "".to_string(),
otlp_endpoint: "".to_string(),
labels: BTreeMap::new(),
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/common/tracing/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ pub fn init_logging(
if cfg.otlp.on {
let mut labels = labels.clone();
labels.insert("category".to_string(), "system".to_string());
labels.extend(cfg.otlp.labels.clone());
let logger = new_otlp_log_writer(&cfg.tracing.otlp_endpoint, labels);
let dispatch = fern::Dispatch::new()
.level(cfg.otlp.level.parse().unwrap_or(LevelFilter::Info))
Expand Down Expand Up @@ -194,6 +195,7 @@ pub fn init_logging(
if !cfg.query.otlp_endpoint.is_empty() {
let mut labels = labels.clone();
labels.insert("category".to_string(), "query".to_string());
labels.extend(cfg.query.labels.clone());
let logger = new_otlp_log_writer(&cfg.tracing.otlp_endpoint, labels);
query_logger = query_logger.chain(Box::new(logger) as Box<dyn Log>);
}
Expand Down
15 changes: 15 additions & 0 deletions src/query/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::collections::HashMap;
use std::env;
use std::fmt;
Expand Down Expand Up @@ -2022,6 +2023,11 @@ pub struct OTLPLogConfig {
)]
#[serde(rename = "endpoint")]
pub otlp_endpoint: String,

/// Log Labels
#[clap(skip)]
#[serde(rename = "labels")]
pub otlp_labels: BTreeMap<String, String>,
}

impl Default for OTLPLogConfig {
Expand All @@ -2038,6 +2044,7 @@ impl TryInto<InnerOTLPLogConfig> for OTLPLogConfig {
on: self.otlp_on,
level: self.otlp_level,
endpoint: self.otlp_endpoint,
labels: self.otlp_labels,
})
}
}
Expand All @@ -2048,6 +2055,7 @@ impl From<InnerOTLPLogConfig> for OTLPLogConfig {
otlp_on: inner.on,
otlp_level: inner.level,
otlp_endpoint: inner.endpoint,
otlp_labels: inner.labels,
}
}
}
Expand All @@ -2072,6 +2080,11 @@ pub struct QueryLogConfig {
)]
#[serde(rename = "otlp_endpoint")]
pub log_query_otlp_endpoint: String,

/// Query Log Labels
#[clap(skip)]
#[serde(rename = "labels")]
pub log_query_otlp_labels: BTreeMap<String, String>,
}

impl Default for QueryLogConfig {
Expand All @@ -2088,6 +2101,7 @@ impl TryInto<InnerQueryLogConfig> for QueryLogConfig {
on: self.log_query_on,
dir: self.log_query_dir,
otlp_endpoint: self.log_query_otlp_endpoint,
labels: self.log_query_otlp_labels,
})
}
}
Expand All @@ -2098,6 +2112,7 @@ impl From<InnerQueryLogConfig> for QueryLogConfig {
log_query_on: inner.on,
log_query_dir: inner.dir,
log_query_otlp_endpoint: inner.otlp_endpoint,
log_query_otlp_labels: inner.labels,
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions src/query/pipeline/core/src/processors/port.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ impl InputPort {
((flags & IS_FINISHED) == IS_FINISHED) && ((flags & HAS_DATA) == 0)
}

pub fn is_need_data(&self) -> bool {
self.shared.get_flags() & NEED_DATA != 0
}

#[inline(always)]
pub fn set_need_data(&self) {
unsafe {
Expand Down Expand Up @@ -240,6 +244,14 @@ impl OutputPort {
(self.shared.get_flags() & IS_FINISHED) != 0
}

pub fn has_data(&self) -> bool {
(self.shared.get_flags() & HAS_DATA) != 0
}

pub fn is_need_data(&self) -> bool {
(self.shared.get_flags() & NEED_DATA) != 0
}

#[inline(always)]
pub fn can_push(&self) -> bool {
let flags = self.shared.get_flags();
Expand Down
54 changes: 54 additions & 0 deletions src/query/service/src/pipelines/executor/executor_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,8 @@ impl RunningGraph {
name: String,
state: String,
details_status: Option<String>,
inputs_status: Vec<(&'static str, &'static str, &'static str)>,
outputs_status: Vec<(&'static str, &'static str, &'static str)>,
}

impl Debug for NodeDisplay {
Expand All @@ -491,12 +493,16 @@ impl RunningGraph {
.field("name", &self.name)
.field("id", &self.id)
.field("state", &self.state)
.field("inputs_status", &self.inputs_status)
.field("outputs_status", &self.outputs_status)
.finish(),
Some(details_status) => f
.debug_struct("Node")
.field("name", &self.name)
.field("id", &self.id)
.field("state", &self.state)
.field("inputs_status", &self.inputs_status)
.field("outputs_status", &self.outputs_status)
.field("details", details_status)
.finish(),
}
Expand All @@ -508,7 +514,55 @@ impl RunningGraph {
for node_index in self.0.graph.node_indices() {
unsafe {
let state = self.0.graph[node_index].state.lock().unwrap();
let inputs_status = self.0.graph[node_index]
.inputs_port
.iter()
.map(|x| {
let finished = match x.is_finished() {
true => "Finished",
false => "Unfinished",
};

let has_data = match x.has_data() {
true => "HasData",
false => "Nodata",
};

let need_data = match x.is_need_data() {
true => "NeedData",
false => "UnNeeded",
};

(finished, has_data, need_data)
})
.collect::<Vec<_>>();

let outputs_status = self.0.graph[node_index]
.outputs_port
.iter()
.map(|x| {
let finished = match x.is_finished() {
true => "Finished",
false => "Unfinished",
};

let has_data = match x.has_data() {
true => "HasData",
false => "Nodata",
};

let need_data = match x.is_need_data() {
true => "NeedData",
false => "UnNeeded",
};

(finished, has_data, need_data)
})
.collect::<Vec<_>>();

nodes_display.push(NodeDisplay {
inputs_status,
outputs_status,
id: self.0.graph[node_index].processor.id().index(),
name: self.0.graph[node_index].processor.name(),
details_status: self.0.graph[node_index].processor.details_status(),
Expand Down
25 changes: 16 additions & 9 deletions src/query/sql/src/executor/physical_plans/physical_lambda.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,19 +107,26 @@ impl PhysicalPlanBuilder {
.map(|arg| {
match arg {
ScalarExpr::BoundColumnRef(col) => {
let index = input_schema
.index_of(&col.column.index.to_string())
.unwrap();
let index = match input_schema.index_of(&col.column.index.to_string()) {
Ok(index) => index,
Err(_) => {
// the argument of lambda function may be another lambda function
match lambda_index_map.get(&col.column.column_name) {
Some(index) => *index,
None => {
return Err(ErrorCode::Internal(format!(
"Unable to get lambda function's argument \"{}\".",
col.column.column_name
)))
}
}
}
};
Ok(index)
}
ScalarExpr::LambdaFunction(inner_func) => {
// nested lambda function as an argument of parent lambda function
let index = lambda_index_map.get(&inner_func.display_name).unwrap();
Ok(*index)
}
_ => {
Err(ErrorCode::Internal(
"lambda function's argument must be a BoundColumnRef or LambdaFunction"
"Lambda function's argument must be a BoundColumnRef"
.to_string(),
))
}
Expand Down
40 changes: 17 additions & 23 deletions src/query/sql/src/executor/physical_plans/physical_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,33 +99,27 @@ impl PhysicalPlanBuilder {
let arg_indices = func
.arguments
.iter()
.map(|arg| {
match arg {
ScalarExpr::BoundColumnRef(col) => {
let index = input_schema
.index_of(&col.column.index.to_string())
.unwrap();
Ok(index)
}
ScalarExpr::UDFServerCall(inner_udf) => {
// nested udf function as an argument of parent udf function
let index = udf_index_map.get(&inner_udf.display_name).unwrap();
Ok(*index)
}
_ => {
Err(ErrorCode::Internal(
"udf function's argument must be a BoundColumnRef or UDFServerCall"
.to_string(),
))
}
.map(|arg| match arg {
ScalarExpr::BoundColumnRef(col) => {
let index =
match input_schema.index_of(&col.column.index.to_string()) {
Ok(index) => index,
Err(_) => {
return Err(ErrorCode::Internal(format!(
"Unable to get udf function's argument \"{}\".",
col.column.column_name
)));
}
};
Ok(index)
}
_ => Err(ErrorCode::Internal(
"Udf function's argument must be a BoundColumnRef".to_string(),
)),
})
.collect::<Result<Vec<_>>>()?;

udf_index_map.insert(
func.display_name.clone(),
index,
);
udf_index_map.insert(func.display_name.clone(), index);
index += 1;

let arg_exprs = func
Expand Down
5 changes: 0 additions & 5 deletions src/query/sql/src/planner/binder/bind_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use itertools::Itertools;
use super::AggregateInfo;
use super::INTERNAL_COLUMN_FACTORY;
use crate::binder::column_binding::ColumnBinding;
use crate::binder::lambda::LambdaInfo;
use crate::binder::window::WindowInfo;
use crate::binder::ColumnBindingBuilder;
use crate::normalize_identifier;
Expand Down Expand Up @@ -117,8 +116,6 @@ pub struct BindContext {

pub windows: WindowInfo,

pub lambda_info: LambdaInfo,

/// If the `BindContext` is created from a CTE, record the cte name
pub cte_name: Option<String>,

Expand Down Expand Up @@ -170,7 +167,6 @@ impl BindContext {
bound_internal_columns: BTreeMap::new(),
aggregate_info: AggregateInfo::default(),
windows: WindowInfo::default(),
lambda_info: LambdaInfo::default(),
cte_name: None,
cte_map_ref: Box::default(),
allow_internal_columns: true,
Expand All @@ -190,7 +186,6 @@ impl BindContext {
bound_internal_columns: BTreeMap::new(),
aggregate_info: Default::default(),
windows: Default::default(),
lambda_info: Default::default(),
cte_name: parent.cte_name,
cte_map_ref: parent.cte_map_ref.clone(),
allow_internal_columns: parent.allow_internal_columns,
Expand Down
Loading

0 comments on commit 833abda

Please sign in to comment.