Skip to content

Commit

Permalink
ref(server): Apply rate limits per item in fast-path
Browse files Browse the repository at this point in the history
  • Loading branch information
jan-auer committed Jun 9, 2020
1 parent 9c6bdbb commit f3ac407
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 7 deletions.
21 changes: 15 additions & 6 deletions relay-server/src/actors/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ use relay_common::{metric, ProjectId};
use relay_config::{Config, RelayMode};
use relay_filter::{matches_any_origin, FiltersConfig};
use relay_general::pii::{DataScrubbingConfig, PiiConfig};
use relay_quotas::{DataCategory, Quota, RateLimits, Scoping};
use relay_quotas::{Quota, RateLimits, Scoping};

use crate::actors::outcome::DiscardReason;
use crate::actors::project_cache::{FetchProjectState, ProjectCache, ProjectError};
use crate::envelope::Envelope;
use crate::extractors::RequestMeta;
use crate::metrics::RelayCounters;
use crate::utils::{ActorResponse, Response};
use crate::utils::{ActorResponse, EnvelopeLimiter, Response};

/// The current status of a project state. Return value of `ProjectState::outdated`.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
Expand Down Expand Up @@ -295,6 +295,11 @@ impl ProjectState {
scoping
}

/// Returns quotas declared in this project state.
pub fn get_quotas(&self) -> &[Quota] {
self.config.quotas.as_slice()
}

