From 0e1bb3a389fcafce70bb543d2bb3fb5a0f1cd61e Mon Sep 17 00:00:00 2001 From: Armin Ronacher Date: Wed, 13 Jan 2021 17:33:06 +0100 Subject: [PATCH 1/6] feat: Added a .no-cache option to event sending --- relay-common/src/project.rs | 7 +++ relay-server/src/actors/project.rs | 48 +++++++++++++++++++++ relay-server/src/endpoints/common.rs | 25 ++++++++++- relay-server/src/extractors/request_meta.rs | 24 ++++++++++- 4 files changed, 101 insertions(+), 3 deletions(-) diff --git a/relay-common/src/project.rs b/relay-common/src/project.rs index 0e91766787..300b691d12 100644 --- a/relay-common/src/project.rs +++ b/relay-common/src/project.rs @@ -56,6 +56,13 @@ impl ProjectKey { Ok(project_key) } + /// Parses a `ProjectKey` from a string with flags. + pub fn parse_with_flags(key: &str) -> Result<(Self, Vec<&str>), ParseProjectKeyError> { + let mut iter = key.split('.'); + let key = ProjectKey::parse(iter.next().ok_or(ParseProjectKeyError)?)?; + Ok((key, iter.collect())) + } + /// Returns the string representation of the project key. #[inline] pub fn as_str(&self) -> &str { diff --git a/relay-server/src/actors/project.rs b/relay-server/src/actors/project.rs index c5f1df27c3..6e0d6a2d9c 100644 --- a/relay-server/src/actors/project.rs +++ b/relay-server/src/actors/project.rs @@ -448,6 +448,33 @@ impl Project { Response::future(future) } + /// This refreshes the underlying state and waits for that. + /// + /// Note that this currently has the limitation that it does not promote the + /// refresh through a chain of relays. This means that it can only ask for a + /// local refresh, but if the next in line relay is outdated this won't do + /// anything. + fn refresh_state( + &mut self, + context: &mut Context, + ) -> Response, ProjectError> { + // count number of times we are looking for the project state + metric!(counter(RelayCounters::ProjectStateGet) += 1); + + relay_log::debug!( + "project {} state requested (force refresh)", + self.public_key + ); + let channel = self.fetch_state(context); + self.state_channel = Some(channel.clone()); + + let future = channel + .map(|shared| (*shared).clone()) + .map_err(|_| ProjectError::FetchFailed); + + Response::future(future) + } + fn fetch_state( &mut self, context: &mut Context, @@ -551,6 +578,27 @@ impl Handler for Project { } } +/// Returns the up to date project state. +/// +/// The project state is always fetched if it is missing or outdated. +pub struct RefreshProjectState; + +impl Message for RefreshProjectState { + type Result = Result, ProjectError>; +} + +impl Handler for Project { + type Result = Response, ProjectError>; + + fn handle( + &mut self, + _message: RefreshProjectState, + context: &mut Context, + ) -> Self::Result { + self.refresh_state(context) + } +} + /// Returns the project state. /// /// The project state is fetched if it is missing or outdated. diff --git a/relay-server/src/endpoints/common.rs b/relay-server/src/endpoints/common.rs index c9dd6355be..ff746fbe1c 100644 --- a/relay-server/src/endpoints/common.rs +++ b/relay-server/src/endpoints/common.rs @@ -19,7 +19,7 @@ use relay_quotas::RateLimits; use crate::actors::events::{QueueEnvelope, QueueEnvelopeError}; use crate::actors::outcome::{DiscardReason, Outcome, TrackOutcome}; -use crate::actors::project::{CheckEnvelope, Project}; +use crate::actors::project::{CheckEnvelope, Project, RefreshProjectState}; use crate::actors::project_cache::{GetProject, ProjectError}; use crate::body::StorePayloadError; use crate::envelope::{AttachmentType, Envelope, EnvelopeError, ItemType, Items}; @@ -404,6 +404,7 @@ where let project_manager = request.state().project_cache(); let outcome_producer = request.state().outcome_producer(); let remote_addr = meta.client_addr(); + let should_refresh = meta.no_cache(); let scoping = Rc::new(RefCell::new(meta.get_partial_scoping())); let event_id = Rc::new(RefCell::new(None)); @@ -412,7 +413,27 @@ where let future = project_manager .send(GetProject { public_key }) .map_err(BadStoreRequest::ScheduleFailed) - .and_then(clone!(event_id, scoping, |project| { + .and_then(clone!(should_refresh, |project| { + type RetVal = ResponseFuture, BadStoreRequest>; + if should_refresh { + Box::new( + project + .send(RefreshProjectState) + .map_err(BadStoreRequest::ScheduleFailed) + .and_then(|result| { + // we actually do not care about the project state here. + // we just want to know it's refreshed. The real state + // is fetched again further down. + result + .map(|_| project) + .map_err(BadStoreRequest::ProjectFailed) + }), + ) as RetVal + } else { + Box::new(Ok(project).into_future()) as RetVal + } + })) + .and_then(clone!(event_id, scoping, should_refresh, |project| { extract_envelope(&request, meta) .into_future() .and_then(clone!(project, event_id, |envelope| { diff --git a/relay-server/src/extractors/request_meta.rs b/relay-server/src/extractors/request_meta.rs index b02cb4d550..5f72c493d0 100644 --- a/relay-server/src/extractors/request_meta.rs +++ b/relay-server/src/extractors/request_meta.rs @@ -153,6 +153,14 @@ const fn default_version() -> u16 { relay_common::PROTOCOL_VERSION } +fn is_false(value: &bool) -> bool { + !*value +} + +fn make_false() -> bool { + false +} + /// Request information for sentry ingest data, such as events, envelopes or metrics. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RequestMeta { @@ -183,6 +191,10 @@ pub struct RequestMeta { #[serde(default, skip_serializing_if = "Option::is_none")] user_agent: Option, + /// A flag that indicates that project options caching should be bypassed. + #[serde(default = "make_false", skip_serializing_if = "is_false")] + no_cache: bool, + /// The time at which the request started. // // NOTE: This is internal-only and not exposed to Envelope headers. @@ -234,6 +246,11 @@ impl RequestMeta { self.user_agent.as_deref() } + /// Indicates that caches should be bypassed. + pub fn no_cache(&self) -> bool { + self.no_cache + } + /// The time at which the request started. pub fn start_time(&self) -> Instant { self.start_time @@ -252,6 +269,7 @@ impl RequestMeta { remote_addr: Some("192.168.0.1".parse().unwrap()), forwarded_for: String::new(), user_agent: Some("sentry/agent".to_string()), + no_cache: false, start_time: Instant::now(), } } @@ -436,9 +454,12 @@ impl FromRequest for RequestMeta { let config = request.state().config(); let upstream = config.upstream_descriptor(); + let (public_key, key_flags) = + ProjectKey::parse_with_flags(auth.public_key()).map_err(BadEventMeta::BadPublicKey)?; + let dsn = PartialDsn { scheme: upstream.scheme(), - public_key: ProjectKey::parse(auth.public_key()).map_err(BadEventMeta::BadPublicKey)?, + public_key, host: upstream.host().to_owned(), port: upstream.port(), path: String::new(), @@ -458,6 +479,7 @@ impl FromRequest for RequestMeta { .get(header::USER_AGENT) .and_then(|h| h.to_str().ok()) .map(str::to_owned), + no_cache: key_flags.contains(&"no-cache"), start_time, }) } From 782f97a25867aec81645188f217deebaee513745 Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Wed, 27 Jan 2021 15:31:05 +0100 Subject: [PATCH 2/6] ref: Propagate no_cache to upstream requests --- relay-server/src/actors/events.rs | 6 +- relay-server/src/actors/project.rs | 102 ++++++++---------- relay-server/src/actors/project_cache.rs | 7 +- relay-server/src/actors/project_upstream.rs | 23 +++- relay-server/src/endpoints/common.rs | 25 +---- relay-server/src/endpoints/project_configs.rs | 7 +- relay-server/src/extractors/request_meta.rs | 3 + relay-server/src/utils/dynamic_sampling.rs | 2 +- 8 files changed, 89 insertions(+), 86 deletions(-) diff --git a/relay-server/src/actors/events.rs b/relay-server/src/actors/events.rs index 4b73dbc291..e9e58db116 100644 --- a/relay-server/src/actors/events.rs +++ b/relay-server/src/actors/events.rs @@ -1489,9 +1489,11 @@ impl Handler for EventManager { } }) .and_then(clone!(project, |envelope| { - // get the state for the current project + // get the state for the current project. we can always fetch the cached version + // even if the no_cache flag was passed, as the cache was updated prior in + // `CheckEnvelope`. project - .send(GetProjectState) + .send(GetProjectState::new()) .map_err(ProcessingError::ScheduleFailed) .and_then(|result| result.map_err(ProcessingError::ProjectFailed)) .map(|state| (envelope, state)) diff --git a/relay-server/src/actors/project.rs b/relay-server/src/actors/project.rs index 6e0d6a2d9c..0f884e4810 100644 --- a/relay-server/src/actors/project.rs +++ b/relay-server/src/actors/project.rs @@ -383,6 +383,7 @@ pub struct Project { state: Option>, state_channel: Option>>>, rate_limits: RateLimits, + no_cache: bool, } impl Project { @@ -394,6 +395,7 @@ impl Project { state: None, state_channel: None, rate_limits: RateLimits::new(), + no_cache: false, } } @@ -403,6 +405,7 @@ impl Project { fn get_or_fetch_state( &mut self, + no_cache: bool, context: &mut Context, ) -> Response, ProjectError> { // count number of times we are looking for the project state @@ -414,6 +417,9 @@ impl Project { .unwrap_or(Outdated::HardOutdated); let cached_state = match (state, outdated) { + // Never use the cached state if `no_cache` is set. + _ if no_cache => None, + // There is no project state that can be used, fetch a state and return it. (None, _) | (_, Outdated::HardOutdated) => None, @@ -424,14 +430,20 @@ impl Project { (Some(state), Outdated::Updated) => return Response::ok(state.clone()), }; + // Check if we are already fetching with `no_cache` enabled. Otherwise, replace the current + // channel with one that has the flag enabled. All envelopes that are already in-flight will + // still receive the potentially cached upstream state, but subsequent envelopes will amend + // to the new channel. + let replace_channel = no_cache && !self.no_cache; + let channel = match self.state_channel { - Some(ref channel) => { + Some(ref channel) if !replace_channel => { relay_log::debug!("project {} state request amended", self.public_key); channel.clone() } - None => { + _ => { relay_log::debug!("project {} state requested", self.public_key); - let channel = self.fetch_state(context); + let channel = self.fetch_state(no_cache, context).shared(); self.state_channel = Some(channel.clone()); channel } @@ -448,42 +460,19 @@ impl Project { Response::future(future) } - /// This refreshes the underlying state and waits for that. - /// - /// Note that this currently has the limitation that it does not promote the - /// refresh through a chain of relays. This means that it can only ask for a - /// local refresh, but if the next in line relay is outdated this won't do - /// anything. - fn refresh_state( - &mut self, - context: &mut Context, - ) -> Response, ProjectError> { - // count number of times we are looking for the project state - metric!(counter(RelayCounters::ProjectStateGet) += 1); - - relay_log::debug!( - "project {} state requested (force refresh)", - self.public_key - ); - let channel = self.fetch_state(context); - self.state_channel = Some(channel.clone()); - - let future = channel - .map(|shared| (*shared).clone()) - .map_err(|_| ProjectError::FetchFailed); - - Response::future(future) - } - fn fetch_state( &mut self, + no_cache: bool, context: &mut Context, - ) -> Shared>> { + ) -> oneshot::Receiver> { let (sender, receiver) = oneshot::channel(); let public_key = self.public_key; self.manager - .send(FetchProjectState { public_key }) + .send(FetchProjectState { + public_key, + no_cache, + }) .into_actor(self) .map(move |state_result, slf, _ctx| { slf.state_channel = None; @@ -497,7 +486,7 @@ impl Project { .drop_err() .spawn(context); - receiver.shared() + receiver } fn get_scoping(&mut self, meta: &RequestMeta) -> Scoping { @@ -559,7 +548,7 @@ impl Actor for Project { /// /// This is used for cases when we only want to perform operations that do /// not require waiting for network requests. -/// +#[derive(Debug)] pub struct GetCachedProjectState; impl Message for GetCachedProjectState { @@ -578,32 +567,27 @@ impl Handler for Project { } } -/// Returns the up to date project state. +/// Returns the project state. /// -/// The project state is always fetched if it is missing or outdated. -pub struct RefreshProjectState; - -impl Message for RefreshProjectState { - type Result = Result, ProjectError>; +/// The project state is fetched if it is missing or outdated. If `no_cache` is specified, then the +/// state is always refreshed. +#[derive(Debug)] +pub struct GetProjectState { + no_cache: bool, } -impl Handler for Project { - type Result = Response, ProjectError>; +impl GetProjectState { + /// Fetches the project state and uses the cached version if up-to-date. + pub fn new() -> Self { + Self { no_cache: false } + } - fn handle( - &mut self, - _message: RefreshProjectState, - context: &mut Context, - ) -> Self::Result { - self.refresh_state(context) + /// Fetches the project state and conditionally skips the cache. + pub fn no_cache(no_cache: bool) -> Self { + Self { no_cache } } } -/// Returns the project state. -/// -/// The project state is fetched if it is missing or outdated. -pub struct GetProjectState; - impl Message for GetProjectState { type Result = Result, ProjectError>; } @@ -611,8 +595,8 @@ impl Message for GetProjectState { impl Handler for Project { type Result = Response, ProjectError>; - fn handle(&mut self, _message: GetProjectState, context: &mut Context) -> Self::Result { - self.get_or_fetch_state(context) + fn handle(&mut self, message: GetProjectState, context: &mut Context) -> Self::Result { + self.get_or_fetch_state(message.no_cache, context) } } @@ -677,13 +661,17 @@ impl Handler for Project { if message.fetch { // Project state fetching is allowed, so ensure the state is fetched and up-to-date. // This will return synchronously if the state is still cached. - self.get_or_fetch_state(context) + self.get_or_fetch_state(message.envelope.meta().no_cache(), context) .into_actor() .map(self, context, move |_, slf, _ctx| { slf.check_envelope_scoped(message) }) } else { - self.get_or_fetch_state(context); + // Preload the project cache so that it arrives a little earlier in processing. However, + // do not pass `no_cache`. In case the project is rate limited, we do not want to force + // a full reload. + self.get_or_fetch_state(false, context); + // message.fetch == false: Fetching must not block the store request. The EventManager // will later fetch the project state. ActorResponse::ok(self.check_envelope_scoped(message)) diff --git a/relay-server/src/actors/project_cache.rs b/relay-server/src/actors/project_cache.rs index a9c3c6bba2..80cee42ca5 100644 --- a/relay-server/src/actors/project_cache.rs +++ b/relay-server/src/actors/project_cache.rs @@ -170,7 +170,11 @@ impl Handler for ProjectCache { /// individual requests. #[derive(Clone)] pub struct FetchProjectState { + /// The public key to fetch the project by. pub public_key: ProjectKey, + + /// If true, all caches should be skipped and a fresh state should be computed. + pub no_cache: bool, } #[derive(Debug)] @@ -201,7 +205,8 @@ impl Handler for ProjectCache { type Result = Response; fn handle(&mut self, message: FetchProjectState, _context: &mut Self::Context) -> Self::Result { - let FetchProjectState { public_key } = message; + let public_key = message.public_key; + if let Some(mut entry) = self.projects.get_mut(&public_key) { // Bump the update time of the project in our hashmap to evade eviction. Eviction is a // sequential scan over self.projects, so this needs to be as fast as possible and diff --git a/relay-server/src/actors/project_upstream.rs b/relay-server/src/actors/project_upstream.rs index 1dc09cf09b..dc677ea732 100644 --- a/relay-server/src/actors/project_upstream.rs +++ b/relay-server/src/actors/project_upstream.rs @@ -40,6 +40,8 @@ pub struct GetProjectStates { pub public_keys: Vec, #[serde(default)] pub full_config: bool, + #[serde(default)] + pub no_cache: bool, } #[derive(Debug, Deserialize, Serialize)] @@ -77,6 +79,7 @@ struct ProjectStateChannel { sender: oneshot::Sender>, receiver: Shared>>, deadline: Instant, + no_cache: bool, } impl ProjectStateChannel { @@ -87,9 +90,14 @@ impl ProjectStateChannel { sender, receiver: receiver.shared(), deadline: Instant::now() + timeout, + no_cache: false, } } + pub fn no_cache(&mut self) { + self.no_cache = true; + } + pub fn send(self, state: ProjectState) { self.sender.send(Arc::new(state)).ok(); } @@ -183,6 +191,8 @@ impl UpstreamProjectSource { // num_batches. Worst case, we're left with one project per request, but that's fine. let actual_batch_size = (total_count + (total_count % num_batches)) / num_batches; + // TODO(ja): This mixes requests with no_cache. Separate out channels with no_cache: true? + let requests: Vec<_> = channels .into_iter() .chunks(actual_batch_size) @@ -198,6 +208,7 @@ impl UpstreamProjectSource { let query = GetProjectStates { public_keys: channels_batch.keys().copied().collect(), full_config: self.config.processing_enabled(), + no_cache: channels_batch.values().any(|c| c.no_cache), }; // count number of http requests for project states @@ -324,6 +335,10 @@ impl Handler for UpstreamProjectSource { } let query_timeout = self.config.query_timeout(); + let FetchProjectState { + public_key, + no_cache, + } = message; // There's an edge case where a project is represented by two Project actors. This can // happen if our project eviction logic removes an actor from `project_cache.projects` @@ -336,9 +351,15 @@ impl Handler for UpstreamProjectSource { // channel for our current `message.id`. let channel = self .state_channels - .entry(message.public_key) + .entry(public_key) .or_insert_with(|| ProjectStateChannel::new(query_timeout)); + // Ensure upstream skips caches if one of the recipients requests an uncached response. This + // operation is additive across requests. + if no_cache { + channel.no_cache(); + } + Box::new( channel .receiver() diff --git a/relay-server/src/endpoints/common.rs b/relay-server/src/endpoints/common.rs index ff746fbe1c..c9dd6355be 100644 --- a/relay-server/src/endpoints/common.rs +++ b/relay-server/src/endpoints/common.rs @@ -19,7 +19,7 @@ use relay_quotas::RateLimits; use crate::actors::events::{QueueEnvelope, QueueEnvelopeError}; use crate::actors::outcome::{DiscardReason, Outcome, TrackOutcome}; -use crate::actors::project::{CheckEnvelope, Project, RefreshProjectState}; +use crate::actors::project::{CheckEnvelope, Project}; use crate::actors::project_cache::{GetProject, ProjectError}; use crate::body::StorePayloadError; use crate::envelope::{AttachmentType, Envelope, EnvelopeError, ItemType, Items}; @@ -404,7 +404,6 @@ where let project_manager = request.state().project_cache(); let outcome_producer = request.state().outcome_producer(); let remote_addr = meta.client_addr(); - let should_refresh = meta.no_cache(); let scoping = Rc::new(RefCell::new(meta.get_partial_scoping())); let event_id = Rc::new(RefCell::new(None)); @@ -413,27 +412,7 @@ where let future = project_manager .send(GetProject { public_key }) .map_err(BadStoreRequest::ScheduleFailed) - .and_then(clone!(should_refresh, |project| { - type RetVal = ResponseFuture, BadStoreRequest>; - if should_refresh { - Box::new( - project - .send(RefreshProjectState) - .map_err(BadStoreRequest::ScheduleFailed) - .and_then(|result| { - // we actually do not care about the project state here. - // we just want to know it's refreshed. The real state - // is fetched again further down. - result - .map(|_| project) - .map_err(BadStoreRequest::ProjectFailed) - }), - ) as RetVal - } else { - Box::new(Ok(project).into_future()) as RetVal - } - })) - .and_then(clone!(event_id, scoping, should_refresh, |project| { + .and_then(clone!(event_id, scoping, |project| { extract_envelope(&request, meta) .into_future() .and_then(clone!(project, event_id, |envelope| { diff --git a/relay-server/src/endpoints/project_configs.rs b/relay-server/src/endpoints/project_configs.rs index 0ae4b9ac2e..93867728ed 100644 --- a/relay-server/src/endpoints/project_configs.rs +++ b/relay-server/src/endpoints/project_configs.rs @@ -66,6 +66,7 @@ fn get_project_configs( ) -> ResponseFuture, Error> { let relay = body.relay; let full = relay.internal && body.inner.full_config; + let no_cache = body.inner.no_cache; let futures = body.inner.public_keys.into_iter().map(move |public_key| { let relay = relay.clone(); @@ -73,7 +74,11 @@ fn get_project_configs( .project_cache() .send(GetProject { public_key }) .map_err(Error::from) - .and_then(|project| project.send(GetProjectState).map_err(Error::from)) + .and_then(move |project| { + project + .send(GetProjectState::no_cache(no_cache)) + .map_err(Error::from) + }) .map(move |project_state| { let project_state = project_state.ok()?; // If public key is known (even if rate-limited, which is Some(false)), it has diff --git a/relay-server/src/extractors/request_meta.rs b/relay-server/src/extractors/request_meta.rs index 5f72c493d0..3030b3bcf8 100644 --- a/relay-server/src/extractors/request_meta.rs +++ b/relay-server/src/extractors/request_meta.rs @@ -368,6 +368,9 @@ impl PartialMeta { if self.user_agent.is_some() { complete.user_agent = self.user_agent; } + if self.no_cache { + complete.no_cache = true; + } complete } diff --git a/relay-server/src/utils/dynamic_sampling.rs b/relay-server/src/utils/dynamic_sampling.rs index 1144ec4be7..5a42cd7c89 100644 --- a/relay-server/src/utils/dynamic_sampling.rs +++ b/relay-server/src/utils/dynamic_sampling.rs @@ -340,7 +340,7 @@ pub fn sample_transaction( }); Box::new(fut) as ResponseFuture<_, _> } else { - let fut = project.send(GetProjectState).then(|project_state| { + let fut = project.send(GetProjectState::new()).then(|project_state| { let project_state = match project_state { // error getting the project, give up and return envelope unchanged Err(_) => return Ok(envelope), From 65066aad6d123ee95ee351b293a6d4eba6343676 Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Wed, 27 Jan 2021 16:08:42 +0100 Subject: [PATCH 3/6] meta: Changelog --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9a777e650a..fe5443b3c0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## Unreleased +**Features**: + +- By adding `.no-cache` to the DSN key, Relay refreshes project configuration caches immediately. This allows to apply changed settings instantly, such as updates to data scrubbing or inbound filter rules. ([#911](https://github.com/getsentry/relay/pull/911)) + **Internal**: - Compatibility mode for pre-aggregated sessions was removed. The feature is now enabled by default in full fidelity. ([#913](https://github.com/getsentry/relay/pull/913)) From aaf2700b4a01d01c69c93d1c34017a9769e2ecd5 Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Thu, 28 Jan 2021 09:54:44 +0100 Subject: [PATCH 4/6] fix: Race condition in state fetching --- relay-server/src/actors/project.rs | 35 +++++++++++++++++++++++------- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/relay-server/src/actors/project.rs b/relay-server/src/actors/project.rs index 0f884e4810..a190c1f9af 100644 --- a/relay-server/src/actors/project.rs +++ b/relay-server/src/actors/project.rs @@ -1,5 +1,5 @@ use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; use actix::prelude::*; use chrono::{DateTime, Utc}; @@ -383,7 +383,8 @@ pub struct Project { state: Option>, state_channel: Option>>>, rate_limits: RateLimits, - no_cache: bool, + last_no_cache: Instant, + fetching_no_cache: bool, } impl Project { @@ -395,7 +396,8 @@ impl Project { state: None, state_channel: None, rate_limits: RateLimits::new(), - no_cache: false, + last_no_cache: Instant::now(), + fetching_no_cache: false, } } @@ -405,12 +407,21 @@ impl Project { fn get_or_fetch_state( &mut self, - no_cache: bool, + mut no_cache: bool, context: &mut Context, ) -> Response, ProjectError> { // count number of times we are looking for the project state metric!(counter(RelayCounters::ProjectStateGet) += 1); + // Allow at most 1 no_cache request per second. Gracefully degrade to cached requests. + if no_cache { + if self.last_no_cache.elapsed() < Duration::from_secs(1) { + no_cache = false; + } else { + self.last_no_cache = Instant::now(); + } + } + let state = self.state.as_ref(); let outdated = state .map(|s| s.outdated(&self.config)) @@ -434,10 +445,14 @@ impl Project { // channel with one that has the flag enabled. All envelopes that are already in-flight will // still receive the potentially cached upstream state, but subsequent envelopes will amend // to the new channel. - let replace_channel = no_cache && !self.no_cache; + let reuse_channel = self.fetching_no_cache || !no_cache; let channel = match self.state_channel { - Some(ref channel) if !replace_channel => { + // Check if we are already fetching with `no_cache` enabled. Otherwise, replace the + // current channel with one that has the flag enabled. All envelopes that are already + // in-flight will still receive the potentially cached upstream state, but subsequent + // envelopes will amend to the new channel. + Some(ref channel) if reuse_channel => { relay_log::debug!("project {} state request amended", self.public_key); channel.clone() } @@ -468,6 +483,8 @@ impl Project { let (sender, receiver) = oneshot::channel(); let public_key = self.public_key; + self.fetching_no_cache = no_cache; + self.manager .send(FetchProjectState { public_key, @@ -475,8 +492,10 @@ impl Project { }) .into_actor(self) .map(move |state_result, slf, _ctx| { - slf.state_channel = None; - slf.state = state_result.map(|resp| resp.state).ok(); + if !slf.fetching_no_cache || no_cache { + slf.state_channel = None; + slf.state = state_result.map(|resp| resp.state).ok(); + } if let Some(ref state) = slf.state { relay_log::debug!("project state {} updated", public_key); From b97707fc5d8c8aa0ca0adf50bf082a84587e316a Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Thu, 28 Jan 2021 10:31:27 +0100 Subject: [PATCH 5/6] ref: Alternative no_cache debouncing --- relay-server/src/actors/project.rs | 98 +++++++++++++++++++----------- 1 file changed, 64 insertions(+), 34 deletions(-) diff --git a/relay-server/src/actors/project.rs b/relay-server/src/actors/project.rs index a190c1f9af..7d450154e0 100644 --- a/relay-server/src/actors/project.rs +++ b/relay-server/src/actors/project.rs @@ -372,6 +372,36 @@ pub struct PublicKeyConfig { pub numeric_id: Option, } +struct StateChannel { + sender: oneshot::Sender>, + receiver: Shared>>, + no_cache: bool, +} + +impl StateChannel { + pub fn new() -> Self { + let (sender, receiver) = oneshot::channel(); + Self { + sender, + receiver: receiver.shared(), + no_cache: false, + } + } + + pub fn no_cache(&mut self, no_cache: bool) -> &mut Self { + self.no_cache = no_cache; + self + } + + pub fn receiver(&self) -> Shared>> { + self.receiver.clone() + } + + pub fn send(self, state: Arc) { + self.sender.send(state).ok(); + } +} + /// Actor representing organization and project configuration for a project key. /// /// This actor no longer uniquely identifies a project. Instead, it identifies a project key. @@ -381,10 +411,9 @@ pub struct Project { config: Arc, manager: Addr, state: Option>, - state_channel: Option>>>, + state_channel: Option, rate_limits: RateLimits, last_no_cache: Instant, - fetching_no_cache: bool, } impl Project { @@ -397,7 +426,6 @@ impl Project { state_channel: None, rate_limits: RateLimits::new(), last_no_cache: Instant::now(), - fetching_no_cache: false, } } @@ -441,26 +469,26 @@ impl Project { (Some(state), Outdated::Updated) => return Response::ok(state.clone()), }; - // Check if we are already fetching with `no_cache` enabled. Otherwise, replace the current - // channel with one that has the flag enabled. All envelopes that are already in-flight will - // still receive the potentially cached upstream state, but subsequent envelopes will amend - // to the new channel. - let reuse_channel = self.fetching_no_cache || !no_cache; - - let channel = match self.state_channel { - // Check if we are already fetching with `no_cache` enabled. Otherwise, replace the - // current channel with one that has the flag enabled. All envelopes that are already - // in-flight will still receive the potentially cached upstream state, but subsequent - // envelopes will amend to the new channel. - Some(ref channel) if reuse_channel => { + let receiver = match self.state_channel { + Some(ref channel) if channel.no_cache || !no_cache => { relay_log::debug!("project {} state request amended", self.public_key); - channel.clone() + channel.receiver() } _ => { relay_log::debug!("project {} state requested", self.public_key); - let channel = self.fetch_state(no_cache, context).shared(); - self.state_channel = Some(channel.clone()); - channel + + let receiver = self + .state_channel + .get_or_insert_with(StateChannel::new) + .no_cache(no_cache) + .receiver(); + + // Either there is no running request, or the current request does not have + // `no_cache` set. In both cases, start a new request. All in-flight receivers will + // get the latest state. + self.fetch_state(no_cache, context); + + receiver } }; @@ -468,23 +496,17 @@ impl Project { return Response::ok(rv); } - let future = channel + let future = receiver .map(|shared| (*shared).clone()) .map_err(|_| ProjectError::FetchFailed); Response::future(future) } - fn fetch_state( - &mut self, - no_cache: bool, - context: &mut Context, - ) -> oneshot::Receiver> { - let (sender, receiver) = oneshot::channel(); + fn fetch_state(&mut self, no_cache: bool, context: &mut Context) { + debug_assert!(self.state_channel.is_some()); let public_key = self.public_key; - self.fetching_no_cache = no_cache; - self.manager .send(FetchProjectState { public_key, @@ -492,20 +514,28 @@ impl Project { }) .into_actor(self) .map(move |state_result, slf, _ctx| { - if !slf.fetching_no_cache || no_cache { - slf.state_channel = None; - slf.state = state_result.map(|resp| resp.state).ok(); + let channel = match slf.state_channel.take() { + Some(channel) => channel, + None => return, + }; + + // If the channel has `no_cache` set but we are not a `no_cache` request, we have + // been superseeded. Put it back and let the other request take precedence. + if channel.no_cache && !no_cache { + slf.state_channel = Some(channel); + return; } + slf.state_channel = None; + slf.state = state_result.map(|resp| resp.state).ok(); + if let Some(ref state) = slf.state { relay_log::debug!("project state {} updated", public_key); - sender.send(state.clone()).ok(); + channel.send(state.clone()); } }) .drop_err() .spawn(context); - - receiver } fn get_scoping(&mut self, meta: &RequestMeta) -> Scoping { From 1e38208062b7d715949c147b1f7af795b1ab3383 Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Thu, 28 Jan 2021 12:18:52 +0100 Subject: [PATCH 6/6] feat: Add a metric --- relay-server/src/actors/project.rs | 1 + relay-server/src/metrics.rs | 10 ++++++++++ 2 files changed, 11 insertions(+) diff --git a/relay-server/src/actors/project.rs b/relay-server/src/actors/project.rs index 7d450154e0..b0294c0d47 100644 --- a/relay-server/src/actors/project.rs +++ b/relay-server/src/actors/project.rs @@ -446,6 +446,7 @@ impl Project { if self.last_no_cache.elapsed() < Duration::from_secs(1) { no_cache = false; } else { + metric!(counter(RelayCounters::ProjectStateNoCache) += 1); self.last_no_cache = Instant::now(); } } diff --git a/relay-server/src/metrics.rs b/relay-server/src/metrics.rs index cd1cd2f430..017fecb377 100644 --- a/relay-server/src/metrics.rs +++ b/relay-server/src/metrics.rs @@ -335,6 +335,15 @@ pub enum RelayCounters { /// Note that after an update loop has completed, there may be more projects pending updates. /// This is indicated by `project_state.pending`. ProjectStateRequest, + /// Number of times a project config was requested with `.no-cache`. + /// + /// This effectively counts the number of envelopes or events that have been sent with a + /// corresponding DSN. Actual queries to the upstream may still be deduplicated for these + /// project state requests. + /// + /// A maximum of 1 such requests per second is allowed per project key. This metric counts only + /// permitted requests. + ProjectStateNoCache, /// Number of times a project is looked up from the cache. /// /// The cache may contain and outdated or expired project state. In that case, the project state @@ -433,6 +442,7 @@ impl CounterMetric for RelayCounters { RelayCounters::Outcomes => "events.outcomes", RelayCounters::ProjectStateGet => "project_state.get", RelayCounters::ProjectStateRequest => "project_state.request", + RelayCounters::ProjectStateNoCache => "project_state.no_cache", RelayCounters::ProjectCacheHit => "project_cache.hit", RelayCounters::ProjectCacheMiss => "project_cache.miss", RelayCounters::ServerStarting => "server.starting",