Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(spans): JSON Kafka message with metadata #2556

Merged
merged 27 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
93ad857
wip
jjbayer Sep 28, 2023
86d6730
schema
jjbayer Sep 29, 2023
4f68528
wip
jjbayer Sep 29, 2023
fc7e8fd
ref(spans): Write sentry tags to dedicated field
jjbayer Sep 29, 2023
65b2bf8
ref(spans): Make sentry_tags a string mapping
jjbayer Sep 29, 2023
cabdf31
Merge branch 'ref/span-sentry-tags' into ref/spans-kafka-schema
jjbayer Sep 29, 2023
e1ee592
update test
jjbayer Sep 29, 2023
ff377a3
double-write
jjbayer Oct 2, 2023
1b3b5e0
Merge remote-tracking branch 'origin/master' into ref/span-sentry-tags
jjbayer Oct 2, 2023
63b91ed
remove unused function
jjbayer Oct 2, 2023
0c9c141
fix: Also for transaction span
jjbayer Oct 2, 2023
cadab70
ref: Use short tag names
jjbayer Oct 3, 2023
bcc3aa4
tests
jjbayer Oct 3, 2023
1f11d03
Merge branch 'ref/span-sentry-tags' into ref/spans-kafka-schema
jjbayer Oct 3, 2023
1e08222
ref: Validate on extraction
jjbayer Oct 3, 2023
74b281d
changelog
jjbayer Oct 3, 2023
a7429d6
simplify
jjbayer Oct 3, 2023
a7256ae
Wrap raw value
jjbayer Oct 3, 2023
da8785e
Remove outdated comment, changelog
jjbayer Oct 3, 2023
78bef44
Merge branch 'ref/span-sentry-tags' into ref/spans-kafka-schema
jjbayer Oct 3, 2023
4962ab8
fix non-processing build
jjbayer Oct 3, 2023
e27eff0
extra validation
jjbayer Oct 3, 2023
fa61e6f
fix
jjbayer Oct 4, 2023
6e027d2
validate status code
jjbayer Oct 4, 2023
b45dce8
Merge remote-tracking branch 'origin/master' into ref/spans-kafka-schema
jjbayer Oct 4, 2023
26bd175
ref: Remove span.status from sentry_tags
jjbayer Oct 4, 2023
6d66741
lint
jjbayer Oct 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@

**Internal**:

