Skip to content

Commit

Permalink
feat(spans): Extracts Queue tags and measurements from transaction tr…
Browse files Browse the repository at this point in the history
…ace context (#3650)

Updates the "create span from transaction" code path to extract:
- `messaging.destination.name` and `messaging.message.id` as tags from
`contexts.trace.data`
- `messaging.message.receive.latency`, `messaging.message.body.size`,
and `messaging.message.retry.count` from `contexts.trace.data`
only for Queue transactions.
  • Loading branch information
edwardgou-sentry authored May 28, 2024
1 parent d812b63 commit 8b7612f
Show file tree
Hide file tree
Showing 4 changed files with 306 additions and 2 deletions.
249 changes: 247 additions & 2 deletions relay-event-normalization/src/normalize/span/tag_extraction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,37 @@ pub fn extract_span_tags(event: &Event, spans: &mut [Annotated<Span>], max_tag_v
}
}

/// Extract segment span specific tags and measurements from the event and materialize them into the spans.
pub fn extract_segment_span_tags(event: &Event, spans: &mut [Annotated<Span>]) {
let segment_tags = extract_segment_tags(event);
let segment_measurements = extract_segment_measurements(event);

for span in spans {
let Some(span) = span.value_mut() else {
continue;
};

if !segment_measurements.is_empty() {
span.measurements
.get_or_insert_with(Default::default)
.extend(
segment_measurements
.iter()
.map(|(k, v)| (k.clone(), Annotated::new(v.clone()))),
);
}
if !segment_tags.is_empty() {
span.sentry_tags
.get_or_insert_with(Default::default)
.extend(
segment_tags.iter().map(|(k, v)| {
(k.clone().sentry_tag_key().into(), Annotated::new(v.clone()))
}),
);
}
}
}

