diff --git a/CHANGELOG.md b/CHANGELOG.md index 7e4888d080..8dd0ed810c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ - Remove the `spool` command from Relay. ([#4423](https://github.com/getsentry/relay/pull/4423)) - Bump `sentry-native` to `0.7.17` and remove cross compilation in CI. ([#4427](https://github.com/getsentry/relay/pull/4427)) - Remove `form_data` envelope items from standalone envelopes. ([#4428](https://github.com/getsentry/relay/pull/4428)) +- Remove use of legacy project cache. ([#4419](https://github.com/getsentry/relay/pull/4419)) ## 24.12.1 diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index cfcc9a759d..4f0606d887 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -13,7 +13,7 @@ use crate::services::metrics::RouterService; use crate::services::outcome::{OutcomeProducer, OutcomeProducerService, TrackOutcome}; use crate::services::outcome_aggregator::OutcomeAggregator; use crate::services::processor::{self, EnvelopeProcessor, EnvelopeProcessorService}; -use crate::services::projects::cache::{legacy, ProjectCacheHandle, ProjectCacheService}; +use crate::services::projects::cache::{ProjectCacheHandle, ProjectCacheService}; use crate::services::projects::source::ProjectSource; use crate::services::relays::{RelayCache, RelayCacheService}; use crate::services::stats::RelayStats; @@ -39,7 +39,6 @@ use relay_redis::AsyncRedisConnection; #[cfg(feature = "processing")] use relay_redis::{PooledClient, RedisError, RedisPool, RedisPools, RedisScripts}; use relay_system::{channel, Addr, Service, ServiceRunner}; -use tokio::sync::mpsc; /// Indicates the type of failure of the server. #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, thiserror::Error)] @@ -68,7 +67,6 @@ pub struct Registry { pub test_store: Addr, pub relay_cache: Addr, pub global_config: Addr, - pub legacy_project_cache: Addr, pub upstream_relay: Addr, pub envelope_buffer: PartitionedEnvelopeBuffer, @@ -197,9 +195,6 @@ impl ServiceState { // service fail if the service is not running. let global_config = runner.start(global_config); - let (legacy_project_cache, legacy_project_cache_rx) = - channel(legacy::ProjectCacheService::name()); - let project_source = ProjectSource::start_in( &mut runner, Arc::clone(&config), @@ -268,37 +263,18 @@ impl ServiceState { processor_rx, ); - let (envelopes_tx, envelopes_rx) = mpsc::channel(config.spool_max_backpressure_envelopes()); - let envelope_buffer = PartitionedEnvelopeBuffer::create( config.spool_partitions(), config.clone(), memory_stat.clone(), global_config_rx.clone(), - envelopes_tx.clone(), project_cache_handle.clone(), + processor.clone(), outcome_aggregator.clone(), test_store.clone(), &mut runner, ); - // Keep all the services in one context. - let project_cache_services = legacy::Services { - envelope_buffer: envelope_buffer.clone(), - envelope_processor: processor.clone(), - outcome_aggregator: outcome_aggregator.clone(), - test_store: test_store.clone(), - }; - - runner.start_with( - legacy::ProjectCacheService::new( - project_cache_handle.clone(), - project_cache_services, - envelopes_rx, - ), - legacy_project_cache_rx, - ); - let health_check = runner.start(HealthCheckService::new( config.clone(), MemoryChecker::new(memory_stat.clone(), config.clone()), @@ -328,7 +304,6 @@ impl ServiceState { test_store, relay_cache, global_config, - legacy_project_cache, project_cache_handle, upstream_relay, envelope_buffer, @@ -365,11 +340,6 @@ impl ServiceState { self.inner.registry.envelope_buffer.buffer(project_key_pair) } - /// Returns the address of the [`legacy::ProjectCache`] service. - pub fn legacy_project_cache(&self) -> &Addr { - &self.inner.registry.legacy_project_cache - } - /// Returns a [`ProjectCacheHandle`]. pub fn project_cache_handle(&self) -> &ProjectCacheHandle { &self.inner.registry.project_cache_handle diff --git a/relay-server/src/services/buffer/common.rs b/relay-server/src/services/buffer/common.rs index 924f8aa1c8..b213fa40f9 100644 --- a/relay-server/src/services/buffer/common.rs +++ b/relay-server/src/services/buffer/common.rs @@ -17,9 +17,14 @@ impl ProjectKeyPair { } } + pub fn has_distinct_sampling_key(&self) -> bool { + self.own_key != self.sampling_key + } + pub fn from_envelope(envelope: &Envelope) -> Self { let own_key = envelope.meta().public_key(); let sampling_key = envelope.sampling_key().unwrap_or(own_key); + Self::new(own_key, sampling_key) } @@ -28,6 +33,78 @@ impl ProjectKeyPair { own_key, sampling_key, } = self; + std::iter::once(*own_key).chain((own_key != sampling_key).then_some(*sampling_key)) } } + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::HashSet; + + #[test] + fn test_project_key_pair_new() { + let own = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); + let sampling = ProjectKey::parse("b94ae32be2584e0bbd7a4cbb95971fee").unwrap(); + + let pair = ProjectKeyPair::new(own, sampling); + assert_eq!(pair.own_key, own); + assert_eq!(pair.sampling_key, sampling); + } + + #[test] + fn test_project_key_pair_equality() { + let key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); + let key2 = ProjectKey::parse("b94ae32be2584e0bbd7a4cbb95971fee").unwrap(); + + let pair1 = ProjectKeyPair::new(key1, key2); + let pair2 = ProjectKeyPair::new(key1, key2); + let pair3 = ProjectKeyPair::new(key2, key1); + + assert_eq!(pair1, pair2); + assert_ne!(pair1, pair3); + } + + #[test] + fn test_project_key_pair_ordering() { + let key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); + let key2 = ProjectKey::parse("b94ae32be2584e0bbd7a4cbb95971fee").unwrap(); + + let pair1 = ProjectKeyPair::new(key1, key2); + let pair2 = ProjectKeyPair::new(key2, key1); + + assert!(pair1 < pair2); + } + + #[test] + fn test_project_key_pair_hash() { + let key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); + let key2 = ProjectKey::parse("b94ae32be2584e0bbd7a4cbb95971fee").unwrap(); + + let pair1 = ProjectKeyPair::new(key1, key2); + let pair2 = ProjectKeyPair::new(key1, key2); + let pair3 = ProjectKeyPair::new(key2, key1); + + let mut set = HashSet::new(); + set.insert(pair1); + assert!(set.contains(&pair2)); + assert!(!set.contains(&pair3)); + } + + #[test] + fn test_project_key_pair_iter() { + let key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); + let key2 = ProjectKey::parse("b94ae32be2584e0bbd7a4cbb95971fee").unwrap(); + + // Test with different sampling key + let pair = ProjectKeyPair::new(key1, key2); + let keys: Vec<_> = pair.iter().collect(); + assert_eq!(keys, vec![key1, key2]); + + // Test with same key (should only yield one key) + let pair = ProjectKeyPair::new(key1, key1); + let keys: Vec<_> = pair.iter().collect(); + assert_eq!(keys, vec![key1]); + } +} diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index e11cafbd60..b7591f743b 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -345,7 +345,10 @@ where Ok(match (stack.peek().await?, ready) { (None, _) => Peek::Empty, - (Some(last_received_at), true) => Peek::Ready { last_received_at }, + (Some(last_received_at), true) => Peek::Ready { + project_key_pair: *project_key_pair, + last_received_at, + }, (Some(last_received_at), false) => Peek::NotReady { project_key_pair: *project_key_pair, next_project_fetch: *next_project_fetch, @@ -561,6 +564,7 @@ where pub enum Peek { Empty, Ready { + project_key_pair: ProjectKeyPair, last_received_at: DateTime, }, NotReady { @@ -574,7 +578,9 @@ impl Peek { pub fn last_received_at(&self) -> Option> { match self { Self::Empty => None, - Self::Ready { last_received_at } + Self::Ready { + last_received_at, .. + } | Self::NotReady { last_received_at, .. } => Some(*last_received_at), diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index 8a4dd53e54..96ea84fb70 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -10,13 +10,11 @@ use std::time::Duration; use ahash::RandomState; use chrono::DateTime; use chrono::Utc; -use relay_base_schema::project::ProjectKey; use relay_config::Config; use relay_system::{Addr, FromMessage, Interface, NoResponse, Service}; use relay_system::{Controller, Shutdown}; use relay_system::{Receiver, ServiceRunner}; -use tokio::sync::mpsc::Permit; -use tokio::sync::{mpsc, watch}; +use tokio::sync::watch; use tokio::time::{timeout, Instant}; use crate::envelope::Envelope; @@ -25,11 +23,11 @@ use crate::services::global_config; use crate::services::outcome::DiscardReason; use crate::services::outcome::Outcome; use crate::services::outcome::TrackOutcome; -use crate::services::processor::ProcessingGroup; -use crate::services::projects::cache::{legacy, ProjectCacheHandle, ProjectChange}; +use crate::services::processor::{EnvelopeProcessor, ProcessEnvelope, ProcessingGroup}; +use crate::services::projects::cache::{CheckedEnvelope, ProjectCacheHandle, ProjectChange}; use crate::services::test_store::TestStore; +use crate::statsd::RelayCounters; use crate::statsd::RelayTimers; -use crate::statsd::{RelayCounters, RelayHistograms}; use crate::utils::ManagedEnvelope; use crate::MemoryChecker; use crate::MemoryStat; @@ -45,6 +43,7 @@ pub use envelope_stack::EnvelopeStack; // pub for benchmarks pub use envelope_store::sqlite::SqliteEnvelopeStore; +use crate::services::projects::project::ProjectState; pub use common::ProjectKeyPair; mod common; @@ -62,18 +61,12 @@ const PARTITIONING_HASHING_SEED: usize = 0; pub enum EnvelopeBuffer { /// A fresh envelope that gets pushed into the buffer by the request handler. Push(Box), - /// Informs the service that a project has no valid project state and must be marked as not ready. - /// - /// This happens when an envelope was sent to the project cache, but one of the necessary project - /// state has expired. The envelope is pushed back into the envelope buffer. - NotReady(ProjectKey, Box), } impl EnvelopeBuffer { fn name(&self) -> &'static str { match &self { EnvelopeBuffer::Push(_) => "push", - EnvelopeBuffer::NotReady(..) => "project_not_ready", } } } @@ -104,8 +97,8 @@ impl PartitionedEnvelopeBuffer { config: Arc, memory_stat: MemoryStat, global_config_rx: watch::Receiver, - envelopes_tx: mpsc::Sender, project_cache_handle: ProjectCacheHandle, + envelope_processor: Addr, outcome_aggregator: Addr, test_store: Addr, runner: &mut ServiceRunner, @@ -118,8 +111,8 @@ impl PartitionedEnvelopeBuffer { memory_stat.clone(), global_config_rx.clone(), Services { - envelopes_tx: envelopes_tx.clone(), project_cache_handle: project_cache_handle.clone(), + envelope_processor: envelope_processor.clone(), outcome_aggregator: outcome_aggregator.clone(), test_store: test_store.clone(), }, @@ -189,8 +182,8 @@ impl ObservableEnvelopeBuffer { pub struct Services { /// Bounded channel used exclusively to handle backpressure when sending envelopes to the /// project cache. - pub envelopes_tx: mpsc::Sender, pub project_cache_handle: ProjectCacheHandle, + pub envelope_processor: Addr, pub outcome_aggregator: Addr, pub test_store: Addr, } @@ -242,18 +235,12 @@ impl EnvelopeBufferService { } /// Wait for the configured amount of time and make sure the project cache is ready to receive. - async fn ready_to_pop( - &mut self, - buffer: &PolymorphicEnvelopeBuffer, - dequeue: bool, - ) -> Option> { + async fn ready_to_pop(&mut self, buffer: &PolymorphicEnvelopeBuffer, dequeue: bool) { self.system_ready(buffer, dequeue).await; if self.sleep > Duration::ZERO { tokio::time::sleep(self.sleep).await; } - - self.services.envelopes_tx.reserve().await.ok() } /// Waits until preconditions for unspooling are met. @@ -286,7 +273,6 @@ impl EnvelopeBufferService { config: &Config, buffer: &mut PolymorphicEnvelopeBuffer, services: &Services, - envelopes_tx_permit: Permit<'a, legacy::DequeuedEnvelope>, ) -> Result { let sleep = match buffer.peek().await? { Peek::Empty => { @@ -298,7 +284,9 @@ impl EnvelopeBufferService { DEFAULT_SLEEP // wait for reset by `handle_message`. } - Peek::Ready { last_received_at } + Peek::Ready { + last_received_at, .. + } | Peek::NotReady { last_received_at, .. } if is_expired(last_received_at, config) => { @@ -316,18 +304,17 @@ impl EnvelopeBufferService { Duration::ZERO // try next pop immediately } - Peek::Ready { .. } => { - relay_log::trace!("EnvelopeBufferService: popping envelope"); + Peek::Ready { + project_key_pair, .. + } => { + relay_log::trace!("EnvelopeBufferService: project(s) of envelope ready"); relay_statsd::metric!( counter(RelayCounters::BufferTryPop) += 1, peek_result = "ready", partition_id = partition_tag ); - let envelope = buffer - .pop() - .await? - .expect("Element disappeared despite exclusive excess"); - envelopes_tx_permit.send(legacy::DequeuedEnvelope(envelope)); + + Self::pop_and_forward(partition_tag, services, buffer, project_key_pair).await?; Duration::ZERO // try next pop immediately } @@ -349,10 +336,8 @@ impl EnvelopeBufferService { if Instant::now() >= next_project_fetch { relay_log::trace!("EnvelopeBufferService: requesting project(s) update"); - let ProjectKeyPair { - own_key, - sampling_key, - } = project_key_pair; + let own_key = project_key_pair.own_key; + let sampling_key = project_key_pair.sampling_key; services.project_cache_handle.fetch(own_key); if sampling_key != own_key { @@ -381,12 +366,7 @@ impl EnvelopeBufferService { managed_envelope.reject(Outcome::Invalid(DiscardReason::Timestamp)); } - async fn handle_message( - partition_tag: &str, - buffer: &mut PolymorphicEnvelopeBuffer, - services: &Services, - message: EnvelopeBuffer, - ) { + async fn handle_message(buffer: &mut PolymorphicEnvelopeBuffer, message: EnvelopeBuffer) { match message { EnvelopeBuffer::Push(envelope) => { // NOTE: This function assumes that a project state update for the relevant @@ -396,19 +376,6 @@ impl EnvelopeBufferService { relay_log::trace!("EnvelopeBufferService: received push message"); Self::push(buffer, envelope).await; } - EnvelopeBuffer::NotReady(project_key, envelope) => { - relay_log::trace!( - "EnvelopeBufferService: received project not ready message for project key {}", - &project_key - ); - relay_statsd::metric!( - counter(RelayCounters::BufferEnvelopesReturned) += 1, - partition_id = partition_tag - ); - Self::push(buffer, envelope).await; - let project = services.project_cache_handle.get(project_key); - buffer.mark_ready(&project_key, !project.state().is_pending()); - } }; } @@ -443,6 +410,106 @@ impl EnvelopeBufferService { } } + async fn pop_and_forward<'a>( + partition_tag: &str, + services: &Services, + buffer: &mut PolymorphicEnvelopeBuffer, + project_key_pair: ProjectKeyPair, + ) -> Result<(), EnvelopeBufferError> { + let own_key = project_key_pair.own_key; + let own_project = services.project_cache_handle.get(own_key); + // We try to load the own project state and bail in case it's pending. + let own_project_info = match own_project.state() { + ProjectState::Enabled(info) => Some(info.clone()), + ProjectState::Disabled => None, + ProjectState::Pending => { + buffer.mark_ready(&own_key, false); + relay_statsd::metric!( + counter(RelayCounters::BufferProjectPending) += 1, + partition_id = &partition_tag + ); + + return Ok(()); + } + }; + + let sampling_key = project_key_pair.sampling_key; + // If the projects are different, we load the project key of the sampling project. On the + // other hand, if they are the same, we just reuse the own project. + let sampling_project_info = if project_key_pair.has_distinct_sampling_key() { + // We try to load the sampling project state and bail in case it's pending. + match services.project_cache_handle.get(sampling_key).state() { + ProjectState::Enabled(info) => Some(info.clone()), + ProjectState::Disabled => None, + ProjectState::Pending => { + buffer.mark_ready(&sampling_key, false); + relay_statsd::metric!( + counter(RelayCounters::BufferProjectPending) += 1, + partition_id = &partition_tag + ); + + return Ok(()); + } + } + } else { + own_project_info.clone() + }; + + relay_log::trace!("EnvelopeBufferService: popping envelope"); + + // If we arrived here, know that both projects are available, so we pop the envelope. + let envelope = buffer + .pop() + .await? + .expect("Element disappeared despite exclusive excess"); + + // If the own project state is disabled, we want to drop the envelope and early return since + // we can't do much about it. + let Some(own_project_info) = own_project_info else { + let mut managed_envelope = ManagedEnvelope::new( + envelope, + services.outcome_aggregator.clone(), + services.test_store.clone(), + ProcessingGroup::Ungrouped, + ); + managed_envelope.reject(Outcome::Invalid(DiscardReason::ProjectId)); + + return Ok(()); + }; + + // We only extract the sampling project info if both projects belong to the same org. + let sampling_project_info = sampling_project_info + .filter(|info| info.organization_id == own_project_info.organization_id); + + for (group, envelope) in ProcessingGroup::split_envelope(*envelope) { + let managed_envelope = ManagedEnvelope::new( + envelope, + services.outcome_aggregator.clone(), + services.test_store.clone(), + group, + ); + + let Ok(CheckedEnvelope { + envelope: Some(managed_envelope), + .. + }) = own_project.check_envelope(managed_envelope) + else { + continue; // Outcomes are emitted by `check_envelope`. + }; + + let reservoir_counters = own_project.reservoir_counters().clone(); + services.envelope_processor.send(ProcessEnvelope { + envelope: managed_envelope, + project_info: own_project_info.clone(), + rate_limits: own_project.rate_limits().current_limits(), + sampling_project_info: sampling_project_info.clone(), + reservoir_counters, + }); + } + + Ok(()) + } + fn update_observable_state(&self, buffer: &mut PolymorphicEnvelopeBuffer) { self.has_capacity .store(buffer.has_capacity(), Ordering::Relaxed); @@ -495,13 +562,6 @@ impl Service for EnvelopeBufferService { relay_log::info!("EnvelopeBufferService {}: starting", self.partition_id); loop { - let used_capacity = - self.services.envelopes_tx.max_capacity() - self.services.envelopes_tx.capacity(); - relay_statsd::metric!( - histogram(RelayHistograms::BufferBackpressureEnvelopesCount) = used_capacity as u64, - partition_id = &partition_tag, - ); - let mut sleep = DEFAULT_SLEEP; let start = Instant::now(); tokio::select! { @@ -509,10 +569,10 @@ impl Service for EnvelopeBufferService { // On the one hand, we might want to prioritize dequeuing over enqueuing // so we do not exceed the buffer capacity by starving the dequeue. // on the other hand, prioritizing old messages violates the LIFO design. - Some(permit) = self.ready_to_pop(&buffer, dequeue.load(Ordering::Relaxed)) => { + _ = self.ready_to_pop(&buffer, dequeue.load(Ordering::Relaxed)) => { relay_statsd::metric!(timer(RelayTimers::BufferIdle) = start.elapsed(), input = "pop", partition_id = &partition_tag); relay_statsd::metric!(timer(RelayTimers::BufferBusy), input = "pop", partition_id = &partition_tag, { - match Self::try_pop(&partition_tag, &config, &mut buffer, &services, permit).await { + match Self::try_pop(&partition_tag, &config, &mut buffer, &services).await { Ok(new_sleep) => { sleep = new_sleep; } @@ -527,9 +587,16 @@ impl Service for EnvelopeBufferService { change = project_changes.recv() => { relay_statsd::metric!(timer(RelayTimers::BufferIdle) = start.elapsed(), input = "project_change", partition_id = &partition_tag); relay_statsd::metric!(timer(RelayTimers::BufferBusy), input = "project_change", partition_id = &partition_tag, { - if let Ok(ProjectChange::Ready(project_key)) = change { - buffer.mark_ready(&project_key, true); - } + match change { + Ok(ProjectChange::Ready(project_key)) => { + buffer.mark_ready(&project_key, true); + }, + Ok(ProjectChange::Evicted(project_key)) => { + buffer.mark_ready(&project_key, false); + }, + _ => {} + }; + relay_statsd::metric!(counter(RelayCounters::BufferProjectChangedEvent) += 1, partition_id = &partition_tag); sleep = Duration::ZERO; }); } @@ -537,7 +604,7 @@ impl Service for EnvelopeBufferService { relay_statsd::metric!(timer(RelayTimers::BufferIdle) = start.elapsed(), input = "handle_message", partition_id = &partition_tag); let message_name = message.name(); relay_statsd::metric!(timer(RelayTimers::BufferBusy), input = message_name, partition_id = &partition_tag, { - Self::handle_message(&partition_tag, &mut buffer, &services, message).await; + Self::handle_message(&mut buffer, message).await; sleep = Duration::ZERO; }); } @@ -569,23 +636,23 @@ impl Service for EnvelopeBufferService { #[cfg(test)] mod tests { + use crate::services::projects::project::{ProjectInfo, ProjectState}; + use crate::testutils::new_envelope; + use crate::MemoryStat; use chrono::Utc; + use relay_base_schema::project::ProjectKey; use relay_dynamic_config::GlobalConfig; use relay_quotas::DataCategory; use std::time::Duration; use tokio::sync::mpsc; use uuid::Uuid; - use crate::services::projects::project::ProjectState; - use crate::testutils::new_envelope; - use crate::MemoryStat; - use super::*; struct EnvelopeBufferServiceResult { service: EnvelopeBufferService, global_tx: watch::Sender, - envelopes_rx: mpsc::Receiver, + envelope_processor_rx: mpsc::UnboundedReceiver, project_cache_handle: ProjectCacheHandle, outcome_aggregator_rx: mpsc::UnboundedReceiver, } @@ -602,9 +669,9 @@ mod tests { let memory_stat = MemoryStat::default(); let (global_tx, global_rx) = watch::channel(global_config_status); - let (envelopes_tx, envelopes_rx) = mpsc::channel(5); let (outcome_aggregator, outcome_aggregator_rx) = Addr::custom(); let project_cache_handle = ProjectCacheHandle::for_test(); + let (envelope_processor, envelope_processor_rx) = Addr::custom(); let envelope_buffer_service = EnvelopeBufferService::new( 0, @@ -612,8 +679,8 @@ mod tests { memory_stat, global_rx, Services { - envelopes_tx, project_cache_handle: project_cache_handle.clone(), + envelope_processor, outcome_aggregator, test_store: Addr::dummy(), }, @@ -622,7 +689,7 @@ mod tests { EnvelopeBufferServiceResult { service: envelope_buffer_service, global_tx, - envelopes_rx, + envelope_processor_rx, project_cache_handle, outcome_aggregator_rx, } @@ -633,7 +700,7 @@ mod tests { let EnvelopeBufferServiceResult { service, global_tx: _global_tx, - envelopes_rx: _envelopes_rx, + envelope_processor_rx: _envelope_processor_rx, project_cache_handle, outcome_aggregator_rx: _outcome_aggregator_rx, } = envelope_buffer_service(None, global_config::Status::Pending); @@ -655,11 +722,11 @@ mod tests { } #[tokio::test(start_paused = true)] - async fn pop_requires_global_config() { + async fn pop_with_global_config_changes() { let EnvelopeBufferServiceResult { service, global_tx, - envelopes_rx, + envelope_processor_rx, project_cache_handle, outcome_aggregator_rx: _outcome_aggregator_rx, .. @@ -669,12 +736,14 @@ mod tests { let envelope = new_envelope(false, "foo"); let project_key = envelope.meta().public_key(); + let project_info = Arc::new(ProjectInfo::default()); + project_cache_handle + .test_set_project_state(project_key, ProjectState::Enabled(project_info)); addr.send(EnvelopeBuffer::Push(envelope.clone())); - project_cache_handle.test_set_project_state(project_key, ProjectState::Disabled); tokio::time::sleep(Duration::from_millis(1000)).await; - assert_eq!(envelopes_rx.len(), 0); + assert_eq!(envelope_processor_rx.len(), 0); global_tx.send_replace(global_config::Status::Ready(Arc::new( GlobalConfig::default(), @@ -682,14 +751,58 @@ mod tests { tokio::time::sleep(Duration::from_millis(1000)).await; - assert_eq!(envelopes_rx.len(), 1); + assert_eq!(envelope_processor_rx.len(), 1); + } + + #[tokio::test(start_paused = true)] + async fn pop_with_project_state_changes() { + let EnvelopeBufferServiceResult { + service, + global_tx: _global_tx, + mut envelope_processor_rx, + project_cache_handle, + outcome_aggregator_rx: _outcome_aggregator_rx, + .. + } = envelope_buffer_service( + None, + global_config::Status::Ready(Arc::new(GlobalConfig::default())), + ); + + let addr = service.start_detached(); + + let envelope = new_envelope(false, "foo"); + let project_key = envelope.meta().public_key(); + project_cache_handle.test_set_project_state(project_key, ProjectState::Pending); + addr.send(EnvelopeBuffer::Push(envelope.clone())); + + tokio::time::sleep(Duration::from_millis(1000)).await; + + assert_eq!(envelope_processor_rx.len(), 0); + + let project_info = Arc::new(ProjectInfo::default()); + project_cache_handle + .test_set_project_state(project_key, ProjectState::Enabled(project_info)); + + tokio::time::sleep(Duration::from_millis(1000)).await; + + assert_eq!(envelope_processor_rx.len(), 1); + assert!(envelope_processor_rx.recv().await.is_some()); + + let envelope = new_envelope(false, "foo"); + let project_key = envelope.meta().public_key(); + project_cache_handle.test_set_project_state(project_key, ProjectState::Disabled); + addr.send(EnvelopeBuffer::Push(envelope.clone())); + + tokio::time::sleep(Duration::from_millis(1000)).await; + + assert_eq!(envelope_processor_rx.len(), 0); } #[tokio::test(start_paused = true)] async fn pop_requires_memory_capacity() { let EnvelopeBufferServiceResult { service, - envelopes_rx, + envelope_processor_rx, project_cache_handle, outcome_aggregator_rx: _outcome_aggregator_rx, global_tx: _global_tx, @@ -717,14 +830,14 @@ mod tests { tokio::time::sleep(Duration::from_millis(1000)).await; - assert_eq!(envelopes_rx.len(), 0); + assert_eq!(envelope_processor_rx.len(), 0); } #[tokio::test(start_paused = true)] async fn old_envelope_is_dropped() { let EnvelopeBufferServiceResult { service, - envelopes_rx, + envelope_processor_rx, project_cache_handle: _project_cache_handle, mut outcome_aggregator_rx, global_tx: _global_tx, @@ -753,110 +866,26 @@ mod tests { tokio::time::sleep(Duration::from_millis(100)).await; - assert_eq!(envelopes_rx.len(), 0); + assert_eq!(envelope_processor_rx.len(), 0); let outcome = outcome_aggregator_rx.try_recv().unwrap(); assert_eq!(outcome.category, DataCategory::TransactionIndexed); assert_eq!(outcome.quantity, 1); } - #[tokio::test(start_paused = true)] - async fn test_update_project() { - let EnvelopeBufferServiceResult { - service, - mut envelopes_rx, - project_cache_handle, - global_tx: _global_tx, - outcome_aggregator_rx: _outcome_aggregator_rx, - } = envelope_buffer_service( - None, - global_config::Status::Ready(Arc::new(GlobalConfig::default())), - ); - - let addr = service.start_detached(); - - let envelope = new_envelope(false, "foo"); - let project_key = envelope.meta().public_key(); - - tokio::time::sleep(Duration::from_secs(1)).await; - - addr.send(EnvelopeBuffer::Push(envelope.clone())); - tokio::time::sleep(Duration::from_secs(3)).await; - - let legacy::DequeuedEnvelope(envelope) = envelopes_rx.recv().await.unwrap(); - - addr.send(EnvelopeBuffer::NotReady(project_key, envelope)); - - tokio::time::sleep(Duration::from_millis(200)).await; - assert_eq!(project_cache_handle.test_num_fetches(), 2); - - tokio::time::sleep(Duration::from_millis(1300)).await; - assert_eq!(project_cache_handle.test_num_fetches(), 3); - } - - #[tokio::test(start_paused = true)] - async fn output_is_throttled() { - let EnvelopeBufferServiceResult { - service, - mut envelopes_rx, - project_cache_handle, - global_tx: _global_tx, - outcome_aggregator_rx: _outcome_aggregator_rx, - .. - } = envelope_buffer_service( - None, - global_config::Status::Ready(Arc::new(GlobalConfig::default())), - ); - - let addr = service.start_detached(); - - let envelope = new_envelope(false, "foo"); - let project_key = envelope.meta().public_key(); - for _ in 0..10 { - addr.send(EnvelopeBuffer::Push(envelope.clone())); - } - project_cache_handle.test_set_project_state(project_key, ProjectState::Disabled); - - tokio::time::sleep(Duration::from_millis(100)).await; - - let mut messages = vec![]; - envelopes_rx.recv_many(&mut messages, 100).await; - - assert_eq!( - messages - .iter() - .filter(|message| matches!(message, legacy::DequeuedEnvelope(..))) - .count(), - 5 - ); - - tokio::time::sleep(Duration::from_millis(100)).await; - - let mut messages = vec![]; - envelopes_rx.recv_many(&mut messages, 100).await; - - assert_eq!( - messages - .iter() - .filter(|message| matches!(message, legacy::DequeuedEnvelope(..))) - .count(), - 5 - ); - } - #[tokio::test(start_paused = true)] async fn test_partitioned_buffer() { let mut runner = ServiceRunner::new(); let (_global_tx, global_rx) = watch::channel(global_config::Status::Ready(Arc::new( GlobalConfig::default(), ))); - let (envelopes_tx, mut envelopes_rx) = mpsc::channel(10); let (outcome_aggregator, _outcome_rx) = Addr::custom(); let project_cache_handle = ProjectCacheHandle::for_test(); + let (envelope_processor, mut envelope_processor_rx) = Addr::custom(); // Create common services for both buffers let services = Services { - envelopes_tx, + envelope_processor, project_cache_handle: project_cache_handle.clone(), outcome_aggregator, test_store: Addr::dummy(), @@ -891,7 +920,16 @@ mod tests { // Create two envelopes with different project keys let envelope1 = new_envelope(false, "foo"); + let project_key = envelope1.meta().public_key(); + let project_info = Arc::new(ProjectInfo::default()); + project_cache_handle + .test_set_project_state(project_key, ProjectState::Enabled(project_info)); + let envelope2 = new_envelope(false, "bar"); + let project_key = envelope2.meta().public_key(); + let project_info = Arc::new(ProjectInfo::default()); + project_cache_handle + .test_set_project_state(project_key, ProjectState::Enabled(project_info)); // Send envelopes to their respective buffers let buffer1 = &partitioned.buffers[0]; @@ -901,8 +939,8 @@ mod tests { buffer2.addr().send(EnvelopeBuffer::Push(envelope2)); // Verify both envelopes were received - assert!(envelopes_rx.recv().await.is_some()); - assert!(envelopes_rx.recv().await.is_some()); - assert!(envelopes_rx.is_empty()); + assert!(envelope_processor_rx.recv().await.is_some()); + assert!(envelope_processor_rx.recv().await.is_some()); + assert!(envelope_processor_rx.is_empty()); } } diff --git a/relay-server/src/services/projects/cache/legacy.rs b/relay-server/src/services/projects/cache/legacy.rs deleted file mode 100644 index 3cf3f47f3a..0000000000 --- a/relay-server/src/services/projects/cache/legacy.rs +++ /dev/null @@ -1,209 +0,0 @@ -use crate::services::buffer::{ - EnvelopeBuffer, EnvelopeBufferError, PartitionedEnvelopeBuffer, ProjectKeyPair, -}; -use crate::services::processor::{EnvelopeProcessor, ProcessEnvelope, ProcessingGroup}; -use crate::services::projects::cache::{CheckedEnvelope, ProjectCacheHandle}; -use crate::Envelope; -use relay_statsd::metric; -use relay_system::{Addr, Interface, Service}; -use tokio::sync::mpsc; - -use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome}; -use crate::services::projects::project::ProjectState; -use crate::services::test_store::TestStore; - -use crate::statsd::RelayTimers; -use crate::utils::ManagedEnvelope; - -/// Handle an envelope that was popped from the envelope buffer. -#[derive(Debug)] -pub struct DequeuedEnvelope(pub Box); - -/// The legacy project cache. -/// -/// It currently just does some project routing from the spool to the processor, -/// which eventually will be moved into the spool and processor, making this service -/// obsolete. -#[derive(Debug)] -pub enum ProjectCache {} - -impl ProjectCache { - pub fn variant(&self) -> &'static str { - match *self {} - } -} - -impl Interface for ProjectCache {} - -/// Holds the addresses of all services required for [`ProjectCache`]. -#[derive(Debug, Clone)] -pub struct Services { - pub envelope_buffer: PartitionedEnvelopeBuffer, - pub envelope_processor: Addr, - pub outcome_aggregator: Addr, - pub test_store: Addr, -} - -/// Main broker of the [`ProjectCacheService`]. -/// -/// This handles incoming public messages, merges resolved project states, and maintains the actual -/// cache of project states. -#[derive(Debug)] -struct ProjectCacheBroker { - services: Services, - projects: ProjectCacheHandle, -} - -impl ProjectCacheBroker { - fn handle_dequeued_envelope( - &mut self, - envelope: Box, - envelope_buffer: Addr, - ) -> Result<(), EnvelopeBufferError> { - let sampling_key = envelope.sampling_key(); - let services = self.services.clone(); - - let own_key = envelope.meta().public_key(); - let project = self.projects.get(own_key); - - // Check if project config is enabled. - let project_info = match project.state() { - ProjectState::Enabled(info) => info, - ProjectState::Disabled => { - let mut managed_envelope = ManagedEnvelope::new( - envelope, - self.services.outcome_aggregator.clone(), - self.services.test_store.clone(), - ProcessingGroup::Ungrouped, - ); - managed_envelope.reject(Outcome::Invalid(DiscardReason::ProjectId)); - return Ok(()); - } - ProjectState::Pending => { - envelope_buffer.send(EnvelopeBuffer::NotReady(own_key, envelope)); - return Ok(()); - } - }; - - // Check if sampling config is enabled. - let sampling_project_info = match sampling_key.map(|sampling_key| { - ( - sampling_key, - self.projects.get(sampling_key).state().clone(), - ) - }) { - Some((_, ProjectState::Enabled(info))) => { - // Only set if it matches the organization ID. Otherwise treat as if there is - // no sampling project. - (info.organization_id == project_info.organization_id).then_some(info) - } - Some((_, ProjectState::Disabled)) => { - // Accept envelope even if its sampling state is disabled: - None - } - Some((sampling_key, ProjectState::Pending)) => { - envelope_buffer.send(EnvelopeBuffer::NotReady(sampling_key, envelope)); - return Ok(()); - } - None => None, - }; - - // Reassign processing groups and proceed to processing. - for (group, envelope) in ProcessingGroup::split_envelope(*envelope) { - let managed_envelope = ManagedEnvelope::new( - envelope, - services.outcome_aggregator.clone(), - services.test_store.clone(), - group, - ); - - let Ok(CheckedEnvelope { - envelope: Some(managed_envelope), - .. - }) = project.check_envelope(managed_envelope) - else { - continue; // Outcomes are emitted by check_envelope - }; - - let reservoir_counters = project.reservoir_counters().clone(); - services.envelope_processor.send(ProcessEnvelope { - envelope: managed_envelope, - project_info: project_info.clone(), - rate_limits: project.rate_limits().current_limits(), - sampling_project_info: sampling_project_info.clone(), - reservoir_counters, - }); - } - - Ok(()) - } - - fn handle_envelope(&mut self, dequeued_envelope: DequeuedEnvelope) { - let project_key_pair = ProjectKeyPair::from_envelope(&dequeued_envelope.0); - let envelope_buffer = self - .services - .envelope_buffer - .clone() - .buffer(project_key_pair) - .addr(); - - if let Err(e) = self.handle_dequeued_envelope(dequeued_envelope.0, envelope_buffer) { - relay_log::error!( - error = &e as &dyn std::error::Error, - "Failed to handle popped envelope" - ); - } - } -} - -/// Service implementing the [`ProjectCache`] interface. -#[derive(Debug)] -pub struct ProjectCacheService { - project_cache_handle: ProjectCacheHandle, - services: Services, - /// Bounded channel used exclusively to receive envelopes from the envelope buffer. - envelopes_rx: mpsc::Receiver, -} - -impl ProjectCacheService { - /// Creates a new `ProjectCacheService`. - pub fn new( - project_cache_handle: ProjectCacheHandle, - services: Services, - envelopes_rx: mpsc::Receiver, - ) -> Self { - Self { - project_cache_handle, - services, - envelopes_rx, - } - } -} - -impl Service for ProjectCacheService { - type Interface = ProjectCache; - - async fn run(self, _rx: relay_system::Receiver) { - let Self { - project_cache_handle, - services, - mut envelopes_rx, - } = self; - relay_log::info!("legacy project cache started"); - - let mut broker = ProjectCacheBroker { - projects: project_cache_handle, - services, - }; - - while let Some(message) = envelopes_rx.recv().await { - metric!( - timer(RelayTimers::LegacyProjectCacheTaskDuration), - task = "handle_envelope", - { broker.handle_envelope(message) } - ) - } - - relay_log::info!("legacy project cache stopped"); - } -} diff --git a/relay-server/src/services/projects/cache/mod.rs b/relay-server/src/services/projects/cache/mod.rs index 35ee749ecc..c3f94e298a 100644 --- a/relay-server/src/services/projects/cache/mod.rs +++ b/relay-server/src/services/projects/cache/mod.rs @@ -3,8 +3,6 @@ mod project; mod service; mod state; -pub mod legacy; - pub use self::handle::ProjectCacheHandle; pub use self::project::{CheckedEnvelope, Project}; pub use self::service::{ProjectCache, ProjectCacheService, ProjectChange}; diff --git a/relay-server/src/statsd.rs b/relay-server/src/statsd.rs index 4d622edc44..d578adef89 100644 --- a/relay-server/src/statsd.rs +++ b/relay-server/src/statsd.rs @@ -182,9 +182,6 @@ pub enum RelayHistograms { /// This metric is tagged with: /// - `storage_type`: The type of storage used in the envelope buffer. BufferEnvelopesCount, - /// Number of envelopes in the backpressure buffer between the envelope buffer - /// and the project cache. - BufferBackpressureEnvelopesCount, /// The amount of bytes in the item payloads of an envelope pushed to the envelope buffer. /// /// This is not quite the same as the actual size of a serialized envelope, because it ignores @@ -297,9 +294,6 @@ impl HistogramMetric for RelayHistograms { RelayHistograms::BatchesPerPartition => "metrics.buckets.batches_per_partition", RelayHistograms::BucketsPerBatch => "metrics.buckets.per_batch", RelayHistograms::BufferEnvelopesCount => "buffer.envelopes_count", - RelayHistograms::BufferBackpressureEnvelopesCount => { - "buffer.backpressure_envelopes_count" - } RelayHistograms::BufferEnvelopeBodySize => "buffer.envelope_body_size", RelayHistograms::BufferEnvelopeSize => "buffer.envelope_size", RelayHistograms::BufferEnvelopeSizeCompressed => "buffer.envelope_size.compressed", @@ -469,14 +463,6 @@ pub enum RelayTimers { /// This metric is tagged with: /// - `task`: The type of the task the project cache does. ProjectCacheTaskDuration, - /// Timing in milliseconds for processing a task in the legacy project cache service. - /// - /// A task is a unit of work the service does. Each branch of the - /// `tokio::select` is a different task type. - /// - /// This metric is tagged with: - /// - `task`: The type of the task the project cache does. - LegacyProjectCacheTaskDuration, /// Timing in milliseconds for handling and responding to a health check request. /// /// This metric is tagged with: @@ -577,7 +563,6 @@ impl TimerMetric for RelayTimers { RelayTimers::GlobalConfigRequestDuration => "global_config.requests.duration", RelayTimers::ProcessMessageDuration => "processor.message.duration", RelayTimers::ProjectCacheTaskDuration => "project_cache.task.duration", - RelayTimers::LegacyProjectCacheTaskDuration => "legacy_project_cache.task.duration", RelayTimers::HealthCheckDuration => "health.message.duration", #[cfg(feature = "processing")] RelayTimers::RateLimitBucketsDuration => "processor.rate_limit_buckets", @@ -637,17 +622,17 @@ pub enum RelayCounters { EnvelopeItemBytes, /// Number of transactions with attachments seen in the request handler. TransactionsWithAttachments, - /// Number of envelopes that were returned to the envelope buffer by the project cache. - /// - /// This happens when the envelope buffer falsely assumes that the envelope's projects are loaded - /// in the cache and sends the envelope onward, even though the project cache cannot handle it. - BufferEnvelopesReturned, /// Number of times an envelope from the buffer is trying to be popped. BufferTryPop, /// Number of envelopes spool to disk. BufferSpooledEnvelopes, /// Number of envelopes unspooled from disk. BufferUnspooledEnvelopes, + /// Number of project changed updates received by the buffer. + BufferProjectChangedEvent, + /// Number of times one or more projects of an envelope were pending when trying to pop + /// their envelope. + BufferProjectPending, /// /// Number of outcomes and reasons for rejected Envelopes. /// @@ -851,10 +836,11 @@ impl CounterMetric for RelayCounters { RelayCounters::EnvelopeItems => "event.items", RelayCounters::TransactionsWithAttachments => "transactions_with_attachments", RelayCounters::EnvelopeItemBytes => "event.item_bytes", - RelayCounters::BufferEnvelopesReturned => "buffer.envelopes_returned", RelayCounters::BufferTryPop => "buffer.try_pop", RelayCounters::BufferSpooledEnvelopes => "buffer.spooled_envelopes", RelayCounters::BufferUnspooledEnvelopes => "buffer.unspooled_envelopes", + RelayCounters::BufferProjectChangedEvent => "buffer.project_changed_event", + RelayCounters::BufferProjectPending => "buffer.project_pending", RelayCounters::Outcomes => "events.outcomes", RelayCounters::ProjectStateRequest => "project_state.request", #[cfg(feature = "processing")]