- Exclude more spans fron metrics extraction. ([#2522](https://github.com/getsentry/relay/pull/2522))
- Exclude more spans fron metrics extraction. ([#2522](https://github.com/getsentry/relay/pull/2522)), [#2525](https://github.com/getsentry/relay/pull/2525), [#2545](https://github.com/getsentry/relay/pull/2545)
- Remove filtering for Android events with missing close events. ([#2524](https://github.com/getsentry/relay/pull/2524))
- Exclude more spans fron metrics extraction. ([#2522](https://github.com/getsentry/relay/pull/2522), [#2525](https://github.com/getsentry/relay/pull/2525), [#2545](https://github.com/getsentry/relay/pull/2545))
- Fix hot-loop burning CPU when upstream service is unavailable. ([#2518](https://github.com/getsentry/relay/pull/2518))
- Extract new low-cardinality transaction duration metric for statistical detectors. ([#2513](https://github.com/getsentry/relay/pull/2513))
- Introduce reservoir sampling rule. ([#2550](https://github.com/getsentry/relay/pull/2550))
- Write span tags to `span.sentry_tags`. ([#2555](https://github.com/getsentry/relay/pull/2555))
- Write span tags to `span.sentry_tags`. ([#2555](https://github.com/getsentry/relay/pull/2555))
- Use JSON instead of MsgPack for Kafka spans. ([#2556](https://github.com/getsentry/relay/pull/2556))

## 23.9.1

Expand Down
11 changes: 9 additions & 2 deletions relay-dynamic-config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ pub fn add_span_metrics(project_config: &mut ProjectConfig) {
("span.", "module"),
("span.", "op"),
("span.", "status_code"),
("span.", "status"),
("span.", "system"),
("", "transaction.method"),
("", "transaction.op"),
Expand All @@ -93,7 +92,15 @@ pub fn add_span_metrics(project_config: &mut ProjectConfig) {
value: None,
condition: None,
})
.into(),
.into_iter()
// Tags taken directly from the span payload:
.chain(std::iter::once(TagSpec {
key: "span.status".into(),
field: Some("span.status".into()),
value: None,
condition: None,
}))
.collect(),
},
TagMapping {
metrics: vec![LazyGlob::new("d:spans/exclusive_time*@millisecond".into())],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ pub enum SpanTagKey {
Action,
Domain,
System,
Status,
StatusCode,
}

Expand Down Expand Up @@ -79,7 +78,6 @@ impl SpanTagKey {
SpanTagKey::Action => "span.action",
SpanTagKey::Domain => "span.domain",
SpanTagKey::System => "span.system",
SpanTagKey::Status => "span.status",
SpanTagKey::StatusCode => "span.status_code",
}
}
Expand Down Expand Up @@ -107,7 +105,6 @@ impl SpanTagKey {
SpanTagKey::Action => "action",
SpanTagKey::Domain => "domain",
SpanTagKey::System => "system",
SpanTagKey::Status => "status",
SpanTagKey::StatusCode => "status_code",
}
}
Expand Down Expand Up @@ -345,10 +342,6 @@ pub(crate) fn extract_tags(span: &Span, config: &Config) -> BTreeMap<SpanTagKey,
}
}

if let Some(span_status) = span.status.value() {
span_tags.insert(SpanTagKey::Status, span_status.to_string());
}

if let Some(status_code) = http_status_code_from_span(span) {
span_tags.insert(SpanTagKey::StatusCode, status_code);
}
Expand Down
56 changes: 56 additions & 0 deletions relay-server/src/actors/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2292,6 +2292,13 @@ impl EnvelopeProcessorService {
};

let mut add_span = |span: Annotated<Span>| {
let span = match self.validate_span(span) {
Ok(span) => span,
Err(e) => {
relay_log::error!("Invalid span: {e}");
return;
}
};
let span = match span.to_json() {
Ok(span) => span,
Err(e) => {
Expand Down Expand Up @@ -2360,6 +2367,55 @@ impl EnvelopeProcessorService {
}
}

/// Helper for [`Self::extract_spans`].
///
/// We do not extract spans with missing fields if those fields are required on the Kafka topic.
#[cfg(feature = "processing")]
fn validate_span(&self, mut span: Annotated<Span>) -> Result<Annotated<Span>, anyhow::Error> {
let inner = span
.value_mut()
.as_mut()
.ok_or(anyhow::anyhow!("empty span"))?;
let Span {
ref exclusive_time,
ref mut tags,
ref mut sentry_tags,
..
} = inner;
// The following required fields are already validated by the `TransactionsProcessor`:
// - `timestamp`
// - `start_timestamp`
// - `trace_id`
// - `span_id`
//
// `is_segment` is set by `extract_span`.
exclusive_time
.value()
.ok_or(anyhow::anyhow!("missing exclusive_time"))?;

if let Some(sentry_tags) = sentry_tags.value_mut() {
sentry_tags.retain(|key, value| match value.value() {
Some(s) => {
match key.as_str() {
"group" => {
// Only allow up to 16-char hex strings in group.
s.len() <= 16 && s.chars().all(|c| c.is_ascii_hexdigit())
}
"status_code" => s.parse::<u16>().is_ok(),
_ => true,
}
}
// Drop empty string values.
None => false,
});
}
if let Some(tags) = tags.value_mut() {
tags.retain(|_, value| !value.value().is_empty())
}

Ok(span)
}

/// Computes the sampling decision on the incoming event
fn run_dynamic_sampling(&self, state: &mut ProcessEnvelopeState) {
// Running dynamic sampling involves either:
Expand Down
56 changes: 36 additions & 20 deletions relay-server/src/actors/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use relay_statsd::metric;
use relay_system::{AsyncResponse, FromMessage, Interface, Sender, Service};
use serde::ser::Error;
use serde::Serialize;
use serde_json::value::RawValue;
use uuid::Uuid;

use crate::envelope::{AttachmentType, Envelope, Item, ItemType};
Expand Down Expand Up @@ -206,6 +207,7 @@ impl StoreService {
scoping.project_id,
event_id,
start_time,
retention,
item,
)?,
_ => {}
Expand Down Expand Up @@ -738,10 +740,11 @@ impl StoreService {
project_id: ProjectId,
event_id: Option<EventId>,
start_time: Instant,
retention_days: u16,
item: &Item,
) -> Result<(), StoreError> {
// Bit unfortunate that we need to parse again here, but it's the same for sessions.
let span: serde_json::Value = match serde_json::from_slice(&item.payload()) {
let payload = item.payload();
let span = match serde_json::from_slice(&payload) {
Ok(span) => span,
Err(error) => {
relay_log::error!(
Expand All @@ -751,14 +754,19 @@ impl StoreService {
return Ok(());
}
};
let message = KafkaMessage::Span(SpanKafkaMessage {
project_id,
let message = SpanKafkaMessage {
start_time: UnixTimestamp::from_instant(start_time).as_secs(),
span,
event_id,
});

self.produce(KafkaTopic::Spans, organization_id, message)?;
organization_id,
project_id,
retention_days,
span,
};
self.produce(
KafkaTopic::Spans,
organization_id,
KafkaMessage::Span(message),
)?;

metric!(
counter(RelayCounters::ProcessingMessageProduced) += 1,
Expand Down Expand Up @@ -1026,25 +1034,30 @@ struct CheckInKafkaMessage {
}

#[derive(Debug, Serialize)]
struct SpanKafkaMessage {
/// Raw span data. See [`relay_event_schema::protocol::Span`] for schema.
span: serde_json::Value,
/// Time at which the span was received by Relay.
struct SpanKafkaMessage<'a> {
/// Time at which the event was received by Relay. Not to be confused with `start_timestamp_ms`.
start_time: u64,
/// The project id for the current span.
project_id: ProjectId,
/// The event id for the current span.
///
/// Once spans are truly standalone, this field will be omitted.
/// The ID of the transaction event associated to this span, if any.
#[serde(skip_serializing_if = "Option::is_none")]
event_id: Option<EventId>,
/// The numeric ID of the organization.
organization_id: u64,
/// The numeric ID of the project.
project_id: ProjectId,
/// Number of days until these data should be deleted.
retention_days: u16,
jjbayer marked this conversation as resolved.
Show resolved Hide resolved
/// Fields from the original span payload.
/// See [`relay-event-schema::protocol::span::Span`] for schema.
///
/// By using a [`RawValue`] here, we can embed the span's JSON without additional parsing.
jjbayer marked this conversation as resolved.
Show resolved Hide resolved
span: &'a RawValue,
}

/// An enum over all possible ingest messages.
#[derive(Debug, Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[allow(clippy::large_enum_variant)]
enum KafkaMessage {
enum KafkaMessage<'a> {
Event(EventKafkaMessage),
Attachment(AttachmentKafkaMessage),
AttachmentChunk(AttachmentChunkKafkaMessage),
Expand All @@ -1060,10 +1073,10 @@ enum KafkaMessage {
ReplayEvent(ReplayEventKafkaMessage),
ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage),
CheckIn(CheckInKafkaMessage),
Span(SpanKafkaMessage),
Span(SpanKafkaMessage<'a>),
}

impl Message for KafkaMessage {
impl Message for KafkaMessage<'_> {
fn variant(&self) -> &'static str {
match self {
KafkaMessage::Event(_) => "event",
Expand Down Expand Up @@ -1124,6 +1137,9 @@ impl Message for KafkaMessage {
KafkaMessage::ReplayEvent(message) => {
serde_json::to_vec(message).map_err(ClientError::InvalidJson)
}
KafkaMessage::Span(message) => {
serde_json::to_vec(message).map_err(ClientError::InvalidJson)
}
_ => rmp_serde::to_vec_named(&self).map_err(ClientError::InvalidMsgPack),
}
}
Expand Down
22 changes: 10 additions & 12 deletions tests/integration/fixtures/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,15 +183,6 @@ def assert_empty(self, timeout=None):
assert rv.value() == message, rv.value()


class MsgPackConsumer(ConsumerBase):
def get_message(self, timeout=None):
message = self.poll(timeout)
assert message is not None
assert message.error() is None

return msgpack.unpackb(message.value(), raw=False, use_list=False)


@pytest.fixture
def outcomes_consumer(kafka_consumer):
return lambda timeout=None, topic=None: OutcomesConsumer(
Expand Down Expand Up @@ -315,9 +306,7 @@ def monitors_consumer(kafka_consumer):

@pytest.fixture
def spans_consumer(kafka_consumer):
return lambda timeout=None: MsgPackConsumer(
timeout=timeout, *kafka_consumer("spans")
)
return lambda timeout=None: SpansConsumer(timeout=timeout, *kafka_consumer("spans"))


class MetricsConsumer(ConsumerBase):
Expand Down Expand Up @@ -437,3 +426,12 @@ def get_check_in(self):
wrapper = msgpack.unpackb(message.value(), raw=False, use_list=False)
assert wrapper["type"] == "check_in"
return json.loads(wrapper["payload"].decode("utf8")), wrapper


class SpansConsumer(ConsumerBase):
def get_span(self):
message = self.poll()
Comment on lines +432 to +433
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use the timeout here? Calling the spans_consumer with a timeout and it not being used may generate some confusion.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See

def poll(self, timeout=None):
if timeout is None:
timeout = self.timeout

assert message is not None
assert message.error() is None

return json.loads(message.value())
13 changes: 8 additions & 5 deletions tests/integration/test_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1197,7 +1197,7 @@ def test_spans(

relay = relay_with_processing()
project_id = 42
project_config = mini_sentry.add_basic_project_config(project_id)
project_config = mini_sentry.add_full_project_config(project_id)
project_config["config"]["features"] = [
"projects:span-metrics-extraction",
"projects:span-metrics-extraction-all-modules",
Expand All @@ -1220,12 +1220,13 @@ def test_spans(

relay.send_event(project_id, event)

child_span = spans_consumer.get_message()
child_span = spans_consumer.get_span()
del child_span["start_time"]
assert child_span == {
"type": "span",
"event_id": "cbf6960622e14a45abc1f03b2055b186",
"project_id": 42,
"organization_id": 1,
"retention_days": 90,
"span": {
"data": {
"description.scrubbed": "GET *",
Expand Down Expand Up @@ -1259,11 +1260,13 @@ def test_spans(
},
}

transaction_span = spans_consumer.get_message()
transaction_span = spans_consumer.get_span()
del transaction_span["start_time"]
assert transaction_span == {
"event_id": "cbf6960622e14a45abc1f03b2055b186",
"project_id": 42,
"organization_id": 1,
"retention_days": 90,
"span": {
"data": {
"transaction": "hi",
Expand All @@ -1278,6 +1281,7 @@ def test_spans(
"is_segment": True,
"op": "hi",
"segment_id": "968cff94913ebb07",
"sentry_tags": {"transaction": "hi", "transaction.op": "hi"},
"span_id": "968cff94913ebb07",
"start_timestamp": datetime.fromisoformat(event["start_timestamp"])
.replace(tzinfo=timezone.utc)
Expand All @@ -1288,7 +1292,6 @@ def test_spans(
.timestamp(),
"trace_id": "a0fa8803753e40fd8124b21eeb2986b5",
},
"type": "span",
}

spans_consumer.assert_empty()