-
Notifications
You must be signed in to change notification settings - Fork 93
/
Copy pathreplay.rs
153 lines (135 loc) · 5.84 KB
/
replay.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
//! Replay related processor code.
use std::error::Error;
use std::net::IpAddr;
use bytes::Bytes;
use relay_config::Config;
use relay_dynamic_config::{Feature, ProjectConfig};
use relay_event_normalization::replay::{self, ReplayError};
use relay_event_normalization::RawUserAgentInfo;
use relay_event_schema::processor::{self, ProcessingState};
use relay_event_schema::protocol::Replay;
use relay_pii::PiiProcessor;
use relay_protocol::Annotated;
use relay_replays::recording::RecordingScrubber;
use relay_statsd::metric;
use crate::envelope::{ContentType, ItemType};
use crate::services::outcome::{DiscardReason, Outcome};
use crate::services::processor::{ProcessEnvelopeState, ProcessingError};
use crate::statsd::RelayTimers;
use crate::utils::ItemAction;
/// Removes replays if the feature flag is not enabled.
pub fn process(state: &mut ProcessEnvelopeState, config: &Config) -> Result<(), ProcessingError> {
let project_state = &state.project_state;
let replays_enabled = project_state.has_feature(Feature::SessionReplay);
let scrubbing_enabled = project_state.has_feature(Feature::SessionReplayRecordingScrubbing);
let meta = state.envelope().meta().clone();
let client_addr = meta.client_addr();
let event_id = state.envelope().event_id();
let limit = config.max_replay_uncompressed_size();
let project_config = project_state.config();
let datascrubbing_config = project_config
.datascrubbing_settings
.pii_config()
.map_err(|e| ProcessingError::PiiConfigError(e.clone()))?
.as_ref();
let mut scrubber = RecordingScrubber::new(
limit,
project_config.pii_config.as_ref(),
datascrubbing_config,
);
let user_agent = &RawUserAgentInfo {
user_agent: meta.user_agent(),
client_hints: meta.client_hints().as_deref(),
};
state.managed_envelope.retain_items(|item| match item.ty() {
ItemType::ReplayEvent => {
if !replays_enabled {
return ItemAction::DropSilently;
}
match process_replay_event(&item.payload(), project_config, client_addr, user_agent) {
Ok(replay) => match replay.to_json() {
Ok(json) => {
item.set_payload(ContentType::Json, json);
ItemAction::Keep
}
Err(error) => {
relay_log::error!(
error = &error as &dyn Error,
"failed to serialize replay"
);
ItemAction::Keep
}
},
Err(error) => {
relay_log::warn!(error = &error as &dyn Error, "invalid replay event");
ItemAction::Drop(Outcome::Invalid(match error {
ReplayError::NoContent => DiscardReason::InvalidReplayEventNoPayload,
ReplayError::CouldNotScrub(_) => DiscardReason::InvalidReplayEventPii,
ReplayError::CouldNotParse(_) => DiscardReason::InvalidReplayEvent,
ReplayError::InvalidPayload(_) => DiscardReason::InvalidReplayEvent,
}))
}
}
}
ItemType::ReplayRecording => {
if !replays_enabled {
return ItemAction::DropSilently;
}
// XXX: Processing is there just for data scrubbing. Skip the entire expensive
// processing step if we do not need to scrub.
if !scrubbing_enabled || scrubber.is_empty() {
return ItemAction::Keep;
}
// Limit expansion of recordings to the max replay size. The payload is
// decompressed temporarily and then immediately re-compressed. However, to
// limit memory pressure, we use the replay limit as a good overall limit for
// allocations.
let parsed_recording = metric!(timer(RelayTimers::ReplayRecordingProcessing), {
scrubber.process_recording(&item.payload())
});
match parsed_recording {
Ok(recording) => {
item.set_payload(ContentType::OctetStream, recording);
ItemAction::Keep
}
Err(e) => {
relay_log::warn!("replay-recording-event: {e} {event_id:?}");
ItemAction::Drop(Outcome::Invalid(DiscardReason::InvalidReplayRecordingEvent))
}
}
}
_ => ItemAction::Keep,
});
Ok(())
}
/// Validates, normalizes, and scrubs PII from a replay event.
fn process_replay_event(
payload: &Bytes,
config: &ProjectConfig,
client_ip: Option<IpAddr>,
user_agent: &RawUserAgentInfo<&str>,
) -> Result<Annotated<Replay>, ReplayError> {
let mut replay =
Annotated::<Replay>::from_json_bytes(payload).map_err(ReplayError::CouldNotParse)?;
if let Some(replay_value) = replay.value_mut() {
replay::validate(replay_value)?;
replay::normalize(replay_value, client_ip, user_agent);
} else {
return Err(ReplayError::NoContent);
}
if let Some(ref config) = config.pii_config {
let mut processor = PiiProcessor::new(config.compiled());
processor::process_value(&mut replay, &mut processor, ProcessingState::root())
.map_err(|e| ReplayError::CouldNotScrub(e.to_string()))?;
}
let pii_config = config
.datascrubbing_settings
.pii_config()
.map_err(|e| ReplayError::CouldNotScrub(e.to_string()))?;
if let Some(config) = pii_config {
let mut processor = PiiProcessor::new(config.compiled());
processor::process_value(&mut replay, &mut processor, ProcessingState::root())
.map_err(|e| ReplayError::CouldNotScrub(e.to_string()))?;
}
Ok(replay)
}