/// Determines whether the given request should be accepted or discarded.
///
/// Returns `Ok(())` if the request should be accepted. Returns `Err(DiscardReason)` if the
Expand Down Expand Up @@ -510,7 +515,7 @@ impl Project {

fn check_envelope(
&mut self,
envelope: Envelope,
mut envelope: Envelope,
scoping: &Scoping,
) -> Result<CheckedEnvelope, DiscardReason> {
if let Some(state) = self.state() {
Expand All @@ -519,9 +524,13 @@ impl Project {

self.rate_limits.clean_expired();

// TODO: Apply rate limits no non-event items once implemented.
let rate_limits = self.rate_limits.check(scoping.item(DataCategory::Error));
let envelope = if rate_limits.is_limited() {
let quotas = self.state().map(|s| s.get_quotas()).unwrap_or(&[]);
let envelope_limiter = EnvelopeLimiter::new(|item_scoping, _| {
Ok(self.rate_limits.check_with_quotas(quotas, item_scoping))
});

let rate_limits = envelope_limiter.enforce(&mut envelope, scoping)?;
let envelope = if envelope.is_empty() {
None
} else {
Some(envelope)
Expand Down
11 changes: 11 additions & 0 deletions relay-server/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,17 @@ impl Envelope {
})
}

/// Retains only the items specified by the predicate.
///
/// In other words, remove all elements where `f(&item)` returns `false`. This method operates
/// in place and preserves the order of the retained items.
pub fn retain_items<F>(&mut self, f: F)
where
F: FnMut(&mut Item) -> bool,
{
self.items.retain(f)
}

/// Serializes this envelope into the given writer.
pub fn serialize<W>(&self, mut writer: W) -> Result<(), EnvelopeError>
where
Expand Down
140 changes: 139 additions & 1 deletion relay-server/src/utils/rate_limits.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use std::fmt::Write;

use relay_quotas::{
DataCategories, DataCategory, QuotaScope, RateLimit, RateLimitScope, RateLimits, Scoping,
DataCategories, DataCategory, ItemScoping, QuotaScope, RateLimit, RateLimitScope, RateLimits,
Scoping,
};

use crate::envelope::{Envelope, Item, ItemType};

/// Name of the rate limits header.
pub const RATE_LIMITS_HEADER: &str = "X-Sentry-Rate-Limits";

Expand Down Expand Up @@ -70,6 +73,141 @@ pub fn parse_rate_limits(scoping: &Scoping, string: &str) -> RateLimits {
rate_limits
}

/// Infer the data category from an item.
///
/// Categories depend mostly on the item type, with a few special cases:
/// - `Event`: the category is inferred from the event type. This requires the `event_type` header
/// to be set on the event item.
/// - `Attachment`: If the attachment creates an event (e.g. for minidumps), the category is assumed
/// to be `Error`.
fn infer_event_category(item: &Item) -> Option<DataCategory> {
match item.ty() {
ItemType::Event => Some(DataCategory::Error),
ItemType::Transaction => Some(DataCategory::Transaction),
ItemType::Security | ItemType::RawSecurity => Some(DataCategory::Security),
ItemType::UnrealReport => Some(DataCategory::Error),
ItemType::Attachment if item.creates_event() => Some(DataCategory::Error),
ItemType::Attachment => None,
ItemType::Session => None,
ItemType::FormData => None,
ItemType::UserReport => None,
}
}

/// Enforces rate limits with the given `check` function on items in the envelope.
///
/// The `check` function is called with the following rules:
/// - Once for a single event, if present in the envelope.
/// - Once for all comprised attachments, unless the event was rate limited.
/// - Once for all comprised sessions.
///
/// Items violating the rate limit are removed from the envelope. This follows a set of rules:
/// - If the event is removed, all items depending on the event are removed (e.g. attachments).
/// - Attachments are not removed if they create events (e.g. minidumps).
/// - Sessions are handled separate to all of the above.
pub struct EnvelopeLimiter<F> {
check: F,
event_category: Option<DataCategory>,
attachment_quantity: usize,
session_quantity: usize,
remove_event: bool,
remove_attachments: bool,
remove_sessions: bool,
}

impl<E, F> EnvelopeLimiter<F>
where
F: FnMut(ItemScoping<'_>, usize) -> Result<RateLimits, E>,
{
/// Create a new `EnvelopeLimiter` with the given `check` function.
pub fn new(check: F) -> Self {
Self {
check,
event_category: None,
attachment_quantity: 0,
session_quantity: 0,
remove_event: false,
remove_attachments: false,
remove_sessions: false,
}
}

/// Process rate limits for the envelope, removing offending items and returning applied limits.
pub fn enforce(mut self, envelope: &mut Envelope, scoping: &Scoping) -> Result<RateLimits, E> {
self.aggregate(envelope);
let rate_limits = self.execute(scoping)?;
envelope.retain_items(|item| !self.should_remove(item));
Ok(rate_limits)
}

fn aggregate(&mut self, envelope: &Envelope) {
for item in envelope.items() {
if item.creates_event() {
self.infer_category(item);
}

match item.ty() {
ItemType::Attachment => self.attachment_quantity += item.len().max(1),
ItemType::Session => self.session_quantity += 1,
_ => (),
}
}
}

fn infer_category(&mut self, item: &Item) {
if matches!(self.event_category, None | Some(DataCategory::Default)) {
if let Some(category) = infer_event_category(item) {
self.event_category = Some(category);
}
}
}

fn execute(&mut self, scoping: &Scoping) -> Result<RateLimits, E> {
let mut rate_limits = RateLimits::new();

if let Some(category) = self.event_category {
let event_limits = (&mut self.check)(scoping.item(category), 1)?;
self.remove_event = event_limits.is_limited();
rate_limits.merge(event_limits);
}

if !self.remove_event && self.attachment_quantity > 0 {
let item_scoping = scoping.item(DataCategory::Attachment);
let attachment_limits = (&mut self.check)(item_scoping, self.attachment_quantity)?;
self.remove_attachments = attachment_limits.is_limited();
rate_limits.merge(attachment_limits);
}

if self.session_quantity > 0 {
let item_scoping = scoping.item(DataCategory::Session);
let session_limits = (&mut self.check)(item_scoping, self.session_quantity)?;
self.remove_sessions = session_limits.is_limited();
rate_limits.merge(session_limits);
}

Ok(rate_limits)
}

fn should_remove(&self, item: &Item) -> bool {
// Remove event items and all items that depend on this event
if self.remove_event && item.requires_event() {
return true;
}

// Remove attachments, except those required for processing
if self.remove_attachments && item.ty() == ItemType::Attachment {
return !item.creates_event();
}

// Remove sessions independently of events
if self.remove_sessions && item.ty() == ItemType::Session {
return true;
}

false
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit f3ac407

Please sign in to comment.