/// Extracts tags shared by every span.
fn extract_shared_tags(event: &Event) -> BTreeMap<SpanTagKey, String> {
let mut tags = BTreeMap::new();
Expand Down Expand Up @@ -304,6 +335,80 @@ fn extract_shared_tags(event: &Event) -> BTreeMap<SpanTagKey, String> {
tags
}

/// Extracts measurements that should only be saved on segment spans.
fn extract_segment_measurements(event: &Event) -> BTreeMap<String, Measurement> {
let mut measurements = BTreeMap::new();

if let Some(trace_context) = event.context::<TraceContext>() {
if let Some(op) = extract_transaction_op(trace_context) {
if op == "queue.publish" || op == "queue.process" {
if let Some(data) = trace_context.data.value() {
for (field, key, unit) in [
(
&data.messaging_message_retry_count,
"messaging.message.retry.count",
MetricUnit::None,
),
(
&data.messaging_message_receive_latency,
"messaging.message.receive.latency",
MetricUnit::Duration(DurationUnit::MilliSecond),
),
(
&data.messaging_message_body_size,
"messaging.message.body.size",
MetricUnit::Information(InformationUnit::Byte),
),
] {
if let Some(value) = value_to_f64(field.value()) {
measurements.insert(
key.into(),
Measurement {
value: value.into(),
unit: unit.into(),
},
);
}
}
}
}
}
}

measurements
}

/// Extract tags that should only be saved on segment spans.
fn extract_segment_tags(event: &Event) -> BTreeMap<SpanTagKey, String> {
let mut tags = BTreeMap::new();

if let Some(trace_context) = event.context::<TraceContext>() {
if let Some(op) = extract_transaction_op(trace_context) {
if op == "queue.publish" || op == "queue.process" {
if let Some(destination_name) = trace_context
.data
.value()
.and_then(|data| data.messaging_destination_name.as_str())
{
tags.insert(
SpanTagKey::MessagingDestinationName,
destination_name.into(),
);
}
if let Some(message_id) = trace_context
.data
.value()
.and_then(|data| data.messaging_message_id.as_str())
{
tags.insert(SpanTagKey::MessagingMessageId, message_id.into());
}
}
}
}

tags
}

/// Writes fields into [`Span::sentry_tags`].
///
/// Generating new span data fields is based on a combination of looking at
Expand Down Expand Up @@ -736,18 +841,21 @@ pub fn extract_measurements(span: &mut Span, is_mobile: bool) {

if span_op.starts_with("queue.") {
if let Some(data) = span.data.value() {
for (field, key) in [
for (field, key, unit) in [
(
&data.messaging_message_retry_count,
"messaging.message.retry.count",
MetricUnit::None,
),
(
&data.messaging_message_receive_latency,
"messaging.message.receive.latency",
MetricUnit::Duration(DurationUnit::MilliSecond),
),
(
&data.messaging_message_body_size,
"messaging.message.body.size",
MetricUnit::Information(InformationUnit::Byte),
),
] {
if let Some(value) = value_to_f64(field.value()) {
Expand All @@ -756,7 +864,7 @@ pub fn extract_measurements(span: &mut Span, is_mobile: bool) {
key.into(),
Measurement {
value: value.into(),
unit: MetricUnit::None.into(),
unit: unit.into(),
}
.into(),
);
Expand Down Expand Up @@ -2019,6 +2127,143 @@ LIMIT 1
Some(&"abc123".to_string())
);
}

#[test]
fn test_extract_segment_queue_tags_and_measurement_from_transaction() {
let json = r#"
{
"type": "transaction",
"platform": "python",
"start_timestamp": "2021-04-26T07:59:01+0100",
"timestamp": "2021-04-26T08:00:00+0100",
"transaction": "foo",
"contexts": {
"trace": {
"op": "queue.process",
"status": "ok",
"data": {
"messaging.destination.name": "default",
"messaging.message.id": "abc123",
"messaging.message.receive.latency": 456,
"messaging.message.body.size": 100,
"messaging.message.retry.count": 3
}
}
}
}
"#;

let event = Annotated::<Event>::from_json(json)
.unwrap()
.into_value()
.unwrap();
let mut spans = [Span::from(&event).into()];

extract_segment_span_tags(&event, &mut spans);

let segment_span: &Annotated<Span> = &spans[0];
let tags = segment_span.value().unwrap().sentry_tags.value().unwrap();
let measurements = segment_span.value().unwrap().measurements.value().unwrap();

assert_eq!(
tags.get("messaging.destination.name"),
Some(&Annotated::new("default".to_string()))
);

assert_eq!(
tags.get("messaging.message.id"),
Some(&Annotated::new("abc123".to_string()))
);

assert_debug_snapshot!(measurements, @r###"
Measurements(
{
"messaging.message.body.size": Measurement {
value: 100.0,
unit: Information(
Byte,
),
},
"messaging.message.receive.latency": Measurement {
value: 456.0,
unit: Duration(
MilliSecond,
),
},
"messaging.message.retry.count": Measurement {
value: 3.0,
unit: None,
},
},
)
"###);
}

#[test]
fn test_does_not_extract_segment_tags_and_measurements_on_child_spans() {
let json = r#"
{
"type": "transaction",
"platform": "python",
"start_timestamp": "2021-04-26T07:59:01+0100",
"timestamp": "2021-04-26T08:00:00+0100",
"transaction": "foo",
"contexts": {
"trace": {
"op": "queue.process",
"status": "ok",
"data": {
"messaging.destination.name": "default",
"messaging.message.id": "abc123",
"messaging.message.receive.latency": 456,
"messaging.message.body.size": 100,
"messaging.message.retry.count": 3
}
}
},
"spans": [
{
"op": "queue.process",
"span_id": "bd429c44b67a3eb1",
"start_timestamp": 1597976300.0000000,
"timestamp": 1597976302.0000000,
"trace_id": "ff62a8b040f340bda5d830223def1d81",
"data": {
"messaging.message.body.size": 200
}
}
]
}
"#;

let mut event = Annotated::<Event>::from_json(json)
.unwrap()
.into_value()
.unwrap();

extract_span_tags_from_event(&mut event, 200);

let span = &event.spans.value().unwrap()[0];
let tags = span.value().unwrap().sentry_tags.value().unwrap();
let measurements = span.value().unwrap().measurements.value().unwrap();

assert_eq!(tags.get("messaging.destination.name"), None);
assert_eq!(tags.get("messaging.message.id"), None);

assert_debug_snapshot!(measurements, @r###"
Measurements(
{
"messaging.message.body.size": Measurement {
value: 200.0,
unit: Information(
Byte,
),
},
},
)
"###);
}

#[test]
fn extract_span_status_into_sentry_tags() {
let json = r#"
Expand Down
30 changes: 30 additions & 0 deletions relay-event-schema/src/protocol/contexts/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,36 @@ pub struct Data {
#[metastructure(field = "previousRoute", pii = "maybe", skip_serialization = "empty")]
pub previous_route: Annotated<Route>,

/// The destination name (ie queue/topic) that a producer/consumer acts on.
///
/// Set by backend SDKs with messaging integration.
#[metastructure(field = "messaging.destination.name")]
pub messaging_destination_name: Annotated<String>,

/// The id of the message in the messaging event.
///
/// Set by backend SDKs with messaging integration.
#[metastructure(field = "messaging.message.id")]
pub messaging_message_id: Annotated<String>,

/// The time duration that a message waited in queue before being received.
///
/// Set by backend SDKs with messaging integration.
#[metastructure(field = "messaging.message.receive.latency")]
pub messaging_message_receive_latency: Annotated<Value>,

/// The number of times a message was redelivered.
///
/// Set by backend SDKs with messaging integration.
#[metastructure(field = "messaging.message.retry.count")]
pub messaging_message_retry_count: Annotated<Value>,

/// The size of the message body in bytes.
///
/// Set by backend SDKs with messaging integration.
#[metastructure(field = "messaging.message.body.size")]
pub messaging_message_body_size: Annotated<Value>,

/// Additional arbitrary fields for forwards compatibility.
#[metastructure(
additional_properties,
Expand Down
1 change: 1 addition & 0 deletions relay-server/src/services/processor/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub fn extract_transaction_span(event: &Event, max_tag_value_size: usize) -> Opt
let mut spans = [Span::from(event).into()];

tag_extraction::extract_span_tags(event, &mut spans, max_tag_value_size);
tag_extraction::extract_segment_span_tags(event, &mut spans);

spans.into_iter().next().and_then(Annotated::into_value)
}
28 changes: 28 additions & 0 deletions relay-server/tests/snapshots/test_fixtures__event_schema.snap
Original file line number Diff line number Diff line change
Expand Up @@ -1028,6 +1028,34 @@ expression: "relay_event_schema::protocol::event_json_schema()"
{
"type": "object",
"properties": {
"messaging.destination.name": {
"description": " The destination name (ie queue/topic) that a producer/consumer acts on.\n\n Set by backend SDKs with messaging integration.",
"default": null,
"type": [
"string",
"null"
]
},
"messaging.message.body.size": {
"description": " The size of the message body in bytes.\n\n Set by backend SDKs with messaging integration.",
"default": null
},
"messaging.message.id": {
"description": " The id of the message in the messaging event.\n\n Set by backend SDKs with messaging integration.",
"default": null,
"type": [
"string",
"null"
]
},
"messaging.message.receive.latency": {
"description": " The time duration that a message waited in queue before being received.\n\n Set by backend SDKs with messaging integration.",
"default": null
},
"messaging.message.retry.count": {
"description": " The number of times a message was redelivered.\n\n Set by backend SDKs with messaging integration.",
"default": null
},
"previousRoute": {
"description": " The previous route in the application\n\n Set by React Native SDK.",
"default": null,
Expand Down

0 comments on commit 8b7612f

Please sign in to comment.