Skip to content

Commit

Permalink
feat(spooler): Add last_peek value in the priority (#3922)
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo authored Aug 14, 2024
1 parent 9e6f195 commit 5c4de42
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
- Switch glob implementations from `regex` to `regex-lite`. ([#3926](https://github.com/getsentry/relay/pull/3926))
- Extract client sdk from transaction into profiles. ([#3915](https://github.com/getsentry/relay/pull/3915))
- Extract `user.geo.subregion` into span metrics/indexed. ([#3914](https://github.com/getsentry/relay/pull/3914))
- Add `last_peek` field to the `Priority` struct. ([#3922](https://github.com/getsentry/relay/pull/3922))

## 24.7.1

Expand Down
117 changes: 97 additions & 20 deletions relay-server/src/services/buffer/envelope_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,30 @@ where

/// Returns a reference to the next-in-line envelope, if one exists.
pub async fn peek(&mut self) -> Result<Option<&Envelope>, EnvelopeBufferError> {
let Some((
QueueItem {
key: stack_key,
value: _,
},
_,
)) = self.priority_queue.peek()
else {
return Ok(None);
};

let stack_key = *stack_key;

self.priority_queue.change_priority_by(&stack_key, |prio| {
prio.last_peek = Instant::now();
});

let Some((
QueueItem {
key: _,
value: stack,
},
_,
)) = self.priority_queue.peek_mut()
)) = self.priority_queue.get_mut(&stack_key)
else {
return Ok(None);
};
Expand Down Expand Up @@ -329,20 +346,24 @@ impl<K: PartialEq, V> Eq for QueueItem<K, V> {}
struct Priority {
readiness: Readiness,
received_at: Instant,
last_peek: Instant,
}

impl Priority {
fn new(received_at: Instant) -> Self {
Self {
readiness: Readiness::new(),
received_at,
last_peek: Instant::now(),
}
}
}

impl PartialEq for Priority {
fn eq(&self, other: &Self) -> bool {
self.readiness.ready() == other.readiness.ready() && self.received_at == other.received_at
self.readiness.ready() == other.readiness.ready()
&& self.received_at == other.received_at
&& self.last_peek == other.last_peek
}
}

Expand All @@ -357,12 +378,22 @@ impl Eq for Priority {}
impl Ord for Priority {
fn cmp(&self, other: &Self) -> Ordering {
match (self.readiness.ready(), other.readiness.ready()) {
(true, true) => self.received_at.cmp(&other.received_at),
// Assuming that two priorities differ only w.r.t. the `last_peek`, we want to prioritize
// stacks that were the least recently peeked. The rationale behind this is that we want
// to keep cycling through different stacks while peeking.
(true, true) => self
.received_at
.cmp(&other.received_at)
.then(self.last_peek.cmp(&other.last_peek).reverse()),
(true, false) => Ordering::Greater,
(false, true) => Ordering::Less,
// For non-ready stacks, we invert the priority, such that projects that are not
// ready and did not receive envelopes recently can be evicted.
(false, false) => self.received_at.cmp(&other.received_at).reverse(),
(false, false) => self
.received_at
.cmp(&other.received_at)
.reverse()
.then(self.last_peek.cmp(&other.last_peek).reverse()),
}
}
}
Expand All @@ -389,18 +420,22 @@ impl Readiness {
#[cfg(test)]
mod tests {
use std::str::FromStr;

use uuid::Uuid;

use relay_common::Dsn;
use relay_event_schema::protocol::EventId;
use relay_sampling::DynamicSamplingContext;

use crate::envelope::{Item, ItemType};
use crate::extractors::RequestMeta;

use super::*;

fn new_envelope(project_key: ProjectKey, sampling_key: Option<ProjectKey>) -> Box<Envelope> {
fn new_envelope(
project_key: ProjectKey,
sampling_key: Option<ProjectKey>,
event_id: Option<EventId>,
) -> Box<Envelope> {
let mut envelope = Envelope::from_request(
None,
RequestMeta::new(Dsn::from_str(&format!("http://{project_key}@localhost/1")).unwrap()),
Expand All @@ -420,11 +455,14 @@ mod tests {
});
envelope.add_item(Item::new(ItemType::Transaction));
}
if let Some(event_id) = event_id {
envelope.set_event_id(event_id);
}
envelope
}

#[tokio::test]
async fn insert_pop() {
async fn test_insert_pop() {
let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new();

let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
Expand All @@ -434,20 +472,29 @@ mod tests {
assert!(buffer.pop().await.unwrap().is_none());
assert!(buffer.peek().await.unwrap().is_none());

buffer.push(new_envelope(project_key1, None)).await.unwrap();
buffer
.push(new_envelope(project_key1, None, None))
.await
.unwrap();
assert_eq!(
buffer.peek().await.unwrap().unwrap().meta().public_key(),
project_key1
);

buffer.push(new_envelope(project_key2, None)).await.unwrap();
buffer
.push(new_envelope(project_key2, None, None))
.await
.unwrap();
// Both projects are not ready, so project 1 is on top (has the oldest envelopes):
assert_eq!(
buffer.peek().await.unwrap().unwrap().meta().public_key(),
project_key1
);

buffer.push(new_envelope(project_key3, None)).await.unwrap();
buffer
.push(new_envelope(project_key3, None, None))
.await
.unwrap();
// All projects are not ready, so project 1 is on top (has the oldest envelopes):
assert_eq!(
buffer.peek().await.unwrap().unwrap().meta().public_key(),
Expand Down Expand Up @@ -499,14 +546,14 @@ mod tests {
}

#[tokio::test]
async fn project_internal_order() {
async fn test_project_internal_order() {
let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new();

let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();

let envelope1 = new_envelope(project_key, None);
let envelope1 = new_envelope(project_key, None, None);
let instant1 = envelope1.meta().start_time();
let envelope2 = new_envelope(project_key, None);
let envelope2 = new_envelope(project_key, None, None);
let instant2 = envelope2.meta().start_time();

assert!(instant2 > instant1);
Expand All @@ -526,21 +573,21 @@ mod tests {
}

#[tokio::test]
async fn sampling_projects() {
async fn test_sampling_projects() {
let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new();

let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap();

let envelope1 = new_envelope(project_key1, None);
let envelope1 = new_envelope(project_key1, None, None);
let instant1 = envelope1.meta().start_time();
buffer.push(envelope1).await.unwrap();

let envelope2 = new_envelope(project_key2, None);
let envelope2 = new_envelope(project_key2, None, None);
let instant2 = envelope2.meta().start_time();
buffer.push(envelope2).await.unwrap();

let envelope3 = new_envelope(project_key1, Some(project_key2));
let envelope3 = new_envelope(project_key1, Some(project_key2), None);
let instant3 = envelope3.meta().start_time();
buffer.push(envelope3).await.unwrap();

Expand Down Expand Up @@ -596,7 +643,7 @@ mod tests {
}

#[tokio::test]
async fn project_keys_distinct() {
async fn test_project_keys_distinct() {
let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap();

Expand All @@ -607,13 +654,43 @@ mod tests {

let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new();
buffer
.push(new_envelope(project_key1, Some(project_key2)))
.push(new_envelope(project_key1, Some(project_key2), None))
.await
.unwrap();
buffer
.push(new_envelope(project_key2, Some(project_key1)))
.push(new_envelope(project_key2, Some(project_key1), None))
.await
.unwrap();
assert_eq!(buffer.priority_queue.len(), 2);
}

#[tokio::test]
async fn test_last_peek_internal_order() {
let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new();

let project_key_1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
let event_id_1 = EventId::new();
let envelope1 = new_envelope(project_key_1, None, Some(event_id_1));

let project_key_2 = ProjectKey::parse("b56ae32be2584e0bbd7a4cbb95971fed").unwrap();
let event_id_2 = EventId::new();
let mut envelope2 = new_envelope(project_key_2, None, Some(event_id_2));
envelope2.set_start_time(envelope1.meta().start_time());

buffer.push(envelope1).await.unwrap();
buffer.push(envelope2).await.unwrap();

assert_eq!(
buffer.peek().await.unwrap().unwrap().event_id().unwrap(),
event_id_1
);
assert_eq!(
buffer.peek().await.unwrap().unwrap().event_id().unwrap(),
event_id_2
);
assert_eq!(
buffer.peek().await.unwrap().unwrap().event_id().unwrap(),
event_id_1
);
}
}

0 comments on commit 5c4de42

Please sign in to comment.