Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support otel context propagation for databend-query #15096

Merged
merged 4 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions src/query/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ naive-cityhash = "0.2.0"
once_cell = { workspace = true }
opendal = { workspace = true }
opensrv-mysql = { version = "0.5.0", features = ["tls"] }
opentelemetry = { version = "0.21", features = ["trace", "logs"] }
opentelemetry-http = "0.10.0"
opentelemetry_sdk = { version = "0.21", features = ["trace", "logs", "rt-tokio"] }
parking_lot = { workspace = true }
parquet = { workspace = true }
paste = "1.0.9"
Expand Down
28 changes: 28 additions & 0 deletions src/query/service/src/servers/http/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ use http::HeaderMap;
use http::HeaderValue;
use log::error;
use log::warn;
use opentelemetry::baggage::BaggageExt;
use opentelemetry::propagation::TextMapPropagator;
use opentelemetry_http::HeaderExtractor;
use opentelemetry_sdk::propagation::BaggagePropagator;
use poem::error::Error as PoemError;
use poem::error::Result as PoemResult;
use poem::http::StatusCode;
Expand All @@ -55,6 +59,8 @@ const DEDUPLICATE_LABEL: &str = "X-DATABEND-DEDUPLICATE-LABEL";
const USER_AGENT: &str = "User-Agent";
const QUERY_ID: &str = "X-DATABEND-QUERY-ID";

const TRACE_PARENT: &str = "traceparent";

pub struct HTTPSessionMiddleware {
pub kind: HttpHandlerKind,
pub auth_manager: Arc<AuthMgr>,
Expand All @@ -66,6 +72,21 @@ impl HTTPSessionMiddleware {
}
}

fn extract_baggage_from_headers(headers: &HeaderMap) -> Option<Vec<(String, String)>> {
headers.get("baggage")?;
let propagator = BaggagePropagator::new();
let extractor = HeaderExtractor(headers);
let result: Vec<(String, String)> = {
let context = propagator.extract(&extractor);
let baggage = context.baggage();
baggage
.iter()
.map(|(key, (value, _metadata))| (key.to_string(), value.to_string()))
.collect()
};
Some(result)
}

