Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(quota): Rate limit attachments by item count #4377

Merged
merged 15 commits into from
Dec 16, 2024
10 changes: 6 additions & 4 deletions relay-quotas/src/quota.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,12 @@ impl ItemScoping<'_> {

/// The unit in which a data category is measured.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum CategoryUnit {
pub enum CategoryUnit {
/// Counts the number of items.
Count,
/// Counts the number of bytes across items.
Bytes,
Batched,
/// Counts the accumulated times across items.
Milliseconds,
}

Expand All @@ -199,9 +201,9 @@ impl CategoryUnit {
| DataCategory::ProfileChunk
| DataCategory::Uptime
| DataCategory::MetricSecond
| DataCategory::AttachmentItem => Some(Self::Count),
| DataCategory::AttachmentItem
| DataCategory::Session => Some(Self::Count),
DataCategory::Attachment => Some(Self::Bytes),
DataCategory::Session => Some(Self::Batched),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An aggregate session item does not support accurate counts, but this does not matter since sessions don't produce outcomes and are only rate possibly limited by setting quota to zero.

DataCategory::ProfileDuration => Some(Self::Milliseconds),

DataCategory::Unknown => None,
Expand Down
76 changes: 40 additions & 36 deletions relay-server/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use relay_quotas::DataCategory;
use relay_sampling::DynamicSamplingContext;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
use smallvec::{smallvec, SmallVec};

use crate::constants::DEFAULT_EVENT_RETENTION;
use crate::extractors::{PartialMeta, RequestMeta};
Expand Down Expand Up @@ -632,6 +632,14 @@ pub struct Item {
payload: Bytes,
}

/// Expresses the purpose of counting quantities.
///
/// Sessions are counted for rate limiting enforcement but not for outcome reporting.
pub enum CountFor {
RateLimits,
Outcomes,
}

impl Item {
/// Creates a new item with the given type.
pub fn new(ty: ItemType) -> Self {
Expand Down Expand Up @@ -670,13 +678,38 @@ impl Item {
/// Returns the number used for counting towards rate limits and producing outcomes.
///
/// For attachments, we count the number of bytes. Other items are counted as 1.
pub fn quantity(&self) -> usize {
pub fn quantities(&self, purpose: CountFor) -> SmallVec<[(DataCategory, usize); 1]> {
Copy link
Member Author

@jjbayer jjbayer Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function now unites the old quantity and outcome_category functions.

We might be able to absorb the index_category into this function in the future, and also replace the EnvelopeSummary with this.

match self.ty() {
ItemType::Attachment => self.len().max(1),
// NOTE: This is semantically wrong. An otel trace contains may contain many spans,
// but we cannot easily count these before converting the trace into a series of spans.
ItemType::OtelTracesData => 1,
_ => 1,
ItemType::Event => smallvec![(DataCategory::Error, 1)],
ItemType::Transaction => smallvec![(DataCategory::Transaction, 1)],
ItemType::Security | ItemType::RawSecurity => {
smallvec![(DataCategory::Security, 1)]
}
ItemType::Nel => smallvec![],
ItemType::UnrealReport => smallvec![(DataCategory::Error, 1)],
ItemType::Attachment => smallvec![
(DataCategory::Attachment, self.len().max(1)),
(DataCategory::AttachmentItem, 1)
],
ItemType::Session | ItemType::Sessions => match purpose {
CountFor::RateLimits => smallvec![(DataCategory::Session, 1)],
CountFor::Outcomes => smallvec![],
},
ItemType::Statsd | ItemType::MetricBuckets => smallvec![],
ItemType::FormData => smallvec![],
ItemType::UserReport => smallvec![],
ItemType::UserReportV2 => smallvec![(DataCategory::UserReportV2, 1)],
ItemType::Profile => smallvec![(DataCategory::Profile, 1)],
ItemType::ReplayEvent | ItemType::ReplayRecording | ItemType::ReplayVideo => {
smallvec![(DataCategory::Replay, 1)]
}
ItemType::ClientReport => smallvec![],
ItemType::CheckIn => smallvec![(DataCategory::Monitor, 1)],
ItemType::Span | ItemType::OtelSpan => smallvec![(DataCategory::Span, 1)],
// NOTE: semantically wrong, but too expensive to parse.
ItemType::OtelTracesData => smallvec![(DataCategory::Span, 1)],
ItemType::ProfileChunk => smallvec![(DataCategory::ProfileChunk, 1)], // TODO: should be seconds?
ItemType::Unknown(_) => smallvec![],
}
}

Expand All @@ -688,35 +721,6 @@ impl Item {
)
}

/// Returns the data category used for generating outcomes.
///
/// Returns `None` if outcomes are not generated for this type (e.g. sessions).
pub fn outcome_category(&self) -> Option<DataCategory> {
match self.ty() {
ItemType::Event => Some(DataCategory::Error),
ItemType::Transaction => Some(DataCategory::Transaction),
ItemType::Security | ItemType::RawSecurity => Some(DataCategory::Security),
ItemType::Nel => None,
ItemType::UnrealReport => Some(DataCategory::Error),
ItemType::Attachment => Some(DataCategory::Attachment),
ItemType::Session | ItemType::Sessions => None,
ItemType::Statsd | ItemType::MetricBuckets => None,
ItemType::FormData => None,
ItemType::UserReport => None,
ItemType::UserReportV2 => Some(DataCategory::UserReportV2),
ItemType::Profile => Some(DataCategory::Profile),
ItemType::ReplayEvent | ItemType::ReplayRecording | ItemType::ReplayVideo => {
Some(DataCategory::Replay)
}
ItemType::ClientReport => None,
ItemType::CheckIn => Some(DataCategory::Monitor),
ItemType::Span | ItemType::OtelSpan => Some(DataCategory::Span),
ItemType::OtelTracesData => None,
ItemType::ProfileChunk => Some(DataCategory::ProfileChunk),
ItemType::Unknown(_) => None,
}
}

/// Returns `true` if this item's payload is empty.
pub fn is_empty(&self) -> bool {
self.payload.is_empty()
Expand Down
24 changes: 11 additions & 13 deletions relay-server/src/services/processor/dynamic_sampling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use relay_sampling::config::RuleType;
use relay_sampling::evaluation::{ReservoirEvaluator, SamplingEvaluator};
use relay_sampling::{DynamicSamplingContext, SamplingConfig};

use crate::envelope::ItemType;
use crate::envelope::{CountFor, ItemType};
use crate::services::outcome::Outcome;
use crate::services::processor::{
EventProcessing, ProcessEnvelopeState, Sampling, TransactionGroup,
Expand Down Expand Up @@ -100,18 +100,16 @@ pub fn drop_unsampled_items(state: &mut ProcessEnvelopeState<TransactionGroup>,
.take_items_by(|item| *item.ty() != ItemType::Profile);

for item in dropped_items {
let Some(category) = item.outcome_category() else {
continue;
};

// Dynamic sampling only drops indexed items. Upgrade the category to the index
// category if one exists for this category, for example profiles will be upgraded to profiles indexed,
// but attachments are still emitted as attachments.
let category = category.index_category().unwrap_or(category);

state
.managed_envelope
.track_outcome(outcome.clone(), category, item.quantity());
for (category, quantity) in item.quantities(CountFor::Outcomes) {
// Dynamic sampling only drops indexed items. Upgrade the category to the index
// category if one exists for this category, for example profiles will be upgraded to profiles indexed,
// but attachments are still emitted as attachments.
let category = category.index_category().unwrap_or(category);

state
.managed_envelope
.track_outcome(outcome.clone(), category, quantity);
}
}

// Mark all remaining items in the envelope as un-sampled.
Expand Down
14 changes: 7 additions & 7 deletions relay-server/src/utils/managed_envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use chrono::{DateTime, Utc};
use relay_quotas::{DataCategory, Scoping};
use relay_system::Addr;

use crate::envelope::{Envelope, Item};
use crate::envelope::{CountFor, Envelope, Item};
use crate::extractors::RequestMeta;
use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
use crate::services::processor::{Processed, ProcessingGroup};
Expand Down Expand Up @@ -52,7 +52,6 @@ pub enum ItemAction {
/// Keep the item.
Keep,
/// Drop the item and log an outcome for it.
/// The outcome will only be logged if the item has a corresponding [`Item::outcome_category()`].
Drop(Outcome),
/// Drop the item without logging an outcome.
DropSilently,
Expand Down Expand Up @@ -263,12 +262,13 @@ impl ManagedEnvelope {
ItemAction::Keep => true,
ItemAction::DropSilently => false,
ItemAction::Drop(outcome) => {
if let Some(category) = item.outcome_category() {
for (category, quantity) in item.quantities(CountFor::Outcomes) {
if let Some(indexed) = category.index_category() {
outcomes.push((outcome.clone(), indexed, item.quantity()));
outcomes.push((outcome.clone(), indexed, quantity));
};
outcomes.push((outcome, category, item.quantity()));
};
outcomes.push((outcome.clone(), category, quantity));
}

false
}
});
Expand Down Expand Up @@ -360,7 +360,7 @@ impl ManagedEnvelope {
tags.has_transactions = summary.secondary_transaction_quantity > 0,
tags.has_span_metrics = summary.secondary_span_quantity > 0,
tags.has_replays = summary.replay_quantity > 0,
tags.has_checkins = summary.checkin_quantity > 0,
tags.has_checkins = summary.monitor_quantity > 0,
tags.event_category = ?summary.event_category,
cached_summary = ?summary,
recomputed_summary = ?EnvelopeSummary::compute(self.envelope()),
Expand Down
Loading
Loading