Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(spooler): Remove use of legacy project cache #4419

Merged
merged 19 commits into from
Jan 8, 2025
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- Remove the `spool` command from Relay. ([#4423](https://github.com/getsentry/relay/pull/4423))
- Bump `sentry-native` to `0.7.17` and remove cross compilation in CI. ([#4427](https://github.com/getsentry/relay/pull/4427))
- Remove `form_data` envelope items from standalone envelopes. ([#4428](https://github.com/getsentry/relay/pull/4428))
- Remove use of legacy project cache. ([#4419](https://github.com/getsentry/relay/pull/4419))

## 24.12.1

Expand Down
34 changes: 2 additions & 32 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::services::metrics::RouterService;
use crate::services::outcome::{OutcomeProducer, OutcomeProducerService, TrackOutcome};
use crate::services::outcome_aggregator::OutcomeAggregator;
use crate::services::processor::{self, EnvelopeProcessor, EnvelopeProcessorService};
use crate::services::projects::cache::{legacy, ProjectCacheHandle, ProjectCacheService};
use crate::services::projects::cache::{ProjectCacheHandle, ProjectCacheService};
use crate::services::projects::source::ProjectSource;
use crate::services::relays::{RelayCache, RelayCacheService};
use crate::services::stats::RelayStats;
Expand All @@ -39,7 +39,6 @@ use relay_redis::AsyncRedisConnection;
#[cfg(feature = "processing")]
use relay_redis::{PooledClient, RedisError, RedisPool, RedisPools, RedisScripts};
use relay_system::{channel, Addr, Service, ServiceRunner};
use tokio::sync::mpsc;

/// Indicates the type of failure of the server.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, thiserror::Error)]
Expand Down Expand Up @@ -68,7 +67,6 @@ pub struct Registry {
pub test_store: Addr<TestStore>,
pub relay_cache: Addr<RelayCache>,
pub global_config: Addr<GlobalConfigManager>,
pub legacy_project_cache: Addr<legacy::ProjectCache>,
pub upstream_relay: Addr<UpstreamRelay>,
pub envelope_buffer: PartitionedEnvelopeBuffer,

Expand Down Expand Up @@ -197,9 +195,6 @@ impl ServiceState {
// service fail if the service is not running.
let global_config = runner.start(global_config);

let (legacy_project_cache, legacy_project_cache_rx) =
channel(legacy::ProjectCacheService::name());

let project_source = ProjectSource::start_in(
&mut runner,
Arc::clone(&config),
Expand Down Expand Up @@ -268,37 +263,18 @@ impl ServiceState {
processor_rx,
);

let (envelopes_tx, envelopes_rx) = mpsc::channel(config.spool_max_backpressure_envelopes());

let envelope_buffer = PartitionedEnvelopeBuffer::create(
config.spool_partitions(),
config.clone(),
memory_stat.clone(),
global_config_rx.clone(),
envelopes_tx.clone(),
project_cache_handle.clone(),
processor.clone(),
outcome_aggregator.clone(),
test_store.clone(),
&mut runner,
);

// Keep all the services in one context.
let project_cache_services = legacy::Services {
envelope_buffer: envelope_buffer.clone(),
envelope_processor: processor.clone(),
outcome_aggregator: outcome_aggregator.clone(),
test_store: test_store.clone(),
};

runner.start_with(
legacy::ProjectCacheService::new(
project_cache_handle.clone(),
project_cache_services,
envelopes_rx,
),
legacy_project_cache_rx,
);

let health_check = runner.start(HealthCheckService::new(
config.clone(),
MemoryChecker::new(memory_stat.clone(), config.clone()),
Expand Down Expand Up @@ -328,7 +304,6 @@ impl ServiceState {
test_store,
relay_cache,
global_config,
legacy_project_cache,
project_cache_handle,
upstream_relay,
envelope_buffer,
Expand Down Expand Up @@ -365,11 +340,6 @@ impl ServiceState {
self.inner.registry.envelope_buffer.buffer(project_key_pair)
}

/// Returns the address of the [`legacy::ProjectCache`] service.
pub fn legacy_project_cache(&self) -> &Addr<legacy::ProjectCache> {
&self.inner.registry.legacy_project_cache
}

/// Returns a [`ProjectCacheHandle`].
pub fn project_cache_handle(&self) -> &ProjectCacheHandle {
&self.inner.registry.project_cache_handle
Expand Down
77 changes: 77 additions & 0 deletions relay-server/src/services/buffer/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@ impl ProjectKeyPair {
}
}

pub fn has_distinct_sampling_key(&self) -> bool {
self.own_key != self.sampling_key
}

pub fn from_envelope(envelope: &Envelope) -> Self {
let own_key = envelope.meta().public_key();
let sampling_key = envelope.sampling_key().unwrap_or(own_key);

Self::new(own_key, sampling_key)
}

Expand All @@ -28,6 +33,78 @@ impl ProjectKeyPair {
own_key,
sampling_key,
} = self;

std::iter::once(*own_key).chain((own_key != sampling_key).then_some(*sampling_key))
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashSet;

#[test]
fn test_project_key_pair_new() {
let own = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let sampling = ProjectKey::parse("b94ae32be2584e0bbd7a4cbb95971fee").unwrap();

let pair = ProjectKeyPair::new(own, sampling);
assert_eq!(pair.own_key, own);
assert_eq!(pair.sampling_key, sampling);
}

#[test]
fn test_project_key_pair_equality() {
let key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let key2 = ProjectKey::parse("b94ae32be2584e0bbd7a4cbb95971fee").unwrap();

let pair1 = ProjectKeyPair::new(key1, key2);
let pair2 = ProjectKeyPair::new(key1, key2);
let pair3 = ProjectKeyPair::new(key2, key1);

assert_eq!(pair1, pair2);
assert_ne!(pair1, pair3);
}

#[test]
fn test_project_key_pair_ordering() {
let key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let key2 = ProjectKey::parse("b94ae32be2584e0bbd7a4cbb95971fee").unwrap();

let pair1 = ProjectKeyPair::new(key1, key2);
let pair2 = ProjectKeyPair::new(key2, key1);

assert!(pair1 < pair2);
}

#[test]
fn test_project_key_pair_hash() {
let key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let key2 = ProjectKey::parse("b94ae32be2584e0bbd7a4cbb95971fee").unwrap();

let pair1 = ProjectKeyPair::new(key1, key2);
let pair2 = ProjectKeyPair::new(key1, key2);
let pair3 = ProjectKeyPair::new(key2, key1);

let mut set = HashSet::new();
set.insert(pair1);
assert!(set.contains(&pair2));
assert!(!set.contains(&pair3));
}

#[test]
fn test_project_key_pair_iter() {
let key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let key2 = ProjectKey::parse("b94ae32be2584e0bbd7a4cbb95971fee").unwrap();

// Test with different sampling key
let pair = ProjectKeyPair::new(key1, key2);
let keys: Vec<_> = pair.iter().collect();
assert_eq!(keys, vec![key1, key2]);

// Test with same key (should only yield one key)
let pair = ProjectKeyPair::new(key1, key1);
let keys: Vec<_> = pair.iter().collect();
assert_eq!(keys, vec![key1]);
}
}
10 changes: 8 additions & 2 deletions relay-server/src/services/buffer/envelope_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,10 @@ where

Ok(match (stack.peek().await?, ready) {
(None, _) => Peek::Empty,
(Some(last_received_at), true) => Peek::Ready { last_received_at },
(Some(last_received_at), true) => Peek::Ready {
project_key_pair: *project_key_pair,
last_received_at,
},
(Some(last_received_at), false) => Peek::NotReady {
project_key_pair: *project_key_pair,
next_project_fetch: *next_project_fetch,
Expand Down Expand Up @@ -561,6 +564,7 @@ where
pub enum Peek {
Empty,
Ready {
project_key_pair: ProjectKeyPair,
last_received_at: DateTime<Utc>,
},
NotReady {
Expand All @@ -574,7 +578,9 @@ impl Peek {
pub fn last_received_at(&self) -> Option<DateTime<Utc>> {
match self {
Self::Empty => None,
Self::Ready { last_received_at }
Self::Ready {
last_received_at, ..
}
| Self::NotReady {
last_received_at, ..
} => Some(*last_received_at),
Expand Down
Loading
Loading