Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metrics): Limits on bucketing cost in aggregator [INGEST-1132] #1287

Merged
merged 18 commits into from
Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 174 additions & 0 deletions relay-metrics/src/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,12 @@ enum AggregateMetricsErrorKind {
/// A metric bucket had a too long string (metric name or a tag key/value).
#[fail(display = "found invalid string")]
InvalidStringLength,
/// A metric bucket is too large for the global bytes limit.
#[fail(display = "total metrics limit exceeded")]
TotalLimitExceeded,
/// A metric bucket is too large for the per-project bytes limit.
#[fail(display = "project metrics limit exceeded")]
ProjectLimitExceeded,
}

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -841,6 +847,23 @@ pub struct AggregatorConfig {
///
/// Defaults to `200` bytes.
pub max_tag_value_length: usize,

/// Maximum amount of bytes used for metrics aggregation.
///
/// When aggregating metrics, Relay keeps track of how many bytes a metric takes in memory.
/// This is only an approximation and does not take into account things such as pre-allocation
/// in hashmaps.
///
/// Defaults to `None`, i.e. no limit.
pub max_total_bucket_bytes: Option<usize>,
jan-auer marked this conversation as resolved.
Show resolved Hide resolved

/// Maximum amount of bytes used for metrics aggregation per project.
///
/// Similar measuring technique to `max_total_bucket_bytes`, but instead of a
/// global/process-wide limit, it is enforced per project id.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// global/process-wide limit, it is enforced per project id.
/// global/process-wide limit, it is enforced per project key.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we still fix this and also use a better config name?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What name would you suggest? max_bucket_bytes_total and max_bucket_bytes_per_project_key?

///
/// Defaults to `None`, i.e. no limit.
pub max_project_bucket_bytes: Option<usize>,
}

impl AggregatorConfig {
Expand Down Expand Up @@ -940,6 +963,8 @@ impl Default for AggregatorConfig {
max_name_length: 200,
max_tag_key_length: 200,
max_tag_value_length: 200,
max_total_bucket_bytes: None,
max_project_bucket_bytes: None,
}
}
}
Expand Down Expand Up @@ -1028,6 +1053,23 @@ impl Message for FlushBuckets {
type Result = Result<(), Vec<Bucket>>;
}

/// Check whether the aggregator has not (yet) exceeded its total limits. Used for healthchecks.
pub struct AcceptsMetrics;

impl Message for AcceptsMetrics {
type Result = bool;
}

impl Handler<AcceptsMetrics> for Aggregator {
type Result = bool;

fn handle(&mut self, _msg: AcceptsMetrics, _ctx: &mut Self::Context) -> Self::Result {
!self
.cost_tracker
.totals_cost_exceeded(self.config.max_total_bucket_bytes)
}
}

