Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Forward profiles of non-sampled transactions #2940

Merged
merged 98 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from 92 commits
Commits
Show all changes
98 commits
Select commit Hold shift + click to select a range
53fa699
Add support (feature flagged) to ingest non-sampled profiles
viglia Aug 2, 2023
59456ff
Remove leftover comment
viglia Aug 2, 2023
8642f7e
Fix errors
viglia Aug 2, 2023
9375cba
Undo transaction_count changes
viglia Aug 4, 2023
eb0fb13
fix(server): Allow partial update of envelope summary during processi…
jjbayer Aug 4, 2023
683a89b
ref(processor): Split envelopes by item type
olksdr Dec 14, 2023
2dfaee9
Set broken config and make sure the realy is unhealthy
olksdr Dec 15, 2023
9da7bbe
ref
olksdr Dec 15, 2023
8e52ce7
Init per-type processing pipeline
olksdr Dec 15, 2023
87b272c
Migrate last steps in proc pipe
olksdr Dec 15, 2023
d73b2ea
Add separate Attachment branch, and eliminate additional condition
olksdr Dec 16, 2023
1184dcc
Merge branch 'master' into ref/processing-pre-item
olksdr Dec 18, 2023
6cb73f8
Merge branch 'master' into ref/processing-pre-item
olksdr Dec 18, 2023
d5d62f4
Small cleanup
olksdr Dec 18, 2023
1be6f31
Merge branch 'master' into ref/processing-pre-item
olksdr Dec 18, 2023
9c74ba2
Merge branch 'master' into ref/processing-pre-item
olksdr Dec 18, 2023
46a5bb5
Merge branch 'master' into viglia/feat/ingest-non-sampled-profiles
viglia Dec 18, 2023
f5ca37e
Add debug test_relay_chain_keep_unsampled_profile
viglia Dec 19, 2023
e80febb
Collect transaction-related item separatelly from everything else
olksdr Dec 19, 2023
aa91ded
Merge branch 'ref/processing-pre-item' of github.com:getsentry/relay …
olksdr Dec 19, 2023
ff1bbcf
Debug dropping event
viglia Dec 19, 2023
1d44852
Merge branch 'master' into ref/processing-pre-item
olksdr Dec 19, 2023
8f244f5
fix typos
olksdr Dec 19, 2023
783ba75
Add transactionMetrics config
viglia Dec 19, 2023
d3dabfa
Merge branch 'master' into ref/processing-pre-item
olksdr Dec 20, 2023
cca0f0f
Merge remote-tracking branch 'origin/master' into ref/processing-pre-…
olksdr Dec 20, 2023
37a20be
Remove `has_event` check from some types, which do not create an event
olksdr Dec 20, 2023
2c055dd
ref: split envelopes into more defined groups
olksdr Dec 21, 2023
ac4e7c3
fix lint
olksdr Dec 21, 2023
fe83c59
Add options that we pass down from Sentry
viglia Dec 21, 2023
3e8e529
Merge branch 'master' into ref/processing-pre-item
olksdr Dec 21, 2023
cedb222
Merge branch 'master' into ref/processing-pre-item
olksdr Dec 22, 2023
0842a72
Merge branch 'master' into ref/processing-pre-item
olksdr Jan 2, 2024
31fe575
rename event -> main_item_type
olksdr Jan 2, 2024
23fbe81
func per processible item type
olksdr Jan 2, 2024
b077694
Merge branch 'ref/processing-pre-item' of github.com:getsentry/relay …
olksdr Jan 2, 2024
6aa7f5c
Merge branch 'master' into ref/processing-pre-item
olksdr Jan 2, 2024
ec8306b
Merge branch 'master' into ref/processing-pre-item
olksdr Jan 4, 2024
9c7f19a
Address review feedback
olksdr Jan 4, 2024
11aa78d
typos and reset healthcheck to master
olksdr Jan 4, 2024
0dee0e8
Merge remote-tracking branch 'origin/master' into ref/processing-pre-…
olksdr Jan 4, 2024
f262576
improve api
olksdr Jan 4, 2024
a21e1d5
Add initial docs to the process funcions
olksdr Jan 4, 2024
72d41dc
follow the naming pattern
olksdr Jan 4, 2024
8898ae3
Set the processing group in the managed envelope context
olksdr Jan 5, 2024
ecdcb89
Merge branch 'master' into ref/processing-pre-item
olksdr Jan 5, 2024
a91f47e
Add fallback to the old impl of the process_state
olksdr Jan 5, 2024
e1f38c0
Split envelope must be the same as from envelope on the processing group
olksdr Jan 5, 2024
dc278a9
Merge remote-tracking branch 'origin/master' into ref/processing-pre-…
olksdr Jan 8, 2024
c10ca25
Use split_envelope function everywhere, and provide processing group …
olksdr Jan 8, 2024
0660430
Add an empty item to the envelope
olksdr Jan 8, 2024
f1d341b
Add item types in to the error on the Unknown processing group
olksdr Jan 8, 2024
3f76275
review comments: make the split envelope a bit easier to read
olksdr Jan 8, 2024
b65696e
Merge remote-tracking branch 'origin/master' into ref/processing-pre-…
olksdr Jan 9, 2024
a88031a
revert unknown items test
olksdr Jan 9, 2024
8207b05
review comments
olksdr Jan 9, 2024
5181480
ref(processor): Sample items, not envelopes
jjbayer Jan 11, 2024
dc36053
Apply suggestions from code review
jjbayer Jan 11, 2024
5b7862e
test: Add attachment to envelope to increase test coverage
jjbayer Jan 12, 2024
32a7662
Merge branch 'ref/ds-items' into feat/ingest-non-sampled2
jjbayer Jan 12, 2024
5cd6c1e
fix: Allow profiles without transactions
jjbayer Jan 12, 2024
6ec1e09
Add headers to ProfileKafkaMessage
viglia Jan 17, 2024
d3dedb1
Use map instead of and_then
viglia Jan 17, 2024
6b50e75
Change organizations:profiling-ingest-unsampled-profiles to projects:…
viglia Jan 19, 2024
994a957
Add profile_metrics_kill_switch option
viglia Jan 19, 2024
5ea531b
Merge remote-tracking branch 'origin/master' into feat/ingest-non-sam…
jjbayer Jan 19, 2024
f6217cc
ref: Type sampled field
jjbayer Jan 19, 2024
c962602
fix
jjbayer Jan 19, 2024
0e2d97e
fix
jjbayer Jan 19, 2024
129bd1a
feat: Respect options
jjbayer Jan 19, 2024
2807b22
fix
jjbayer Jan 19, 2024
1bf8d55
Monkeypatch GLOBAL_CONFIG to add desired options
viglia Jan 22, 2024
9b4b165
Complete test_relay_chain_keep_unsampled_profile test with asserts
viglia Jan 23, 2024
e1fa3e5
Fix linting
viglia Jan 23, 2024
cc85214
check explicitly for sampled is False in the test assertion
viglia Jan 23, 2024
56f4396
Add link to functions aggregation
viglia Jan 23, 2024
50e61a5
Capitalize options description
viglia Jan 23, 2024
99c9f76
removed unused as_bool implementation
viglia Jan 23, 2024
507fb77
Fix url to make lint happy
viglia Jan 23, 2024
be6c0f0
Update changelog
viglia Jan 23, 2024
fc9517c
Merge remote-tracking branch 'origin' into feat/ingest-non-sampled2
viglia Jan 23, 2024
9b44f5e
increase mini_sentry.captured_events.get timeout to 2 secs
viglia Jan 23, 2024
3755a74
Bump mini_sentry.captured_events.get to 5 secs
viglia Jan 23, 2024
02bd4fc
Fix test
viglia Jan 23, 2024
358592a
Merge branch 'master' into feat/ingest-non-sampled2
jjbayer Jan 24, 2024
9f5d64e
test: transaction metadata
jjbayer Jan 25, 2024
55a109d
fix: expand profile
jjbayer Jan 25, 2024
67aa46a
Merge remote-tracking branch 'origin/master' into feat/ingest-non-sam…
jjbayer Jan 25, 2024
e27a4ea
fix: default features
jjbayer Jan 25, 2024
5b20b1e
Update relay-server/src/services/processor.rs
jjbayer Jan 25, 2024
fe6c2b0
Apply suggestions from code review
jjbayer Jan 25, 2024
39befff
code review
jjbayer Jan 25, 2024
a369fbe
ref: simplify header handling
jjbayer Jan 25, 2024
60a00af
test: kafka header
jjbayer Jan 25, 2024
81c988b
fix: Don't try to be clever
jjbayer Jan 25, 2024
f31cb1d
Merge branch 'master' into feat/ingest-non-sampled2
jjbayer Jan 25, 2024
9c1a361
rename
jjbayer Jan 26, 2024
96652bd
Update CHANGELOG.md
jjbayer Jan 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
- Copy event measurements to span & normalize span measurements. ([#2953](https://github.com/getsentry/relay/pull/2953))
- Add `allow_negative` to `BuiltinMeasurementKey`. Filter out negative BuiltinMeasurements if `allow_negative` is false. ([#2982](https://github.com/getsentry/relay/pull/2982))
- Add possiblity to block metrics or their tags with glob-patterns. ([#2954](https://github.com/getsentry/relay/pull/2954), [#2973](https://github.com/getsentry/relay/pull/2973))
- Forward profiles of non-sampled transactions ([#2940](https://github.com/getsentry/relay/pull/2940))
jjbayer marked this conversation as resolved.
Show resolved Hide resolved

**Bug Fixes**:

Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions relay-dynamic-config/src/feature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ pub enum Feature {
/// Enable the Relay cardinality limiter.
#[serde(rename = "organizations:relay-cardinality-limiter")]
CardinalityLimiter,
/// Enable processing and extracting data from profiles that would normally be dropped by dynamic sampling.
///
/// This is required for [slowest function aggregation](https://github.com/getsentry/snuba/blob/b5311b404a6bd73a9e1997a46d38e7df88e5f391/snuba/snuba_migrations/functions/0001_functions.py#L209-L256). The profile payload will be dropped on the sentry side.
#[serde(rename = "projects:profiling-ingest-unsampled-profiles")]
IngestUnsampledProfiles,

/// Deprecated, still forwarded for older downstream Relays.
#[serde(rename = "organizations:transaction-name-mark-scrubbed-as-sanitized")]
Expand Down
19 changes: 14 additions & 5 deletions relay-dynamic-config/src/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,19 @@ impl GlobalConfig {
#[serde(default, rename_all = "camelCase")]
#[cfg_attr(feature = "jsonschema", derive(JsonSchema))]
pub struct Options {
// Example:
// ```rs
// #[serde(default, rename = "relay.some-option.name")]
// pub some_option: Vec<u32>,
// ```
/// List of platform names for which we allow using unsampled profiles for the purpose
/// of improving profile (function) metrics
#[serde(rename = "profiling.profile_metrics.unsampled_profiles.platforms")]
pub profile_metrics_allowed_platforms: Vec<String>,

/// Sample rate for tuning the amount of unsampled profiles that we "let through"
#[serde(rename = "profiling.profile_metrics.unsampled_profiles.sample_rate")]
pub profile_metrics_sample_rate: f32,

/// Kill switch for shutting down profile metrics
#[serde(rename = "profiling.profile_metrics.unsampled_profiles.enabled")]
pub unsampled_profiles_enabled: bool,

/// All other unknown options.
#[serde(flatten)]
other: HashMap<String, Value>,
Expand All @@ -75,6 +83,7 @@ mod tests {
}),
options: Some(Options {
other: HashMap::from([("relay.unknown".to_owned(), Value::Bool(true))]),
..Default::default()
}),
};

Expand Down
1 change: 1 addition & 0 deletions relay-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ mime_guess = { version = "2.0.4", optional = true }
minidump = { version = "0.15.2", optional = true }
multer = "2.0.4"
once_cell = { workspace = true }
rand = { workspace = true }
regex = { workspace = true }
relay-auth = { path = "../relay-auth" }
relay-aws-extension = { path = "../relay-aws-extension" }
Expand Down
19 changes: 19 additions & 0 deletions relay-server/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,14 @@ pub struct ItemHeaders {
#[serde(default, skip_serializing_if = "is_false")]
metrics_extracted: bool,

/// `false` if the sampling decision is "drop".
///
/// In the most common use case, the item is dropped when the sampling decision is "drop".
/// For profiles with the feature enabled, however, we keep all profile items and mark the ones
/// for which the transaction was dropped as `sampled: false`.
#[serde(default, skip_serializing_if = "Option::is_none")]
Dav1dde marked this conversation as resolved.
Show resolved Hide resolved
sampled: Option<bool>,

/// Other attributes for forward compatibility.
#[serde(flatten)]
other: BTreeMap<String, Value>,
Expand Down Expand Up @@ -564,6 +572,7 @@ impl Item {
sample_rates: None,
other: BTreeMap::new(),
metrics_extracted: false,
sampled: None,
},
payload: Bytes::new(),
}
Expand Down Expand Up @@ -739,6 +748,16 @@ impl Item {
self.headers.metrics_extracted = metrics_extracted;
}

/// Gets the `sampled` flag.
pub fn sampled(&self) -> Option<bool> {
Dav1dde marked this conversation as resolved.
Show resolved Hide resolved
self.headers.sampled
}

/// Sets the `sampled` flag.
pub fn set_sampled(&mut self, sampled: bool) {
self.headers.sampled = Some(sampled);
}

/// Returns the specified header value, if present.
pub fn get_header<K>(&self, name: &K) -> Option<&Value>
where
Expand Down
21 changes: 13 additions & 8 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1193,7 +1193,11 @@ impl EnvelopeProcessorService {
self.extract_metrics(state)?;
}

dynamic_sampling::sample_envelope_items(state);
dynamic_sampling::sample_envelope_items(
state,
&self.inner.config,
&self.inner.global_config.current(),
);

if_processing!(self.inner.config, {
event::store(state, &self.inner.config, self.inner.geoip_lookup.as_ref())?;
Expand All @@ -1213,11 +1217,8 @@ impl EnvelopeProcessorService {
Ok(())
}

/// Processes standalone attachments.
fn process_standalone_attachments(
&self,
state: &mut ProcessEnvelopeState,
) -> Result<(), ProcessingError> {
/// Processes standalone items that require an event ID, but do not have an event on the same envelope.
fn process_standalone(&self, state: &mut ProcessEnvelopeState) -> Result<(), ProcessingError> {
profile::filter(state);

if_processing!(self.inner.config, {
Expand Down Expand Up @@ -1337,7 +1338,11 @@ impl EnvelopeProcessorService {
self.extract_metrics(state)?;
}

dynamic_sampling::sample_envelope_items(state);
dynamic_sampling::sample_envelope_items(
state,
&self.inner.config,
&self.inner.global_config.current(),
);

if_processing!(self.inner.config, {
event::store(state, &self.inner.config, self.inner.geoip_lookup.as_ref())?;
Expand Down Expand Up @@ -1379,7 +1384,7 @@ impl EnvelopeProcessorService {
ProcessingGroup::Error => self.process_errors(state)?,
ProcessingGroup::Transaction => self.process_transactions(state)?,
ProcessingGroup::Session => self.process_sessions(state)?,
ProcessingGroup::Standalone => self.process_standalone_attachments(state)?,
ProcessingGroup::Standalone => self.process_standalone(state)?,
ProcessingGroup::ClientReport => self.process_client_reports(state)?,
ProcessingGroup::Replay => self.process_replays(state)?,
ProcessingGroup::CheckIn => self.process_checkins(state)?,
Expand Down
61 changes: 51 additions & 10 deletions relay-server/src/services/processor/dynamic_sampling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@ use std::ops::ControlFlow;
use chrono::Utc;
use relay_base_schema::events::EventType;
use relay_config::Config;
use relay_dynamic_config::ErrorBoundary;
use relay_dynamic_config::{ErrorBoundary, Feature, GlobalConfig};
use relay_event_schema::protocol::{Contexts, Event, TraceContext};
use relay_protocol::{Annotated, Empty};
use relay_sampling::config::{RuleType, SamplingMode};
use relay_sampling::evaluation::{ReservoirEvaluator, SamplingEvaluator};
use relay_sampling::{DynamicSamplingContext, SamplingConfig};

use crate::envelope::ItemType;
use crate::services::outcome::Outcome;
use crate::services::processor::ProcessEnvelopeState;
use crate::utils::{self, SamplingResult};
use crate::services::processor::{profile, ProcessEnvelopeState};
use crate::utils::{self, ItemAction, SamplingResult};

/// Ensures there is a valid dynamic sampling context and corresponding project state.
///
Expand Down Expand Up @@ -96,17 +97,31 @@ pub fn run(state: &mut ProcessEnvelopeState, config: &Config) {
}

/// Apply the dynamic sampling decision from `compute_sampling_decision`.
pub fn sample_envelope_items(state: &mut ProcessEnvelopeState) {
pub fn sample_envelope_items(
state: &mut ProcessEnvelopeState,
config: &Config,
global_config: &GlobalConfig,
) {
if let SamplingResult::Match(sampling_match) = std::mem::take(&mut state.sampling_result) {
// We assume that sampling is only supposed to work on transactions.
if state.event_type() == Some(EventType::Transaction) && sampling_match.should_drop() {
let matched_rules = sampling_match.into_matched_rules();
let Some(event) = state.event.value() else {
return;
};
if event.ty.value() == Some(&EventType::Transaction) && sampling_match.should_drop() {
let unsampled_profiles_enabled = forward_unsampled_profiles(state, global_config);

let matched_rules = sampling_match.into_matched_rules();
let outcome = Outcome::FilteredSampling(matched_rules.clone());
state
.managed_envelope
.retain_items(|_| utils::ItemAction::Drop(outcome.clone()));

state.managed_envelope.retain_items(|item| {
if unsampled_profiles_enabled && item.ty() == &ItemType::Profile {
item.set_sampled(false);
// Transfer metadata to the profile before the transaction gets dropped:
let (_, item_action) = profile::expand_profile(item, event, config);
item_action
} else {
ItemAction::Drop(outcome.clone())
}
});
// The event is no longer in the envelope, so we need to handle it separately:
state.reject_event(outcome);
}
Expand Down Expand Up @@ -226,6 +241,32 @@ pub fn tag_error_with_sampling_decision(state: &mut ProcessEnvelopeState, config
}
}

/// Determines whether profiles that would otherwise be dropped by dynamic sampling should be kept.
fn forward_unsampled_profiles(state: &ProcessEnvelopeState, global_config: &GlobalConfig) -> bool {
let Some(global_options) = &global_config.options else {
return false;
};

if !global_options.unsampled_profiles_enabled {
return false;
}

let event_platform = state
.event
.value()
.and_then(|e| e.platform.as_str())
.unwrap_or("");

state
.project_state
.has_feature(Feature::IngestUnsampledProfiles)
&& global_options
.profile_metrics_allowed_platforms
.iter()
.any(|s| s == event_platform)
&& rand::random::<f32>() < global_options.profile_metrics_sample_rate
}

#[cfg(test)]
mod tests {

Expand Down
64 changes: 40 additions & 24 deletions relay-server/src/services/processor/profile.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
//! Profiles related processor code.

#[cfg(feature = "processing")]
use {crate::envelope::ContentType, relay_config::Config, relay_dynamic_config::Feature};
use relay_dynamic_config::Feature;

use relay_base_schema::events::EventType;
use relay_event_schema::protocol::{Contexts, ProfileContext};
use relay_profiling::ProfileError;
use relay_config::Config;
use relay_event_schema::protocol::{Contexts, Event, ProfileContext};
use relay_profiling::{ProfileError, ProfileId};
use relay_protocol::Annotated;

use crate::envelope::ItemType;
use crate::envelope::{ContentType, Item, ItemType};
use crate::services::outcome::{DiscardReason, Outcome};
use crate::services::processor::ProcessEnvelopeState;
use crate::utils::ItemAction;
Expand All @@ -23,10 +24,14 @@ pub fn filter(state: &mut ProcessEnvelopeState) {
.count();
let mut profile_id = None;
state.managed_envelope.retain_items(|item| match item.ty() {
// Drop profile without a transaction in the same envelope.
ItemType::Profile if transaction_count == 0 => ItemAction::DropSilently,
// First profile found in the envelope, we'll keep it if metadata are valid.
ItemType::Profile if profile_id.is_none() => {
// Drop profile without a transaction in the same envelope.
let profile_allowed = transaction_count > 0 || matches!(item.sampled(), Some(false));
if !profile_allowed {
return ItemAction::DropSilently;
}

match relay_profiling::parse_metadata(&item.payload(), state.project_id) {
Ok(id) => {
profile_id = Some(id);
Expand Down Expand Up @@ -77,24 +82,9 @@ pub fn process(state: &mut ProcessEnvelopeState, config: &Config) {
let Some(event) = state.event.value() else {
return ItemAction::DropSilently;
};
match relay_profiling::expand_profile(&item.payload(), event) {
Ok((profile_id, payload)) => {
if payload.len() <= config.max_profile_size() {
found_profile_id = Some(profile_id);
item.set_payload(ContentType::Json, payload);
ItemAction::Keep
} else {
ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
relay_profiling::discard_reason(
relay_profiling::ProfileError::ExceedSizeLimit,
),
)))
}
}
Err(err) => ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
relay_profiling::discard_reason(err),
))),
}
let (profile_id, action) = expand_profile(item, event, config);
found_profile_id = profile_id;
action
}
_ => ItemAction::Keep,
});
Expand All @@ -110,6 +100,32 @@ pub fn process(state: &mut ProcessEnvelopeState, config: &Config) {
}
}

/// Transfers transaction metadata to profile and check its size.
pub fn expand_profile(
item: &mut Item,
event: &Event,
config: &Config,
) -> (Option<ProfileId>, ItemAction) {
let mut profile_id = None;
let item_action = match relay_profiling::expand_profile(&item.payload(), event) {
Ok((id, payload)) => {
profile_id = Some(id);
if payload.len() <= config.max_profile_size() {
item.set_payload(ContentType::Json, payload);
ItemAction::Keep
} else {
ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
relay_profiling::discard_reason(relay_profiling::ProfileError::ExceedSizeLimit),
)))
}
}
Err(err) => ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
relay_profiling::discard_reason(err),
))),
};
(profile_id, item_action)
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down
Loading