-
Notifications
You must be signed in to change notification settings - Fork 93
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feat(outcomes): Aggregate client reports [INGEST-247] #1118
Conversation
}; | ||
|
||
relay_log::trace!("Flushing outcome for timestamp {}", timestamp); | ||
outcome_producer.do_send(outcome).ok(); // TODO: should we handle send errors here? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we send messages in a loop. Could this overflow the outcome producer's mailbox? Should we send a batch of outcomes to the producer instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Jan told me that this bypasses the inbox
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, thank you. The docs on Recipient
are actually very clear on this:
https://docs.rs/actix/0.7.9/actix/struct.Recipient.html#method.do_send
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think this does some unnecessary allocations on hashmap but otherwise it should be fine.
/// Mapping from offset to bucket key to quantity. timestamp = offset * bucket_interval | ||
buckets: BTreeMap<u64, HashMap<BucketKey, u32>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not put the timestamp into the bucket key and avoid this nesting and extra allocation? Together with split_off
optimization this should make the flush quite fast as long as the timestamp is declared on top of BucketKey
(see documentation for derive(PartialOrd)
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with untitaker, I would use a single bucket_key -> quantity map (with the timesamp % bucket_interval
embedded in the bucket_key)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did as you suggested. Had to tweak the BucketKey
type a bit in order to call split_off
with a key containing only an offset.
let max_offset = (UnixTimestamp::now().as_secs() - self.flush_delay) / self.bucket_interval; | ||
let bucket_interval = self.bucket_interval; | ||
let outcome_producer = self.outcome_producer.clone(); | ||
self.buckets.retain(|offset, mapping| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you could probably use split_off here
Looks good. |
relay-config/src/config.rs
Outdated
@@ -946,6 +950,8 @@ impl Default for Outcomes { | |||
batch_size: 1000, | |||
batch_interval: 500, | |||
source: None, | |||
bucket_interval: 60, | |||
flush_delay: 30, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we flush every 30 seconds if the buckets are of 60 seconds ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flush_delay
is the grace period during which outcomes may still be submitted to a bucket after its time window has passed.
let outcome_producer = self.outcome_producer.clone(); | ||
self.buckets.retain(|offset, mapping| { | ||
if offset <= &max_offset { | ||
for (bucket_key, quantity) in mapping.drain() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would not bother with this, I would just flush everything every flush_delay
.
I understand that you don't want to send twice buckets that are not full yet but if we make the
flush_delay >= bucket_internal (which I think we should) that would not be a big problem, we
may send buckets more than once anyway (for outcmoes that come late).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I must admit I did not even think of this option, and I like the simplicity. Should have read this comment before implementing the split_off
:)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@untitaker just to get an additional opinion, what do you think about the suggestion of simply flushing all the buckets on every flush?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, simplified to use a single hashmap that gets flushed unconditionally every two minutes.
/// Mapping from offset to bucket key to quantity. timestamp = offset * bucket_interval | ||
buckets: BTreeMap<u64, HashMap<BucketKey, u32>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with untitaker, I would use a single bucket_key -> quantity map (with the timesamp % bucket_interval
embedded in the bucket_key)
/// The number of seconds between flushes of all buckets | ||
flush_interval: u64, | ||
/// Mapping from bucket key to quantity. | ||
buckets: HashMap<BucketKey, u32>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we use a hashmap here? (and also in the metrics aggregator I guess)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
recap from slack: probably no strong reason one way or the other right now
Add the option to emit outcomes as client reports. This is now the default behavior of non-processing relays. - Dynamic sampling config is now propagated to untrusted relays, allowing them to apply sampling rules. - The emit_outcomes config flag can have three states: true, false, or "client reports". - Every created TrackOutcome is sent to the outcome aggregator instead of the outcome producer. - If configured to emit outcomes as client reports, the outcome aggregator erases event_id and remote_addr from the outcome. - If an outcome still has an event_id after step 3, it is forwarded to the configured producer without aggregating. - Else, the outcome is aggregated as in Feat(outcomes): Aggregate client reports [INGEST-247] #1118. - Finally, the outcome producer converts the aggregated outcome to a client report and sends it to the upstream as an envelope.
Aggregate outcomes generated by client reports before sending them on.
The key for aggregation buckets consists of
event_id
isNone
for client reports).The value of an aggregated bucket is the outcome quantity.
This PR prepares for INGEST-247. It does not yet implement collecting outcomes in external relays.