From 7293a50786e1477fe1f4ff7c4eba11cb704edb6c Mon Sep 17 00:00:00 2001 From: David Herberth Date: Mon, 30 Sep 2024 14:25:23 +0200 Subject: [PATCH] ref(replay): Run geo info normalization on the replay user --- relay-event-normalization/src/replay.rs | 39 ++- relay-server/src/services/processor.rs | 6 +- relay-server/src/services/processor/replay.rs | 243 ++++++++---------- 3 files changed, 138 insertions(+), 150 deletions(-) diff --git a/relay-event-normalization/src/replay.rs b/relay-event-normalization/src/replay.rs index 250a71a421c..ce557466e8f 100644 --- a/relay-event-normalization/src/replay.rs +++ b/relay-event-normalization/src/replay.rs @@ -6,9 +6,10 @@ use relay_event_schema::processor::{self, ProcessingState, Processor}; use relay_event_schema::protocol::{Contexts, IpAddr, Replay}; use relay_protocol::Annotated; +use crate::event::normalize_user_geoinfo; use crate::normalize::user_agent; -use crate::trimming; use crate::user_agent::RawUserAgentInfo; +use crate::{trimming, GeoIpLookup}; /// Replay validation or normalization error. /// @@ -82,11 +83,17 @@ pub fn validate(replay: &Replay) -> Result<(), ReplayError> { pub fn normalize( replay: &mut Annotated, client_ip: Option, - user_agent: &RawUserAgentInfo<&str>, + user_agent: RawUserAgentInfo<&str>, + geoip_lookup: Option<&GeoIpLookup>, ) { let _ = processor::apply(replay, |replay_value, meta| { normalize_platform(replay_value); normalize_ip_address(replay_value, client_ip); + if let Some(geoip_lookup) = geoip_lookup { + if let Some(user) = replay_value.user.value_mut() { + normalize_user_geoinfo(geoip_lookup, user); + } + } normalize_user_agent(replay_value, user_agent); normalize_type(replay_value); normalize_array_fields(replay_value); @@ -124,7 +131,7 @@ fn normalize_ip_address(replay: &mut Replay, ip_address: Option) { ); } -fn normalize_user_agent(replay: &mut Replay, default_user_agent: &RawUserAgentInfo<&str>) { +fn normalize_user_agent(replay: &mut Replay, default_user_agent: RawUserAgentInfo<&str>) { let headers = match replay .request .value() @@ -139,11 +146,11 @@ fn normalize_user_agent(replay: &mut Replay, default_user_agent: &RawUserAgentIn let user_agent_info = if user_agent_info.is_empty() { default_user_agent } else { - &user_agent_info + user_agent_info }; let contexts = replay.contexts.get_or_insert_with(Contexts::new); - user_agent::normalize_user_agent_info_generic(contexts, &replay.platform, user_agent_info); + user_agent::normalize_user_agent_info_generic(contexts, &replay.platform, &user_agent_info); } fn normalize_platform(replay: &mut Replay) { @@ -253,7 +260,7 @@ mod tests { let payload = include_str!("../../tests/fixtures/replay.json"); let mut replay: Annotated = Annotated::from_json(payload).unwrap(); - normalize(&mut replay, None, &RawUserAgentInfo::default()); + normalize(&mut replay, None, RawUserAgentInfo::default(), None); let contexts = get_value!(replay.contexts!); assert_eq!( @@ -290,12 +297,17 @@ mod tests { let mut replay: Annotated = Annotated::from_json(payload).unwrap(); // No user object and no ip-address was provided. - normalize(&mut replay, None, &RawUserAgentInfo::default()); + normalize(&mut replay, None, RawUserAgentInfo::default(), None); assert_eq!(get_value!(replay.user), None); // No user object but an ip-address was provided. let ip_address = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); - normalize(&mut replay, Some(ip_address), &RawUserAgentInfo::default()); + normalize( + &mut replay, + Some(ip_address), + RawUserAgentInfo::default(), + None, + ); let ipaddr = get_value!(replay.user!).ip_address.as_str(); assert_eq!(Some("127.0.0.1"), ipaddr); @@ -309,7 +321,12 @@ mod tests { let payload = include_str!("../../tests/fixtures/replay_missing_user_ip_address.json"); let mut replay: Annotated = Annotated::from_json(payload).unwrap(); - normalize(&mut replay, Some(ip_address), &RawUserAgentInfo::default()); + normalize( + &mut replay, + Some(ip_address), + RawUserAgentInfo::default(), + None, + ); let ipaddr = get_value!(replay.user!).ip_address.as_str(); assert_eq!(Some("127.0.0.1"), ipaddr); @@ -320,7 +337,7 @@ mod tests { let payload = include_str!("../../tests/fixtures/replay_failure_22_08_31.json"); let mut replay: Annotated = Annotated::from_json(payload).unwrap(); - normalize(&mut replay, None, &RawUserAgentInfo::default()); + normalize(&mut replay, None, RawUserAgentInfo::default(), None); let user = get_value!(replay.user!); assert_eq!(user.ip_address.as_str(), Some("127.1.1.1")); @@ -444,7 +461,7 @@ mod tests { let json = format!(r#"{{"dist": "{}"}}"#, "0".repeat(100)); let mut replay = Annotated::::from_json(json.as_str()).unwrap(); - normalize(&mut replay, None, &RawUserAgentInfo::default()); + normalize(&mut replay, None, RawUserAgentInfo::default(), None); assert_annotated_snapshot!(replay, @r#"{ "platform": "other", "dist": "0000000000000000000000000000000000000000000000000000000000000...", diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 1d4c93f17fd..c7ebfbbefc8 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -1835,7 +1835,11 @@ impl EnvelopeProcessorService { &self, state: &mut ProcessEnvelopeState, ) -> Result<(), ProcessingError> { - replay::process(state, &self.inner.global_config.current())?; + replay::process( + state, + &self.inner.global_config.current(), + self.inner.geoip_lookup.as_ref(), + )?; if_processing!(self.inner.config, { self.enforce_quotas(state)?; }); diff --git a/relay-server/src/services/processor/replay.rs b/relay-server/src/services/processor/replay.rs index 5efc6d8fe4c..62068b40a0b 100644 --- a/relay-server/src/services/processor/replay.rs +++ b/relay-server/src/services/processor/replay.rs @@ -6,7 +6,7 @@ use bytes::Bytes; use relay_base_schema::project::ProjectId; use relay_dynamic_config::{Feature, GlobalConfig, ProjectConfig}; use relay_event_normalization::replay::{self, ReplayError}; -use relay_event_normalization::RawUserAgentInfo; +use relay_event_normalization::{GeoIpLookup, RawUserAgentInfo}; use relay_event_schema::processor::{self, ProcessingState}; use relay_event_schema::protocol::{EventId, Replay}; use relay_pii::PiiProcessor; @@ -24,91 +24,82 @@ use crate::statsd::{RelayCounters, RelayTimers}; pub fn process( state: &mut ProcessEnvelopeState, global_config: &GlobalConfig, + geoip_lookup: Option<&GeoIpLookup>, ) -> Result<(), ProcessingError> { - let project_state = &state.project_state; - let replays_disabled = state.should_filter(Feature::SessionReplay); - let scrubbing_enabled = project_state.has_feature(Feature::SessionReplayRecordingScrubbing); - let replay_video_disabled = project_state.has_feature(Feature::SessionReplayVideoDisabled); - let project_id = project_state.project_id; - let organization_id = project_state.organization_id; - - let meta = state.envelope().meta().clone(); - let client_addr = meta.client_addr(); - let event_id = state.envelope().event_id(); - - let limit = state.config.max_replay_uncompressed_size(); - let project_config = project_state.config(); - let datascrubbing_config = project_config - .datascrubbing_settings - .pii_config() - .map_err(|e| ProcessingError::PiiConfigError(e.clone()))? - .as_ref(); - let mut scrubber = RecordingScrubber::new( - limit, - project_config.pii_config.as_ref(), - datascrubbing_config, - ); - - let user_agent = &RawUserAgentInfo { - user_agent: meta.user_agent(), - client_hints: meta.client_hints().as_deref(), - }; - let combined_envelope_items = - project_state.has_feature(Feature::SessionReplayCombinedEnvelopeItems); - // If the replay feature is not enabled drop the items silenty. - if replays_disabled { + if state.should_filter(Feature::SessionReplay) { state.managed_envelope.drop_items_silently(); return Ok(()); } // If the replay video feature is not enabled check the envelope items for a // replay video event. - if replay_video_disabled && count_replay_video_events(state) > 0 { + if state.should_filter(Feature::SessionReplayVideoDisabled) + && count_replay_video_events(state) > 0 + { state.managed_envelope.drop_items_silently(); return Ok(()); } + let rs = { + let meta = state.envelope().meta(); + + ReplayState { + config: &state.project_state.config, + global_config, + geoip_lookup, + event_id: state.envelope().event_id(), + project_id: state.project_state.project_id, + organization_id: state.project_state.organization_id, + client_addr: meta.client_addr(), + user_agent: RawUserAgentInfo { + user_agent: meta.user_agent().map(|s| s.to_owned()), + client_hints: meta.client_hints().clone(), + }, + } + }; + + let mut scrubber = if state + .project_state + .has_feature(Feature::SessionReplayRecordingScrubbing) + { + let datascrubbing_config = rs + .config + .datascrubbing_settings + .pii_config() + .map_err(|e| ProcessingError::PiiConfigError(e.clone()))? + .as_ref(); + + Some(RecordingScrubber::new( + state.config.max_replay_uncompressed_size(), + rs.config.pii_config.as_ref(), + datascrubbing_config, + )) + } else { + None + }; + for item in state.managed_envelope.envelope_mut().items_mut() { - // Set the combined payload header to the value of the combined feature. - item.set_replay_combined_payload(combined_envelope_items); + if state + .project_state + .has_feature(Feature::SessionReplayCombinedEnvelopeItems) + { + item.set_replay_combined_payload(true); + } match item.ty() { ItemType::ReplayEvent => { - let replay_event = handle_replay_event_item( - item.payload(), - &event_id, - project_config, - global_config, - client_addr, - user_agent, - project_id, - organization_id, - )?; + let replay_event = handle_replay_event_item(item.payload(), &rs)?; item.set_payload(ContentType::Json, replay_event); } ItemType::ReplayRecording => { - let replay_recording = handle_replay_recording_item( - item.payload(), - &event_id, - scrubbing_enabled, - &mut scrubber, - )?; + let replay_recording = + handle_replay_recording_item(item.payload(), scrubber.as_mut())?; item.set_payload(ContentType::OctetStream, replay_recording); } ItemType::ReplayVideo => { - let replay_video = handle_replay_video_item( - item.payload(), - &event_id, - project_config, - global_config, - client_addr, - user_agent, - scrubbing_enabled, - &mut scrubber, - project_id, - organization_id, - )?; + let replay_video = + handle_replay_video_item(item.payload(), scrubber.as_mut(), &rs)?; item.set_payload(ContentType::OctetStream, replay_video); } _ => {} @@ -118,29 +109,32 @@ pub fn process( Ok(()) } +#[derive(Debug)] +struct ReplayState<'a> { + pub config: &'a ProjectConfig, + pub global_config: &'a GlobalConfig, + pub geoip_lookup: Option<&'a GeoIpLookup>, + pub event_id: Option, + pub project_id: Option, + pub organization_id: Option, + pub client_addr: Option, + pub user_agent: RawUserAgentInfo, +} + // Replay Event Processing. -#[allow(clippy::too_many_arguments)] fn handle_replay_event_item( payload: Bytes, - event_id: &Option, - config: &ProjectConfig, - global_config: &GlobalConfig, - client_ip: Option, - user_agent: &RawUserAgentInfo<&str>, - project_id: Option, - organization_id: Option, + state: &ReplayState<'_>, ) -> Result { - let filter_settings = &config.filter_settings; - - match process_replay_event(&payload, config, client_ip, user_agent) { + match process_replay_event(&payload, state) { Ok(replay) => { if let Some(replay_type) = replay.value() { relay_filter::should_filter( replay_type, - client_ip, - filter_settings, - global_config.filters(), + state.client_addr, + &state.config.filter_settings, + state.global_config.filters(), ) .map_err(ProcessingError::ReplayFiltered)?; @@ -151,9 +145,9 @@ fn handle_replay_event_item( metric!(counter(RelayCounters::ReplayExceededSegmentLimit) += 1); relay_log::warn!( - ?event_id, - project_id = project_id.map(|v| v.value()), - organization_id = organization_id, + event_id = ?state.event_id, + project_id = state.project_id.map(|v| v.value()), + organization_id = state.organization_id, segment_id = segment_id, "replay segment-exceeded-limit" ); @@ -166,7 +160,7 @@ fn handle_replay_event_item( Err(error) => { relay_log::error!( error = &error as &dyn Error, - ?event_id, + event_id = ?state.event_id, "failed to serialize replay" ); Ok(payload) @@ -176,9 +170,9 @@ fn handle_replay_event_item( Err(error) => { relay_log::warn!( error = &error as &dyn Error, - ?event_id, - project_id = project_id.map(|v| v.value()), - organization_id = organization_id, + event_id = ?state.event_id, + project_id = state.project_id.map(|v| v.value()), + organization_id = state.organization_id, "invalid replay event" ); Err(match error { @@ -202,9 +196,7 @@ fn handle_replay_event_item( /// Validates, normalizes, and scrubs PII from a replay event. fn process_replay_event( payload: &[u8], - config: &ProjectConfig, - client_ip: Option, - user_agent: &RawUserAgentInfo<&str>, + state: &ReplayState<'_>, ) -> Result, ReplayError> { let mut replay = Annotated::::from_json_bytes(payload).map_err(ReplayError::CouldNotParse)?; @@ -214,18 +206,25 @@ fn process_replay_event( }; replay::validate(replay_value)?; - replay::normalize(&mut replay, client_ip, user_agent); + replay::normalize( + &mut replay, + state.client_addr, + state.user_agent.as_deref(), + state.geoip_lookup, + ); - if let Some(ref config) = config.pii_config { + if let Some(ref config) = state.config.pii_config { let mut processor = PiiProcessor::new(config.compiled()); processor::process_value(&mut replay, &mut processor, ProcessingState::root()) .map_err(|e| ReplayError::CouldNotScrub(e.to_string()))?; } - let pii_config = config + let pii_config = state + .config .datascrubbing_settings .pii_config() .map_err(|e| ReplayError::CouldNotScrub(e.to_string()))?; + if let Some(config) = pii_config { let mut processor = PiiProcessor::new(config.compiled()); processor::process_value(&mut replay, &mut processor, ProcessingState::root()) @@ -239,13 +238,14 @@ fn process_replay_event( fn handle_replay_recording_item( payload: Bytes, - event_id: &Option, - scrubbing_enabled: bool, - scrubber: &mut RecordingScrubber, + scrubber: Option<&mut RecordingScrubber>, ) -> Result { // XXX: Processing is there just for data scrubbing. Skip the entire expensive // processing step if we do not need to scrub. - if !scrubbing_enabled || scrubber.is_empty() { + let Some(scrubber) = scrubber else { + return Ok(payload); + }; + if scrubber.is_empty() { return Ok(payload); } @@ -253,19 +253,11 @@ fn handle_replay_recording_item( // decompressed temporarily and then immediately re-compressed. However, to // limit memory pressure, we use the replay limit as a good overall limit for // allocations. - let parsed_recording = metric!(timer(RelayTimers::ReplayRecordingProcessing), { + metric!(timer(RelayTimers::ReplayRecordingProcessing), { scrubber.process_recording(&payload) - }); - - match parsed_recording { - Ok(recording) => Ok(recording.into()), - Err(e) => { - relay_log::warn!("replay-recording-event: {e} {event_id:?}"); - Err(ProcessingError::InvalidReplay( - DiscardReason::InvalidReplayRecordingEvent, - )) - } - } + }) + .map(Into::into) + .map_err(|_| ProcessingError::InvalidReplay(DiscardReason::InvalidReplayRecordingEvent)) } // Replay Video Processing @@ -277,48 +269,23 @@ struct ReplayVideoEvent { replay_video: Bytes, } -#[allow(clippy::too_many_arguments)] fn handle_replay_video_item( payload: Bytes, - event_id: &Option, - config: &ProjectConfig, - global_config: &GlobalConfig, - client_ip: Option, - user_agent: &RawUserAgentInfo<&str>, - scrubbing_enabled: bool, - scrubber: &mut RecordingScrubber, - project_id: Option, - organization_id: Option, + scrubber: Option<&mut RecordingScrubber>, + state: &ReplayState<'_>, ) -> Result { let ReplayVideoEvent { replay_event, replay_recording, replay_video, - } = match rmp_serde::from_slice(&payload) { - Ok(result) => result, - Err(e) => { - relay_log::warn!("replay-video-event: {e} {event_id:?}"); - return Err(ProcessingError::InvalidReplay( - DiscardReason::InvalidReplayVideoEvent, - )); - } - }; + } = rmp_serde::from_slice(&payload) + .map_err(|_| ProcessingError::InvalidReplay(DiscardReason::InvalidReplayVideoEvent))?; // Process as a replay-event envelope item. - let replay_event = handle_replay_event_item( - replay_event, - event_id, - config, - global_config, - client_ip, - user_agent, - project_id, - organization_id, - )?; + let replay_event = handle_replay_event_item(replay_event, state)?; // Process as a replay-recording envelope item. - let replay_recording = - handle_replay_recording_item(replay_recording, event_id, scrubbing_enabled, scrubber)?; + let replay_recording = handle_replay_recording_item(replay_recording, scrubber)?; // Verify the replay-video payload is not empty. if replay_video.is_empty() {