Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(metrics): Improve distribution value performance #2483

Merged
merged 5 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
- Support ingestion of custom metrics when the `organizations:custom-metrics` feature flag is enabled. ([#2443](https://github.com/getsentry/relay/pull/2443))
- Merge span metrics and standalone spans extraction options. ([#2447](https://github.com/getsentry/relay/pull/2447))
- Support parsing aggregated metric buckets directly from statsd payloads. ([#2468](https://github.com/getsentry/relay/pull/2468), [#2472](https://github.com/getsentry/relay/pull/2472))
- Improve performance when ingesting distribution metrics with a large number of data points. ([#2483](https://github.com/getsentry/relay/pull/2483))
- Rename the envelope item type for StatsD payloads to "statsd". ([#2470](https://github.com/getsentry/relay/pull/2470))
- Add a nanojoule unit for profile measurements. ([#2478](https://github.com/getsentry/relay/pull/2478))

Expand Down
9 changes: 2 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ futures = { version = "0.3", default-features = false, features = ["std"] }
insta = { version = "1.31.0", features = ["json", "redactions", "ron"] }
itertools = "0.10.5"
once_cell = "1.13.1"
rand = "0.8.5"
regex = "1.9.1"
serde = { version = "1.0.159", features = ["derive"] }
serde_json = "1.0.93"
Expand Down
2 changes: 1 addition & 1 deletion relay-auth/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ chrono = { workspace = true }
data-encoding = "2.3.3"
ed25519-dalek = { version = "2.0.0", features = ["rand_core"] }
hmac = "0.12.1"
rand = "0.8.5"
rand = { workspace = true }
relay-common = { path = "../relay-common" }
serde = { workspace = true }
serde_json = { workspace = true }
Expand Down
5 changes: 3 additions & 2 deletions relay-metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ publish = false

[dependencies]
bytecount = "0.6.0"
float-ord = "0.3.1"
fnv = "1.0.7"
hash32 = "0.3.1"
itertools = { workspace = true }
Expand All @@ -22,17 +21,19 @@ relay-statsd = { path = "../relay-statsd" }
relay-system = { path = "../relay-system" }
serde = { workspace = true }
serde_json = { workspace = true }
smallvec = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["time"] }

[dev-dependencies]
criterion = { workspace = true }
insta = { workspace = true }
rand = { workspace = true }
relay-statsd = { path = "../relay-statsd", features = ["test"] }
relay-test = { path = "../relay-test" }
similar-asserts = { workspace = true }
tokio = { workspace = true, features = ["test-util"] }

[[bench]]
name = "aggregator"
name = "benches"
harness = false
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::collections::BTreeMap;
use std::fmt;

use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion};
use criterion::{black_box, criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion};
use relay_base_schema::project::ProjectKey;
use relay_common::time::UnixTimestamp;
use relay_metrics::{AggregatorConfig, AggregatorService, Metric, MetricValue};
use relay_metrics::{AggregatorConfig, AggregatorService, DistributionValue, Metric, MetricValue};

/// Struct representing a testcase for which insert + flush are timed.
struct MetricInput {
Expand Down Expand Up @@ -159,5 +159,22 @@ fn bench_insert_and_flush(c: &mut Criterion) {
}
}

criterion_group!(benches, bench_insert_and_flush);
fn bench_distribution(c: &mut Criterion) {
let mut group = c.benchmark_group("DistributionValue");

for size in [1, 10, 100, 1000, 10_000, 100_000, 1_000_000] {
let values = std::iter::from_fn(|| Some(rand::random()))
.take(size as usize)
.collect::<Vec<f64>>();

group.throughput(criterion::Throughput::Elements(size));
group.bench_with_input(BenchmarkId::from_parameter(size), &values, |b, values| {
b.iter(|| DistributionValue::from_iter(black_box(values.iter().copied())))
});
}

group.finish();
}

criterion_group!(benches, bench_insert_and_flush, bench_distribution);
criterion_main!(benches);
42 changes: 19 additions & 23 deletions relay-metrics/src/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use crate::statsd::{
MetricTimers,
};
use crate::{
protocol, Bucket, BucketValue, Metric, MetricNamespace, MetricResourceIdentifier, MetricValue,
protocol, Bucket, BucketValue, DistributionValue, Metric, MetricNamespace,
MetricResourceIdentifier, MetricValue,
};

/// Interval for the flush cycle of the [`AggregatorService`].
Expand All @@ -47,12 +48,12 @@ trait MergeValue: Into<BucketValue> {
}

impl MergeValue for BucketValue {
fn merge_into(self, bucket_value: &mut BucketValue) -> Result<(), AggregateMetricsError> {
fn merge_into(self, bucket_value: &mut Self) -> Result<(), AggregateMetricsError> {
match (bucket_value, self) {
(BucketValue::Counter(lhs), BucketValue::Counter(rhs)) => *lhs += rhs,
(BucketValue::Distribution(lhs), BucketValue::Distribution(rhs)) => lhs.extend(&rhs),
(BucketValue::Set(lhs), BucketValue::Set(rhs)) => lhs.extend(rhs),
(BucketValue::Gauge(lhs), BucketValue::Gauge(rhs)) => lhs.merge(rhs),
(Self::Counter(lhs), Self::Counter(rhs)) => *lhs += rhs,
(Self::Distribution(lhs), Self::Distribution(rhs)) => lhs.extend_from_slice(&rhs),
(Self::Set(lhs), Self::Set(rhs)) => lhs.extend(rhs),
(Self::Gauge(lhs), Self::Gauge(rhs)) => lhs.merge(rhs),
_ => return Err(AggregateMetricsErrorKind::InvalidTypes.into()),
}

Expand All @@ -67,7 +68,7 @@ impl MergeValue for MetricValue {
*counter += value;
}
(BucketValue::Distribution(distribution), MetricValue::Distribution(value)) => {
distribution.insert(value);
distribution.push(value);
}
(BucketValue::Set(set), MetricValue::Set(value)) => {
set.insert(value);
Expand Down Expand Up @@ -125,12 +126,14 @@ fn split_at(mut bucket: Bucket, size: usize) -> (Option<Bucket>, Option<Bucket>)
match bucket.value {
BucketValue::Counter(_) => (None, Some(bucket)),
BucketValue::Distribution(ref mut distribution) => {
let org = std::mem::take(distribution);
let mut org = std::mem::take(distribution);

let mut new_bucket = bucket.clone();
new_bucket.value =
BucketValue::Distribution(DistributionValue::from_slice(&org[split_at..]));

let mut iter = org.iter_values();
bucket.value = BucketValue::Distribution((&mut iter).take(split_at).collect());
new_bucket.value = BucketValue::Distribution(iter.collect());
org.truncate(split_at);
bucket.value = BucketValue::Distribution(org);

(Some(bucket), Some(new_bucket))
}
Expand Down Expand Up @@ -1509,7 +1512,7 @@ mod tests {
BucketValue::Distribution(dist![2., 4.])
.merge_into(&mut value)
.unwrap();
assert_eq!(value, BucketValue::Distribution(dist![1., 2., 2., 3., 4.]));
assert_eq!(value, BucketValue::Distribution(dist![1., 2., 3., 2., 4.]));
}

#[test]
Expand Down Expand Up @@ -1596,10 +1599,7 @@ mod tests {
expected_bucket_value_size + 5 * expected_set_entry_size
);
let distribution = BucketValue::Distribution(dist![1., 2., 3.]);
assert_eq!(
distribution.cost(),
expected_bucket_value_size + 3 * (8 + 4)
);
assert_eq!(distribution.cost(), expected_bucket_value_size + 3 * 8);
let gauge = BucketValue::Gauge(GaugeValue {
last: 43.,
min: 42.,
Expand Down Expand Up @@ -1852,14 +1852,10 @@ mod tests {
(
"d:transactions/foo@none",
MetricValue::Distribution(1.0),
fixed_cost + 12,
fixed_cost + 8,
), // New bucket + 1 element
("d:transactions/foo@none", MetricValue::Distribution(1.0), 0), // no new element
(
"d:transactions/foo@none",
MetricValue::Distribution(2.0),
12,
), // 1 new element
("d:transactions/foo@none", MetricValue::Distribution(1.0), 8), // duplicate element
("d:transactions/foo@none", MetricValue::Distribution(2.0), 8), // 1 new element
(
"g:transactions/foo@none",
MetricValue::Gauge(0.3),
Expand Down
Loading