From 8d47ad5962ebf38e073e2ed339981a10e7f30be5 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Tue, 21 Jan 2025 13:37:54 +0100 Subject: [PATCH] Revert "feat(ourlogs): Allow log ingestion behind a flag" (#4463) Crashed because of missing kafka topic. --- CHANGELOG.md | 1 - Cargo.lock | 21 +- Cargo.toml | 3 +- py/sentry_relay/consts.py | 2 +- relay-cabi/include/relay.h | 4 +- relay-cogs/src/lib.rs | 3 - relay-config/src/config.rs | 8 - relay-dynamic-config/src/feature.rs | 6 +- relay-event-schema/src/processor/attrs.rs | 2 - relay-event-schema/src/processor/traits.rs | 2 - relay-event-schema/src/protocol/mod.rs | 2 - relay-event-schema/src/protocol/ourlog.rs | 243 ------------------ relay-kafka/src/config.rs | 6 +- relay-ourlogs/Cargo.toml | 29 --- relay-ourlogs/src/lib.rs | 13 - relay-ourlogs/src/ourlog.rs | 204 --------------- relay-pii/src/selector.rs | 1 - relay-server/Cargo.toml | 63 +++-- relay-server/src/envelope.rs | 15 -- relay-server/src/services/outcome.rs | 4 - relay-server/src/services/processor.rs | 48 ---- relay-server/src/services/processor/event.rs | 2 - relay-server/src/services/processor/ourlog.rs | 105 -------- relay-server/src/services/store.rs | 72 ------ relay-server/src/utils/managed_envelope.rs | 15 -- relay-server/src/utils/rate_limits.rs | 74 ------ relay-server/src/utils/sizes.rs | 3 - requirements-dev.txt | 4 +- tests/integration/conftest.py | 1 - tests/integration/fixtures/processing.py | 25 -- tests/integration/test_ourlogs.py | 118 --------- 31 files changed, 41 insertions(+), 1058 deletions(-) delete mode 100644 relay-event-schema/src/protocol/ourlog.rs delete mode 100644 relay-ourlogs/Cargo.toml delete mode 100644 relay-ourlogs/src/lib.rs delete mode 100644 relay-ourlogs/src/ourlog.rs delete mode 100644 relay-server/src/services/processor/ourlog.rs delete mode 100644 tests/integration/test_ourlogs.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 1a939124030..ed236fb2a33 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,7 +15,6 @@ **Internal**: - Updates performance score calculation on spans and events to also store cdf values as measurements. ([#4438](https://github.com/getsentry/relay/pull/4438)) -- Allow log ingestion behind a flag, only for internal use currently. ([#4448](https://github.com/getsentry/relay/pull/4448)) ## 24.12.2 diff --git a/Cargo.lock b/Cargo.lock index 5cf8c455cb0..5802be6b590 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2844,7 +2844,6 @@ dependencies = [ "opentelemetry", "percent-encoding", "rand", - "serde_json", "thiserror", ] @@ -3758,20 +3757,6 @@ dependencies = [ "uuid", ] -[[package]] -name = "relay-ourlogs" -version = "24.12.1" -dependencies = [ - "chrono", - "hex", - "insta", - "once_cell", - "opentelemetry-proto", - "relay-event-schema", - "relay-protocol", - "serde_json", -] - [[package]] name = "relay-pattern" version = "25.1.0" @@ -3989,7 +3974,6 @@ dependencies = [ "relay-log", "relay-metrics", "relay-monitors", - "relay-ourlogs", "relay-pii", "relay-profiling", "relay-protocol", @@ -4467,16 +4451,15 @@ dependencies = [ [[package]] name = "sentry-kafka-schemas" -version = "0.1.129" +version = "0.1.122" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "790627715d4ea0e58e252dcb657a44146fde401b5520bbbc3b6500764ef71c86" +checksum = "6636bbc9fda2c104d326386bf8fdcc36d4031bca525a74a970ad8bbecb7570d2" dependencies = [ "jsonschema", "serde", "serde_json", "serde_yaml", "thiserror", - "url", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index a5bd8f4454b..356c267556c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,7 +43,6 @@ relay-kafka = { path = "relay-kafka" } relay-log = { path = "relay-log" } relay-metrics = { path = "relay-metrics" } relay-monitors = { path = "relay-monitors" } -relay-ourlogs = { path = "relay-ourlogs" } relay-pattern = { path = "relay-pattern" } relay-pii = { path = "relay-pii" } relay-profiling = { path = "relay-profiling" } @@ -151,7 +150,7 @@ reqwest = "0.12.9" rmp-serde = "1.3.0" sentry = "0.34.0" sentry-core = "0.34.0" -sentry-kafka-schemas = { version = "0.1.129", default-features = false } +sentry-kafka-schemas = { version = "0.1.122", default-features = false } sentry-release-parser = { version = "1.3.2", default-features = false } sentry-types = "0.34.0" semver = "1.0.23" diff --git a/py/sentry_relay/consts.py b/py/sentry_relay/consts.py index b841567ac3d..825f7e0a119 100644 --- a/py/sentry_relay/consts.py +++ b/py/sentry_relay/consts.py @@ -8,7 +8,7 @@ class DataCategory(IntEnum): - # start generated + # begin generated DEFAULT = 0 ERROR = 1 TRANSACTION = 2 diff --git a/relay-cabi/include/relay.h b/relay-cabi/include/relay.h index a191829ed6a..db50243b8f0 100644 --- a/relay-cabi/include/relay.h +++ b/relay-cabi/include/relay.h @@ -143,14 +143,14 @@ enum RelayDataCategory { */ RELAY_DATA_CATEGORY_ATTACHMENT_ITEM = 22, /** - * LogItem + * LogCount * * This is the category for logs for which we store the count log events for users for measuring * missing breadcrumbs, and count of logs for rate limiting purposes. */ RELAY_DATA_CATEGORY_LOG_ITEM = 23, /** - * LogByte + * LogBytes * * This is the category for logs for which we store log event total bytes for users. */ diff --git a/relay-cogs/src/lib.rs b/relay-cogs/src/lib.rs index d29f8fa392b..46c13bf46b1 100644 --- a/relay-cogs/src/lib.rs +++ b/relay-cogs/src/lib.rs @@ -117,8 +117,6 @@ pub enum AppFeature { Transactions, /// Errors. Errors, - /// Logs. - Logs, /// Spans. Spans, /// Sessions. @@ -161,7 +159,6 @@ impl AppFeature { Self::Transactions => "transactions", Self::Errors => "errors", Self::Spans => "spans", - Self::Logs => "our_logs", Self::Sessions => "sessions", Self::ClientReports => "client_reports", Self::CheckIns => "check_ins", diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index fa41ef8b1dd..93c8b152a28 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -617,8 +617,6 @@ pub struct Limits { /// The maximum payload size for a profile pub max_profile_size: ByteSize, /// The maximum payload size for a span. - pub max_log_size: ByteSize, - /// The maximum payload size for a span. pub max_span_size: ByteSize, /// The maximum payload size for a statsd metric. pub max_statsd_size: ByteSize, @@ -685,7 +683,6 @@ impl Default for Limits { max_api_file_upload_size: ByteSize::mebibytes(40), max_api_chunk_upload_size: ByteSize::mebibytes(100), max_profile_size: ByteSize::mebibytes(50), - max_log_size: ByteSize::mebibytes(1), max_span_size: ByteSize::mebibytes(1), max_statsd_size: ByteSize::mebibytes(1), max_metric_buckets_size: ByteSize::mebibytes(1), @@ -2216,11 +2213,6 @@ impl Config { self.values.limits.max_check_in_size.as_bytes() } - /// Returns the maximum payload size of a log in bytes. - pub fn max_log_size(&self) -> usize { - self.values.limits.max_log_size.as_bytes() - } - /// Returns the maximum payload size of a span in bytes. pub fn max_span_size(&self) -> usize { self.values.limits.max_span_size.as_bytes() diff --git a/relay-dynamic-config/src/feature.rs b/relay-dynamic-config/src/feature.rs index 8144aa9fd6f..2609729e3d2 100644 --- a/relay-dynamic-config/src/feature.rs +++ b/relay-dynamic-config/src/feature.rs @@ -102,11 +102,7 @@ pub enum Feature { /// Serialized as `organizations:ingest-spans-in-eap` #[serde(rename = "organizations:ingest-spans-in-eap")] IngestSpansInEap, - /// Enable log ingestion for our log product (this is not internal logging). - /// - /// Serialized as `organizations:ourlogs-ingestion`. - #[serde(rename = "organizations:ourlogs-ingestion")] - OurLogsIngestion, + /// This feature has graduated and is hard-coded for external Relays. #[doc(hidden)] #[serde(rename = "projects:profiling-ingest-unsampled-profiles")] diff --git a/relay-event-schema/src/processor/attrs.rs b/relay-event-schema/src/processor/attrs.rs index 563b1e65ace..b8772a09dcf 100644 --- a/relay-event-schema/src/processor/attrs.rs +++ b/relay-event-schema/src/processor/attrs.rs @@ -46,7 +46,6 @@ pub enum ValueType { Message, Thread, Breadcrumb, - OurLog, Span, ClientSdkInfo, @@ -85,7 +84,6 @@ relay_common::derive_fromstr_and_display!(ValueType, UnknownValueTypeError, { ValueType::Message => "message", ValueType::Thread => "thread", ValueType::Breadcrumb => "breadcrumb", - ValueType::OurLog => "ourlog", ValueType::Span => "span", ValueType::ClientSdkInfo => "sdk", ValueType::Minidump => "minidump", diff --git a/relay-event-schema/src/processor/traits.rs b/relay-event-schema/src/processor/traits.rs index 9c5491872d4..ac1a78728d7 100644 --- a/relay-event-schema/src/processor/traits.rs +++ b/relay-event-schema/src/processor/traits.rs @@ -108,12 +108,10 @@ pub trait Processor: Sized { process_method!(process_breadcrumb, crate::protocol::Breadcrumb); process_method!(process_template_info, crate::protocol::TemplateInfo); process_method!(process_header_name, crate::protocol::HeaderName); - process_method!(process_ourlog, crate::protocol::OurLog); process_method!(process_span, crate::protocol::Span); process_method!(process_trace_context, crate::protocol::TraceContext); process_method!(process_native_image_path, crate::protocol::NativeImagePath); process_method!(process_contexts, crate::protocol::Contexts); - process_method!(process_attribute_value, crate::protocol::AttributeValue); fn process_other( &mut self, diff --git a/relay-event-schema/src/protocol/mod.rs b/relay-event-schema/src/protocol/mod.rs index fc1c9307787..7827447ce58 100644 --- a/relay-event-schema/src/protocol/mod.rs +++ b/relay-event-schema/src/protocol/mod.rs @@ -18,7 +18,6 @@ mod mechanism; mod metrics; mod metrics_summary; mod nel; -mod ourlog; mod relay_info; mod replay; mod request; @@ -55,7 +54,6 @@ pub use self::mechanism::*; pub use self::metrics::*; pub use self::metrics_summary::*; pub use self::nel::*; -pub use self::ourlog::*; pub use self::relay_info::*; pub use self::replay::*; pub use self::request::*; diff --git a/relay-event-schema/src/protocol/ourlog.rs b/relay-event-schema/src/protocol/ourlog.rs deleted file mode 100644 index e4eea9882ff..00000000000 --- a/relay-event-schema/src/protocol/ourlog.rs +++ /dev/null @@ -1,243 +0,0 @@ -use relay_protocol::{ - Annotated, Empty, Error, FromValue, IntoValue, Object, SkipSerialization, Value, -}; - -use serde::ser::SerializeMap; - -use crate::processor::ProcessValue; -use crate::protocol::{SpanId, TraceId}; - -#[derive(Clone, Debug, Default, PartialEq, Empty, FromValue, IntoValue, ProcessValue)] -#[metastructure(process_func = "process_ourlog", value_type = "OurLog")] -pub struct OurLog { - /// Time when the event occurred. - #[metastructure(required = true, trim = false)] - pub timestamp_nanos: Annotated, - - /// Time when the event was observed. - #[metastructure(required = true, trim = false)] - pub observed_timestamp_nanos: Annotated, - - /// The ID of the trace the log belongs to. - #[metastructure(required = false, trim = false)] - pub trace_id: Annotated, - /// The Span id. - /// - #[metastructure(required = false, trim = false)] - pub span_id: Annotated, - - /// Trace flag bitfield. - #[metastructure(required = false)] - pub trace_flags: Annotated, - - /// This is the original string representation of the severity as it is known at the source - #[metastructure(required = false, max_chars = 32, pii = "true", trim = false)] - pub severity_text: Annotated, - - /// Numerical representation of the severity level - #[metastructure(required = false)] - pub severity_number: Annotated, - - /// Log body. - #[metastructure(required = true, pii = "true", trim = false)] - pub body: Annotated, - - /// Arbitrary attributes on a log. - #[metastructure(pii = "true", trim = false)] - pub attributes: Annotated>, - - /// Additional arbitrary fields for forwards compatibility. - #[metastructure(additional_properties, retain = true, pii = "maybe", trim = false)] - pub other: Object, -} - -#[derive(Debug, Clone, PartialEq, ProcessValue)] -pub enum AttributeValue { - #[metastructure(field = "string_value", pii = "true")] - StringValue(String), - #[metastructure(field = "int_value", pii = "true")] - IntValue(i64), - #[metastructure(field = "double_value", pii = "true")] - DoubleValue(f64), - #[metastructure(field = "bool_value", pii = "true")] - BoolValue(bool), - /// Any other unknown attribute value. - /// - /// This exists to ensure other attribute values such as array and object can be added in the future. - Unknown(String), -} - -impl IntoValue for AttributeValue { - fn into_value(self) -> Value { - let mut map = Object::new(); - match self { - AttributeValue::StringValue(v) => { - map.insert("string_value".to_string(), Annotated::new(Value::String(v))); - } - AttributeValue::IntValue(v) => { - map.insert("int_value".to_string(), Annotated::new(Value::I64(v))); - } - AttributeValue::DoubleValue(v) => { - map.insert("double_value".to_string(), Annotated::new(Value::F64(v))); - } - AttributeValue::BoolValue(v) => { - map.insert("bool_value".to_string(), Annotated::new(Value::Bool(v))); - } - AttributeValue::Unknown(v) => { - map.insert("unknown".to_string(), Annotated::new(Value::String(v))); - } - } - Value::Object(map) - } - - fn serialize_payload(&self, s: S, _behavior: SkipSerialization) -> Result - where - Self: Sized, - S: serde::Serializer, - { - let mut map = s.serialize_map(None)?; - match self { - AttributeValue::StringValue(v) => { - map.serialize_entry("string_value", v)?; - } - AttributeValue::IntValue(v) => { - map.serialize_entry("int_value", v)?; - } - AttributeValue::DoubleValue(v) => { - map.serialize_entry("double_value", v)?; - } - AttributeValue::BoolValue(v) => { - map.serialize_entry("bool_value", v)?; - } - AttributeValue::Unknown(v) => { - map.serialize_entry("unknown", v)?; - } - } - map.end() - } -} - -impl AttributeValue { - pub fn string_value(&self) -> Option<&String> { - match self { - AttributeValue::StringValue(s) => Some(s), - _ => None, - } - } - pub fn int_value(&self) -> Option { - match self { - AttributeValue::IntValue(i) => Some(*i), - _ => None, - } - } - pub fn double_value(&self) -> Option { - match self { - AttributeValue::DoubleValue(d) => Some(*d), - _ => None, - } - } - pub fn bool_value(&self) -> Option { - match self { - AttributeValue::BoolValue(b) => Some(*b), - _ => None, - } - } -} - -impl Empty for AttributeValue { - #[inline] - fn is_empty(&self) -> bool { - matches!(self, Self::Unknown(_)) - } -} - -impl FromValue for AttributeValue { - fn from_value(value: Annotated) -> Annotated { - match value { - Annotated(Some(Value::String(value)), meta) => { - Annotated(Some(AttributeValue::StringValue(value)), meta) - } - Annotated(Some(Value::I64(value)), meta) => { - Annotated(Some(AttributeValue::IntValue(value)), meta) - } - Annotated(Some(Value::F64(value)), meta) => { - Annotated(Some(AttributeValue::DoubleValue(value)), meta) - } - Annotated(Some(Value::Bool(value)), meta) => { - Annotated(Some(AttributeValue::BoolValue(value)), meta) - } - Annotated(Some(value), mut meta) => { - meta.add_error(Error::expected( - "a valid attribute value (string, int, double, bool)", - )); - meta.set_original_value(Some(value)); - Annotated(None, meta) - } - Annotated(None, meta) => Annotated(None, meta), - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_ourlog_serialization() { - let json = r#"{ - "timestamp_nanos": 1544712660300000000, - "observed_timestamp_nanos": 1544712660300000000, - "trace_id": "5b8efff798038103d269b633813fc60c", - "span_id": "eee19b7ec3c1b174", - "severity_text": "Information", - "severity_number": 10, - "body": "Example log record", - "attributes": { - "boolean.attribute": { - "bool_value": true - }, - "double.attribute": { - "double_value": 637.704 - }, - "int.attribute": { - "int_value": 10 - }, - "string.attribute": { - "string_value": "some string" - } - } -}"#; - - let mut attributes = Object::new(); - attributes.insert( - "string.attribute".into(), - Annotated::new(AttributeValue::StringValue("some string".into())), - ); - attributes.insert( - "boolean.attribute".into(), - Annotated::new(AttributeValue::BoolValue(true)), - ); - attributes.insert( - "int.attribute".into(), - Annotated::new(AttributeValue::IntValue(10)), - ); - attributes.insert( - "double.attribute".into(), - Annotated::new(AttributeValue::DoubleValue(637.704)), - ); - - let log = Annotated::new(OurLog { - timestamp_nanos: Annotated::new(1544712660300000000), - observed_timestamp_nanos: Annotated::new(1544712660300000000), - severity_number: Annotated::new(10), - severity_text: Annotated::new("Information".to_string()), - trace_id: Annotated::new(TraceId("5b8efff798038103d269b633813fc60c".into())), - span_id: Annotated::new(SpanId("eee19b7ec3c1b174".into())), - body: Annotated::new("Example log record".to_string()), - attributes: Annotated::new(attributes), - ..Default::default() - }); - - assert_eq!(json, log.to_json_pretty().unwrap()); - } -} diff --git a/relay-kafka/src/config.rs b/relay-kafka/src/config.rs index faaa2b69a00..7b9e8411241 100644 --- a/relay-kafka/src/config.rs +++ b/relay-kafka/src/config.rs @@ -45,8 +45,6 @@ pub enum KafkaTopic { ReplayRecordings, /// Monitor check-ins. Monitors, - /// Logs (our log product). - OurLogs, /// Standalone spans without a transaction. Spans, /// Feedback events topic. @@ -58,7 +56,7 @@ impl KafkaTopic { /// It will have to be adjusted if the new variants are added. pub fn iter() -> std::slice::Iter<'static, Self> { use KafkaTopic::*; - static TOPICS: [KafkaTopic; 14] = [ + static TOPICS: [KafkaTopic; 13] = [ Events, Attachments, Transactions, @@ -70,7 +68,6 @@ impl KafkaTopic { ReplayEvents, ReplayRecordings, Monitors, - OurLogs, Spans, Feedback, ]; @@ -131,7 +128,6 @@ define_topic_assignments! { profiles: (KafkaTopic::Profiles, "profiles", "Stacktrace topic name"), replay_events: (KafkaTopic::ReplayEvents, "ingest-replay-events", "Replay Events topic name."), replay_recordings: (KafkaTopic::ReplayRecordings, "ingest-replay-recordings", "Recordings topic name."), - ourlogs: (KafkaTopic::OurLogs, "snuba-ourlogs", "Logs from our logs product."), monitors: (KafkaTopic::Monitors, "ingest-monitors", "Monitor check-ins."), spans: (KafkaTopic::Spans, "snuba-spans", "Standalone spans without a transaction."), feedback: (KafkaTopic::Feedback, "ingest-feedback-events", "Feedback events topic."), diff --git a/relay-ourlogs/Cargo.toml b/relay-ourlogs/Cargo.toml deleted file mode 100644 index c132967b924..00000000000 --- a/relay-ourlogs/Cargo.toml +++ /dev/null @@ -1,29 +0,0 @@ -[package] -name = "relay-ourlogs" -authors = ["Sentry "] -description = "Log normalization and processing" -homepage = "https://getsentry.github.io/relay/" -repository = "https://github.com/getsentry/relay" -version = "24.12.1" -edition = "2021" -license-file = "../LICENSE" -publish = false - -[lints] -workspace = true - -[dependencies] -chrono = { workspace = true } -hex = { workspace = true } -once_cell = { workspace = true } -opentelemetry-proto = { workspace = true, features = [ - "gen-tonic", - "with-serde", - "logs", -] } -relay-event-schema = { workspace = true } -relay-protocol = { workspace = true } -serde_json = { workspace = true } - -[dev-dependencies] -insta = { workspace = true } diff --git a/relay-ourlogs/src/lib.rs b/relay-ourlogs/src/lib.rs deleted file mode 100644 index 38130a4a4c6..00000000000 --- a/relay-ourlogs/src/lib.rs +++ /dev/null @@ -1,13 +0,0 @@ -//! Structs and functions needed to ingest OpenTelemetry logs. - -#![warn(missing_docs)] -#![doc( - html_logo_url = "https://raw.githubusercontent.com/getsentry/relay/master/artwork/relay-icon.png", - html_favicon_url = "https://raw.githubusercontent.com/getsentry/relay/master/artwork/relay-icon.png" -)] - -pub use crate::ourlog::otel_to_sentry_log; - -pub use opentelemetry_proto::tonic::logs::v1::LogRecord as OtelLog; - -mod ourlog; diff --git a/relay-ourlogs/src/ourlog.rs b/relay-ourlogs/src/ourlog.rs deleted file mode 100644 index 9662a822a11..00000000000 --- a/relay-ourlogs/src/ourlog.rs +++ /dev/null @@ -1,204 +0,0 @@ -use opentelemetry_proto::tonic::common::v1::any_value::Value as OtelValue; - -use crate::OtelLog; -use relay_event_schema::protocol::{AttributeValue, OurLog, SpanId, TraceId}; -use relay_protocol::{Annotated, Object}; - -/// Transform an OtelLog to a Sentry log. -pub fn otel_to_sentry_log(otel_log: OtelLog) -> OurLog { - let OtelLog { - severity_number, - severity_text, - body, - attributes, - trace_id, - span_id, - .. - } = otel_log; - - let span_id = hex::encode(span_id); - let trace_id = hex::encode(trace_id); - - let body = body - .and_then(|v| v.value) - .and_then(|v| match v { - OtelValue::StringValue(s) => Some(s), - _ => None, - }) - .unwrap_or_else(String::new); - - let mut attribute_data = Object::new(); - - for attribute in attributes.into_iter() { - if let Some(value) = attribute.value.and_then(|v| v.value) { - let key = attribute.key; - match value { - OtelValue::ArrayValue(_) => {} - OtelValue::BoolValue(v) => { - attribute_data.insert(key, Annotated::new(AttributeValue::BoolValue(v))); - } - OtelValue::BytesValue(v) => { - if let Ok(v) = String::from_utf8(v) { - attribute_data.insert(key, Annotated::new(AttributeValue::StringValue(v))); - } - } - OtelValue::DoubleValue(v) => { - attribute_data.insert(key, Annotated::new(AttributeValue::DoubleValue(v))); - } - OtelValue::IntValue(v) => { - attribute_data.insert(key, Annotated::new(AttributeValue::IntValue(v))); - } - OtelValue::KvlistValue(_) => {} - OtelValue::StringValue(v) => { - attribute_data.insert(key, Annotated::new(AttributeValue::StringValue(v))); - } - } - } - } - - OurLog { - timestamp_nanos: Annotated::new(otel_log.time_unix_nano), - observed_timestamp_nanos: Annotated::new(otel_log.observed_time_unix_nano), - trace_id: TraceId(trace_id).into(), - span_id: Annotated::new(SpanId(span_id)), - trace_flags: Annotated::new(0.0), - severity_text: severity_text.into(), - severity_number: Annotated::new(severity_number as i64), - attributes: attribute_data.into(), - body: Annotated::new(body), - ..Default::default() - } -} - -#[cfg(test)] -mod tests { - use super::*; - use relay_protocol::{get_path, get_value}; - - #[test] - fn parse_log() { - // https://github.com/open-telemetry/opentelemetry-proto/blob/c4214b8168d0ce2a5236185efb8a1c8950cccdd6/examples/logs.json - let json = r#"{ - "timeUnixNano": "1544712660300000000", - "observedTimeUnixNano": "1544712660300000000", - "severityNumber": 10, - "severityText": "Information", - "traceId": "5B8EFFF798038103D269B633813FC60C", - "spanId": "EEE19B7EC3C1B174", - "body": { - "stringValue": "Example log record" - }, - "attributes": [ - { - "key": "string.attribute", - "value": { - "stringValue": "some string" - } - }, - { - "key": "boolean.attribute", - "value": { - "boolValue": true - } - }, - { - "key": "int.attribute", - "value": { - "intValue": "10" - } - }, - { - "key": "double.attribute", - "value": { - "doubleValue": 637.704 - } - }, - { - "key": "array.attribute", - "value": { - "arrayValue": { - "values": [ - { - "stringValue": "many" - }, - { - "stringValue": "values" - } - ] - } - } - }, - { - "key": "map.attribute", - "value": { - "kvlistValue": { - "values": [ - { - "key": "some.map.key", - "value": { - "stringValue": "some value" - } - } - ] - } - } - } - ] - }"#; - - let otel_log: OtelLog = serde_json::from_str(json).unwrap(); - let our_log: OurLog = otel_to_sentry_log(otel_log); - let annotated_log: Annotated = Annotated::new(our_log); - assert_eq!( - get_path!(annotated_log.body), - Some(&Annotated::new("Example log record".into())) - ); - } - - #[test] - fn parse_log_with_db_attributes() { - let json = r#"{ - "timeUnixNano": "1544712660300000000", - "observedTimeUnixNano": "1544712660300000000", - "severityNumber": 10, - "severityText": "Information", - "traceId": "5B8EFFF798038103D269B633813FC60C", - "spanId": "EEE19B7EC3C1B174", - "body": { - "stringValue": "Database query executed" - }, - "attributes": [ - { - "key": "db.name", - "value": { - "stringValue": "database" - } - }, - { - "key": "db.type", - "value": { - "stringValue": "sql" - } - }, - { - "key": "db.statement", - "value": { - "stringValue": "SELECT \"table\".\"col\" FROM \"table\" WHERE \"table\".\"col\" = %s" - } - } - ] - }"#; - let otel_log: OtelLog = serde_json::from_str(json).unwrap(); - let our_log = otel_to_sentry_log(otel_log); - let annotated_log: Annotated = Annotated::new(our_log); - - assert_eq!( - get_path!(annotated_log.body), - Some(&Annotated::new("Database query executed".into())) - ); - assert_eq!( - get_value!(annotated_log.attributes["db.statement"]!).string_value(), - Some(&"SELECT \"table\".\"col\" FROM \"table\" WHERE \"table\".\"col\" = %s".into()) - ); - } -} diff --git a/relay-pii/src/selector.rs b/relay-pii/src/selector.rs index 880a71044c4..c0481107bf2 100644 --- a/relay-pii/src/selector.rs +++ b/relay-pii/src/selector.rs @@ -135,7 +135,6 @@ impl SelectorPathItem { | ValueType::Message | ValueType::Thread | ValueType::Breadcrumb - | ValueType::OurLog | ValueType::Span | ValueType::Minidump | ValueType::HeapMemory diff --git a/relay-server/Cargo.toml b/relay-server/Cargo.toml index 89bbd03b9e5..ea6aa647688 100644 --- a/relay-server/Cargo.toml +++ b/relay-server/Cargo.toml @@ -13,16 +13,16 @@ publish = false [features] default = [] processing = [ - "dep:minidump", - "dep:symbolic-common", - "dep:symbolic-unreal", - "relay-cardinality/redis", - "relay-config/processing", - "relay-kafka/producer", - "relay-metrics/redis", - "relay-quotas/redis", - "relay-redis/impl", - "relay-sampling/redis", + "dep:minidump", + "dep:symbolic-common", + "dep:symbolic-unreal", + "relay-cardinality/redis", + "relay-config/processing", + "relay-kafka/producer", + "relay-metrics/redis", + "relay-quotas/redis", + "relay-redis/impl", + "relay-sampling/redis", ] [lints] @@ -75,7 +75,6 @@ relay-event-schema = { workspace = true } relay-filter = { workspace = true } relay-kafka = { workspace = true, optional = true } relay-log = { workspace = true, features = ["sentry"] } -relay-ourlogs = { workspace = true } relay-metrics = { workspace = true } relay-monitors = { workspace = true } relay-pii = { workspace = true } @@ -89,10 +88,10 @@ relay-spans = { workspace = true } relay-statsd = { workspace = true } relay-system = { workspace = true } reqwest = { workspace = true, features = [ - "gzip", - "hickory-dns", - "stream", - "native-tls-vendored", + "gzip", + "hickory-dns", + "stream", + "native-tls-vendored", ] } rmp-serde = { workspace = true } serde = { workspace = true } @@ -101,14 +100,14 @@ serde_json = { workspace = true } smallvec = { workspace = true, features = ["drain_filter"] } socket2 = { workspace = true } sqlx = { workspace = true, features = [ - "macros", - "migrate", - "sqlite", - "runtime-tokio", + "macros", + "migrate", + "sqlite", + "runtime-tokio", ], default-features = false } symbolic-common = { workspace = true, optional = true, default-features = false } symbolic-unreal = { workspace = true, optional = true, default-features = false, features = [ - "serde", + "serde", ] } sysinfo = { workspace = true } thiserror = { workspace = true } @@ -116,18 +115,18 @@ tokio = { workspace = true, features = ["sync", "time"] } tokio-util = { workspace = true, default-features = false } tower = { workspace = true, default-features = false, features = ["limit"] } tower-http = { workspace = true, default-features = false, features = [ - "catch-panic", - "compression-br", - "compression-deflate", - "compression-gzip", - "compression-zstd", - "cors", - "decompression-br", - "decompression-deflate", - "decompression-gzip", - "decompression-zstd", - "set-header", - "trace", + "catch-panic", + "compression-br", + "compression-deflate", + "compression-gzip", + "compression-zstd", + "cors", + "decompression-br", + "decompression-deflate", + "decompression-gzip", + "decompression-zstd", + "set-header", + "trace", ] } url = { workspace = true, features = ["serde"] } uuid = { workspace = true, features = ["v5"] } diff --git a/relay-server/src/envelope.rs b/relay-server/src/envelope.rs index 6a3d6101eea..e29444199c5 100644 --- a/relay-server/src/envelope.rs +++ b/relay-server/src/envelope.rs @@ -117,10 +117,6 @@ pub enum ItemType { ReplayVideo, /// Monitor check-in encoded as JSON. CheckIn, - /// A log from the [OTEL Log format](https://opentelemetry.io/docs/specs/otel/logs/data-model/#log-and-event-record-definition) - OtelLog, - /// A log for the log product, not internal logs. - Log, /// A standalone span. Span, /// A standalone OpenTelemetry span serialized as JSON. @@ -178,8 +174,6 @@ impl ItemType { Self::ReplayRecording => "replay_recording", Self::ReplayVideo => "replay_video", Self::CheckIn => "check_in", - Self::Log => "log", - Self::OtelLog => "otel_log", Self::Span => "span", Self::OtelSpan => "otel_span", Self::OtelTracesData => "otel_traces_data", @@ -233,8 +227,6 @@ impl std::str::FromStr for ItemType { "replay_recording" => Self::ReplayRecording, "replay_video" => Self::ReplayVideo, "check_in" => Self::CheckIn, - "log" => Self::Log, - "otel_log" => Self::OtelLog, "span" => Self::Span, "otel_span" => Self::OtelSpan, "otel_traces_data" => Self::OtelTracesData, @@ -704,10 +696,6 @@ impl Item { CountFor::Outcomes => smallvec![], }, ItemType::Statsd | ItemType::MetricBuckets => smallvec![], - ItemType::Log | ItemType::OtelLog => smallvec![ - (DataCategory::LogByte, self.len().max(1)), - (DataCategory::LogItem, 1) - ], ItemType::FormData => smallvec![], ItemType::UserReport => smallvec![], ItemType::UserReportV2 => smallvec![(DataCategory::UserReportV2, 1)], @@ -964,8 +952,6 @@ impl Item { | ItemType::Profile | ItemType::CheckIn | ItemType::Span - | ItemType::Log - | ItemType::OtelLog | ItemType::OtelSpan | ItemType::OtelTracesData | ItemType::ProfileChunk => false, @@ -1000,7 +986,6 @@ impl Item { ItemType::Profile => true, ItemType::CheckIn => false, ItemType::Span => false, - ItemType::Log | ItemType::OtelLog => false, ItemType::OtelSpan => false, ItemType::OtelTracesData => false, ItemType::ProfileChunk => false, diff --git a/relay-server/src/services/outcome.rs b/relay-server/src/services/outcome.rs index 6b869fe8197..d11f1f9a97c 100644 --- a/relay-server/src/services/outcome.rs +++ b/relay-server/src/services/outcome.rs @@ -457,9 +457,6 @@ pub enum DiscardReason { /// (Relay) Profiling related discard reasons Profiling(&'static str), - /// (Relay) A log that is not valid after normalization. - InvalidLog, - /// (Relay) A span is not valid after normalization. InvalidSpan, @@ -509,7 +506,6 @@ impl DiscardReason { DiscardReason::InvalidReplayRecordingEvent => "invalid_replay_recording", DiscardReason::InvalidReplayVideoEvent => "invalid_replay_video", DiscardReason::Profiling(reason) => reason, - DiscardReason::InvalidLog => "invalid_log", DiscardReason::InvalidSpan => "invalid_span", DiscardReason::FeatureDisabled(_) => "feature_disabled", } diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 9418e9065c6..3417f5ed08a 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -86,7 +86,6 @@ mod attachment; mod dynamic_sampling; mod event; mod metrics; -mod ourlog; mod profile; mod profile_chunk; mod replay; @@ -194,7 +193,6 @@ processing_group!(StandaloneGroup, Standalone); processing_group!(ClientReportGroup, ClientReport); processing_group!(ReplayGroup, Replay); processing_group!(CheckInGroup, CheckIn); -processing_group!(LogGroup, Log); processing_group!(SpanGroup, Span); impl Sampling for SpanGroup { @@ -242,8 +240,6 @@ pub enum ProcessingGroup { Replay, /// Crons. CheckIn, - /// Logs. - Log, /// Spans. Span, /// Metrics. @@ -307,7 +303,6 @@ impl ProcessingGroup { &ItemType::Span | &ItemType::OtelSpan | &ItemType::OtelTracesData ) }); - if !span_items.is_empty() { grouped_envelopes.push(( ProcessingGroup::Span, @@ -315,17 +310,6 @@ impl ProcessingGroup { )) } - // Extract logs. - let logs_items = - envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Log | &ItemType::OtelLog)); - - if !logs_items.is_empty() { - grouped_envelopes.push(( - ProcessingGroup::Log, - Envelope::from_parts(headers.clone(), logs_items), - )) - } - // Extract all metric items. // // Note: Should only be relevant in proxy mode. In other modes we send metrics through @@ -427,7 +411,6 @@ impl ProcessingGroup { ProcessingGroup::ClientReport => "client_report", ProcessingGroup::Replay => "replay", ProcessingGroup::CheckIn => "check_in", - ProcessingGroup::Log => "log", ProcessingGroup::Span => "span", ProcessingGroup::Metrics => "metrics", ProcessingGroup::ProfileChunk => "profile_chunk", @@ -447,7 +430,6 @@ impl From for AppFeature { ProcessingGroup::ClientReport => AppFeature::ClientReports, ProcessingGroup::Replay => AppFeature::Replays, ProcessingGroup::CheckIn => AppFeature::CheckIns, - ProcessingGroup::Log => AppFeature::Logs, ProcessingGroup::Span => AppFeature::Spans, ProcessingGroup::Metrics => AppFeature::UnattributedMetrics, ProcessingGroup::ProfileChunk => AppFeature::Profiles, @@ -2018,35 +2000,6 @@ impl EnvelopeProcessorService { Ok(Some(extracted_metrics)) } - /// Process logs - /// - fn process_logs( - &self, - managed_envelope: &mut TypedEnvelope, - project_info: Arc, - #[allow(unused_variables)] rate_limits: Arc, - ) -> Result, ProcessingError> { - #[allow(unused_mut)] - let mut extracted_metrics = ProcessingExtractedMetrics::new(); - - ourlog::filter( - managed_envelope, - self.inner.config.clone(), - project_info.clone(), - ); - if_processing!(self.inner.config, { - self.enforce_quotas( - managed_envelope, - Annotated::empty(), - &mut extracted_metrics, - project_info.clone(), - rate_limits, - )?; - ourlog::process(managed_envelope, project_info.clone()); - }); - Ok(Some(extracted_metrics)) - } - /// Processes standalone spans. /// /// This function does *not* run for spans extracted from transactions. @@ -2202,7 +2155,6 @@ impl EnvelopeProcessorService { ProcessingGroup::CheckIn => { run!(process_checkins, project_id, project_info, rate_limits) } - ProcessingGroup::Log => run!(process_logs, project_info, rate_limits), ProcessingGroup::Span => run!( process_standalone_spans, self.inner.config.clone(), diff --git a/relay-server/src/services/processor/event.rs b/relay-server/src/services/processor/event.rs index 0769e5c8b8f..d6032e3e98c 100644 --- a/relay-server/src/services/processor/event.rs +++ b/relay-server/src/services/processor/event.rs @@ -468,8 +468,6 @@ fn is_duplicate(item: &Item, processing_enabled: bool) -> bool { ItemType::ReplayRecording => false, ItemType::ReplayVideo => false, ItemType::CheckIn => false, - ItemType::Log => false, - ItemType::OtelLog => false, ItemType::Span => false, ItemType::OtelSpan => false, ItemType::OtelTracesData => false, diff --git a/relay-server/src/services/processor/ourlog.rs b/relay-server/src/services/processor/ourlog.rs deleted file mode 100644 index d4a6ad8bbfa..00000000000 --- a/relay-server/src/services/processor/ourlog.rs +++ /dev/null @@ -1,105 +0,0 @@ -//! Log processing code. -use std::sync::Arc; - -use crate::services::processor::LogGroup; -use relay_config::Config; -use relay_dynamic_config::Feature; - -use crate::services::processor::should_filter; -use crate::services::projects::project::ProjectInfo; -use crate::utils::{ItemAction, TypedEnvelope}; - -#[cfg(feature = "processing")] -use { - crate::envelope::ContentType, - crate::envelope::{Item, ItemType}, - crate::services::outcome::{DiscardReason, Outcome}, - crate::services::processor::ProcessingError, - relay_dynamic_config::ProjectConfig, - relay_event_schema::processor::{process_value, ProcessingState}, - relay_event_schema::protocol::OurLog, - relay_ourlogs::OtelLog, - relay_pii::PiiProcessor, - relay_protocol::Annotated, -}; - -/// Removes logs from the envelope if the feature is not enabled. -pub fn filter( - managed_envelope: &mut TypedEnvelope, - config: Arc, - project_info: Arc, -) { - let logging_disabled = should_filter(&config, &project_info, Feature::OurLogsIngestion); - managed_envelope.retain_items(|_| { - if logging_disabled { - ItemAction::DropSilently - } else { - ItemAction::Keep - } - }); -} - -/// Processes logs. -#[cfg(feature = "processing")] -pub fn process(managed_envelope: &mut TypedEnvelope, project_info: Arc) { - managed_envelope.retain_items(|item| { - let mut annotated_log = match item.ty() { - ItemType::OtelLog => match serde_json::from_slice::(&item.payload()) { - Ok(otel_log) => Annotated::new(relay_ourlogs::otel_to_sentry_log(otel_log)), - Err(err) => { - relay_log::debug!("failed to parse OTel Log: {}", err); - return ItemAction::Drop(Outcome::Invalid(DiscardReason::InvalidLog)); - } - }, - ItemType::Log => match Annotated::::from_json_bytes(&item.payload()) { - Ok(our_log) => our_log, - Err(err) => { - relay_log::debug!("failed to parse Sentry Log: {}", err); - return ItemAction::Drop(Outcome::Invalid(DiscardReason::InvalidLog)); - } - }, - - _ => return ItemAction::Keep, - }; - - if let Err(e) = scrub(&mut annotated_log, &project_info.config) { - relay_log::error!("failed to scrub pii from log: {}", e); - return ItemAction::Drop(Outcome::Invalid(DiscardReason::Internal)); - } - - let mut new_item = Item::new(ItemType::Log); - let payload = match annotated_log.to_json() { - Ok(payload) => payload, - Err(err) => { - relay_log::debug!("failed to serialize log: {}", err); - return ItemAction::Drop(Outcome::Invalid(DiscardReason::Internal)); - } - }; - new_item.set_payload(ContentType::Json, payload); - - *item = new_item; - - ItemAction::Keep - }); -} - -#[cfg(feature = "processing")] -fn scrub( - annotated_log: &mut Annotated, - project_config: &ProjectConfig, -) -> Result<(), ProcessingError> { - if let Some(ref config) = project_config.pii_config { - let mut processor = PiiProcessor::new(config.compiled()); - process_value(annotated_log, &mut processor, ProcessingState::root())?; - } - let pii_config = project_config - .datascrubbing_settings - .pii_config() - .map_err(|e| ProcessingError::PiiConfigError(e.clone()))?; - if let Some(config) = pii_config { - let mut processor = PiiProcessor::new(config.compiled()); - process_value(annotated_log, &mut processor, ProcessingState::root())?; - } - - Ok(()) -} diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 6f1ea14d680..68bdee36106 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -290,7 +290,6 @@ impl StoreService { ItemType::Span => { self.produce_span(scoping, received_at, event_id, retention, item)? } - ItemType::Log => self.produce_log(scoping, received_at, retention, item)?, ItemType::ProfileChunk => self.produce_profile_chunk( scoping.organization_id, scoping.project_id, @@ -941,52 +940,6 @@ impl StoreService { scoping, timestamp: received_at, }); - Ok(()) - } - - fn produce_log( - &self, - scoping: Scoping, - received_at: DateTime, - retention_days: u16, - item: &Item, - ) -> Result<(), StoreError> { - relay_log::trace!("Producing log"); - let payload = item.payload(); - let payload_len = payload.len(); - - let message = KafkaMessage::Log { - headers: BTreeMap::from([("project_id".to_string(), scoping.project_id.to_string())]), - message: LogKafkaMessage { - payload, - organization_id: scoping.organization_id.value(), - project_id: scoping.project_id.value(), - retention_days, - received: safe_timestamp(received_at), - }, - }; - - self.produce(KafkaTopic::OurLogs, message)?; - - // We need to track the count and bytes separately for possible rate limits and quotas on both counts and bytes. - self.outcome_aggregator.send(TrackOutcome { - category: DataCategory::LogItem, - event_id: None, - outcome: Outcome::Accepted, - quantity: 1, - remote_addr: None, - scoping, - timestamp: received_at, - }); - self.outcome_aggregator.send(TrackOutcome { - category: DataCategory::LogByte, - event_id: None, - outcome: Outcome::Accepted, - quantity: payload_len as u32, - remote_addr: None, - scoping, - timestamp: received_at, - }); Ok(()) } @@ -1353,17 +1306,6 @@ struct SpanKafkaMessage<'a> { platform: Cow<'a, str>, // We only use this for logging for now } -#[derive(Debug, Deserialize, Serialize)] -struct LogKafkaMessage { - /// Raw log payload. - payload: Bytes, - organization_id: u64, - project_id: u64, - /// Number of days until these data should be deleted. - retention_days: u16, - received: u64, -} - fn none_or_empty_object(value: &Option<&RawValue>) -> bool { match value { None => true, @@ -1405,12 +1347,6 @@ enum KafkaMessage<'a> { #[serde(flatten)] message: SpanKafkaMessage<'a>, }, - Log { - #[serde(skip)] - headers: BTreeMap, - #[serde(flatten)] - message: LogKafkaMessage, - }, ProfileChunk(ProfileChunkKafkaMessage), } @@ -1433,7 +1369,6 @@ impl Message for KafkaMessage<'_> { KafkaMessage::ReplayEvent(_) => "replay_event", KafkaMessage::ReplayRecordingNotChunked(_) => "replay_recording_not_chunked", KafkaMessage::CheckIn(_) => "check_in", - KafkaMessage::Log { .. } => "log", KafkaMessage::Span { .. } => "span", KafkaMessage::ProfileChunk(_) => "profile_chunk", } @@ -1457,7 +1392,6 @@ impl Message for KafkaMessage<'_> { // Random partitioning Self::Profile(_) | Self::Span { .. } - | Self::Log { .. } | Self::ReplayRecordingNotChunked(_) | Self::ProfileChunk(_) => Uuid::nil(), @@ -1486,12 +1420,6 @@ impl Message for KafkaMessage<'_> { } None } - KafkaMessage::Log { headers, .. } => { - if !headers.is_empty() { - return Some(headers); - } - None - } KafkaMessage::Span { headers, .. } => { if !headers.is_empty() { return Some(headers); diff --git a/relay-server/src/utils/managed_envelope.rs b/relay-server/src/utils/managed_envelope.rs index 0ca927b3d87..ad9c6234643 100644 --- a/relay-server/src/utils/managed_envelope.rs +++ b/relay-server/src/utils/managed_envelope.rs @@ -414,21 +414,6 @@ impl ManagedEnvelope { ); } - if self.context.summary.log_item_quantity > 0 { - self.track_outcome( - outcome.clone(), - DataCategory::LogItem, - self.context.summary.log_item_quantity, - ); - } - if self.context.summary.log_byte_quantity > 0 { - self.track_outcome( - outcome.clone(), - DataCategory::LogByte, - self.context.summary.log_byte_quantity, - ); - } - // Track outcomes for attached secondary transactions, e.g. extracted from metrics. // // Primary transaction count is already tracked through the event category diff --git a/relay-server/src/utils/rate_limits.rs b/relay-server/src/utils/rate_limits.rs index 54347e7ccf5..c2d1ac98547 100644 --- a/relay-server/src/utils/rate_limits.rs +++ b/relay-server/src/utils/rate_limits.rs @@ -128,8 +128,6 @@ fn infer_event_category(item: &Item) -> Option { ItemType::ReplayVideo => None, ItemType::ClientReport => None, ItemType::CheckIn => None, - ItemType::Log => None, - ItemType::OtelLog => None, ItemType::Span => None, ItemType::OtelSpan => None, ItemType::OtelTracesData => None, @@ -166,12 +164,6 @@ pub struct EnvelopeSummary { /// The number of monitor check-ins. pub monitor_quantity: usize, - /// The number of log for the log product sent. - pub log_item_quantity: usize, - - /// The number of log bytes for the log product sent, in bytes - pub log_byte_quantity: usize, - /// Secondary number of transactions. /// /// This is 0 for envelopes which contain a transaction, @@ -229,7 +221,6 @@ impl EnvelopeSummary { } summary.payload_size += item.len(); - for (category, quantity) in item.quantities(CountFor::RateLimits) { summary.add_quantity(category, quantity); } @@ -248,8 +239,6 @@ impl EnvelopeSummary { DataCategory::ReplayVideo => &mut self.replay_quantity, DataCategory::Monitor => &mut self.monitor_quantity, DataCategory::Span => &mut self.span_quantity, - DataCategory::LogItem => &mut self.log_item_quantity, - DataCategory::LogByte => &mut self.log_byte_quantity, DataCategory::ProfileChunk => &mut self.profile_chunk_quantity, // TODO: This catch-all return looks dangerous _ => return, @@ -355,10 +344,6 @@ pub struct Enforcement { pub replays: CategoryLimit, /// The combined check-in item rate limit. pub check_ins: CategoryLimit, - /// The combined logs (our product logs) rate limit. - pub log_items: CategoryLimit, - /// The combined logs (our product logs) rate limit. - pub log_bytes: CategoryLimit, /// The combined spans rate limit. pub spans: CategoryLimit, /// The rate limit for the indexed span category. @@ -400,8 +385,6 @@ impl Enforcement { profiles_indexed, replays, check_ins, - log_items, - log_bytes, spans, spans_indexed, user_reports_v2, @@ -417,8 +400,6 @@ impl Enforcement { profiles_indexed, replays, check_ins, - log_items, - log_bytes, spans, spans_indexed, user_reports_v2, @@ -507,9 +488,6 @@ impl Enforcement { ItemType::ReplayVideo => !self.replays.is_active(), ItemType::ReplayRecording => !self.replays.is_active(), ItemType::CheckIn => !self.check_ins.is_active(), - ItemType::OtelLog | ItemType::Log => { - !(self.log_items.is_active() || self.log_bytes.is_active()) - } ItemType::Span | ItemType::OtelSpan | ItemType::OtelTracesData => { !self.spans_indexed.is_active() } @@ -722,28 +700,6 @@ where rate_limits.merge(session_limits); } - // Handle logs. - if summary.log_item_quantity > 0 { - let item_scoping = scoping.item(DataCategory::LogItem); - let log_limits = self.check.apply(item_scoping, summary.log_item_quantity)?; - enforcement.log_items = CategoryLimit::new( - DataCategory::LogItem, - summary.log_item_quantity, - log_limits.longest(), - ); - rate_limits.merge(log_limits); - } - if summary.log_byte_quantity > 0 { - let item_scoping = scoping.item(DataCategory::LogByte); - let log_limits = self.check.apply(item_scoping, summary.log_byte_quantity)?; - enforcement.log_bytes = CategoryLimit::new( - DataCategory::LogByte, - summary.log_byte_quantity, - log_limits.longest(), - ); - rate_limits.merge(log_limits); - } - // Handle profiles. if enforcement.is_event_active() { enforcement.profiles = enforcement @@ -1656,34 +1612,4 @@ mod tests { assert_eq!(summary.profile_quantity, 2); assert_eq!(summary.secondary_transaction_quantity, 7); } - - #[test] - fn test_enforce_limit_logs_count() { - let mut envelope = envelope![Log, Log]; - - let mut mock = MockLimiter::default().deny(DataCategory::LogItem); - let (enforcement, limits) = enforce_and_apply(&mut mock, &mut envelope, None); - - assert!(limits.is_limited()); - assert_eq!(envelope.envelope().len(), 0); - mock.assert_call(DataCategory::LogItem, 2); - mock.assert_call(DataCategory::LogByte, 20); - - assert_eq!(get_outcomes(enforcement), vec![(DataCategory::LogItem, 2)]); - } - - #[test] - fn test_enforce_limit_logs_bytes() { - let mut envelope = envelope![Log, Log]; - - let mut mock = MockLimiter::default().deny(DataCategory::LogByte); - let (enforcement, limits) = enforce_and_apply(&mut mock, &mut envelope, None); - - assert!(limits.is_limited()); - assert_eq!(envelope.envelope().len(), 0); - mock.assert_call(DataCategory::LogItem, 2); - mock.assert_call(DataCategory::LogByte, 20); - - assert_eq!(get_outcomes(enforcement), vec![(DataCategory::LogByte, 20)]); - } } diff --git a/relay-server/src/utils/sizes.rs b/relay-server/src/utils/sizes.rs index 96ec13e3de3..6c03420c2df 100644 --- a/relay-server/src/utils/sizes.rs +++ b/relay-server/src/utils/sizes.rs @@ -15,7 +15,6 @@ use crate::utils::{ItemAction, ManagedEnvelope}; /// - `max_attachments_size` /// - `max_check_in_size` /// - `max_event_size` -/// - `max_log_size` /// - `max_metric_buckets_size` /// - `max_profile_size` /// - `max_replay_compressed_size` @@ -62,8 +61,6 @@ pub fn check_envelope_size_limits(config: &Config, envelope: &Envelope) -> Resul ItemType::UserReport => NO_LIMIT, ItemType::Statsd => config.max_statsd_size(), ItemType::MetricBuckets => config.max_metric_buckets_size(), - ItemType::Log => config.max_log_size(), - ItemType::OtelLog => config.max_log_size(), ItemType::Span | ItemType::OtelSpan => config.max_span_size(), ItemType::OtelTracesData => config.max_event_size(), // a spans container similar to `Transaction` ItemType::ProfileChunk => config.max_profile_size(), diff --git a/requirements-dev.txt b/requirements-dev.txt index 942dfd3ee4d..a3f517ab67a 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -10,13 +10,13 @@ devservices==1.0.8 flake8==7.0.0 confluent-kafka==2.1.1 flask==3.0.3 -msgpack==1.1.0 +msgpack==1.0.7 opentelemetry-proto==1.22.0 pytest-localserver==0.8.1 pytest-sentry==0.3.0 pytest-xdist==3.5.0 pytest==7.4.3 -PyYAML==6.0.2 +PyYAML==6.0.1 redis==4.5.4 requests==2.32.2 sentry_sdk==2.10.0 diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index a89f638780e..97cad120413 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -34,7 +34,6 @@ replay_events_consumer, monitors_consumer, spans_consumer, - ourlogs_consumer, profiles_consumer, feedback_consumer, ) diff --git a/tests/integration/fixtures/processing.py b/tests/integration/fixtures/processing.py index 2a1b19b849f..98efbefee38 100644 --- a/tests/integration/fixtures/processing.py +++ b/tests/integration/fixtures/processing.py @@ -60,7 +60,6 @@ def inner(options=None): "metrics_generic": metrics_topic, "replay_events": get_topic_name("replay_events"), "replay_recordings": get_topic_name("replay_recordings"), - "ourlogs": get_topic_name("ourlogs"), "monitors": get_topic_name("monitors"), "spans": get_topic_name("spans"), "profiles": get_topic_name("profiles"), @@ -349,11 +348,6 @@ def spans_consumer(consumer_fixture): yield from consumer_fixture(SpansConsumer, "spans") -@pytest.fixture -def ourlogs_consumer(consumer_fixture): - yield from consumer_fixture(OurLogsConsumer, "ourlogs") - - @pytest.fixture def profiles_consumer(consumer_fixture): yield from consumer_fixture(ProfileConsumer, "profiles") @@ -514,25 +508,6 @@ def get_spans(self, timeout=None, n=None): return spans -class OurLogsConsumer(ConsumerBase): - def get_ourlog(self): - message = self.poll() - assert message is not None - assert message.error() is None - - message_dict = msgpack.unpackb(message.value(), raw=False, use_list=False) - return json.loads(message_dict["payload"].decode("utf8")), message_dict - - def get_ourlogs(self): - ourlogs = [] - for message in self.poll_many(): - assert message is not None - assert message.error() is None - message_dict = msgpack.unpackb(message.value(), raw=False, use_list=False) - ourlogs.append(json.loads(message_dict["payload"].decode("utf8"))) - return ourlogs - - class ProfileConsumer(ConsumerBase): def get_profile(self): message = self.poll() diff --git a/tests/integration/test_ourlogs.py b/tests/integration/test_ourlogs.py deleted file mode 100644 index 6652f9bb486..00000000000 --- a/tests/integration/test_ourlogs.py +++ /dev/null @@ -1,118 +0,0 @@ -import json -from datetime import datetime, timedelta, timezone - -from sentry_sdk.envelope import Envelope, Item, PayloadRef - - -TEST_CONFIG = { - "aggregator": { - "bucket_interval": 1, - "initial_delay": 0, - } -} - - -def envelope_with_ourlogs(start: datetime, end: datetime) -> Envelope: - envelope = Envelope() - envelope.add_item( - Item( - type="otel_log", - payload=PayloadRef( - bytes=json.dumps( - { - "timeUnixNano": str(int(start.timestamp() * 1e9)), - "observedTimeUnixNano": str(int(end.timestamp() * 1e9)), - "severityNumber": 10, - "severityText": "Information", - "traceId": "5B8EFFF798038103D269B633813FC60C", - "spanId": "EEE19B7EC3C1B174", - "body": {"stringValue": "Example log record"}, - "attributes": [ - { - "key": "string.attribute", - "value": {"stringValue": "some string"}, - }, - {"key": "boolean.attribute", "value": {"boolValue": True}}, - {"key": "int.attribute", "value": {"intValue": "10"}}, - { - "key": "double.attribute", - "value": {"doubleValue": 637.704}, - }, - ], - } - ).encode() - ), - ) - ) - return envelope - - -def test_ourlog_extraction( - mini_sentry, - relay_with_processing, - ourlogs_consumer, -): - ourlogs_consumer = ourlogs_consumer() - project_id = 42 - project_config = mini_sentry.add_full_project_config(project_id) - project_config["config"]["features"] = [ - "organizations:ourlogs-ingestion", - ] - - relay = relay_with_processing(options=TEST_CONFIG) - - duration = timedelta(milliseconds=500) - now = datetime.now(timezone.utc) - end = now - timedelta(seconds=1) - start = end - duration - - # Send OTel log and sentry log via envelope - envelope = envelope_with_ourlogs(start, end) - relay.send_envelope(project_id, envelope) - - ourlogs = ourlogs_consumer.get_ourlogs() - assert len(ourlogs) == 1 - expected_0 = { - "timestamp_nanos": int(start.timestamp() * 1e9), - "observed_timestamp_nanos": int(end.timestamp() * 1e9), - "trace_id": "5b8efff798038103d269b633813fc60c", - "body": "Example log record", - "trace_flags": 0.0, - "span_id": "eee19b7ec3c1b174", - "severity_text": "Information", - "severity_number": 10, - "attributes": { - "string.attribute": {"string_value": "some string"}, - "boolean.attribute": {"bool_value": True}, - "int.attribute": {"int_value": 10}, - "double.attribute": {"double_value": 637.704}, - }, - } - assert ourlogs[0] == expected_0 - - ourlogs_consumer.assert_empty() - - -def test_ourlog_extraction_is_disabled_without_feature( - mini_sentry, - relay_with_processing, - ourlogs_consumer, -): - ourlogs_consumer = ourlogs_consumer() - relay = relay_with_processing(options=TEST_CONFIG) - project_id = 42 - project_config = mini_sentry.add_full_project_config(project_id) - project_config["config"]["features"] = [] - - duration = timedelta(milliseconds=500) - now = datetime.now(timezone.utc) - end = now - timedelta(seconds=1) - start = end - duration - - envelope = envelope_with_ourlogs(start, end) - relay.send_envelope(project_id, envelope) - - ourlogs = ourlogs_consumer.get_ourlogs() - assert len(ourlogs) == 0 - - ourlogs_consumer.assert_empty()