enum AggregatorState {
Running,
ShuttingDown,
Expand All @@ -1042,6 +1084,46 @@ struct CostTracker {
}

impl CostTracker {
fn totals_cost_exceeded(&self, max_total_cost: Option<usize>) -> bool {
if let Some(max_total_cost) = max_total_cost {
if self.total_cost >= max_total_cost {
return true;
}
}

return false;
}

fn check_limits_exceeded(
&self,
project_key: ProjectKey,
max_total_cost: Option<usize>,
max_project_cost: Option<usize>,
) -> Result<(), AggregateMetricsError> {
if self.totals_cost_exceeded(max_total_cost) {
relay_log::configure_scope(|scope| {
scope.set_extra("bucket.project_key", project_key.as_str().to_owned().into());
});
return Err(AggregateMetricsErrorKind::TotalLimitExceeded.into());
}

if let Some(max_project_cost) = max_project_cost {
let project_cost = self
.cost_per_project_key
.get(&project_key)
.cloned()
.unwrap_or(0);
if project_cost >= max_project_cost {
relay_log::configure_scope(|scope| {
scope.set_extra("bucket.project_key", project_key.as_str().to_owned().into());
});
return Err(AggregateMetricsErrorKind::ProjectLimitExceeded.into());
}
}

Ok(())
}

fn add_cost(&mut self, project_key: ProjectKey, cost: usize) {
self.total_cost += cost;
let project_cost = self.cost_per_project_key.entry(project_key).or_insert(0);
Expand Down Expand Up @@ -1259,6 +1341,38 @@ impl Aggregator {

let key = Self::validate_bucket_key(key, &self.config)?;

// XXX: This is not a great implementation of cost enforcement.
//
// * it takes two lookups of the project key in the cost tracker to merge a bucket: once in
// `check_limits_exceeded` and once in `add_cost`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you try to return the error from add_cost and do the diff computation inline? The .entry() function below doesn't create the entry until you call insert on it at least, which just leaves the merge into the bucket, and that could also easily return whether it did insert or not.

It's excellent that you're calling out caveats of enforcement below, though in practice an off-by-one is not an issue.

//
// * the limits are not actually enforced consistently
//
// A bucket can be merged that exceeds the cost limit, and only the next bucket will be
// limited because the limit is now reached. This implementation was chosen because it's
// currently not possible to determine cost accurately upfront: The bucket values have to
// be merged together to figure out how costly the merge was. Changing that would force
// us to unravel a lot of abstractions that we have already built.
//
// As a result of that, it is possible to exceed the bucket cost limit significantly
// until we have guaranteed upper bounds on the cost of a single bucket (which we
// currently don't, because a metric can have arbitrary amount of tag values).
//
// Another consequence is that a MergeValue that adds zero cost (such as an existing
// counter bucket being incremented) is currently rejected even though it doesn't have to
// be.
Comment on lines +1393 to +1395
Copy link
Member

@jan-auer jan-auer Jun 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is actually problematic and we should fix that. Same as in my previous comment -- if merge_into returns what happened, you should be able to enforce more consistently. See also #1284 (comment).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I call merge_into, it's already too late to enforce anything, as I can't potentially undo a merge. I would need to special-case counters or add a method to MergeValue that returns projected added cost under either occupied/vacant scenarios.

let's check in on monday

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's OK, since worst case you're off by one, so depending on the value you're inserting you're less than 48 bytes over. For a memory limit of 16GB that is insignificant. The other option is to implement cost on MetricValue.

Copy link
Member Author

@untitaker untitaker Jun 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's talk on monday, everything you're saying is true but I don't think it implies that one can fix this case (without impl-ing cost on MetricValue)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if implementing cost on MetricValue solves the problem. Whether for example a set bucket grows depends on the pre-existing set entries, not just the incoming value. So to be 100% consistent we would need a function that answers the question "would this increase my size?", which would be overkill IMO.

//
// The flipside of this approach is however that there's more optimization potential: If
// the limit is already exceeded, we could implement an optimization that drops envelope
// items before they are parsed, as we can be sure that the new metric bucket will be
// rejected in the aggregator regardless of whether it is merged into existing buckets,
// whether it is just a counter, etc.
self.cost_tracker.check_limits_exceeded(
project_key,
self.config.max_total_bucket_bytes,
self.config.max_project_bucket_bytes,
)?;
jjbayer marked this conversation as resolved.
Show resolved Hide resolved

let added_cost;
match self.buckets.entry(key) {
Entry::Occupied(mut entry) => {
Expand Down Expand Up @@ -1661,6 +1775,8 @@ mod tests {
max_name_length: 200,
max_tag_key_length: 200,
max_tag_value_length: 200,
max_project_bucket_bytes: None,
max_total_bucket_bytes: None,
}
}

Expand Down Expand Up @@ -2538,4 +2654,62 @@ mod tests {
.unwrap();
assert_eq!(validation.tags.len(), 0);
}

#[test]
fn test_aggregator_cost_enforcement_total() {
let config = AggregatorConfig {
max_total_bucket_bytes: Some(1),
..test_config()
};

let metric = Metric {
name: "c:foo".to_owned(),
unit: MetricUnit::None,
value: MetricValue::Counter(42.),
timestamp: UnixTimestamp::from_secs(999994711),
tags: BTreeMap::new(),
};

let receiver = TestReceiver::start_default().recipient();
let mut aggregator = Aggregator::new(config, receiver);
let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();

aggregator.insert(project_key, metric.clone()).unwrap();
assert_eq!(
aggregator
.insert(project_key, metric.clone())
.unwrap_err()
.kind,
AggregateMetricsErrorKind::TotalLimitExceeded
);
}

#[test]
fn test_aggregator_cost_enforcement_project() {
let config = AggregatorConfig {
max_project_bucket_bytes: Some(1),
..test_config()
};

let metric = Metric {
name: "c:foo".to_owned(),
unit: MetricUnit::None,
value: MetricValue::Counter(42.),
timestamp: UnixTimestamp::from_secs(999994711),
tags: BTreeMap::new(),
};

let receiver = TestReceiver::start_default().recipient();
let mut aggregator = Aggregator::new(config, receiver);
let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();

aggregator.insert(project_key, metric.clone()).unwrap();
assert_eq!(
aggregator
.insert(project_key, metric.clone())
.unwrap_err()
.kind,
AggregateMetricsErrorKind::ProjectLimitExceeded
);
}
}
18 changes: 15 additions & 3 deletions relay-server/src/actors/healthcheck.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use futures::future;
use futures::prelude::*;

use relay_config::{Config, RelayMode};
use relay_metrics::{AcceptsMetrics, Aggregator};
use relay_statsd::metric;
use relay_system::{Controller, Shutdown};

Expand Down Expand Up @@ -88,12 +89,23 @@ impl Handler<IsHealthy> for Healthcheck {
IsHealthy::Liveness => Box::new(future::ok(true)),
IsHealthy::Readiness => {
if self.is_shutting_down {
Box::new(future::ok(false))
} else if self.config.requires_auth() {
return Box::new(future::ok(false));
}

let is_aggregator_full = Aggregator::from_registry()
.send(AcceptsMetrics)
.map_err(|_| ());
let is_authenticated: Self::Result = if self.config.requires_auth() {
Box::new(upstream.send(IsAuthenticated).map_err(|_| ()))
} else {
Box::new(future::ok(true))
}
};

Box::new(
is_aggregator_full
.join(is_authenticated)
.map(|(a, b)| a && b),
)
jjbayer marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand Down
15 changes: 15 additions & 0 deletions tests/integration/test_healthchecks.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,18 @@ def test_readiness_proxy(mini_sentry, relay):
finally:
# Authentication failures would fail the test
mini_sentry.test_failures.clear()


def test_readiness_depends_on_aggregator_being_full(mini_sentry, relay):
try:
relay = relay(
mini_sentry,
{"aggregator": {"max_total_bucket_bytes": 0}},
wait_healthcheck=False,
)

response = wait_get(relay, "/api/relay/healthcheck/ready/")
assert response.status_code == 503
finally:
# Authentication failures would fail the test
mini_sentry.test_failures.clear()