fn get_credential(req: &Request, kind: HttpHandlerKind) -> Result<Credential> {
let std_auth_headers: Vec<_> = req.headers().get_all(AUTHORIZATION).iter().collect();
if std_auth_headers.len() > 1 {
Expand Down Expand Up @@ -204,12 +225,19 @@ impl<E> HTTPSessionEndpoint<E> {
.map(|id| id.to_str().unwrap().to_string())
.unwrap_or_else(|| Uuid::new_v4().to_string());

let trace_parent = req
.headers()
.get(TRACE_PARENT)
.map(|id| id.to_str().unwrap().to_string());
let baggage = extract_baggage_from_headers(req.headers());
Ok(HttpQueryContext::new(
session,
query_id,
node_id,
deduplicate_label,
user_agent,
trace_parent,
baggage,
req.method().to_string(),
req.uri().to_string(),
))
Expand Down
53 changes: 26 additions & 27 deletions src/query/service/src/servers/http/v1/http_query_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,7 @@ async fn query_final_handler(
ctx: &HttpQueryContext,
Path(query_id): Path<String>,
) -> PoemResult<impl IntoResponse> {
let trace_id = query_id_to_trace_id(&query_id);
let root = Span::root(
full_name!(),
SpanContext::new(trace_id, SpanId(rand::random())),
)
.with_properties(|| ctx.to_minitrace_properties());
let root = get_http_tracing_span(full_name!(), ctx, &query_id);
let _t = SlowRequestLogTracker::new(ctx);

async {
Expand Down Expand Up @@ -257,12 +252,7 @@ async fn query_cancel_handler(
ctx: &HttpQueryContext,
Path(query_id): Path<String>,
) -> PoemResult<impl IntoResponse> {
let trace_id = query_id_to_trace_id(&query_id);
let root = Span::root(
full_name!(),
SpanContext::new(trace_id, SpanId(rand::random())),
)
.with_properties(|| ctx.to_minitrace_properties());
let root = get_http_tracing_span(full_name!(), ctx, &query_id);
let _t = SlowRequestLogTracker::new(ctx);

async {
Expand Down Expand Up @@ -296,12 +286,7 @@ async fn query_state_handler(
ctx: &HttpQueryContext,
Path(query_id): Path<String>,
) -> PoemResult<impl IntoResponse> {
let trace_id = query_id_to_trace_id(&query_id);
let root = Span::root(
full_name!(),
SpanContext::new(trace_id, SpanId(rand::random())),
)
.with_properties(|| ctx.to_minitrace_properties());
let root = get_http_tracing_span(full_name!(), ctx, &query_id);

async {
let http_query_manager = HttpQueryManager::instance();
Expand All @@ -326,12 +311,7 @@ async fn query_page_handler(
ctx: &HttpQueryContext,
Path((query_id, page_no)): Path<(String, usize)>,
) -> PoemResult<impl IntoResponse> {
let trace_id = query_id_to_trace_id(&query_id);
let root = Span::root(
full_name!(),
SpanContext::new(trace_id, SpanId(rand::random())),
)
.with_properties(|| ctx.to_minitrace_properties());
let root = get_http_tracing_span(full_name!(), ctx, &query_id);
let _t = SlowRequestLogTracker::new(ctx);

async {
Expand Down Expand Up @@ -362,9 +342,7 @@ pub(crate) async fn query_handler(
ctx: &HttpQueryContext,
Json(req): Json<HttpQueryRequest>,
) -> PoemResult<impl IntoResponse> {
let trace_id = query_id_to_trace_id(&ctx.query_id);
let root = Span::root(full_name!(), SpanContext::new(trace_id, SpanId::default()))
.with_properties(|| ctx.to_minitrace_properties());
let root = get_http_tracing_span(full_name!(), ctx, &ctx.query_id);
let _t = SlowRequestLogTracker::new(ctx);

async {
Expand Down Expand Up @@ -497,3 +475,24 @@ impl Drop for SlowRequestLogTracker {
})
}
}

// get_http_tracing_span always return a valid span for tracing
// it will try to decode w3 traceparent and if empty or failed, it will create a new root span and throw a warning
fn get_http_tracing_span(name: &'static str, ctx: &HttpQueryContext, query_id: &str) -> Span {
if let Some(parent) = ctx.trace_parent.as_ref() {
let trace = parent.as_str();
match SpanContext::decode_w3c_traceparent(trace) {
Some(span_context) => {
return Span::root(name, span_context)
.with_properties(|| ctx.to_minitrace_properties());
}
None => {
warn!("failed to decode trace parent: {}", trace);
}
}
}

let trace_id = query_id_to_trace_id(query_id);
Span::root(name, SpanContext::new(trace_id, SpanId(rand::random())))
.with_properties(|| ctx.to_minitrace_properties())
}
35 changes: 25 additions & 10 deletions src/query/service/src/servers/http/v1/query/http_query_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::sync::Arc;

use http::StatusCode;
Expand All @@ -30,6 +31,8 @@ pub struct HttpQueryContext {
pub node_id: String,
pub deduplicate_label: Option<String>,
pub user_agent: Option<String>,
pub trace_parent: Option<String>,
pub opentelemetry_baggage: Option<Vec<(String, String)>>,
pub http_method: String,
pub uri: String,
}
Expand All @@ -41,6 +44,8 @@ impl HttpQueryContext {
node_id: String,
deduplicate_label: Option<String>,
user_agent: Option<String>,
trace_parent: Option<String>,
open_telemetry_baggage: Option<Vec<(String, String)>>,
ZhiHanZ marked this conversation as resolved.
Show resolved Hide resolved
http_method: String,
uri: String,
) -> Self {
Expand All @@ -50,6 +55,8 @@ impl HttpQueryContext {
node_id,
deduplicate_label,
user_agent,
trace_parent,
opentelemetry_baggage: open_telemetry_baggage,
http_method,
uri,
}
Expand All @@ -64,20 +71,28 @@ impl HttpQueryContext {
Ok(self.session.clone())
}

pub fn to_minitrace_properties(&self) -> Vec<(&'static str, String)> {
let mut properties = self.session.to_minitrace_properties();
properties.extend([
("query_id", self.query_id.clone()),
("node_id", self.node_id.clone()),
pub fn to_minitrace_properties(&self) -> BTreeMap<String, String> {
let mut result = BTreeMap::new();
let properties = self.session.to_minitrace_properties();
result.extend(properties);
result.extend([
("query_id".to_string(), self.query_id.clone()),
("node_id".to_string(), self.node_id.clone()),
(
"deduplicate_label",
"deduplicate_label".to_string(),
self.deduplicate_label.clone().unwrap_or_default(),
),
("user_agent", self.user_agent.clone().unwrap_or_default()),
("http_method", self.http_method.clone()),
("uri", self.uri.clone()),
(
"user_agent".to_string(),
self.user_agent.clone().unwrap_or_default(),
),
("http_method".to_string(), self.http_method.clone()),
("uri".to_string(), self.uri.clone()),
]);
properties
if let Some(baggage) = self.opentelemetry_baggage.clone() {
result.extend(baggage);
}
result
}

pub fn set_fail(&self) {
Expand Down
15 changes: 9 additions & 6 deletions src/query/service/src/sessions/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,20 @@ impl Session {
}))
}

pub fn to_minitrace_properties(self: &Arc<Self>) -> Vec<(&'static str, String)> {
pub fn to_minitrace_properties(self: &Arc<Self>) -> Vec<(String, String)> {
let mut properties = vec![
("session_id", self.id.clone()),
("session_database", self.get_current_database()),
("session_tenant", self.get_current_tenant().to_string()),
("session_id".to_string(), self.id.clone()),
("session_database".to_string(), self.get_current_database()),
(
"session_tenant".to_string(),
self.get_current_tenant().to_string(),
),
];
if let Some(query_id) = self.get_current_query_id() {
properties.push(("query_id", query_id));
properties.push(("query_id".to_string(), query_id));
}
if let Some(connection_id) = self.get_mysql_conn_id() {
properties.push(("connection_id", connection_id.to_string()));
properties.push(("connection_id".to_string(), connection_id.to_string()));
}
properties
}
Expand Down
Loading