Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(quota): Rate limit attachments by item count #4377

Merged
merged 15 commits into from
Dec 16, 2024
10 changes: 6 additions & 4 deletions relay-quotas/src/quota.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,12 @@ impl ItemScoping<'_> {

/// The unit in which a data category is measured.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum CategoryUnit {
pub enum CategoryUnit {
/// Counts the number of items.
Count,
/// Counts the number of bytes across items.
Bytes,
Batched,
/// Counts the accumulated times across items.
Milliseconds,
}

Expand All @@ -199,9 +201,9 @@ impl CategoryUnit {
| DataCategory::ProfileChunk
| DataCategory::Uptime
| DataCategory::MetricSecond
| DataCategory::AttachmentItem => Some(Self::Count),
| DataCategory::AttachmentItem
| DataCategory::Session => Some(Self::Count),
DataCategory::Attachment => Some(Self::Bytes),
DataCategory::Session => Some(Self::Batched),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An aggregate session item does not support accurate counts, but this does not matter since sessions don't produce outcomes and are only rate possibly limited by setting quota to zero.

DataCategory::ProfileDuration => Some(Self::Milliseconds),

DataCategory::Unknown => None,
Expand Down
76 changes: 40 additions & 36 deletions relay-server/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use relay_quotas::DataCategory;
use relay_sampling::DynamicSamplingContext;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
use smallvec::{smallvec, SmallVec};

use crate::constants::DEFAULT_EVENT_RETENTION;
use crate::extractors::{PartialMeta, RequestMeta};
Expand Down Expand Up @@ -632,6 +632,14 @@ pub struct Item {
payload: Bytes,
}

/// Expresses the purpose of counting quantities.
///
/// Sessions are counted for rate limiting enforcement but not for outcome reporting.
pub enum CountFor {
RateLimits,
Outcomes,
}

impl Item {
/// Creates a new item with the given type.
pub fn new(ty: ItemType) -> Self {
Expand Down Expand Up @@ -670,13 +678,38 @@ impl Item {
/// Returns the number used for counting towards rate limits and producing outcomes.
///
/// For attachments, we count the number of bytes. Other items are counted as 1.
pub fn quantity(&self) -> usize {
pub fn quantities(&self, purpose: CountFor) -> SmallVec<[(DataCategory, usize); 1]> {
Copy link
Member Author

@jjbayer jjbayer Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function now unites the old quantity and outcome_category functions.

We might be able to absorb the index_category into this function in the future, and also replace the EnvelopeSummary with this.

match self.ty() {
ItemType::Attachment => self.len().max(1),
// NOTE: This is semantically wrong. An otel trace contains may contain many spans,
// but we cannot easily count these before converting the trace into a series of spans.
ItemType::OtelTracesData => 1,
_ => 1,
ItemType::Event => smallvec![(DataCategory::Error, 1)],
ItemType::Transaction => smallvec![(DataCategory::Transaction, 1)],
ItemType::Security | ItemType::RawSecurity => {
smallvec![(DataCategory::Security, 1)]
}
ItemType::Nel => smallvec![],
ItemType::UnrealReport => smallvec![(DataCategory::Error, 1)],
ItemType::Attachment => smallvec![
(DataCategory::Attachment, self.len().max(1)),
(DataCategory::AttachmentItem, 1)
],
ItemType::Session | ItemType::Sessions => match purpose {
CountFor::RateLimits => smallvec![(DataCategory::Session, 1)],
CountFor::Outcomes => smallvec![],
},
ItemType::Statsd | ItemType::MetricBuckets => smallvec![],
ItemType::FormData => smallvec![],
ItemType::UserReport => smallvec![],
ItemType::UserReportV2 => smallvec![(DataCategory::UserReportV2, 1)],
ItemType::Profile => smallvec![(DataCategory::Profile, 1)],
ItemType::ReplayEvent | ItemType::ReplayRecording | ItemType::ReplayVideo => {
smallvec![(DataCategory::Replay, 1)]
}
ItemType::ClientReport => smallvec![],
ItemType::CheckIn => smallvec![(DataCategory::Monitor, 1)],
ItemType::Span | ItemType::OtelSpan => smallvec![(DataCategory::Span, 1)],
// NOTE: semantically wrong, but too expensive to parse.
ItemType::OtelTracesData => smallvec![(DataCategory::Span, 1)],
ItemType::ProfileChunk => smallvec![(DataCategory::ProfileChunk, 1)], // TODO: should be seconds?
ItemType::Unknown(_) => smallvec![],
}
}

Expand All @@ -688,35 +721,6 @@ impl Item {
)
}

/// Returns the data category used for generating outcomes.
///
/// Returns `None` if outcomes are not generated for this type (e.g. sessions).
pub fn outcome_category(&self) -> Option<DataCategory> {
match self.ty() {
ItemType::Event => Some(DataCategory::Error),
ItemType::Transaction => Some(DataCategory::Transaction),
ItemType::Security | ItemType::RawSecurity => Some(DataCategory::Security),
ItemType::Nel => None,
ItemType::UnrealReport => Some(DataCategory::Error),
ItemType::Attachment => Some(DataCategory::Attachment),
ItemType::Session | ItemType::Sessions => None,
ItemType::Statsd | ItemType::MetricBuckets => None,
ItemType::FormData => None,
ItemType::UserReport => None,
ItemType::UserReportV2 => Some(DataCategory::UserReportV2),
ItemType::Profile => Some(DataCategory::Profile),
ItemType::ReplayEvent | ItemType::ReplayRecording | ItemType::ReplayVideo => {
Some(DataCategory::Replay)
}
ItemType::ClientReport => None,
ItemType::CheckIn => Some(DataCategory::Monitor),
ItemType::Span | ItemType::OtelSpan => Some(DataCategory::Span),
ItemType::OtelTracesData => None,
ItemType::ProfileChunk => Some(DataCategory::ProfileChunk),
ItemType::Unknown(_) => None,
}
}

/// Returns `true` if this item's payload is empty.
pub fn is_empty(&self) -> bool {
self.payload.is_empty()
Expand Down
24 changes: 11 additions & 13 deletions relay-server/src/services/processor/dynamic_sampling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use relay_sampling::config::RuleType;
use relay_sampling::evaluation::{ReservoirEvaluator, SamplingEvaluator};
use relay_sampling::{DynamicSamplingContext, SamplingConfig};

use crate::envelope::ItemType;
use crate::envelope::{CountFor, ItemType};
use crate::services::outcome::Outcome;
use crate::services::processor::{
EventProcessing, ProcessEnvelopeState, Sampling, TransactionGroup,
Expand Down Expand Up @@ -100,18 +100,16 @@ pub fn drop_unsampled_items(state: &mut ProcessEnvelopeState<TransactionGroup>,
.take_items_by(|item| *item.ty() != ItemType::Profile);

for item in dropped_items {
let Some(category) = item.outcome_category() else {
continue;
};

// Dynamic sampling only drops indexed items. Upgrade the category to the index
// category if one exists for this category, for example profiles will be upgraded to profiles indexed,
// but attachments are still emitted as attachments.
let category = category.index_category().unwrap_or(category);

state
.managed_envelope
.track_outcome(outcome.clone(), category, item.quantity());
for (category, quantity) in item.quantities(CountFor::Outcomes) {
// Dynamic sampling only drops indexed items. Upgrade the category to the index
// category if one exists for this category, for example profiles will be upgraded to profiles indexed,
// but attachments are still emitted as attachments.
let category = category.index_category().unwrap_or(category);

state
.managed_envelope
.track_outcome(outcome.clone(), category, quantity);
}
}

// Mark all remaining items in the envelope as un-sampled.
Expand Down
14 changes: 7 additions & 7 deletions relay-server/src/utils/managed_envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use chrono::{DateTime, Utc};
use relay_quotas::{DataCategory, Scoping};
use relay_system::Addr;

use crate::envelope::{Envelope, Item};
use crate::envelope::{CountFor, Envelope, Item};
use crate::extractors::RequestMeta;
use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
use crate::services::processor::{Processed, ProcessingGroup};
Expand Down Expand Up @@ -52,7 +52,6 @@ pub enum ItemAction {
/// Keep the item.
Keep,
/// Drop the item and log an outcome for it.
/// The outcome will only be logged if the item has a corresponding [`Item::outcome_category()`].
Drop(Outcome),
/// Drop the item without logging an outcome.
DropSilently,
Expand Down Expand Up @@ -263,12 +262,13 @@ impl ManagedEnvelope {
ItemAction::Keep => true,
ItemAction::DropSilently => false,
ItemAction::Drop(outcome) => {
if let Some(category) = item.outcome_category() {
for (category, quantity) in item.quantities(CountFor::Outcomes) {
if let Some(indexed) = category.index_category() {
outcomes.push((outcome.clone(), indexed, item.quantity()));
outcomes.push((outcome.clone(), indexed, quantity));
};
outcomes.push((outcome, category, item.quantity()));
};
outcomes.push((outcome.clone(), category, quantity));
}

false
}
});
Expand Down Expand Up @@ -360,7 +360,7 @@ impl ManagedEnvelope {
tags.has_transactions = summary.secondary_transaction_quantity > 0,
tags.has_span_metrics = summary.secondary_span_quantity > 0,
tags.has_replays = summary.replay_quantity > 0,
tags.has_checkins = summary.checkin_quantity > 0,
tags.has_checkins = summary.monitor_quantity > 0,
tags.event_category = ?summary.event_category,
cached_summary = ?summary,
recomputed_summary = ?EnvelopeSummary::compute(self.envelope()),
Expand Down
81 changes: 59 additions & 22 deletions relay-server/src/utils/rate_limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use relay_quotas::{
ReasonCode, Scoping,
};

use crate::envelope::{Envelope, Item, ItemType};
use crate::envelope::{CountFor, Envelope, Item, ItemType};
use crate::services::outcome::Outcome;
use crate::utils::ManagedEnvelope;

Expand Down Expand Up @@ -149,6 +149,9 @@ pub struct EnvelopeSummary {
/// The quantity of all attachments combined in bytes.
pub attachment_quantity: usize,

/// The number of attachments.
pub attachment_item_quantity: usize,

/// The number of all session updates.
pub session_quantity: usize,

Expand All @@ -159,7 +162,7 @@ pub struct EnvelopeSummary {
pub replay_quantity: usize,

/// The number of monitor check-ins.
pub checkin_quantity: usize,
pub monitor_quantity: usize,

/// Secondary number of transactions.
///
Expand Down Expand Up @@ -218,28 +221,29 @@ impl EnvelopeSummary {
}

summary.payload_size += item.len();
summary.set_quantity(item);
for (category, quantity) in item.quantities(CountFor::RateLimits) {
summary.add_quantity(category, quantity);
}
}

summary
}

fn set_quantity(&mut self, item: &Item) {
let target_quantity = match item.ty() {
ItemType::Attachment => &mut self.attachment_quantity,
ItemType::Session => &mut self.session_quantity,
ItemType::Profile => &mut self.profile_quantity,
ItemType::ReplayEvent => &mut self.replay_quantity,
ItemType::ReplayRecording => &mut self.replay_quantity,
ItemType::ReplayVideo => &mut self.replay_quantity,
ItemType::CheckIn => &mut self.checkin_quantity,
ItemType::OtelTracesData => &mut self.span_quantity,
ItemType::OtelSpan => &mut self.span_quantity,
ItemType::Span => &mut self.span_quantity,
ItemType::ProfileChunk => &mut self.profile_chunk_quantity,
fn add_quantity(&mut self, category: DataCategory, quantity: usize) {
let target_quantity = match category {
DataCategory::Attachment => &mut self.attachment_quantity,
DataCategory::AttachmentItem => &mut self.attachment_item_quantity,
DataCategory::Session => &mut self.session_quantity,
DataCategory::Profile => &mut self.profile_quantity,
DataCategory::Replay => &mut self.replay_quantity,
DataCategory::ReplayVideo => &mut self.replay_quantity,
DataCategory::Monitor => &mut self.monitor_quantity,
DataCategory::Span => &mut self.span_quantity,
DataCategory::ProfileChunk => &mut self.profile_chunk_quantity,
// TODO: This catch-all return looks dangerous
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to remove the add_quantity function in a future PR, that will hopefully also resolve this TODO.

_ => return,
};
*target_quantity += item.quantity();
*target_quantity += quantity;
}

/// Infers the appropriate [`DataCategory`] for the envelope [`Item`].
Expand Down Expand Up @@ -326,7 +330,7 @@ pub struct Enforcement {
pub event: CategoryLimit,
/// The rate limit for the indexed category of the event.
pub event_indexed: CategoryLimit,
/// The combined attachment item rate limit.
/// The combined attachment bytes rate limit.
pub attachments: CategoryLimit,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this name is somewhat set in stone? Because in principle I think something like attachment_bytes would be better.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered it, but I did not want to mess with the serialization / python code generation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's fair.

/// The combined session item rate limit.
pub sessions: CategoryLimit,
Expand Down Expand Up @@ -639,12 +643,45 @@ where
let attachment_limits = self
.check
.apply(item_scoping, summary.attachment_quantity)?;
enforcement.attachments = CategoryLimit::new(

let mut limits = CategoryLimit::new(
DataCategory::Attachment,
summary.attachment_quantity,
attachment_limits.longest(),
);

if !attachment_limits.is_limited() {
let item_scoping = scoping.item(DataCategory::AttachmentItem);
let attachment_limits = self
.check
.apply(item_scoping, summary.attachment_item_quantity)?;

limits = CategoryLimit::new(
DataCategory::AttachmentItem,
summary.attachment_item_quantity,
attachment_limits.longest(),
);
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is getting messier with every addition. I will attempt to generalize it in a future PR.


enforcement.attachments = limits;

// Only record rate limits for plain attachments. For all other attachments, it's
// perfectly "legal" to send them. They will still be discarded in Sentry, but clients
// can continue to send them.
if summary.has_plain_attachments {
rate_limits.merge(attachment_limits);
}
} else if summary.attachment_item_quantity > 0 {
let item_scoping = scoping.item(DataCategory::AttachmentItem);
let attachment_limits = self
.check
.apply(item_scoping, summary.attachment_item_quantity)?;
enforcement.attachments = CategoryLimit::new(
DataCategory::AttachmentItem,
summary.attachment_item_quantity,
attachment_limits.longest(),
);

// Only record rate limits for plain attachments. For all other attachments, it's
// perfectly "legal" to send them. They will still be discarded in Sentry, but clients
// can continue to send them.
Expand Down Expand Up @@ -719,12 +756,12 @@ where
rate_limits.merge(replay_limits);
}

if summary.checkin_quantity > 0 {
if summary.monitor_quantity > 0 {
let item_scoping = scoping.item(DataCategory::Monitor);
let checkin_limits = self.check.apply(item_scoping, summary.checkin_quantity)?;
let checkin_limits = self.check.apply(item_scoping, summary.monitor_quantity)?;
enforcement.check_ins = CategoryLimit::new(
DataCategory::Monitor,
summary.checkin_quantity,
summary.monitor_quantity,
checkin_limits.longest(),
);
rate_limits.merge(checkin_limits);
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/fixtures/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ def assert_rate_limited(
assert outcome["outcome"] == 2, outcome
assert outcome["reason"] == reason, outcome["reason"]
if key_id is not None:
assert outcome["key_id"] == key_id
assert outcome["key_id"] == key_id, (outcome["key_id"], key_id)

if quantity is not None:
count = sum(outcome["quantity"] for outcome in outcomes)
Expand Down
Loading
Loading