From fc39a3d115e4b4a04a7cca4b58140772ed0a9653 Mon Sep 17 00:00:00 2001 From: Kev <6111995+k-fish@users.noreply.github.com> Date: Thu, 23 Jan 2025 15:39:40 -0500 Subject: [PATCH 1/6] Revert "Revert "feat(ourlogs): Allow log ingestion behind a flag" (#4463)" This reverts commit 8d47ad5962ebf38e073e2ed339981a10e7f30be5. --- CHANGELOG.md | 1 + Cargo.lock | 21 +- Cargo.toml | 3 +- py/sentry_relay/consts.py | 2 +- relay-cogs/src/lib.rs | 3 + relay-config/src/config.rs | 8 + relay-dynamic-config/src/feature.rs | 6 +- relay-event-schema/src/processor/attrs.rs | 2 + relay-event-schema/src/processor/traits.rs | 2 + relay-event-schema/src/protocol/mod.rs | 2 + relay-event-schema/src/protocol/ourlog.rs | 243 ++++++++++++++++++ relay-kafka/src/config.rs | 6 +- relay-ourlogs/Cargo.toml | 29 +++ relay-ourlogs/src/lib.rs | 13 + relay-ourlogs/src/ourlog.rs | 204 +++++++++++++++ relay-pii/src/selector.rs | 1 + relay-server/Cargo.toml | 63 ++--- relay-server/src/envelope.rs | 15 ++ relay-server/src/services/outcome.rs | 4 + relay-server/src/services/processor.rs | 48 ++++ relay-server/src/services/processor/event.rs | 2 + relay-server/src/services/processor/ourlog.rs | 105 ++++++++ relay-server/src/services/store.rs | 72 ++++++ relay-server/src/utils/managed_envelope.rs | 15 ++ relay-server/src/utils/rate_limits.rs | 74 ++++++ relay-server/src/utils/sizes.rs | 3 + requirements-dev.txt | 4 +- tests/integration/conftest.py | 1 + tests/integration/fixtures/processing.py | 25 ++ tests/integration/test_ourlogs.py | 118 +++++++++ 30 files changed, 1056 insertions(+), 39 deletions(-) create mode 100644 relay-event-schema/src/protocol/ourlog.rs create mode 100644 relay-ourlogs/Cargo.toml create mode 100644 relay-ourlogs/src/lib.rs create mode 100644 relay-ourlogs/src/ourlog.rs create mode 100644 relay-server/src/services/processor/ourlog.rs create mode 100644 tests/integration/test_ourlogs.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 81502d90e20..f75fbc136f2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ **Internal**: - Updates performance score calculation on spans and events to also store cdf values as measurements. ([#4438](https://github.com/getsentry/relay/pull/4438)) +- Allow log ingestion behind a flag, only for internal use currently. ([#4448](https://github.com/getsentry/relay/pull/4448)) ## 24.12.2 diff --git a/Cargo.lock b/Cargo.lock index c56ecc08f41..93e3ef053c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2844,6 +2844,7 @@ dependencies = [ "opentelemetry", "percent-encoding", "rand", + "serde_json", "thiserror", ] @@ -3757,6 +3758,20 @@ dependencies = [ "uuid", ] +[[package]] +name = "relay-ourlogs" +version = "24.12.1" +dependencies = [ + "chrono", + "hex", + "insta", + "once_cell", + "opentelemetry-proto", + "relay-event-schema", + "relay-protocol", + "serde_json", +] + [[package]] name = "relay-pattern" version = "25.1.0" @@ -3975,6 +3990,7 @@ dependencies = [ "relay-log", "relay-metrics", "relay-monitors", + "relay-ourlogs", "relay-pii", "relay-profiling", "relay-protocol", @@ -4452,15 +4468,16 @@ dependencies = [ [[package]] name = "sentry-kafka-schemas" -version = "0.1.122" +version = "0.1.129" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6636bbc9fda2c104d326386bf8fdcc36d4031bca525a74a970ad8bbecb7570d2" +checksum = "790627715d4ea0e58e252dcb657a44146fde401b5520bbbc3b6500764ef71c86" dependencies = [ "jsonschema", "serde", "serde_json", "serde_yaml", "thiserror", + "url", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 356c267556c..a5bd8f4454b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,7 @@ relay-kafka = { path = "relay-kafka" } relay-log = { path = "relay-log" } relay-metrics = { path = "relay-metrics" } relay-monitors = { path = "relay-monitors" } +relay-ourlogs = { path = "relay-ourlogs" } relay-pattern = { path = "relay-pattern" } relay-pii = { path = "relay-pii" } relay-profiling = { path = "relay-profiling" } @@ -150,7 +151,7 @@ reqwest = "0.12.9" rmp-serde = "1.3.0" sentry = "0.34.0" sentry-core = "0.34.0" -sentry-kafka-schemas = { version = "0.1.122", default-features = false } +sentry-kafka-schemas = { version = "0.1.129", default-features = false } sentry-release-parser = { version = "1.3.2", default-features = false } sentry-types = "0.34.0" semver = "1.0.23" diff --git a/py/sentry_relay/consts.py b/py/sentry_relay/consts.py index b3f4e850cbb..6b6af192c57 100644 --- a/py/sentry_relay/consts.py +++ b/py/sentry_relay/consts.py @@ -8,7 +8,7 @@ class DataCategory(IntEnum): - # begin generated + # start generated DEFAULT = 0 ERROR = 1 TRANSACTION = 2 diff --git a/relay-cogs/src/lib.rs b/relay-cogs/src/lib.rs index 46c13bf46b1..d29f8fa392b 100644 --- a/relay-cogs/src/lib.rs +++ b/relay-cogs/src/lib.rs @@ -117,6 +117,8 @@ pub enum AppFeature { Transactions, /// Errors. Errors, + /// Logs. + Logs, /// Spans. Spans, /// Sessions. @@ -159,6 +161,7 @@ impl AppFeature { Self::Transactions => "transactions", Self::Errors => "errors", Self::Spans => "spans", + Self::Logs => "our_logs", Self::Sessions => "sessions", Self::ClientReports => "client_reports", Self::CheckIns => "check_ins", diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index 93c8b152a28..fa41ef8b1dd 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -617,6 +617,8 @@ pub struct Limits { /// The maximum payload size for a profile pub max_profile_size: ByteSize, /// The maximum payload size for a span. + pub max_log_size: ByteSize, + /// The maximum payload size for a span. pub max_span_size: ByteSize, /// The maximum payload size for a statsd metric. pub max_statsd_size: ByteSize, @@ -683,6 +685,7 @@ impl Default for Limits { max_api_file_upload_size: ByteSize::mebibytes(40), max_api_chunk_upload_size: ByteSize::mebibytes(100), max_profile_size: ByteSize::mebibytes(50), + max_log_size: ByteSize::mebibytes(1), max_span_size: ByteSize::mebibytes(1), max_statsd_size: ByteSize::mebibytes(1), max_metric_buckets_size: ByteSize::mebibytes(1), @@ -2213,6 +2216,11 @@ impl Config { self.values.limits.max_check_in_size.as_bytes() } + /// Returns the maximum payload size of a log in bytes. + pub fn max_log_size(&self) -> usize { + self.values.limits.max_log_size.as_bytes() + } + /// Returns the maximum payload size of a span in bytes. pub fn max_span_size(&self) -> usize { self.values.limits.max_span_size.as_bytes() diff --git a/relay-dynamic-config/src/feature.rs b/relay-dynamic-config/src/feature.rs index 003fa65c574..2210a1b1e7a 100644 --- a/relay-dynamic-config/src/feature.rs +++ b/relay-dynamic-config/src/feature.rs @@ -102,7 +102,11 @@ pub enum Feature { /// Serialized as `organizations:ingest-spans-in-eap` #[serde(rename = "organizations:ingest-spans-in-eap")] IngestSpansInEap, - + /// Enable log ingestion for our log product (this is not internal logging). + /// + /// Serialized as `organizations:ourlogs-ingestion`. + #[serde(rename = "organizations:ourlogs-ingestion")] + OurLogsIngestion, /// This feature has graduated and is hard-coded for external Relays. #[doc(hidden)] #[serde(rename = "projects:profiling-ingest-unsampled-profiles")] diff --git a/relay-event-schema/src/processor/attrs.rs b/relay-event-schema/src/processor/attrs.rs index b8772a09dcf..563b1e65ace 100644 --- a/relay-event-schema/src/processor/attrs.rs +++ b/relay-event-schema/src/processor/attrs.rs @@ -46,6 +46,7 @@ pub enum ValueType { Message, Thread, Breadcrumb, + OurLog, Span, ClientSdkInfo, @@ -84,6 +85,7 @@ relay_common::derive_fromstr_and_display!(ValueType, UnknownValueTypeError, { ValueType::Message => "message", ValueType::Thread => "thread", ValueType::Breadcrumb => "breadcrumb", + ValueType::OurLog => "ourlog", ValueType::Span => "span", ValueType::ClientSdkInfo => "sdk", ValueType::Minidump => "minidump", diff --git a/relay-event-schema/src/processor/traits.rs b/relay-event-schema/src/processor/traits.rs index ac1a78728d7..9c5491872d4 100644 --- a/relay-event-schema/src/processor/traits.rs +++ b/relay-event-schema/src/processor/traits.rs @@ -108,10 +108,12 @@ pub trait Processor: Sized { process_method!(process_breadcrumb, crate::protocol::Breadcrumb); process_method!(process_template_info, crate::protocol::TemplateInfo); process_method!(process_header_name, crate::protocol::HeaderName); + process_method!(process_ourlog, crate::protocol::OurLog); process_method!(process_span, crate::protocol::Span); process_method!(process_trace_context, crate::protocol::TraceContext); process_method!(process_native_image_path, crate::protocol::NativeImagePath); process_method!(process_contexts, crate::protocol::Contexts); + process_method!(process_attribute_value, crate::protocol::AttributeValue); fn process_other( &mut self, diff --git a/relay-event-schema/src/protocol/mod.rs b/relay-event-schema/src/protocol/mod.rs index 7827447ce58..fc1c9307787 100644 --- a/relay-event-schema/src/protocol/mod.rs +++ b/relay-event-schema/src/protocol/mod.rs @@ -18,6 +18,7 @@ mod mechanism; mod metrics; mod metrics_summary; mod nel; +mod ourlog; mod relay_info; mod replay; mod request; @@ -54,6 +55,7 @@ pub use self::mechanism::*; pub use self::metrics::*; pub use self::metrics_summary::*; pub use self::nel::*; +pub use self::ourlog::*; pub use self::relay_info::*; pub use self::replay::*; pub use self::request::*; diff --git a/relay-event-schema/src/protocol/ourlog.rs b/relay-event-schema/src/protocol/ourlog.rs new file mode 100644 index 00000000000..e4eea9882ff --- /dev/null +++ b/relay-event-schema/src/protocol/ourlog.rs @@ -0,0 +1,243 @@ +use relay_protocol::{ + Annotated, Empty, Error, FromValue, IntoValue, Object, SkipSerialization, Value, +}; + +use serde::ser::SerializeMap; + +use crate::processor::ProcessValue; +use crate::protocol::{SpanId, TraceId}; + +#[derive(Clone, Debug, Default, PartialEq, Empty, FromValue, IntoValue, ProcessValue)] +#[metastructure(process_func = "process_ourlog", value_type = "OurLog")] +pub struct OurLog { + /// Time when the event occurred. + #[metastructure(required = true, trim = false)] + pub timestamp_nanos: Annotated, + + /// Time when the event was observed. + #[metastructure(required = true, trim = false)] + pub observed_timestamp_nanos: Annotated, + + /// The ID of the trace the log belongs to. + #[metastructure(required = false, trim = false)] + pub trace_id: Annotated, + /// The Span id. + /// + #[metastructure(required = false, trim = false)] + pub span_id: Annotated, + + /// Trace flag bitfield. + #[metastructure(required = false)] + pub trace_flags: Annotated, + + /// This is the original string representation of the severity as it is known at the source + #[metastructure(required = false, max_chars = 32, pii = "true", trim = false)] + pub severity_text: Annotated, + + /// Numerical representation of the severity level + #[metastructure(required = false)] + pub severity_number: Annotated, + + /// Log body. + #[metastructure(required = true, pii = "true", trim = false)] + pub body: Annotated, + + /// Arbitrary attributes on a log. + #[metastructure(pii = "true", trim = false)] + pub attributes: Annotated>, + + /// Additional arbitrary fields for forwards compatibility. + #[metastructure(additional_properties, retain = true, pii = "maybe", trim = false)] + pub other: Object, +} + +#[derive(Debug, Clone, PartialEq, ProcessValue)] +pub enum AttributeValue { + #[metastructure(field = "string_value", pii = "true")] + StringValue(String), + #[metastructure(field = "int_value", pii = "true")] + IntValue(i64), + #[metastructure(field = "double_value", pii = "true")] + DoubleValue(f64), + #[metastructure(field = "bool_value", pii = "true")] + BoolValue(bool), + /// Any other unknown attribute value. + /// + /// This exists to ensure other attribute values such as array and object can be added in the future. + Unknown(String), +} + +impl IntoValue for AttributeValue { + fn into_value(self) -> Value { + let mut map = Object::new(); + match self { + AttributeValue::StringValue(v) => { + map.insert("string_value".to_string(), Annotated::new(Value::String(v))); + } + AttributeValue::IntValue(v) => { + map.insert("int_value".to_string(), Annotated::new(Value::I64(v))); + } + AttributeValue::DoubleValue(v) => { + map.insert("double_value".to_string(), Annotated::new(Value::F64(v))); + } + AttributeValue::BoolValue(v) => { + map.insert("bool_value".to_string(), Annotated::new(Value::Bool(v))); + } + AttributeValue::Unknown(v) => { + map.insert("unknown".to_string(), Annotated::new(Value::String(v))); + } + } + Value::Object(map) + } + + fn serialize_payload(&self, s: S, _behavior: SkipSerialization) -> Result + where + Self: Sized, + S: serde::Serializer, + { + let mut map = s.serialize_map(None)?; + match self { + AttributeValue::StringValue(v) => { + map.serialize_entry("string_value", v)?; + } + AttributeValue::IntValue(v) => { + map.serialize_entry("int_value", v)?; + } + AttributeValue::DoubleValue(v) => { + map.serialize_entry("double_value", v)?; + } + AttributeValue::BoolValue(v) => { + map.serialize_entry("bool_value", v)?; + } + AttributeValue::Unknown(v) => { + map.serialize_entry("unknown", v)?; + } + } + map.end() + } +} + +impl AttributeValue { + pub fn string_value(&self) -> Option<&String> { + match self { + AttributeValue::StringValue(s) => Some(s), + _ => None, + } + } + pub fn int_value(&self) -> Option { + match self { + AttributeValue::IntValue(i) => Some(*i), + _ => None, + } + } + pub fn double_value(&self) -> Option { + match self { + AttributeValue::DoubleValue(d) => Some(*d), + _ => None, + } + } + pub fn bool_value(&self) -> Option { + match self { + AttributeValue::BoolValue(b) => Some(*b), + _ => None, + } + } +} + +impl Empty for AttributeValue { + #[inline] + fn is_empty(&self) -> bool { + matches!(self, Self::Unknown(_)) + } +} + +impl FromValue for AttributeValue { + fn from_value(value: Annotated) -> Annotated { + match value { + Annotated(Some(Value::String(value)), meta) => { + Annotated(Some(AttributeValue::StringValue(value)), meta) + } + Annotated(Some(Value::I64(value)), meta) => { + Annotated(Some(AttributeValue::IntValue(value)), meta) + } + Annotated(Some(Value::F64(value)), meta) => { + Annotated(Some(AttributeValue::DoubleValue(value)), meta) + } + Annotated(Some(Value::Bool(value)), meta) => { + Annotated(Some(AttributeValue::BoolValue(value)), meta) + } + Annotated(Some(value), mut meta) => { + meta.add_error(Error::expected( + "a valid attribute value (string, int, double, bool)", + )); + meta.set_original_value(Some(value)); + Annotated(None, meta) + } + Annotated(None, meta) => Annotated(None, meta), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_ourlog_serialization() { + let json = r#"{ + "timestamp_nanos": 1544712660300000000, + "observed_timestamp_nanos": 1544712660300000000, + "trace_id": "5b8efff798038103d269b633813fc60c", + "span_id": "eee19b7ec3c1b174", + "severity_text": "Information", + "severity_number": 10, + "body": "Example log record", + "attributes": { + "boolean.attribute": { + "bool_value": true + }, + "double.attribute": { + "double_value": 637.704 + }, + "int.attribute": { + "int_value": 10 + }, + "string.attribute": { + "string_value": "some string" + } + } +}"#; + + let mut attributes = Object::new(); + attributes.insert( + "string.attribute".into(), + Annotated::new(AttributeValue::StringValue("some string".into())), + ); + attributes.insert( + "boolean.attribute".into(), + Annotated::new(AttributeValue::BoolValue(true)), + ); + attributes.insert( + "int.attribute".into(), + Annotated::new(AttributeValue::IntValue(10)), + ); + attributes.insert( + "double.attribute".into(), + Annotated::new(AttributeValue::DoubleValue(637.704)), + ); + + let log = Annotated::new(OurLog { + timestamp_nanos: Annotated::new(1544712660300000000), + observed_timestamp_nanos: Annotated::new(1544712660300000000), + severity_number: Annotated::new(10), + severity_text: Annotated::new("Information".to_string()), + trace_id: Annotated::new(TraceId("5b8efff798038103d269b633813fc60c".into())), + span_id: Annotated::new(SpanId("eee19b7ec3c1b174".into())), + body: Annotated::new("Example log record".to_string()), + attributes: Annotated::new(attributes), + ..Default::default() + }); + + assert_eq!(json, log.to_json_pretty().unwrap()); + } +} diff --git a/relay-kafka/src/config.rs b/relay-kafka/src/config.rs index 7b9e8411241..faaa2b69a00 100644 --- a/relay-kafka/src/config.rs +++ b/relay-kafka/src/config.rs @@ -45,6 +45,8 @@ pub enum KafkaTopic { ReplayRecordings, /// Monitor check-ins. Monitors, + /// Logs (our log product). + OurLogs, /// Standalone spans without a transaction. Spans, /// Feedback events topic. @@ -56,7 +58,7 @@ impl KafkaTopic { /// It will have to be adjusted if the new variants are added. pub fn iter() -> std::slice::Iter<'static, Self> { use KafkaTopic::*; - static TOPICS: [KafkaTopic; 13] = [ + static TOPICS: [KafkaTopic; 14] = [ Events, Attachments, Transactions, @@ -68,6 +70,7 @@ impl KafkaTopic { ReplayEvents, ReplayRecordings, Monitors, + OurLogs, Spans, Feedback, ]; @@ -128,6 +131,7 @@ define_topic_assignments! { profiles: (KafkaTopic::Profiles, "profiles", "Stacktrace topic name"), replay_events: (KafkaTopic::ReplayEvents, "ingest-replay-events", "Replay Events topic name."), replay_recordings: (KafkaTopic::ReplayRecordings, "ingest-replay-recordings", "Recordings topic name."), + ourlogs: (KafkaTopic::OurLogs, "snuba-ourlogs", "Logs from our logs product."), monitors: (KafkaTopic::Monitors, "ingest-monitors", "Monitor check-ins."), spans: (KafkaTopic::Spans, "snuba-spans", "Standalone spans without a transaction."), feedback: (KafkaTopic::Feedback, "ingest-feedback-events", "Feedback events topic."), diff --git a/relay-ourlogs/Cargo.toml b/relay-ourlogs/Cargo.toml new file mode 100644 index 00000000000..c132967b924 --- /dev/null +++ b/relay-ourlogs/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "relay-ourlogs" +authors = ["Sentry "] +description = "Log normalization and processing" +homepage = "https://getsentry.github.io/relay/" +repository = "https://github.com/getsentry/relay" +version = "24.12.1" +edition = "2021" +license-file = "../LICENSE" +publish = false + +[lints] +workspace = true + +[dependencies] +chrono = { workspace = true } +hex = { workspace = true } +once_cell = { workspace = true } +opentelemetry-proto = { workspace = true, features = [ + "gen-tonic", + "with-serde", + "logs", +] } +relay-event-schema = { workspace = true } +relay-protocol = { workspace = true } +serde_json = { workspace = true } + +[dev-dependencies] +insta = { workspace = true } diff --git a/relay-ourlogs/src/lib.rs b/relay-ourlogs/src/lib.rs new file mode 100644 index 00000000000..38130a4a4c6 --- /dev/null +++ b/relay-ourlogs/src/lib.rs @@ -0,0 +1,13 @@ +//! Structs and functions needed to ingest OpenTelemetry logs. + +#![warn(missing_docs)] +#![doc( + html_logo_url = "https://raw.githubusercontent.com/getsentry/relay/master/artwork/relay-icon.png", + html_favicon_url = "https://raw.githubusercontent.com/getsentry/relay/master/artwork/relay-icon.png" +)] + +pub use crate::ourlog::otel_to_sentry_log; + +pub use opentelemetry_proto::tonic::logs::v1::LogRecord as OtelLog; + +mod ourlog; diff --git a/relay-ourlogs/src/ourlog.rs b/relay-ourlogs/src/ourlog.rs new file mode 100644 index 00000000000..9662a822a11 --- /dev/null +++ b/relay-ourlogs/src/ourlog.rs @@ -0,0 +1,204 @@ +use opentelemetry_proto::tonic::common::v1::any_value::Value as OtelValue; + +use crate::OtelLog; +use relay_event_schema::protocol::{AttributeValue, OurLog, SpanId, TraceId}; +use relay_protocol::{Annotated, Object}; + +/// Transform an OtelLog to a Sentry log. +pub fn otel_to_sentry_log(otel_log: OtelLog) -> OurLog { + let OtelLog { + severity_number, + severity_text, + body, + attributes, + trace_id, + span_id, + .. + } = otel_log; + + let span_id = hex::encode(span_id); + let trace_id = hex::encode(trace_id); + + let body = body + .and_then(|v| v.value) + .and_then(|v| match v { + OtelValue::StringValue(s) => Some(s), + _ => None, + }) + .unwrap_or_else(String::new); + + let mut attribute_data = Object::new(); + + for attribute in attributes.into_iter() { + if let Some(value) = attribute.value.and_then(|v| v.value) { + let key = attribute.key; + match value { + OtelValue::ArrayValue(_) => {} + OtelValue::BoolValue(v) => { + attribute_data.insert(key, Annotated::new(AttributeValue::BoolValue(v))); + } + OtelValue::BytesValue(v) => { + if let Ok(v) = String::from_utf8(v) { + attribute_data.insert(key, Annotated::new(AttributeValue::StringValue(v))); + } + } + OtelValue::DoubleValue(v) => { + attribute_data.insert(key, Annotated::new(AttributeValue::DoubleValue(v))); + } + OtelValue::IntValue(v) => { + attribute_data.insert(key, Annotated::new(AttributeValue::IntValue(v))); + } + OtelValue::KvlistValue(_) => {} + OtelValue::StringValue(v) => { + attribute_data.insert(key, Annotated::new(AttributeValue::StringValue(v))); + } + } + } + } + + OurLog { + timestamp_nanos: Annotated::new(otel_log.time_unix_nano), + observed_timestamp_nanos: Annotated::new(otel_log.observed_time_unix_nano), + trace_id: TraceId(trace_id).into(), + span_id: Annotated::new(SpanId(span_id)), + trace_flags: Annotated::new(0.0), + severity_text: severity_text.into(), + severity_number: Annotated::new(severity_number as i64), + attributes: attribute_data.into(), + body: Annotated::new(body), + ..Default::default() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use relay_protocol::{get_path, get_value}; + + #[test] + fn parse_log() { + // https://github.com/open-telemetry/opentelemetry-proto/blob/c4214b8168d0ce2a5236185efb8a1c8950cccdd6/examples/logs.json + let json = r#"{ + "timeUnixNano": "1544712660300000000", + "observedTimeUnixNano": "1544712660300000000", + "severityNumber": 10, + "severityText": "Information", + "traceId": "5B8EFFF798038103D269B633813FC60C", + "spanId": "EEE19B7EC3C1B174", + "body": { + "stringValue": "Example log record" + }, + "attributes": [ + { + "key": "string.attribute", + "value": { + "stringValue": "some string" + } + }, + { + "key": "boolean.attribute", + "value": { + "boolValue": true + } + }, + { + "key": "int.attribute", + "value": { + "intValue": "10" + } + }, + { + "key": "double.attribute", + "value": { + "doubleValue": 637.704 + } + }, + { + "key": "array.attribute", + "value": { + "arrayValue": { + "values": [ + { + "stringValue": "many" + }, + { + "stringValue": "values" + } + ] + } + } + }, + { + "key": "map.attribute", + "value": { + "kvlistValue": { + "values": [ + { + "key": "some.map.key", + "value": { + "stringValue": "some value" + } + } + ] + } + } + } + ] + }"#; + + let otel_log: OtelLog = serde_json::from_str(json).unwrap(); + let our_log: OurLog = otel_to_sentry_log(otel_log); + let annotated_log: Annotated = Annotated::new(our_log); + assert_eq!( + get_path!(annotated_log.body), + Some(&Annotated::new("Example log record".into())) + ); + } + + #[test] + fn parse_log_with_db_attributes() { + let json = r#"{ + "timeUnixNano": "1544712660300000000", + "observedTimeUnixNano": "1544712660300000000", + "severityNumber": 10, + "severityText": "Information", + "traceId": "5B8EFFF798038103D269B633813FC60C", + "spanId": "EEE19B7EC3C1B174", + "body": { + "stringValue": "Database query executed" + }, + "attributes": [ + { + "key": "db.name", + "value": { + "stringValue": "database" + } + }, + { + "key": "db.type", + "value": { + "stringValue": "sql" + } + }, + { + "key": "db.statement", + "value": { + "stringValue": "SELECT \"table\".\"col\" FROM \"table\" WHERE \"table\".\"col\" = %s" + } + } + ] + }"#; + let otel_log: OtelLog = serde_json::from_str(json).unwrap(); + let our_log = otel_to_sentry_log(otel_log); + let annotated_log: Annotated = Annotated::new(our_log); + + assert_eq!( + get_path!(annotated_log.body), + Some(&Annotated::new("Database query executed".into())) + ); + assert_eq!( + get_value!(annotated_log.attributes["db.statement"]!).string_value(), + Some(&"SELECT \"table\".\"col\" FROM \"table\" WHERE \"table\".\"col\" = %s".into()) + ); + } +} diff --git a/relay-pii/src/selector.rs b/relay-pii/src/selector.rs index c0481107bf2..880a71044c4 100644 --- a/relay-pii/src/selector.rs +++ b/relay-pii/src/selector.rs @@ -135,6 +135,7 @@ impl SelectorPathItem { | ValueType::Message | ValueType::Thread | ValueType::Breadcrumb + | ValueType::OurLog | ValueType::Span | ValueType::Minidump | ValueType::HeapMemory diff --git a/relay-server/Cargo.toml b/relay-server/Cargo.toml index ea6aa647688..89bbd03b9e5 100644 --- a/relay-server/Cargo.toml +++ b/relay-server/Cargo.toml @@ -13,16 +13,16 @@ publish = false [features] default = [] processing = [ - "dep:minidump", - "dep:symbolic-common", - "dep:symbolic-unreal", - "relay-cardinality/redis", - "relay-config/processing", - "relay-kafka/producer", - "relay-metrics/redis", - "relay-quotas/redis", - "relay-redis/impl", - "relay-sampling/redis", + "dep:minidump", + "dep:symbolic-common", + "dep:symbolic-unreal", + "relay-cardinality/redis", + "relay-config/processing", + "relay-kafka/producer", + "relay-metrics/redis", + "relay-quotas/redis", + "relay-redis/impl", + "relay-sampling/redis", ] [lints] @@ -75,6 +75,7 @@ relay-event-schema = { workspace = true } relay-filter = { workspace = true } relay-kafka = { workspace = true, optional = true } relay-log = { workspace = true, features = ["sentry"] } +relay-ourlogs = { workspace = true } relay-metrics = { workspace = true } relay-monitors = { workspace = true } relay-pii = { workspace = true } @@ -88,10 +89,10 @@ relay-spans = { workspace = true } relay-statsd = { workspace = true } relay-system = { workspace = true } reqwest = { workspace = true, features = [ - "gzip", - "hickory-dns", - "stream", - "native-tls-vendored", + "gzip", + "hickory-dns", + "stream", + "native-tls-vendored", ] } rmp-serde = { workspace = true } serde = { workspace = true } @@ -100,14 +101,14 @@ serde_json = { workspace = true } smallvec = { workspace = true, features = ["drain_filter"] } socket2 = { workspace = true } sqlx = { workspace = true, features = [ - "macros", - "migrate", - "sqlite", - "runtime-tokio", + "macros", + "migrate", + "sqlite", + "runtime-tokio", ], default-features = false } symbolic-common = { workspace = true, optional = true, default-features = false } symbolic-unreal = { workspace = true, optional = true, default-features = false, features = [ - "serde", + "serde", ] } sysinfo = { workspace = true } thiserror = { workspace = true } @@ -115,18 +116,18 @@ tokio = { workspace = true, features = ["sync", "time"] } tokio-util = { workspace = true, default-features = false } tower = { workspace = true, default-features = false, features = ["limit"] } tower-http = { workspace = true, default-features = false, features = [ - "catch-panic", - "compression-br", - "compression-deflate", - "compression-gzip", - "compression-zstd", - "cors", - "decompression-br", - "decompression-deflate", - "decompression-gzip", - "decompression-zstd", - "set-header", - "trace", + "catch-panic", + "compression-br", + "compression-deflate", + "compression-gzip", + "compression-zstd", + "cors", + "decompression-br", + "decompression-deflate", + "decompression-gzip", + "decompression-zstd", + "set-header", + "trace", ] } url = { workspace = true, features = ["serde"] } uuid = { workspace = true, features = ["v5"] } diff --git a/relay-server/src/envelope.rs b/relay-server/src/envelope.rs index 8a099628a60..1561082be07 100644 --- a/relay-server/src/envelope.rs +++ b/relay-server/src/envelope.rs @@ -117,6 +117,10 @@ pub enum ItemType { ReplayVideo, /// Monitor check-in encoded as JSON. CheckIn, + /// A log from the [OTEL Log format](https://opentelemetry.io/docs/specs/otel/logs/data-model/#log-and-event-record-definition) + OtelLog, + /// A log for the log product, not internal logs. + Log, /// A standalone span. Span, /// A standalone OpenTelemetry span serialized as JSON. @@ -174,6 +178,8 @@ impl ItemType { Self::ReplayRecording => "replay_recording", Self::ReplayVideo => "replay_video", Self::CheckIn => "check_in", + Self::Log => "log", + Self::OtelLog => "otel_log", Self::Span => "span", Self::OtelSpan => "otel_span", Self::OtelTracesData => "otel_traces_data", @@ -227,6 +233,8 @@ impl std::str::FromStr for ItemType { "replay_recording" => Self::ReplayRecording, "replay_video" => Self::ReplayVideo, "check_in" => Self::CheckIn, + "log" => Self::Log, + "otel_log" => Self::OtelLog, "span" => Self::Span, "otel_span" => Self::OtelSpan, "otel_traces_data" => Self::OtelTracesData, @@ -696,6 +704,10 @@ impl Item { CountFor::Outcomes => smallvec![], }, ItemType::Statsd | ItemType::MetricBuckets => smallvec![], + ItemType::Log | ItemType::OtelLog => smallvec![ + (DataCategory::LogByte, self.len().max(1)), + (DataCategory::LogItem, 1) + ], ItemType::FormData => smallvec![], ItemType::UserReport => smallvec![], ItemType::UserReportV2 => smallvec![(DataCategory::UserReportV2, 1)], @@ -953,6 +965,8 @@ impl Item { | ItemType::Profile | ItemType::CheckIn | ItemType::Span + | ItemType::Log + | ItemType::OtelLog | ItemType::OtelSpan | ItemType::OtelTracesData | ItemType::ProfileChunk => false, @@ -987,6 +1001,7 @@ impl Item { ItemType::Profile => true, ItemType::CheckIn => false, ItemType::Span => false, + ItemType::Log | ItemType::OtelLog => false, ItemType::OtelSpan => false, ItemType::OtelTracesData => false, ItemType::ProfileChunk => false, diff --git a/relay-server/src/services/outcome.rs b/relay-server/src/services/outcome.rs index 756dff63c29..5986de51540 100644 --- a/relay-server/src/services/outcome.rs +++ b/relay-server/src/services/outcome.rs @@ -455,6 +455,9 @@ pub enum DiscardReason { /// (Relay) Profiling related discard reasons Profiling(&'static str), + /// (Relay) A log that is not valid after normalization. + InvalidLog, + /// (Relay) A span is not valid after normalization. InvalidSpan, @@ -504,6 +507,7 @@ impl DiscardReason { DiscardReason::InvalidReplayRecordingEvent => "invalid_replay_recording", DiscardReason::InvalidReplayVideoEvent => "invalid_replay_video", DiscardReason::Profiling(reason) => reason, + DiscardReason::InvalidLog => "invalid_log", DiscardReason::InvalidSpan => "invalid_span", DiscardReason::FeatureDisabled(_) => "feature_disabled", } diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 3417f5ed08a..9418e9065c6 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -86,6 +86,7 @@ mod attachment; mod dynamic_sampling; mod event; mod metrics; +mod ourlog; mod profile; mod profile_chunk; mod replay; @@ -193,6 +194,7 @@ processing_group!(StandaloneGroup, Standalone); processing_group!(ClientReportGroup, ClientReport); processing_group!(ReplayGroup, Replay); processing_group!(CheckInGroup, CheckIn); +processing_group!(LogGroup, Log); processing_group!(SpanGroup, Span); impl Sampling for SpanGroup { @@ -240,6 +242,8 @@ pub enum ProcessingGroup { Replay, /// Crons. CheckIn, + /// Logs. + Log, /// Spans. Span, /// Metrics. @@ -303,6 +307,7 @@ impl ProcessingGroup { &ItemType::Span | &ItemType::OtelSpan | &ItemType::OtelTracesData ) }); + if !span_items.is_empty() { grouped_envelopes.push(( ProcessingGroup::Span, @@ -310,6 +315,17 @@ impl ProcessingGroup { )) } + // Extract logs. + let logs_items = + envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Log | &ItemType::OtelLog)); + + if !logs_items.is_empty() { + grouped_envelopes.push(( + ProcessingGroup::Log, + Envelope::from_parts(headers.clone(), logs_items), + )) + } + // Extract all metric items. // // Note: Should only be relevant in proxy mode. In other modes we send metrics through @@ -411,6 +427,7 @@ impl ProcessingGroup { ProcessingGroup::ClientReport => "client_report", ProcessingGroup::Replay => "replay", ProcessingGroup::CheckIn => "check_in", + ProcessingGroup::Log => "log", ProcessingGroup::Span => "span", ProcessingGroup::Metrics => "metrics", ProcessingGroup::ProfileChunk => "profile_chunk", @@ -430,6 +447,7 @@ impl From for AppFeature { ProcessingGroup::ClientReport => AppFeature::ClientReports, ProcessingGroup::Replay => AppFeature::Replays, ProcessingGroup::CheckIn => AppFeature::CheckIns, + ProcessingGroup::Log => AppFeature::Logs, ProcessingGroup::Span => AppFeature::Spans, ProcessingGroup::Metrics => AppFeature::UnattributedMetrics, ProcessingGroup::ProfileChunk => AppFeature::Profiles, @@ -2000,6 +2018,35 @@ impl EnvelopeProcessorService { Ok(Some(extracted_metrics)) } + /// Process logs + /// + fn process_logs( + &self, + managed_envelope: &mut TypedEnvelope, + project_info: Arc, + #[allow(unused_variables)] rate_limits: Arc, + ) -> Result, ProcessingError> { + #[allow(unused_mut)] + let mut extracted_metrics = ProcessingExtractedMetrics::new(); + + ourlog::filter( + managed_envelope, + self.inner.config.clone(), + project_info.clone(), + ); + if_processing!(self.inner.config, { + self.enforce_quotas( + managed_envelope, + Annotated::empty(), + &mut extracted_metrics, + project_info.clone(), + rate_limits, + )?; + ourlog::process(managed_envelope, project_info.clone()); + }); + Ok(Some(extracted_metrics)) + } + /// Processes standalone spans. /// /// This function does *not* run for spans extracted from transactions. @@ -2155,6 +2202,7 @@ impl EnvelopeProcessorService { ProcessingGroup::CheckIn => { run!(process_checkins, project_id, project_info, rate_limits) } + ProcessingGroup::Log => run!(process_logs, project_info, rate_limits), ProcessingGroup::Span => run!( process_standalone_spans, self.inner.config.clone(), diff --git a/relay-server/src/services/processor/event.rs b/relay-server/src/services/processor/event.rs index d6032e3e98c..0769e5c8b8f 100644 --- a/relay-server/src/services/processor/event.rs +++ b/relay-server/src/services/processor/event.rs @@ -468,6 +468,8 @@ fn is_duplicate(item: &Item, processing_enabled: bool) -> bool { ItemType::ReplayRecording => false, ItemType::ReplayVideo => false, ItemType::CheckIn => false, + ItemType::Log => false, + ItemType::OtelLog => false, ItemType::Span => false, ItemType::OtelSpan => false, ItemType::OtelTracesData => false, diff --git a/relay-server/src/services/processor/ourlog.rs b/relay-server/src/services/processor/ourlog.rs new file mode 100644 index 00000000000..d4a6ad8bbfa --- /dev/null +++ b/relay-server/src/services/processor/ourlog.rs @@ -0,0 +1,105 @@ +//! Log processing code. +use std::sync::Arc; + +use crate::services::processor::LogGroup; +use relay_config::Config; +use relay_dynamic_config::Feature; + +use crate::services::processor::should_filter; +use crate::services::projects::project::ProjectInfo; +use crate::utils::{ItemAction, TypedEnvelope}; + +#[cfg(feature = "processing")] +use { + crate::envelope::ContentType, + crate::envelope::{Item, ItemType}, + crate::services::outcome::{DiscardReason, Outcome}, + crate::services::processor::ProcessingError, + relay_dynamic_config::ProjectConfig, + relay_event_schema::processor::{process_value, ProcessingState}, + relay_event_schema::protocol::OurLog, + relay_ourlogs::OtelLog, + relay_pii::PiiProcessor, + relay_protocol::Annotated, +}; + +/// Removes logs from the envelope if the feature is not enabled. +pub fn filter( + managed_envelope: &mut TypedEnvelope, + config: Arc, + project_info: Arc, +) { + let logging_disabled = should_filter(&config, &project_info, Feature::OurLogsIngestion); + managed_envelope.retain_items(|_| { + if logging_disabled { + ItemAction::DropSilently + } else { + ItemAction::Keep + } + }); +} + +/// Processes logs. +#[cfg(feature = "processing")] +pub fn process(managed_envelope: &mut TypedEnvelope, project_info: Arc) { + managed_envelope.retain_items(|item| { + let mut annotated_log = match item.ty() { + ItemType::OtelLog => match serde_json::from_slice::(&item.payload()) { + Ok(otel_log) => Annotated::new(relay_ourlogs::otel_to_sentry_log(otel_log)), + Err(err) => { + relay_log::debug!("failed to parse OTel Log: {}", err); + return ItemAction::Drop(Outcome::Invalid(DiscardReason::InvalidLog)); + } + }, + ItemType::Log => match Annotated::::from_json_bytes(&item.payload()) { + Ok(our_log) => our_log, + Err(err) => { + relay_log::debug!("failed to parse Sentry Log: {}", err); + return ItemAction::Drop(Outcome::Invalid(DiscardReason::InvalidLog)); + } + }, + + _ => return ItemAction::Keep, + }; + + if let Err(e) = scrub(&mut annotated_log, &project_info.config) { + relay_log::error!("failed to scrub pii from log: {}", e); + return ItemAction::Drop(Outcome::Invalid(DiscardReason::Internal)); + } + + let mut new_item = Item::new(ItemType::Log); + let payload = match annotated_log.to_json() { + Ok(payload) => payload, + Err(err) => { + relay_log::debug!("failed to serialize log: {}", err); + return ItemAction::Drop(Outcome::Invalid(DiscardReason::Internal)); + } + }; + new_item.set_payload(ContentType::Json, payload); + + *item = new_item; + + ItemAction::Keep + }); +} + +#[cfg(feature = "processing")] +fn scrub( + annotated_log: &mut Annotated, + project_config: &ProjectConfig, +) -> Result<(), ProcessingError> { + if let Some(ref config) = project_config.pii_config { + let mut processor = PiiProcessor::new(config.compiled()); + process_value(annotated_log, &mut processor, ProcessingState::root())?; + } + let pii_config = project_config + .datascrubbing_settings + .pii_config() + .map_err(|e| ProcessingError::PiiConfigError(e.clone()))?; + if let Some(config) = pii_config { + let mut processor = PiiProcessor::new(config.compiled()); + process_value(annotated_log, &mut processor, ProcessingState::root())?; + } + + Ok(()) +} diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 81649cb42df..0ec81e31253 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -289,6 +289,7 @@ impl StoreService { ItemType::Span => { self.produce_span(scoping, received_at, event_id, retention, item)? } + ItemType::Log => self.produce_log(scoping, received_at, retention, item)?, ItemType::ProfileChunk => self.produce_profile_chunk( scoping.organization_id, scoping.project_id, @@ -939,6 +940,52 @@ impl StoreService { scoping, timestamp: received_at, }); + Ok(()) + } + + fn produce_log( + &self, + scoping: Scoping, + received_at: DateTime, + retention_days: u16, + item: &Item, + ) -> Result<(), StoreError> { + relay_log::trace!("Producing log"); + let payload = item.payload(); + let payload_len = payload.len(); + + let message = KafkaMessage::Log { + headers: BTreeMap::from([("project_id".to_string(), scoping.project_id.to_string())]), + message: LogKafkaMessage { + payload, + organization_id: scoping.organization_id.value(), + project_id: scoping.project_id.value(), + retention_days, + received: safe_timestamp(received_at), + }, + }; + + self.produce(KafkaTopic::OurLogs, message)?; + + // We need to track the count and bytes separately for possible rate limits and quotas on both counts and bytes. + self.outcome_aggregator.send(TrackOutcome { + category: DataCategory::LogItem, + event_id: None, + outcome: Outcome::Accepted, + quantity: 1, + remote_addr: None, + scoping, + timestamp: received_at, + }); + self.outcome_aggregator.send(TrackOutcome { + category: DataCategory::LogByte, + event_id: None, + outcome: Outcome::Accepted, + quantity: payload_len as u32, + remote_addr: None, + scoping, + timestamp: received_at, + }); Ok(()) } @@ -1305,6 +1352,17 @@ struct SpanKafkaMessage<'a> { platform: Cow<'a, str>, // We only use this for logging for now } +#[derive(Debug, Deserialize, Serialize)] +struct LogKafkaMessage { + /// Raw log payload. + payload: Bytes, + organization_id: u64, + project_id: u64, + /// Number of days until these data should be deleted. + retention_days: u16, + received: u64, +} + fn none_or_empty_object(value: &Option<&RawValue>) -> bool { match value { None => true, @@ -1346,6 +1404,12 @@ enum KafkaMessage<'a> { #[serde(flatten)] message: SpanKafkaMessage<'a>, }, + Log { + #[serde(skip)] + headers: BTreeMap, + #[serde(flatten)] + message: LogKafkaMessage, + }, ProfileChunk(ProfileChunkKafkaMessage), } @@ -1368,6 +1432,7 @@ impl Message for KafkaMessage<'_> { KafkaMessage::ReplayEvent(_) => "replay_event", KafkaMessage::ReplayRecordingNotChunked(_) => "replay_recording_not_chunked", KafkaMessage::CheckIn(_) => "check_in", + KafkaMessage::Log { .. } => "log", KafkaMessage::Span { .. } => "span", KafkaMessage::ProfileChunk(_) => "profile_chunk", } @@ -1391,6 +1456,7 @@ impl Message for KafkaMessage<'_> { // Random partitioning Self::Profile(_) | Self::Span { .. } + | Self::Log { .. } | Self::ReplayRecordingNotChunked(_) | Self::ProfileChunk(_) => Uuid::nil(), @@ -1419,6 +1485,12 @@ impl Message for KafkaMessage<'_> { } None } + KafkaMessage::Log { headers, .. } => { + if !headers.is_empty() { + return Some(headers); + } + None + } KafkaMessage::Span { headers, .. } => { if !headers.is_empty() { return Some(headers); diff --git a/relay-server/src/utils/managed_envelope.rs b/relay-server/src/utils/managed_envelope.rs index ad9c6234643..0ca927b3d87 100644 --- a/relay-server/src/utils/managed_envelope.rs +++ b/relay-server/src/utils/managed_envelope.rs @@ -414,6 +414,21 @@ impl ManagedEnvelope { ); } + if self.context.summary.log_item_quantity > 0 { + self.track_outcome( + outcome.clone(), + DataCategory::LogItem, + self.context.summary.log_item_quantity, + ); + } + if self.context.summary.log_byte_quantity > 0 { + self.track_outcome( + outcome.clone(), + DataCategory::LogByte, + self.context.summary.log_byte_quantity, + ); + } + // Track outcomes for attached secondary transactions, e.g. extracted from metrics. // // Primary transaction count is already tracked through the event category diff --git a/relay-server/src/utils/rate_limits.rs b/relay-server/src/utils/rate_limits.rs index 7eb91c1ca86..308c94a0bda 100644 --- a/relay-server/src/utils/rate_limits.rs +++ b/relay-server/src/utils/rate_limits.rs @@ -128,6 +128,8 @@ fn infer_event_category(item: &Item) -> Option { ItemType::ReplayVideo => None, ItemType::ClientReport => None, ItemType::CheckIn => None, + ItemType::Log => None, + ItemType::OtelLog => None, ItemType::Span => None, ItemType::OtelSpan => None, ItemType::OtelTracesData => None, @@ -167,6 +169,12 @@ pub struct EnvelopeSummary { /// The number of monitor check-ins. pub monitor_quantity: usize, + /// The number of log for the log product sent. + pub log_item_quantity: usize, + + /// The number of log bytes for the log product sent, in bytes + pub log_byte_quantity: usize, + /// Secondary number of transactions. /// /// This is 0 for envelopes which contain a transaction, @@ -224,6 +232,7 @@ impl EnvelopeSummary { } summary.payload_size += item.len(); + for (category, quantity) in item.quantities(CountFor::RateLimits) { summary.add_quantity(category, quantity); } @@ -242,6 +251,8 @@ impl EnvelopeSummary { DataCategory::ReplayVideo => &mut self.replay_video_quantity, DataCategory::Monitor => &mut self.monitor_quantity, DataCategory::Span => &mut self.span_quantity, + DataCategory::LogItem => &mut self.log_item_quantity, + DataCategory::LogByte => &mut self.log_byte_quantity, DataCategory::ProfileChunk => &mut self.profile_chunk_quantity, // TODO: This catch-all return looks dangerous _ => return, @@ -349,6 +360,10 @@ pub struct Enforcement { pub replay_videos: CategoryLimit, /// The combined check-in item rate limit. pub check_ins: CategoryLimit, + /// The combined logs (our product logs) rate limit. + pub log_items: CategoryLimit, + /// The combined logs (our product logs) rate limit. + pub log_bytes: CategoryLimit, /// The combined spans rate limit. pub spans: CategoryLimit, /// The rate limit for the indexed span category. @@ -391,6 +406,8 @@ impl Enforcement { replays, replay_videos, check_ins, + log_items, + log_bytes, spans, spans_indexed, user_reports_v2, @@ -407,6 +424,8 @@ impl Enforcement { replays, replay_videos, check_ins, + log_items, + log_bytes, spans, spans_indexed, user_reports_v2, @@ -495,6 +514,9 @@ impl Enforcement { ItemType::ReplayVideo => !self.replay_videos.is_active(), ItemType::ReplayRecording => !self.replays.is_active(), ItemType::CheckIn => !self.check_ins.is_active(), + ItemType::OtelLog | ItemType::Log => { + !(self.log_items.is_active() || self.log_bytes.is_active()) + } ItemType::Span | ItemType::OtelSpan | ItemType::OtelTracesData => { !self.spans_indexed.is_active() } @@ -707,6 +729,28 @@ where rate_limits.merge(session_limits); } + // Handle logs. + if summary.log_item_quantity > 0 { + let item_scoping = scoping.item(DataCategory::LogItem); + let log_limits = self.check.apply(item_scoping, summary.log_item_quantity)?; + enforcement.log_items = CategoryLimit::new( + DataCategory::LogItem, + summary.log_item_quantity, + log_limits.longest(), + ); + rate_limits.merge(log_limits); + } + if summary.log_byte_quantity > 0 { + let item_scoping = scoping.item(DataCategory::LogByte); + let log_limits = self.check.apply(item_scoping, summary.log_byte_quantity)?; + enforcement.log_bytes = CategoryLimit::new( + DataCategory::LogByte, + summary.log_byte_quantity, + log_limits.longest(), + ); + rate_limits.merge(log_limits); + } + // Handle profiles. if enforcement.is_event_active() { enforcement.profiles = enforcement @@ -1652,4 +1696,34 @@ mod tests { assert_eq!(summary.profile_quantity, 2); assert_eq!(summary.secondary_transaction_quantity, 7); } + + #[test] + fn test_enforce_limit_logs_count() { + let mut envelope = envelope![Log, Log]; + + let mut mock = MockLimiter::default().deny(DataCategory::LogItem); + let (enforcement, limits) = enforce_and_apply(&mut mock, &mut envelope, None); + + assert!(limits.is_limited()); + assert_eq!(envelope.envelope().len(), 0); + mock.assert_call(DataCategory::LogItem, 2); + mock.assert_call(DataCategory::LogByte, 20); + + assert_eq!(get_outcomes(enforcement), vec![(DataCategory::LogItem, 2)]); + } + + #[test] + fn test_enforce_limit_logs_bytes() { + let mut envelope = envelope![Log, Log]; + + let mut mock = MockLimiter::default().deny(DataCategory::LogByte); + let (enforcement, limits) = enforce_and_apply(&mut mock, &mut envelope, None); + + assert!(limits.is_limited()); + assert_eq!(envelope.envelope().len(), 0); + mock.assert_call(DataCategory::LogItem, 2); + mock.assert_call(DataCategory::LogByte, 20); + + assert_eq!(get_outcomes(enforcement), vec![(DataCategory::LogByte, 20)]); + } } diff --git a/relay-server/src/utils/sizes.rs b/relay-server/src/utils/sizes.rs index 6c03420c2df..96ec13e3de3 100644 --- a/relay-server/src/utils/sizes.rs +++ b/relay-server/src/utils/sizes.rs @@ -15,6 +15,7 @@ use crate::utils::{ItemAction, ManagedEnvelope}; /// - `max_attachments_size` /// - `max_check_in_size` /// - `max_event_size` +/// - `max_log_size` /// - `max_metric_buckets_size` /// - `max_profile_size` /// - `max_replay_compressed_size` @@ -61,6 +62,8 @@ pub fn check_envelope_size_limits(config: &Config, envelope: &Envelope) -> Resul ItemType::UserReport => NO_LIMIT, ItemType::Statsd => config.max_statsd_size(), ItemType::MetricBuckets => config.max_metric_buckets_size(), + ItemType::Log => config.max_log_size(), + ItemType::OtelLog => config.max_log_size(), ItemType::Span | ItemType::OtelSpan => config.max_span_size(), ItemType::OtelTracesData => config.max_event_size(), // a spans container similar to `Transaction` ItemType::ProfileChunk => config.max_profile_size(), diff --git a/requirements-dev.txt b/requirements-dev.txt index 1476a8deae7..126d6f9e57a 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -10,13 +10,13 @@ devservices==1.0.10 flake8==7.0.0 confluent-kafka==2.1.1 flask==3.0.3 -msgpack==1.0.7 +msgpack==1.1.0 opentelemetry-proto==1.22.0 pytest-localserver==0.8.1 pytest-sentry==0.3.0 pytest-xdist==3.5.0 pytest==7.4.3 -PyYAML==6.0.1 +PyYAML==6.0.2 redis==4.5.4 requests==2.32.2 sentry_sdk==2.10.0 diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 97cad120413..a89f638780e 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -34,6 +34,7 @@ replay_events_consumer, monitors_consumer, spans_consumer, + ourlogs_consumer, profiles_consumer, feedback_consumer, ) diff --git a/tests/integration/fixtures/processing.py b/tests/integration/fixtures/processing.py index 98efbefee38..2a1b19b849f 100644 --- a/tests/integration/fixtures/processing.py +++ b/tests/integration/fixtures/processing.py @@ -60,6 +60,7 @@ def inner(options=None): "metrics_generic": metrics_topic, "replay_events": get_topic_name("replay_events"), "replay_recordings": get_topic_name("replay_recordings"), + "ourlogs": get_topic_name("ourlogs"), "monitors": get_topic_name("monitors"), "spans": get_topic_name("spans"), "profiles": get_topic_name("profiles"), @@ -348,6 +349,11 @@ def spans_consumer(consumer_fixture): yield from consumer_fixture(SpansConsumer, "spans") +@pytest.fixture +def ourlogs_consumer(consumer_fixture): + yield from consumer_fixture(OurLogsConsumer, "ourlogs") + + @pytest.fixture def profiles_consumer(consumer_fixture): yield from consumer_fixture(ProfileConsumer, "profiles") @@ -508,6 +514,25 @@ def get_spans(self, timeout=None, n=None): return spans +class OurLogsConsumer(ConsumerBase): + def get_ourlog(self): + message = self.poll() + assert message is not None + assert message.error() is None + + message_dict = msgpack.unpackb(message.value(), raw=False, use_list=False) + return json.loads(message_dict["payload"].decode("utf8")), message_dict + + def get_ourlogs(self): + ourlogs = [] + for message in self.poll_many(): + assert message is not None + assert message.error() is None + message_dict = msgpack.unpackb(message.value(), raw=False, use_list=False) + ourlogs.append(json.loads(message_dict["payload"].decode("utf8"))) + return ourlogs + + class ProfileConsumer(ConsumerBase): def get_profile(self): message = self.poll() diff --git a/tests/integration/test_ourlogs.py b/tests/integration/test_ourlogs.py new file mode 100644 index 00000000000..6652f9bb486 --- /dev/null +++ b/tests/integration/test_ourlogs.py @@ -0,0 +1,118 @@ +import json +from datetime import datetime, timedelta, timezone + +from sentry_sdk.envelope import Envelope, Item, PayloadRef + + +TEST_CONFIG = { + "aggregator": { + "bucket_interval": 1, + "initial_delay": 0, + } +} + + +def envelope_with_ourlogs(start: datetime, end: datetime) -> Envelope: + envelope = Envelope() + envelope.add_item( + Item( + type="otel_log", + payload=PayloadRef( + bytes=json.dumps( + { + "timeUnixNano": str(int(start.timestamp() * 1e9)), + "observedTimeUnixNano": str(int(end.timestamp() * 1e9)), + "severityNumber": 10, + "severityText": "Information", + "traceId": "5B8EFFF798038103D269B633813FC60C", + "spanId": "EEE19B7EC3C1B174", + "body": {"stringValue": "Example log record"}, + "attributes": [ + { + "key": "string.attribute", + "value": {"stringValue": "some string"}, + }, + {"key": "boolean.attribute", "value": {"boolValue": True}}, + {"key": "int.attribute", "value": {"intValue": "10"}}, + { + "key": "double.attribute", + "value": {"doubleValue": 637.704}, + }, + ], + } + ).encode() + ), + ) + ) + return envelope + + +def test_ourlog_extraction( + mini_sentry, + relay_with_processing, + ourlogs_consumer, +): + ourlogs_consumer = ourlogs_consumer() + project_id = 42 + project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"]["features"] = [ + "organizations:ourlogs-ingestion", + ] + + relay = relay_with_processing(options=TEST_CONFIG) + + duration = timedelta(milliseconds=500) + now = datetime.now(timezone.utc) + end = now - timedelta(seconds=1) + start = end - duration + + # Send OTel log and sentry log via envelope + envelope = envelope_with_ourlogs(start, end) + relay.send_envelope(project_id, envelope) + + ourlogs = ourlogs_consumer.get_ourlogs() + assert len(ourlogs) == 1 + expected_0 = { + "timestamp_nanos": int(start.timestamp() * 1e9), + "observed_timestamp_nanos": int(end.timestamp() * 1e9), + "trace_id": "5b8efff798038103d269b633813fc60c", + "body": "Example log record", + "trace_flags": 0.0, + "span_id": "eee19b7ec3c1b174", + "severity_text": "Information", + "severity_number": 10, + "attributes": { + "string.attribute": {"string_value": "some string"}, + "boolean.attribute": {"bool_value": True}, + "int.attribute": {"int_value": 10}, + "double.attribute": {"double_value": 637.704}, + }, + } + assert ourlogs[0] == expected_0 + + ourlogs_consumer.assert_empty() + + +def test_ourlog_extraction_is_disabled_without_feature( + mini_sentry, + relay_with_processing, + ourlogs_consumer, +): + ourlogs_consumer = ourlogs_consumer() + relay = relay_with_processing(options=TEST_CONFIG) + project_id = 42 + project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"]["features"] = [] + + duration = timedelta(milliseconds=500) + now = datetime.now(timezone.utc) + end = now - timedelta(seconds=1) + start = end - duration + + envelope = envelope_with_ourlogs(start, end) + relay.send_envelope(project_id, envelope) + + ourlogs = ourlogs_consumer.get_ourlogs() + assert len(ourlogs) == 0 + + ourlogs_consumer.assert_empty() From cda733e3c52f4402c09dd1c8296d96ea6c917fc0 Mon Sep 17 00:00:00 2001 From: Kev <6111995+k-fish@users.noreply.github.com> Date: Thu, 23 Jan 2025 16:42:33 -0500 Subject: [PATCH 2/6] Update CHANGELOG.md --- CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f75fbc136f2..1d277731ebd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## Unreleased +- Allow log ingestion behind a flag, only for internal use currently. ([#4471](https://github.com/getsentry/relay/pull/4471)) + **Features**: - Add configuration option to limit the amount of concurrent http connections. ([#4453](https://github.com/getsentry/relay/pull/4453)) @@ -21,7 +23,6 @@ **Internal**: - Updates performance score calculation on spans and events to also store cdf values as measurements. ([#4438](https://github.com/getsentry/relay/pull/4438)) -- Allow log ingestion behind a flag, only for internal use currently. ([#4448](https://github.com/getsentry/relay/pull/4448)) ## 24.12.2 From 57f82947ff21ce4ab58a46983b43ff2fbd3f9c2b Mon Sep 17 00:00:00 2001 From: Kev <6111995+k-fish@users.noreply.github.com> Date: Thu, 23 Jan 2025 16:43:10 -0500 Subject: [PATCH 3/6] Update consts.py --- py/sentry_relay/consts.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/py/sentry_relay/consts.py b/py/sentry_relay/consts.py index 6b6af192c57..b3f4e850cbb 100644 --- a/py/sentry_relay/consts.py +++ b/py/sentry_relay/consts.py @@ -8,7 +8,7 @@ class DataCategory(IntEnum): - # start generated + # begin generated DEFAULT = 0 ERROR = 1 TRANSACTION = 2 From 06eb1b91460bc76933ff8d4951ba50636a22b590 Mon Sep 17 00:00:00 2001 From: Kev Date: Tue, 28 Jan 2025 19:21:44 -0500 Subject: [PATCH 4/6] Switch back to flat json structure --- relay-server/src/services/store.rs | 74 +++++++++++++++++++++++++----- 1 file changed, 62 insertions(+), 12 deletions(-) diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 0ec81e31253..1067cd1bee6 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -954,15 +954,44 @@ impl StoreService { let payload = item.payload(); let payload_len = payload.len(); + let d = &mut Deserializer::from_slice(&payload); + + let mut log: LogKafkaMessage = match serde_path_to_error::deserialize(d) { + Ok(log) => log, + Err(error) => { + relay_log::error!( + error = &error as &dyn std::error::Error, + "failed to parse log" + ); + self.outcome_aggregator.send(TrackOutcome { + category: DataCategory::LogItem, + event_id: None, + outcome: Outcome::Invalid(DiscardReason::InvalidLog), + quantity: 1, + remote_addr: None, + scoping, + timestamp: received_at, + }); + self.outcome_aggregator.send(TrackOutcome { + category: DataCategory::LogByte, + event_id: None, + outcome: Outcome::Invalid(DiscardReason::InvalidLog), + quantity: payload.len() as u32, + remote_addr: None, + scoping, + timestamp: received_at, + }); + return Ok(()); + } + }; + + log.organization_id = scoping.organization_id.value(); + log.project_id = scoping.project_id.value(); + log.retention_days = retention_days; + log.received = safe_timestamp(received_at); let message = KafkaMessage::Log { headers: BTreeMap::from([("project_id".to_string(), scoping.project_id.to_string())]), - message: LogKafkaMessage { - payload, - organization_id: scoping.organization_id.value(), - project_id: scoping.project_id.value(), - retention_days, - received: safe_timestamp(received_at), - }, + message: log, }; self.produce(KafkaTopic::OurLogs, message)?; @@ -1353,14 +1382,32 @@ struct SpanKafkaMessage<'a> { } #[derive(Debug, Deserialize, Serialize)] -struct LogKafkaMessage { - /// Raw log payload. - payload: Bytes, +struct LogKafkaMessage<'a> { + #[serde(default)] organization_id: u64, + #[serde(default)] project_id: u64, - /// Number of days until these data should be deleted. + #[serde(default)] + timestamp_nanos: u64, + #[serde(default)] + observed_timestamp_nanos: u64, + #[serde(default)] retention_days: u16, + #[serde(default)] received: u64, + body: &'a RawValue, + + trace_id: EventId, + #[serde(default, skip_serializing_if = "Option::is_none")] + span_id: Option<&'a str>, + #[serde(default, skip_serializing_if = "Option::is_none")] + severity_text: Option<&'a str>, + #[serde(default, skip_serializing_if = "Option::is_none")] + severity_number: Option, + #[serde(default, skip_serializing_if = "none_or_empty_object")] + attributes: Option<&'a RawValue>, + #[serde(default, skip_serializing_if = "Option::is_none")] + flags: Option, } fn none_or_empty_object(value: &Option<&RawValue>) -> bool { @@ -1408,7 +1455,7 @@ enum KafkaMessage<'a> { #[serde(skip)] headers: BTreeMap, #[serde(flatten)] - message: LogKafkaMessage, + message: LogKafkaMessage<'a>, }, ProfileChunk(ProfileChunkKafkaMessage), } @@ -1513,6 +1560,9 @@ impl Message for KafkaMessage<'_> { KafkaMessage::Span { message, .. } => serde_json::to_vec(message) .map(Cow::Owned) .map_err(ClientError::InvalidJson), + KafkaMessage::Log { message, .. } => serde_json::to_vec(message) + .map(Cow::Owned) + .map_err(ClientError::InvalidJson), _ => rmp_serde::to_vec_named(&self) .map(Cow::Owned) .map_err(ClientError::InvalidMsgPack), From dc48d3abe3f9cb4947fb4c38e3b59c0ec69c5a50 Mon Sep 17 00:00:00 2001 From: Kev Date: Tue, 28 Jan 2025 19:51:26 -0500 Subject: [PATCH 5/6] Integration test updated, switch flags from f64 to u64 and fixed naming --- relay-event-schema/src/protocol/ourlog.rs | 2 +- relay-ourlogs/src/ourlog.rs | 2 +- relay-server/src/services/store.rs | 2 +- tests/integration/fixtures/processing.py | 6 ++---- tests/integration/test_ourlogs.py | 11 ++++++++--- 5 files changed, 13 insertions(+), 10 deletions(-) diff --git a/relay-event-schema/src/protocol/ourlog.rs b/relay-event-schema/src/protocol/ourlog.rs index e4eea9882ff..753493e2cda 100644 --- a/relay-event-schema/src/protocol/ourlog.rs +++ b/relay-event-schema/src/protocol/ourlog.rs @@ -28,7 +28,7 @@ pub struct OurLog { /// Trace flag bitfield. #[metastructure(required = false)] - pub trace_flags: Annotated, + pub trace_flags: Annotated, /// This is the original string representation of the severity as it is known at the source #[metastructure(required = false, max_chars = 32, pii = "true", trim = false)] diff --git a/relay-ourlogs/src/ourlog.rs b/relay-ourlogs/src/ourlog.rs index 9662a822a11..189f6649658 100644 --- a/relay-ourlogs/src/ourlog.rs +++ b/relay-ourlogs/src/ourlog.rs @@ -61,7 +61,7 @@ pub fn otel_to_sentry_log(otel_log: OtelLog) -> OurLog { observed_timestamp_nanos: Annotated::new(otel_log.observed_time_unix_nano), trace_id: TraceId(trace_id).into(), span_id: Annotated::new(SpanId(span_id)), - trace_flags: Annotated::new(0.0), + trace_flags: Annotated::new(0), severity_text: severity_text.into(), severity_number: Annotated::new(severity_number as i64), attributes: attribute_data.into(), diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 1067cd1bee6..d0309fcbc87 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -1407,7 +1407,7 @@ struct LogKafkaMessage<'a> { #[serde(default, skip_serializing_if = "none_or_empty_object")] attributes: Option<&'a RawValue>, #[serde(default, skip_serializing_if = "Option::is_none")] - flags: Option, + trace_flags: Option, } fn none_or_empty_object(value: &Option<&RawValue>) -> bool { diff --git a/tests/integration/fixtures/processing.py b/tests/integration/fixtures/processing.py index 2a1b19b849f..f2d0a3faa90 100644 --- a/tests/integration/fixtures/processing.py +++ b/tests/integration/fixtures/processing.py @@ -520,16 +520,14 @@ def get_ourlog(self): assert message is not None assert message.error() is None - message_dict = msgpack.unpackb(message.value(), raw=False, use_list=False) - return json.loads(message_dict["payload"].decode("utf8")), message_dict + return json.loads(message.value()) def get_ourlogs(self): ourlogs = [] for message in self.poll_many(): assert message is not None assert message.error() is None - message_dict = msgpack.unpackb(message.value(), raw=False, use_list=False) - ourlogs.append(json.loads(message_dict["payload"].decode("utf8"))) + ourlogs.append(json.loads(message.value())) return ourlogs diff --git a/tests/integration/test_ourlogs.py b/tests/integration/test_ourlogs.py index 6652f9bb486..e373898f643 100644 --- a/tests/integration/test_ourlogs.py +++ b/tests/integration/test_ourlogs.py @@ -72,12 +72,15 @@ def test_ourlog_extraction( ourlogs = ourlogs_consumer.get_ourlogs() assert len(ourlogs) == 1 - expected_0 = { + expected = { + "organization_id": 1, + "project_id": 42, + "retention_days": 90, "timestamp_nanos": int(start.timestamp() * 1e9), "observed_timestamp_nanos": int(end.timestamp() * 1e9), "trace_id": "5b8efff798038103d269b633813fc60c", "body": "Example log record", - "trace_flags": 0.0, + "trace_flags": 0, "span_id": "eee19b7ec3c1b174", "severity_text": "Information", "severity_number": 10, @@ -88,7 +91,9 @@ def test_ourlog_extraction( "double.attribute": {"double_value": 637.704}, }, } - assert ourlogs[0] == expected_0 + + del ourlogs[0]["received"] + assert ourlogs[0] == expected ourlogs_consumer.assert_empty() From 756d605e295b885fec9a082e53c3ba59bcfa7280 Mon Sep 17 00:00:00 2001 From: Kev <6111995+k-fish@users.noreply.github.com> Date: Wed, 29 Jan 2025 10:06:43 -0500 Subject: [PATCH 6/6] Update relay-server/src/services/store.rs Co-authored-by: Joris Bayer --- relay-server/src/services/store.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index d0309fcbc87..e0f6fa8a9d7 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -1400,8 +1400,8 @@ struct LogKafkaMessage<'a> { trace_id: EventId, #[serde(default, skip_serializing_if = "Option::is_none")] span_id: Option<&'a str>, - #[serde(default, skip_serializing_if = "Option::is_none")] - severity_text: Option<&'a str>, + #[serde(borrow, default, skip_serializing_if = "Option::is_none")] + severity_text: Option>, #[serde(default, skip_serializing_if = "Option::is_none")] severity_number: Option, #[serde(default, skip_serializing_if = "none_or_empty_object")]