Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metrics): Configurable flush time shift #2349

Merged
merged 9 commits into from
Jul 25, 2023
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
**Internal**:

- Add capability to configure metrics aggregators per use case. ([#2341](https://github.com/getsentry/relay/pull/2341))
- Configurable flush time offsets for metrics buckets. ([#2349](https://github.com/getsentry/relay/pull/2349))

## 23.7.0

Expand Down
67 changes: 54 additions & 13 deletions relay-metrics/src/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -926,6 +926,24 @@ impl BucketKey {
}
}

/// Configuration value for [`AggregatorConfig::shift_key`].
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum ShiftKey {
/// Shifts the flush time by an offset based on the [`ProjectKey`].
///
/// This allows buckets from the same project to be flushed together.
#[default]
Project,

/// Shifts the flush time by an offset based on the bucket key itself.
///
/// This allows for a completely random distribution of bucket flush times.
///
/// Only for use in processing Relays.
Bucket,
}

/// Parameters used by the [`AggregatorService`].
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(default)]
Expand Down Expand Up @@ -1010,6 +1028,12 @@ pub struct AggregatorConfig {
///
/// Defaults to `None`, i.e. no limit.
pub max_project_key_bucket_bytes: Option<usize>,

/// Key used to shift the flush time of a bucket.
///
/// This prevents flushing all buckets from a bucket interval at the same
/// time by computing an offset from the hash of the given key.
pub shift_key: ShiftKey,
}

impl AggregatorConfig {
Expand Down Expand Up @@ -1068,28 +1092,21 @@ impl AggregatorConfig {
///
/// Recent buckets are flushed after a grace period of `initial_delay`. Backdated buckets, that
/// is, buckets that lie in the past, are flushed after the shorter `debounce_delay`.
fn get_flush_time(&self, bucket_timestamp: UnixTimestamp, project_key: ProjectKey) -> Instant {
fn get_flush_time(&self, bucket_key: &BucketKey) -> Instant {
let now = Instant::now();
let mut flush = None;

if let MonotonicResult::Instant(instant) = bucket_timestamp.to_instant() {
if let MonotonicResult::Instant(instant) = bucket_key.timestamp.to_instant() {
let instant = Instant::from_std(instant);
let bucket_end = instant + self.bucket_interval();
let initial_flush = bucket_end + self.initial_delay();
// If the initial flush is still pending, use that.
if initial_flush > now {
// Shift deterministically within one bucket interval based on the project key. This
// distributes buckets over time while also flushing all buckets of the same project
// key together.
let mut hasher = FnvHasher::default();
hasher.write(project_key.as_str().as_bytes());
let shift_millis = hasher.finish() % (self.bucket_interval * 1000);

flush = Some(initial_flush + Duration::from_millis(shift_millis));
flush = Some(initial_flush + self.flush_time_shift(bucket_key));
}
}

let delay = UnixTimestamp::now().as_secs() as i64 - bucket_timestamp.as_secs() as i64;
let delay = UnixTimestamp::now().as_secs() as i64 - bucket_key.timestamp.as_secs() as i64;
relay_statsd::metric!(
histogram(MetricHistograms::BucketsDelay) = delay as f64,
backdated = if flush.is_none() { "true" } else { "false" },
Expand All @@ -1102,6 +1119,23 @@ impl AggregatorConfig {
None => now + self.debounce_delay(),
}
}

// Shift deterministically within one bucket interval based on the project or bucket key.
//
// This distributes buckets over time to prevent peaks.
fn flush_time_shift(&self, bucket: &BucketKey) -> Duration {
let hash_value = match self.shift_key {
ShiftKey::Project => {
let mut hasher = FnvHasher::default();
hasher.write(bucket.project_key.as_str().as_bytes());
hasher.finish()
}
ShiftKey::Bucket => bucket.hash64(),
};
let shift_millis = hash_value % (self.bucket_interval * 1000);

Duration::from_millis(shift_millis)
}
}

impl Default for AggregatorConfig {
Expand All @@ -1119,6 +1153,7 @@ impl Default for AggregatorConfig {
max_tag_value_length: 200,
max_total_bucket_bytes: None,
max_project_key_bucket_bytes: None,
shift_key: ShiftKey::default(),
}
}
}
Expand Down Expand Up @@ -1671,7 +1706,6 @@ impl AggregatorService {
key: BucketKey,
value: T,
) -> Result<(), AggregateMetricsError> {
let timestamp = key.timestamp;
let project_key = key.project_key;

let key = Self::validate_bucket_key(key, &self.config)?;
Expand Down Expand Up @@ -1734,7 +1768,7 @@ impl AggregatorService {
metric_name = metric_name_tag(&entry.key().metric_name),
);

let flush_at = self.config.get_flush_time(timestamp, project_key);
let flush_at = self.config.get_flush_time(entry.key());
let bucket = value.into();
added_cost = entry.key().cost() + bucket.cost();
entry.insert(QueuedBucket::new(flush_at, bucket));
Expand Down Expand Up @@ -3241,4 +3275,11 @@ mod tests {
fn test_capped_iter_completeness_100() {
test_capped_iter_completeness(100, 4);
}

#[test]
fn test_parse_shift_key() {
let json = r#"{"shift_key": "bucket"}"#;
let parsed: AggregatorConfig = serde_json::from_str(json).unwrap();
assert!(matches!(parsed.shift_key, ShiftKey::Bucket));
}
}