diff --git a/relay-server/src/endpoints/batch_metrics.rs b/relay-server/src/endpoints/batch_metrics.rs index d068e59fde..96aac00bd7 100644 --- a/relay-server/src/endpoints/batch_metrics.rs +++ b/relay-server/src/endpoints/batch_metrics.rs @@ -4,8 +4,7 @@ use serde::{Deserialize, Serialize}; use crate::extractors::{SignedBytes, StartTime}; use crate::service::ServiceState; -use crate::services::processor::ProcessBatchedMetrics; -use crate::services::projects::cache::BucketSource; +use crate::services::processor::{BucketSource, ProcessBatchedMetrics}; #[derive(Debug, Serialize, Deserialize)] struct SendMetricsResponse {} diff --git a/relay-server/src/endpoints/common.rs b/relay-server/src/endpoints/common.rs index e7a86e1d0d..19f0ee6125 100644 --- a/relay-server/src/endpoints/common.rs +++ b/relay-server/src/endpoints/common.rs @@ -13,7 +13,7 @@ use crate::service::ServiceState; use crate::services::buffer::EnvelopeBuffer; use crate::services::outcome::{DiscardReason, Outcome}; use crate::services::processor::{MetricData, ProcessMetrics, ProcessingGroup}; -use crate::services::projects::cache::ValidateEnvelope; +use crate::services::projects::cache::legacy::ValidateEnvelope; use crate::statsd::{RelayCounters, RelayHistograms}; use crate::utils::{self, ApiErrorResponse, FormDataIter, ManagedEnvelope}; @@ -312,7 +312,9 @@ fn queue_envelope( } None => { relay_log::trace!("Sending envelope to project cache for V1 buffer"); - state.project_cache().send(ValidateEnvelope::new(envelope)); + state + .legacy_project_cache() + .send(ValidateEnvelope::new(envelope)); } } } diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 47b31a815e..a068ec1689 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -11,8 +11,10 @@ use crate::services::metrics::{Aggregator, RouterService}; use crate::services::outcome::{OutcomeProducer, OutcomeProducerService, TrackOutcome}; use crate::services::outcome_aggregator::OutcomeAggregator; use crate::services::processor::{self, EnvelopeProcessor, EnvelopeProcessorService}; -use crate::services::projects::cache::{ProjectCache, ProjectCacheService, Services}; -use crate::services::projects::cache2::ProjectCacheHandle; +use crate::services::projects::cache::{ + legacy, ProjectCache, ProjectCacheHandle, ProjectCacheService, +}; +use crate::services::projects::source::ProjectSource; use crate::services::relays::{RelayCache, RelayCacheService}; use crate::services::stats::RelayStats; #[cfg(feature = "processing")] @@ -62,6 +64,7 @@ pub struct Registry { pub test_store: Addr, pub relay_cache: Addr, pub global_config: Addr, + pub legacy_project_cache: Addr, pub project_cache: Addr, pub upstream_relay: Addr, pub envelope_buffer: Option, @@ -189,12 +192,23 @@ impl ServiceState { // service fail if the service is not running. let global_config = global_config.start(); - let (project_cache, project_cache_rx) = channel(ProjectCacheService::name()); + let (legacy_project_cache, legacy_project_cache_rx) = + channel(legacy::ProjectCacheService::name()); + + let project_source = ProjectSource::start( + Arc::clone(&config), + upstream_relay.clone(), + redis_pools + .as_ref() + .map(|pools| pools.project_configs.clone()), + ); + let (project_cache, project_cache_handle) = + ProjectCacheService::new(Arc::clone(&config), project_source).start(); let aggregator = RouterService::new( config.default_aggregator_config().clone(), config.secondary_aggregator_configs().clone(), - Some(project_cache.clone().recipient()), + Some(legacy_project_cache.clone().recipient()), ); let aggregator_handle = aggregator.handle(); let aggregator = aggregator.start(); @@ -229,11 +243,12 @@ impl ServiceState { create_processor_pool(&config)?, config.clone(), global_config_handle, + project_cache_handle.clone(), cogs, #[cfg(feature = "processing")] redis_pools.clone(), processor::Addrs { - project_cache: project_cache.clone(), + legacy_project_cache: legacy_project_cache.clone(), outcome_aggregator: outcome_aggregator.clone(), upstream_relay: upstream_relay.clone(), test_store: test_store.clone(), @@ -252,7 +267,7 @@ impl ServiceState { global_config_rx.clone(), buffer::Services { envelopes_tx, - project_cache: project_cache.clone(), + project_cache_handle: project_cache_handle.clone(), outcome_aggregator: outcome_aggregator.clone(), test_store: test_store.clone(), }, @@ -260,27 +275,24 @@ impl ServiceState { .map(|b| b.start_observable()); // Keep all the services in one context. - let project_cache_services = Services { + let project_cache_services = legacy::Services { envelope_buffer: envelope_buffer.as_ref().map(ObservableEnvelopeBuffer::addr), aggregator: aggregator.clone(), envelope_processor: processor.clone(), outcome_aggregator: outcome_aggregator.clone(), - project_cache: project_cache.clone(), + project_cache: legacy_project_cache.clone(), test_store: test_store.clone(), - upstream_relay: upstream_relay.clone(), }; - ProjectCacheService::new( + legacy::ProjectCacheService::new( config.clone(), MemoryChecker::new(memory_stat.clone(), config.clone()), + project_cache_handle.clone(), project_cache_services, global_config_rx, envelopes_rx, - redis_pools - .as_ref() - .map(|pools| pools.project_configs.clone()), ) - .spawn_handler(project_cache_rx); + .spawn_handler(legacy_project_cache_rx); let health_check = HealthCheckService::new( config.clone(), @@ -310,7 +322,9 @@ impl ServiceState { test_store, relay_cache, global_config, + legacy_project_cache, project_cache, + project_cache_handle, upstream_relay, envelope_buffer, }; @@ -343,9 +357,9 @@ impl ServiceState { self.inner.registry.envelope_buffer.as_ref() } - /// Returns the address of the [`ProjectCache`] service. - pub fn project_cache(&self) -> &Addr { - &self.inner.registry.project_cache + /// Returns the address of the [`legacy::ProjectCache`] service. + pub fn legacy_project_cache(&self) -> &Addr { + &self.inner.registry.legacy_project_cache } /// Returns a [`ProjectCacheHandle`]. diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index 10221615b8..80c90b04c0 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -21,10 +21,7 @@ use crate::services::outcome::DiscardReason; use crate::services::outcome::Outcome; use crate::services::outcome::TrackOutcome; use crate::services::processor::ProcessingGroup; -use crate::services::projects::cache::DequeuedEnvelope; - -use crate::services::projects::cache2::ProjectCacheHandle; -use crate::services::projects::cache2::ProjectEvent; +use crate::services::projects::cache::{legacy, ProjectCacheHandle, ProjectEvent}; use crate::services::test_store::TestStore; use crate::statsd::{RelayCounters, RelayHistograms}; use crate::utils::ManagedEnvelope; @@ -101,7 +98,7 @@ impl ObservableEnvelopeBuffer { pub struct Services { /// Bounded channel used exclusively to handle backpressure when sending envelopes to the /// project cache. - pub envelopes_tx: mpsc::Sender, + pub envelopes_tx: mpsc::Sender, pub project_cache_handle: ProjectCacheHandle, pub outcome_aggregator: Addr, pub test_store: Addr, @@ -159,7 +156,7 @@ impl EnvelopeBufferService { &mut self, buffer: &PolymorphicEnvelopeBuffer, dequeue: bool, - ) -> Option> { + ) -> Option> { relay_statsd::metric!( counter(RelayCounters::BufferReadyToPop) += 1, status = "checking" @@ -220,7 +217,7 @@ impl EnvelopeBufferService { config: &Config, buffer: &mut PolymorphicEnvelopeBuffer, services: &Services, - envelopes_tx_permit: Permit<'a, DequeuedEnvelope>, + envelopes_tx_permit: Permit<'a, legacy::DequeuedEnvelope>, ) -> Result { let sleep = match buffer.peek().await? { Peek::Empty => { @@ -253,7 +250,7 @@ impl EnvelopeBufferService { .pop() .await? .expect("Element disappeared despite exclusive excess"); - envelopes_tx_permit.send(DequeuedEnvelope(envelope)); + envelopes_tx_permit.send(legacy::DequeuedEnvelope(envelope)); Duration::ZERO // try next pop immediately } @@ -430,7 +427,7 @@ impl Service for EnvelopeBufferService { } } } - ProjectEvent::Ready(project_key) = project_events.recv() => { + Ok(ProjectEvent::Ready(project_key)) = project_events.recv() => { Self::handle_message(&mut buffer, EnvelopeBuffer::Ready(project_key)).await; sleep = Duration::ZERO; } @@ -490,8 +487,8 @@ mod tests { struct EnvelopeBufferServiceResult { service: EnvelopeBufferService, global_tx: watch::Sender, - envelopes_rx: mpsc::Receiver, - project_cache_rx: mpsc::UnboundedReceiver, + envelopes_rx: mpsc::Receiver, + project_cache_rx: mpsc::UnboundedReceiver, outcome_aggregator_rx: mpsc::UnboundedReceiver, } diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 572c252817..16aad5ac51 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -71,8 +71,7 @@ use crate::services::global_config::GlobalConfigHandle; use crate::services::metrics::{Aggregator, MergeBuckets}; use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome}; use crate::services::processor::event::FiltersStatus; -use crate::services::projects::cache::{BucketSource, ProjectCache}; -use crate::services::projects::cache2::ProjectCacheHandle; +use crate::services::projects::cache::{legacy, ProjectCacheHandle}; use crate::services::projects::project::{ProjectInfo, ProjectState}; use crate::services::test_store::{Capture, TestStore}; use crate::services::upstream::{ @@ -757,7 +756,7 @@ struct ProcessEnvelopeState<'a, Group> { /// Currently active cached rate limits of the project this envelope belongs to. #[cfg_attr(not(feature = "processing"), expect(dead_code))] - rate_limits: RateLimits, + rate_limits: Arc, /// The config of this Relay instance. config: Arc, @@ -981,6 +980,32 @@ pub struct ProcessBatchedMetrics { pub sent_at: Option>, } +/// Source information where a metric bucket originates from. +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub enum BucketSource { + /// The metric bucket originated from an internal Relay use case. + /// + /// The metric bucket originates either from within the same Relay + /// or was accepted coming from another Relay which is registered as + /// an internal Relay via Relay's configuration. + Internal, + /// The bucket source originated from an untrusted source. + /// + /// Managed Relays sending extracted metrics are considered external, + /// it's a project use case but it comes from an untrusted source. + External, +} + +impl From<&RequestMeta> for BucketSource { + fn from(value: &RequestMeta) -> Self { + if value.is_from_internal_relay() { + Self::Internal + } else { + Self::External + } + } +} + /// Metric buckets with additional project. #[derive(Debug, Clone)] pub struct ProjectMetrics { @@ -1099,7 +1124,7 @@ pub struct EnvelopeProcessorService { /// Contains the addresses of services that the processor publishes to. pub struct Addrs { - pub project_cache: Addr, + pub legacy_project_cache: Addr, pub outcome_aggregator: Addr, pub upstream_relay: Addr, pub test_store: Addr, @@ -1111,7 +1136,7 @@ pub struct Addrs { impl Default for Addrs { fn default() -> Self { Addrs { - project_cache: Addr::dummy(), + legacy_project_cache: Addr::dummy(), outcome_aggregator: Addr::dummy(), upstream_relay: Addr::dummy(), test_store: Addr::dummy(), @@ -1126,7 +1151,7 @@ struct InnerProcessor { workers: WorkerGroup, config: Arc, global_config: GlobalConfigHandle, - project_cache_handle: ProjectCacheHandle, + project_cache: ProjectCacheHandle, cogs: Cogs, #[cfg(feature = "processing")] quotas_pool: Option, @@ -1145,6 +1170,7 @@ impl EnvelopeProcessorService { pool: ThreadPool, config: Arc, global_config: GlobalConfigHandle, + project_cache: ProjectCacheHandle, cogs: Cogs, #[cfg(feature = "processing")] redis: Option, addrs: Addrs, @@ -1173,6 +1199,7 @@ impl EnvelopeProcessorService { let inner = InnerProcessor { workers: WorkerGroup::new(pool), global_config, + project_cache, cogs, #[cfg(feature = "processing")] quotas_pool: quotas.clone(), @@ -1238,7 +1265,7 @@ impl EnvelopeProcessorService { mut managed_envelope: TypedEnvelope, project_id: ProjectId, project_info: Arc, - rate_limits: RateLimits, + rate_limits: Arc, sampling_project_info: Option>, reservoir_counters: Arc>>, ) -> ProcessEnvelopeState { @@ -1310,7 +1337,7 @@ impl EnvelopeProcessorService { // Update cached rate limits with the freshly computed ones. if !limits.is_empty() { self.inner - .project_cache_handle + .project_cache .get(state.managed_envelope.scoping().project_key) .rate_limits() .merge(limits); @@ -1838,7 +1865,7 @@ impl EnvelopeProcessorService { mut managed_envelope: ManagedEnvelope, project_id: ProjectId, project_info: Arc, - rate_limits: RateLimits, + rate_limits: Arc, sampling_project_info: Option>, reservoir_counters: Arc>>, ) -> Result { @@ -2096,13 +2123,13 @@ impl EnvelopeProcessorService { true }); - let project = self.inner.project_cache_handle.get(project_key); + let project = self.inner.project_cache.get(project_key); // Best effort check to filter and rate limit buckets, if there is no project state // available at the current time, we will check again after flushing. let buckets = match project.project_state() { ProjectState::Enabled(project_info) => { - let rate_limits = project.cached_rate_limits(); + let rate_limits = project.rate_limits().current_limits(); self.check_buckets(project_key, &project_info, &rate_limits, buckets) } _ => buckets, @@ -2198,7 +2225,7 @@ impl EnvelopeProcessorService { envelope, body, http_encoding, - project_cache: self.inner.addrs.project_cache.clone(), + project_cache: self.inner.project_cache.clone(), })); } Err(error) => { @@ -2352,7 +2379,7 @@ impl EnvelopeProcessorService { ); self.inner - .project_cache_handle + .project_cache .get(item_scoping.scoping.project_key) .rate_limits() .merge(limits); @@ -2430,7 +2457,7 @@ impl EnvelopeProcessorService { if was_enforced { // Update the rate limits in the project cache. self.inner - .project_cache_handle + .project_cache .get(scoping.project_key) .rate_limits() .merge(rate_limits); @@ -2879,7 +2906,7 @@ pub struct SendEnvelope { envelope: TypedEnvelope, body: Bytes, http_encoding: HttpEncoding, - project_cache_handle: ProjectCacheHandle, + project_cache: ProjectCacheHandle, } impl UpstreamRequest for SendEnvelope { @@ -2931,7 +2958,7 @@ impl UpstreamRequest for SendEnvelope { self.envelope.accept(); if let UpstreamRequestError::RateLimited(limits) = error { - self.project_cache_handle + self.project_cache .get(scoping.project_key) .rate_limits() .merge(limits.scope(&scoping)); @@ -3705,7 +3732,7 @@ mod tests { let processor = create_test_processor_with_addrs( config, Addrs { - project_cache, + legacy_project_cache, ..Default::default() }, ); @@ -3749,11 +3776,11 @@ mod tests { processor.handle_process_batched_metrics(&mut token, message); let value = project_cache_rx.recv().await.unwrap(); - let ProjectCache::ProcessMetrics(pm1) = value else { + let legacy::ProjectCache::ProcessMetrics(pm1) = value else { panic!() }; let value = project_cache_rx.recv().await.unwrap(); - let ProjectCache::ProcessMetrics(pm2) = value else { + let legacy::ProjectCache::ProcessMetrics(pm2) = value else { panic!() }; diff --git a/relay-server/src/services/processor/metrics.rs b/relay-server/src/services/processor/metrics.rs index 0d28285e77..703898e425 100644 --- a/relay-server/src/services/processor/metrics.rs +++ b/relay-server/src/services/processor/metrics.rs @@ -5,7 +5,7 @@ use relay_quotas::Scoping; use crate::metrics::MetricOutcomes; use crate::services::outcome::Outcome; -use crate::services::projects::cache::BucketSource; +use crate::services::processor::BucketSource; use crate::services::projects::project::ProjectInfo; /// Checks if the namespace of the passed bucket is valid. diff --git a/relay-server/src/services/projects/cache2/handle.rs b/relay-server/src/services/projects/cache/handle.rs similarity index 82% rename from relay-server/src/services/projects/cache2/handle.rs rename to relay-server/src/services/projects/cache/handle.rs index e7afb43454..39b4b23594 100644 --- a/relay-server/src/services/projects/cache2/handle.rs +++ b/relay-server/src/services/projects/cache/handle.rs @@ -1,4 +1,4 @@ -use core::fmt; +use std::fmt; use std::sync::Arc; use relay_base_schema::project::ProjectKey; @@ -7,15 +7,15 @@ use relay_system::Addr; use tokio::sync::broadcast; use super::state::Shared; -use crate::services::projects::cache2::service::ProjectEvent; -use crate::services::projects::cache2::{Project, ProjectCache}; +use crate::services::projects::cache::service::ProjectEvent; +use crate::services::projects::cache::{Project, ProjectCache}; #[derive(Clone)] pub struct ProjectCacheHandle { - shared: Arc, - config: Arc, - service: Addr, - project_events: broadcast::Sender, + pub(super) shared: Arc, + pub(super) config: Arc, + pub(super) service: Addr, + pub(super) project_events: broadcast::Sender, } impl ProjectCacheHandle { diff --git a/relay-server/src/services/projects/cache.rs b/relay-server/src/services/projects/cache/legacy.rs similarity index 95% rename from relay-server/src/services/projects/cache.rs rename to relay-server/src/services/projects/cache/legacy.rs index 2d99733db1..330ca47bc7 100644 --- a/relay-server/src/services/projects/cache.rs +++ b/relay-server/src/services/projects/cache/legacy.rs @@ -3,19 +3,17 @@ use std::error::Error; use std::sync::Arc; use std::time::Duration; -use crate::extractors::RequestMeta; use crate::services::buffer::{EnvelopeBuffer, EnvelopeBufferError}; use crate::services::global_config; use crate::services::processor::{ EncodeMetrics, EnvelopeProcessor, ProcessEnvelope, ProcessingGroup, ProjectMetrics, }; -use crate::services::projects::cache2::{CheckedEnvelope, ProjectCacheHandle, ProjectEvent}; +use crate::services::projects::cache::{CheckedEnvelope, ProjectCacheHandle, ProjectEvent}; use crate::Envelope; use hashbrown::HashSet; use relay_base_schema::project::ProjectKey; use relay_config::Config; use relay_metrics::Bucket; -use relay_redis::RedisPool; use relay_statsd::metric; use relay_system::{Addr, FromMessage, Interface, Service}; use tokio::sync::{mpsc, watch}; @@ -28,7 +26,6 @@ use crate::services::spooler::{ UnspooledEnvelope, BATCH_KEY_COUNT, }; use crate::services::test_store::TestStore; -use crate::services::upstream::UpstreamRelay; use crate::statsd::{RelayCounters, RelayGauges, RelayHistograms, RelayTimers}; use crate::utils::{GarbageDisposal, ManagedEnvelope, MemoryChecker, RetryBackoff, SleepHandle}; @@ -55,32 +52,6 @@ impl ValidateEnvelope { } } -/// Source information where a metric bucket originates from. -#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] -pub enum BucketSource { - /// The metric bucket originated from an internal Relay use case. - /// - /// The metric bucket originates either from within the same Relay - /// or was accepted coming from another Relay which is registered as - /// an internal Relay via Relay's configuration. - Internal, - /// The bucket source originated from an untrusted source. - /// - /// Managed Relays sending extracted metrics are considered external, - /// it's a project use case but it comes from an untrusted source. - External, -} - -impl From<&RequestMeta> for BucketSource { - fn from(value: &RequestMeta) -> Self { - if value.is_from_internal_relay() { - Self::Internal - } else { - Self::External - } - } -} - /// Updates the buffer index for [`ProjectKey`] with the [`QueueKey`] keys. /// /// This message is sent from the project buffer in case of the error while fetching the data from @@ -106,12 +77,6 @@ pub struct RefreshIndexCache(pub HashSet); #[derive(Debug)] pub struct DequeuedEnvelope(pub Box); -/// A request to update a project, typically sent by the envelope buffer. -/// -/// This message is similar to [`GetProjectState`], except it has no `no_cache` option -/// and it does not send a response, but sends a signal back to the buffer instead. -pub struct UpdateProject(pub ProjectKey); - /// A cache for [`ProjectState`]s. /// /// The project maintains information about organizations, projects, and project keys along with @@ -189,7 +154,6 @@ pub struct Services { pub outcome_aggregator: Addr, pub project_cache: Addr, pub test_store: Addr, - pub upstream_relay: Addr, } /// Main broker of the [`ProjectCacheService`]. @@ -704,12 +668,11 @@ impl ProjectCacheBroker { pub struct ProjectCacheService { config: Arc, memory_checker: MemoryChecker, - project_cache: ProjectCacheHandle, + project_cache_handle: ProjectCacheHandle, services: Services, global_config_rx: watch::Receiver, /// Bounded channel used exclusively to receive envelopes from the envelope buffer. envelopes_rx: mpsc::Receiver, - redis: Option, } impl ProjectCacheService { @@ -717,20 +680,18 @@ impl ProjectCacheService { pub fn new( config: Arc, memory_checker: MemoryChecker, - project_cache: ProjectCacheHandle, + project_cache_handle: ProjectCacheHandle, services: Services, global_config_rx: watch::Receiver, envelopes_rx: mpsc::Receiver, - redis: Option, ) -> Self { Self { config, memory_checker, - project_cache, + project_cache_handle, services, global_config_rx, envelopes_rx, - redis, } } } @@ -742,13 +703,12 @@ impl Service for ProjectCacheService { let Self { config, memory_checker, - project_cache, + project_cache_handle, services, mut global_config_rx, mut envelopes_rx, - redis, } = self; - let project_events = project_cache.events(); + let project_events = project_cache_handle.events(); let project_cache = services.project_cache.clone(); let outcome_aggregator = services.outcome_aggregator.clone(); let test_store = services.test_store.clone(); @@ -833,7 +793,7 @@ impl Service for ProjectCacheService { } }) }, - project_event = project_events.recv() => { + Ok(project_event) = project_events.recv() => { metric!(timer(RelayTimers::ProjectCacheTaskDuration), task = "handle_project_event", { broker.handle_project_event(project_event); }) @@ -907,7 +867,6 @@ mod tests { let (outcome_aggregator, _) = mock_service("outcome_aggregator", (), |&mut (), _| {}); let (project_cache, _) = mock_service("project_cache", (), |&mut (), _| {}); let (test_store, _) = mock_service("test_store", (), |&mut (), _| {}); - let (upstream_relay, _) = mock_service("upstream_relay", (), |&mut (), _| {}); Services { envelope_buffer: None, @@ -916,7 +875,6 @@ mod tests { project_cache, outcome_aggregator, test_store, - upstream_relay, } } diff --git a/relay-server/src/services/projects/cache2/mod.rs b/relay-server/src/services/projects/cache/mod.rs similarity index 92% rename from relay-server/src/services/projects/cache2/mod.rs rename to relay-server/src/services/projects/cache/mod.rs index 62d55aff70..7d2b1d1afa 100644 --- a/relay-server/src/services/projects/cache2/mod.rs +++ b/relay-server/src/services/projects/cache/mod.rs @@ -3,6 +3,8 @@ mod project; mod service; mod state; +pub mod legacy; + pub use self::handle::ProjectCacheHandle; pub use self::project::{CheckedEnvelope, Project}; pub use self::service::{ProjectCache, ProjectCacheService, ProjectEvent}; diff --git a/relay-server/src/services/projects/cache2/project.rs b/relay-server/src/services/projects/cache/project.rs similarity index 99% rename from relay-server/src/services/projects/cache2/project.rs rename to relay-server/src/services/projects/cache/project.rs index 2722360954..cdac543789 100644 --- a/relay-server/src/services/projects/cache2/project.rs +++ b/relay-server/src/services/projects/cache/project.rs @@ -7,7 +7,7 @@ use relay_sampling::evaluation::ReservoirCounters; use crate::envelope::ItemType; use crate::services::outcome::{DiscardReason, Outcome}; -use crate::services::projects::cache2::state::SharedProject; +use crate::services::projects::cache::state::SharedProject; use crate::services::projects::project::ProjectState; use crate::utils::{CheckLimits, Enforcement, EnvelopeLimiter, ManagedEnvelope}; diff --git a/relay-server/src/services/projects/cache2/service.rs b/relay-server/src/services/projects/cache/service.rs similarity index 66% rename from relay-server/src/services/projects/cache2/service.rs rename to relay-server/src/services/projects/cache/service.rs index 1115a8feeb..c34879ebb0 100644 --- a/relay-server/src/services/projects/cache2/service.rs +++ b/relay-server/src/services/projects/cache/service.rs @@ -2,13 +2,25 @@ use std::sync::Arc; use relay_base_schema::project::ProjectKey; use relay_config::Config; +use relay_system::Service; use tokio::sync::{broadcast, mpsc}; -use crate::services::buffer::EnvelopeBuffer; -use crate::services::projects::cache2::state::{CompletedFetch, Fetch}; +use crate::services::projects::cache::handle::ProjectCacheHandle; +use crate::services::projects::cache::state::{CompletedFetch, Fetch, ProjectStore}; use crate::services::projects::project::{ProjectFetchState, ProjectState}; use crate::services::projects::source::ProjectSource; +/// Size of the broadcast channel for project events. +/// +/// This is set to a value which theoretically should never be reachable, +/// the number of events is approximately bounded by the amount of projects +/// receiving events. +/// +/// It is set to such a large amount because receivers of events currently +/// do not deal with lags in the channel gracefuly. +const PROJECT_EVENTS_CHANNEL_SIZE: usize = 512_000; + +#[derive(Debug)] pub enum ProjectCache { Fetch(ProjectKey), } @@ -23,18 +35,17 @@ impl relay_system::FromMessage for ProjectCache { } } +#[derive(Debug, Copy, Clone)] pub enum ProjectEvent { Ready(ProjectKey), Evicted(ProjectKey), } pub struct ProjectCacheService { - store: super::state::ProjectStore, + store: ProjectStore, source: ProjectSource, config: Arc, - buffer: relay_system::Addr, - project_update_rx: mpsc::UnboundedReceiver, project_update_tx: mpsc::UnboundedSender, @@ -42,6 +53,35 @@ pub struct ProjectCacheService { } impl ProjectCacheService { + pub fn new(config: Arc, source: ProjectSource) -> Self { + let (project_update_tx, project_update_rx) = mpsc::unbounded_channel(); + let project_events_tx = broadcast::channel(256_000).0; + + Self { + store: ProjectStore::default(), + source, + config, + project_update_rx, + project_update_tx, + project_events_tx, + } + } + + pub fn start(self) -> (relay_system::Addr, ProjectCacheHandle) { + let (addr, addr_rx) = relay_system::channel(Self::name()); + + let handle = ProjectCacheHandle { + shared: self.store.shared(), + config: Arc::clone(&self.config), + service: addr.clone(), + project_events: self.project_events_tx.clone(), + }; + + self.spawn_handler(addr_rx); + + (addr, handle) + } + fn schedule_fetch(&self, fetch: Fetch) { let source = self.source.clone(); let project_updates = self.project_update_tx.clone(); @@ -70,6 +110,7 @@ impl ProjectCacheService { } } +/// All [`ProjectCacheService`] message handlers. impl ProjectCacheService { fn handle_fetch(&mut self, project_key: ProjectKey) { if let Some(fetch) = self.store.try_begin_fetch(project_key, &self.config) { @@ -89,20 +130,23 @@ impl ProjectCacheService { return; } - self.project_events_tx + // TODO: no-ops from revision checks should not end up here + let _ = self + .project_events_tx .send(ProjectEvent::Ready(project_key)); } fn handle_evict_stale_projects(&mut self) { let on_evict = |project_key| { - self.project_events_tx + let _ = self + .project_events_tx .send(ProjectEvent::Evicted(project_key)); }; self.store.evict_stale_projects(&self.config, on_evict); } - fn handle(&mut self, message: ProjectCache) { + fn handle_message(&mut self, message: ProjectCache) { match message { ProjectCache::Fetch(project_key) => self.handle_fetch(project_key), } @@ -124,7 +168,7 @@ impl relay_system::Service for ProjectCacheService { self.handle_project_update(update) }, Some(message) = rx.recv() => { - self.handle(message); + self.handle_message(message); }, _ = eviction_ticker.tick() => { self.handle_evict_stale_projects() diff --git a/relay-server/src/services/projects/cache2/state.rs b/relay-server/src/services/projects/cache/state.rs similarity index 98% rename from relay-server/src/services/projects/cache2/state.rs rename to relay-server/src/services/projects/cache/state.rs index 722e20f182..74fc03b72b 100644 --- a/relay-server/src/services/projects/cache2/state.rs +++ b/relay-server/src/services/projects/cache/state.rs @@ -8,7 +8,7 @@ use relay_config::Config; use relay_quotas::{CachedRateLimits, RateLimits}; use relay_sampling::evaluation::ReservoirCounters; -use crate::services::projects::cache2::ProjectCache; +use crate::services::projects::cache::ProjectCache; use crate::services::projects::project::ProjectState; use crate::utils::RetryBackoff; @@ -22,12 +22,17 @@ use crate::utils::RetryBackoff; /// a fetch to create the private state when [`Missing`] is returned. /// This gurnatuees that eventually the project state is populated, but for a undetermined, /// time it is possible that shared state exists without the respective private state. +#[derive(Default)] pub struct ProjectStore { shared: Arc, private: hashbrown::HashMap, } impl ProjectStore { + pub fn shared(&self) -> Arc { + Arc::clone(&self.shared) + } + pub fn get(&mut self, project_key: ProjectKey) -> Option> { let private = self.private.get_mut(&project_key)?; let shared = self.shared.projects.pin().get(&project_key).cloned(); @@ -155,6 +160,7 @@ impl ProjectRef<'_> { } } +#[derive(Default)] pub struct Shared { projects: papaya::HashMap, } diff --git a/relay-server/src/services/projects/mod.rs b/relay-server/src/services/projects/mod.rs index ef8b6fead2..bc8f253eee 100644 --- a/relay-server/src/services/projects/mod.rs +++ b/relay-server/src/services/projects/mod.rs @@ -1,4 +1,3 @@ pub mod cache; -pub mod cache2; pub mod project; pub mod source; diff --git a/relay-server/src/services/spooler/mod.rs b/relay-server/src/services/spooler/mod.rs index 54c2530550..7fc489ebbf 100644 --- a/relay-server/src/services/spooler/mod.rs +++ b/relay-server/src/services/spooler/mod.rs @@ -55,7 +55,7 @@ use crate::envelope::{Envelope, EnvelopeError}; use crate::extractors::StartTime; use crate::services::outcome::TrackOutcome; use crate::services::processor::ProcessingGroup; -use crate::services::projects::cache::{ProjectCache, RefreshIndexCache, UpdateSpoolIndex}; +use crate::services::projects::cache::legacy::{ProjectCache, RefreshIndexCache, UpdateSpoolIndex}; use crate::services::test_store::TestStore; use crate::statsd::{RelayCounters, RelayGauges, RelayHistograms, RelayTimers}; use crate::utils::{ManagedEnvelope, MemoryChecker}; diff --git a/relay-server/src/testutils.rs b/relay-server/src/testutils.rs index 9f4c255d35..358a2a929d 100644 --- a/relay-server/src/testutils.rs +++ b/relay-server/src/testutils.rs @@ -137,7 +137,7 @@ pub fn create_test_processor(config: Config) -> EnvelopeProcessorService { redis_pools, processor::Addrs { outcome_aggregator, - project_cache, + legacy_project_cache: project_cache, upstream_relay, test_store, #[cfg(feature = "processing")]