Skip to content

Commit

Permalink
Merge branch 'master' into feat/metrics-enforce-quotas
Browse files Browse the repository at this point in the history
* master:
  fix(quotas): Make redis rate limiter work with quantity 0 (#1519)
  ref: Remove unused rate_limits from ProcessEnvelopeState (#1516)
  fix(quotas): Use correct string spelling for TransactionProcessed (#1514)
  • Loading branch information
jan-auer committed Oct 7, 2022
2 parents b6070aa + 926827a commit b7bd8e0
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 21 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
- Remove long-running futures from metrics flush. ([#1492](https://github.com/getsentry/relay/pull/1492))
- Migrate to 2021 Rust edition. ([#1510](https://github.com/getsentry/relay/pull/1510))
- Make the profiling frame object compatible with the stacktrace frame object from event. ([#1512](https://github.com/getsentry/relay/pull/1512))
- Fix quota DataCategory::TransactionProcessed serialisation to match that of the CAPI. ([#1514](https://github.com/getsentry/relay/pull/1514))
- Support checking quotas in the Redis rate limiter without incrementing them. ([#1519](https://github.com/getsentry/relay/pull/1519))

## 22.9.0

Expand Down
4 changes: 3 additions & 1 deletion relay-common/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl fmt::Display for EventType {

/// Classifies the type of data that is being ingested.
#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
#[serde(rename_all = "snake_case")]
#[repr(i8)]
pub enum DataCategory {
/// Reserved and unused.
Expand Down Expand Up @@ -128,6 +128,7 @@ pub enum DataCategory {
impl DataCategory {
/// Returns the data category corresponding to the given name.
pub fn from_name(string: &str) -> Self {
// TODO: This should probably use serde.
match string {
"default" => Self::Default,
"error" => Self::Error,
Expand All @@ -144,6 +145,7 @@ impl DataCategory {

/// Returns the canonical name of this data category.
pub fn name(self) -> &'static str {
// TODO: This should probably use serde.
match self {
Self::Default => "default",
Self::Error => "error",
Expand Down
16 changes: 12 additions & 4 deletions relay-quotas/src/is_rate_limited.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
-- ``ARGV`` (3 per quota):
-- * [number] Quota limit. Can be ``-1`` for unlimited quotas.
-- * [number] Absolute Expiration time as Unix timestamp (secs since 1.1.1970 ) for the key.
-- * [number] Quantity to increment the quota by.
-- * [number] Quantity to increment the quota by, or ``0`` to check without incrementing.
--
-- For example, to check the following two quotas each with a timeout of 10 minutes from now:
-- * Key ``foo``, refund key ``foo_refund``, limit ``10``; quantity ``5``
Expand Down Expand Up @@ -43,7 +43,13 @@ for i=0, num_quotas - 1 do
local rejected = false
-- limit=-1 means "no limit"
if limit >= 0 then
rejected = (redis.call('GET', KEYS[k]) or 0) - (redis.call('GET', KEYS[k + 1]) or 0) + quantity > limit
local consumed = (redis.call('GET', KEYS[k]) or 0) - (redis.call('GET', KEYS[k + 1]) or 0)
-- we never increment past the limit. if quantity is 0, check instead if we reached limit.
if quantity == 0 then
rejected = consumed >= limit
else
rejected = consumed + quantity > limit
end
end

if rejected then
Expand All @@ -57,8 +63,10 @@ if not failed then
local k = i * 2 + 1
local v = i * 3 + 1

redis.call('INCRBY', KEYS[k], ARGV[v + 2])
redis.call('EXPIREAT', KEYS[k], ARGV[v + 1])
if tonumber(ARGV[v + 2]) > 0 then
redis.call('INCRBY', KEYS[k], ARGV[v + 2])
redis.call('EXPIREAT', KEYS[k], ARGV[v + 1])
end
end
end

Expand Down
53 changes: 53 additions & 0 deletions relay-quotas/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ impl RedisRateLimiter {
///
/// If no key is specified, then only organization-wide and project-wide quotas are checked. If
/// a key is specified, then key-quotas are also checked.
///
/// The passed `quantity` may be `0`. In this case, the rate limiter will check if the quota
/// limit has been reached or exceeded without incrementing it in the success case. This can be
/// useful to check for required quotas in a different data category.
pub fn is_rate_limited(
&self,
quotas: &[Quota],
Expand Down Expand Up @@ -364,6 +368,55 @@ mod tests {
}
}

#[test]
fn test_quantity_0() {
let quotas = &[Quota {
id: Some(format!("test_quantity_0_{:?}", SystemTime::now())),
categories: DataCategories::new(),
scope: QuotaScope::Organization,
scope_id: None,
limit: Some(1),
window: Some(60),
reason_code: Some(ReasonCode::new("get_lost")),
}];

let scoping = ItemScoping {
category: DataCategory::Error,
scoping: &Scoping {
organization_id: 42,
project_id: ProjectId::new(43),
project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
key_id: Some(44),
},
};

let rate_limiter = build_rate_limiter();

// limit is 1, so first call not rate limited
assert!(!rate_limiter
.is_rate_limited(quotas, scoping, 1)
.unwrap()
.is_limited());

// quota is now exhausted
assert!(rate_limiter
.is_rate_limited(quotas, scoping, 1)
.unwrap()
.is_limited());

// quota is exhausted, regardless of the quantity
assert!(rate_limiter
.is_rate_limited(quotas, scoping, 0)
.unwrap()
.is_limited());

// quota is exhausted, regardless of the quantity
assert!(rate_limiter
.is_rate_limited(quotas, scoping, 1)
.unwrap()
.is_limited());
}

#[test]
fn test_bails_immediately_without_any_quota() {
let scoping = ItemScoping {
Expand Down
18 changes: 2 additions & 16 deletions relay-server/src/actors/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use relay_general::store::{ClockDriftProcessor, LightNormalizationConfig};
use relay_general::types::{Annotated, Array, FromValue, Object, ProcessingAction, Value};
use relay_log::LogError;
use relay_metrics::{Bucket, InsertMetrics, MergeBuckets, Metric};
use relay_quotas::{DataCategory, Quota, RateLimits, ReasonCode, Scoping};
use relay_quotas::{DataCategory, Quota, ReasonCode, Scoping};
use relay_redis::RedisPool;
use relay_sampling::{DynamicSamplingContext, RuleId};
use relay_statsd::metric;
Expand Down Expand Up @@ -230,14 +230,6 @@ struct ProcessEnvelopeState {
/// resulting item.
sample_rates: Option<Value>,

/// Rate limits returned in processing mode.
///
/// The rate limiter is invoked in processing mode, after which the resulting limits are stored
/// in this field. Note that there can be rate limits even if the envelope still carries items.
///
/// These are always empty in non-processing mode, since the rate limiter is not invoked.
rate_limits: RateLimits,

/// Metrics extracted from items in the envelope.
///
/// Relay can extract metrics for sessions and transactions, which is controlled by
Expand Down Expand Up @@ -465,9 +457,6 @@ pub struct ProcessEnvelopeResponse {
/// removed from the envelope. Otherwise, if the envelope is empty or the entire envelope needs
/// to be dropped, this is `None`.
pub envelope: Option<(Envelope, EnvelopeContext)>,

/// All rate limits that have been applied on the envelope.
pub rate_limits: RateLimits,
}

/// Applies processing to all contents of the given envelope.
Expand Down Expand Up @@ -1184,7 +1173,6 @@ impl EnvelopeProcessorService {
transaction_metrics_extracted: false,
metrics: Metrics::default(),
sample_rates: None,
rate_limits: RateLimits::new(),
extracted_metrics: Vec::new(),
project_state,
sampling_project_state,
Expand Down Expand Up @@ -1786,10 +1774,9 @@ impl EnvelopeProcessorService {

if limits.is_limited() {
ProjectCache::from_registry()
.do_send(UpdateRateLimits::new(scoping.project_key, limits.clone()));
.do_send(UpdateRateLimits::new(scoping.project_key, limits));
}

state.rate_limits = limits;
enforcement.track_outcomes(&state.envelope, &state.envelope_context.scoping());

if remove_event {
Expand Down Expand Up @@ -2100,7 +2087,6 @@ impl EnvelopeProcessorService {

Ok(ProcessEnvelopeResponse {
envelope: envelope_response,
rate_limits: state.rate_limits,
})
}
Err(err) => {
Expand Down
6 changes: 6 additions & 0 deletions relay-server/src/utils/rate_limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ impl EnvelopeSummary {
summary
}

/// Infers the appropriate [`DataCategory`] for the envelope [`Item`].
///
/// The inferred category is only applied to the [`EnvelopeSummary`] if there is not yet
/// a category set.
fn infer_category(&mut self, item: &Item) {
if matches!(self.event_category, None | Some(DataCategory::Default)) {
if let Some(category) = infer_event_category(item) {
Expand All @@ -186,6 +190,8 @@ struct CategoryLimit {
/// The limited data category.
category: DataCategory,
/// The total rate limited quantity across all items.
///
/// This will be `0` if nothing was rate limited.
quantity: usize,
/// The reason code of the applied rate limit.
///
Expand Down

0 comments on commit b7bd8e0

Please sign in to comment.