diff --git a/CHANGELOG.md b/CHANGELOG.md index 2713991045..cf44b2d2b9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,10 +2,6 @@ ## Unreleased -**Bug Fixes**: - -- Do not drop envelopes for unparsable project configs. ([#3770](https://github.com/getsentry/relay/pull/3770)) - **Features** - "Cardinality limit" outcomes now report which limit was exceeded. ([#3825](https://github.com/getsentry/relay/pull/3825)) diff --git a/relay-dynamic-config/src/project.rs b/relay-dynamic-config/src/project.rs index 55063feea7..b7340de94a 100644 --- a/relay-dynamic-config/src/project.rs +++ b/relay-dynamic-config/src/project.rs @@ -90,7 +90,7 @@ pub struct ProjectConfig { impl ProjectConfig { /// Validates fields in this project config and removes values that are partially invalid. - pub fn sanitized(&mut self) { + pub fn sanitize(&mut self) { self.quotas.retain(Quota::is_valid); metrics::convert_conditional_tagging(self); diff --git a/relay-server/src/endpoints/common.rs b/relay-server/src/endpoints/common.rs index 2f354c54b9..27384dc61c 100644 --- a/relay-server/src/endpoints/common.rs +++ b/relay-server/src/endpoints/common.rs @@ -297,10 +297,9 @@ fn queue_envelope( } // Split off the envelopes by item type. - let scoping = managed_envelope.scoping(); let envelopes = ProcessingGroup::split_envelope(*managed_envelope.take_envelope()); for (group, envelope) in envelopes { - let mut envelope = buffer_guard + let envelope = buffer_guard .enter( envelope, state.outcome_aggregator().clone(), @@ -308,7 +307,6 @@ fn queue_envelope( group, ) .map_err(BadStoreRequest::QueueFailed)?; - envelope.scope(scoping); state.project_cache().send(ValidateEnvelope::new(envelope)); } // The entire envelope is taken for a split above, and it's empty at this point, we can just diff --git a/relay-server/src/endpoints/project_configs.rs b/relay-server/src/endpoints/project_configs.rs index 0a84430583..086c1a4830 100644 --- a/relay-server/src/endpoints/project_configs.rs +++ b/relay-server/src/endpoints/project_configs.rs @@ -16,7 +16,7 @@ use crate::endpoints::forward; use crate::extractors::SignedJson; use crate::service::ServiceState; use crate::services::global_config::{self, StatusResponse}; -use crate::services::project::{LimitedParsedProjectState, ParsedProjectState, ProjectState}; +use crate::services::project::{LimitedProjectState, ProjectState}; use crate::services::project_cache::{GetCachedProjectState, GetProjectState}; /// V2 version of this endpoint. @@ -49,13 +49,13 @@ struct VersionQuery { #[derive(Debug, Clone, Serialize)] #[serde(untagged)] enum ProjectStateWrapper { - Full(ParsedProjectState), - Limited(#[serde(with = "LimitedParsedProjectState")] ParsedProjectState), + Full(ProjectState), + Limited(#[serde(with = "LimitedProjectState")] ProjectState), } impl ProjectStateWrapper { /// Create a wrapper which forces serialization into external or internal format - pub fn new(state: ParsedProjectState, full: bool) -> Self { + pub fn new(state: ProjectState, full: bool) -> Self { if full { Self::Full(state) } else { @@ -76,7 +76,7 @@ impl ProjectStateWrapper { #[derive(Debug, Serialize)] #[serde(rename_all = "camelCase")] struct GetProjectStatesResponseWrapper { - configs: HashMap, + configs: HashMap>, #[serde(skip_serializing_if = "Vec::is_empty")] pending: Vec, #[serde(skip_serializing_if = "Option::is_none")] @@ -123,6 +123,7 @@ async fn inner( project_cache .send(GetProjectState::new(project_key).no_cache(no_cache)) .await + .map(Some) }; (project_key, state_result) @@ -145,36 +146,23 @@ async fn inner( let mut pending = Vec::with_capacity(keys_len); let mut configs = HashMap::with_capacity(keys_len); for (project_key, state_result) in future::join_all(futures).await { - let project_info = match state_result? { - ProjectState::Enabled(info) => info, - ProjectState::Disabled => { - // Don't insert project config. Downstream Relay will consider it disabled. - continue; - } - ProjectState::Pending => { - pending.push(project_key); - continue; - } + let Some(project_state) = state_result? else { + pending.push(project_key); + continue; }; // If public key is known (even if rate-limited, which is Some(false)), it has // access to the project config let has_access = relay.internal - || project_info + || project_state .config .trusted_relays .contains(&relay.public_key); if has_access { let full = relay.internal && inner.full_config; - let wrapper = ProjectStateWrapper::new( - ParsedProjectState { - disabled: false, - info: project_info.as_ref().clone(), - }, - full, - ); - configs.insert(project_key, wrapper); + let wrapper = ProjectStateWrapper::new((*project_state).clone(), full); + configs.insert(project_key, Some(wrapper)); } else { relay_log::debug!( relay = %relay.public_key, diff --git a/relay-server/src/envelope.rs b/relay-server/src/envelope.rs index 27be96ea9b..5fc0c6a6d6 100644 --- a/relay-server/src/envelope.rs +++ b/relay-server/src/envelope.rs @@ -30,7 +30,6 @@ //! //! ``` -use relay_base_schema::project::ProjectKey; use std::borrow::Borrow; use std::collections::BTreeMap; use std::fmt; @@ -1219,24 +1218,6 @@ impl Envelope { self.headers.sent_at } - /// Returns the project key defined in the `trace` header of the envelope. - /// - /// This function returns `None` if: - /// - there is no [`DynamicSamplingContext`] in the envelope headers. - /// - there are no transactions or events in the envelope, since in this case sampling by trace is redundant. - pub fn sampling_key(&self) -> Option { - // If the envelope item is not of type transaction or event, we will not return a sampling key - // because it doesn't make sense to load the root project state if we don't perform trace - // sampling. - self.get_item_by(|item| { - matches!( - item.ty(), - ItemType::Transaction | ItemType::Event | ItemType::Span - ) - })?; - self.dsc().map(|dsc| dsc.public_key) - } - /// Sets the event id on the envelope. pub fn set_event_id(&mut self, event_id: EventId) { self.headers.event_id = Some(event_id); diff --git a/relay-server/src/metrics_extraction/event.rs b/relay-server/src/metrics_extraction/event.rs index a3f80e810e..3f2887d137 100644 --- a/relay-server/src/metrics_extraction/event.rs +++ b/relay-server/src/metrics_extraction/event.rs @@ -131,7 +131,7 @@ mod tests { metric_extraction: metric_extraction.map(ErrorBoundary::Ok).unwrap_or_default(), ..ProjectConfig::default() }; - project.sanitized(); // enables metrics extraction rules + project.sanitize(); // enables metrics extraction rules let project = project.metric_extraction.ok().unwrap(); OwnedConfig { global, project } diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 8ddd6b9621..064a894d04 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -74,7 +74,7 @@ use crate::service::ServiceError; use crate::services::global_config::GlobalConfigHandle; use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome}; use crate::services::processor::event::FiltersStatus; -use crate::services::project::ProjectInfo; +use crate::services::project::ProjectState; use crate::services::project_cache::{ AddMetricBuckets, AddMetricMeta, BucketSource, ProjectCache, UpdateRateLimits, }; @@ -169,7 +169,7 @@ pub trait EventProcessing {} /// A trait for processing groups that can be dynamically sampled. pub trait Sampling { /// Whether dynamic sampling should run under the given project's conditions. - fn supports_sampling(project_state: &ProjectInfo) -> bool; + fn supports_sampling(project_state: &ProjectState) -> bool; /// Whether reservoir sampling applies to this processing group (a.k.a. data type). fn supports_reservoir_sampling() -> bool; @@ -179,7 +179,7 @@ processing_group!(TransactionGroup, Transaction); impl EventProcessing for TransactionGroup {} impl Sampling for TransactionGroup { - fn supports_sampling(project_state: &ProjectInfo) -> bool { + fn supports_sampling(project_state: &ProjectState) -> bool { // For transactions, we require transaction metrics to be enabled before sampling. matches!(&project_state.config.transaction_metrics, Some(ErrorBoundary::Ok(c)) if c.is_enabled()) } @@ -200,7 +200,7 @@ processing_group!(CheckInGroup, CheckIn); processing_group!(SpanGroup, Span); impl Sampling for SpanGroup { - fn supports_sampling(project_state: &ProjectInfo) -> bool { + fn supports_sampling(project_state: &ProjectState) -> bool { // If no metrics could be extracted, do not sample anything. matches!(&project_state.config().metric_extraction, ErrorBoundary::Ok(c) if c.is_supported()) } @@ -589,14 +589,14 @@ type ExtractedEvent = (Annotated, usize); #[derive(Debug)] pub struct ProcessingExtractedMetrics { metrics: ExtractedMetrics, - project: Arc, + project: Arc, global: Arc, extrapolation_factor: Option, } impl ProcessingExtractedMetrics { pub fn new( - project: Arc, + project: Arc, global: Arc, dsc: Option<&DynamicSamplingContext>, ) -> Self { @@ -747,7 +747,7 @@ fn send_metrics(metrics: ExtractedMetrics, envelope: &Envelope, project_cache: A // project_without_tracing -> metrics goes to self // dependent_project_with_tracing -> metrics goes to root // root_project_with_tracing -> metrics goes to root == self - let sampling_project_key = envelope.sampling_key().unwrap_or(project_key); + let sampling_project_key = utils::get_sampling_key(envelope).unwrap_or(project_key); project_cache.send(AddMetricBuckets::internal( sampling_project_key, sampling_metrics, @@ -792,11 +792,11 @@ struct ProcessEnvelopeState<'a, Group> { extracted_metrics: ProcessingExtractedMetrics, /// The state of the project that this envelope belongs to. - project_state: Arc, + project_state: Arc, /// The state of the project that initiated the current trace. /// This is the config used for trace-based dynamic sampling. - sampling_project_state: Option>, + sampling_project_state: Option>, /// The id of the project that this envelope is ingested into. /// @@ -891,8 +891,8 @@ pub struct ProcessEnvelopeResponse { #[derive(Debug)] pub struct ProcessEnvelope { pub envelope: ManagedEnvelope, - pub project_info: Arc, - pub sampling_project_info: Option>, + pub project_state: Arc, + pub sampling_project_state: Option>, pub reservoir_counters: ReservoirCounters, } @@ -948,8 +948,8 @@ pub struct ProcessMetricMeta { pub struct ProjectMetrics { /// The metric buckets to encode. pub buckets: Vec, - /// Project info for extracting quotas. - pub project_info: Arc, + /// Project state for extracting quotas. + pub project_state: Arc, } /// Encodes metrics into an envelope ready to be sent upstream. @@ -1220,8 +1220,8 @@ impl EnvelopeProcessorService { global_config: Arc, mut managed_envelope: TypedEnvelope, project_id: ProjectId, - project_state: Arc, - sampling_project_state: Option>, + project_state: Arc, + sampling_project_state: Option>, reservoir_counters: Arc>>, ) -> ProcessEnvelopeState { let envelope = managed_envelope.envelope_mut(); @@ -1840,8 +1840,8 @@ impl EnvelopeProcessorService { &self, mut managed_envelope: ManagedEnvelope, project_id: ProjectId, - project_state: Arc, - sampling_project_state: Option>, + project_state: Arc, + sampling_project_state: Option>, reservoir_counters: Arc>>, ) -> Result { // Get the group from the managed envelope context, and if it's not set, try to guess it @@ -1940,8 +1940,8 @@ impl EnvelopeProcessorService { ) -> Result { let ProcessEnvelope { envelope: mut managed_envelope, - project_info, - sampling_project_info, + project_state, + sampling_project_state, reservoir_counters, } = message; @@ -1951,7 +1951,7 @@ impl EnvelopeProcessorService { // // Neither ID can be available in proxy mode on the /store/ endpoint. This is not supported, // since we cannot process an envelope without project ID, so drop it. - let project_id = match project_info + let project_id = match project_state .project_id .or_else(|| managed_envelope.envelope().meta().project_id()) { @@ -1988,8 +1988,8 @@ impl EnvelopeProcessorService { match self.process_envelope( managed_envelope, project_id, - project_info, - sampling_project_info, + project_state, + sampling_project_state, reservoir_counters, ) { Ok(mut state) => { @@ -2310,7 +2310,7 @@ impl EnvelopeProcessorService { fn rate_limit_buckets( &self, scoping: Scoping, - project_state: &ProjectInfo, + project_state: &ProjectState, mut buckets: Vec, ) -> Vec { let Some(rate_limiter) = self.inner.rate_limiter.as_ref() else { @@ -2525,19 +2525,19 @@ impl EnvelopeProcessorService { for (scoping, message) in message.scopes { let ProjectMetrics { buckets, - project_info, + project_state, } = message; - let buckets = self.rate_limit_buckets(scoping, &project_info, buckets); + let buckets = self.rate_limit_buckets(scoping, &project_state, buckets); - let limits = project_info.get_cardinality_limits(); + let limits = project_state.get_cardinality_limits(); let buckets = self.cardinality_limit_buckets(scoping, limits, buckets); if buckets.is_empty() { continue; } - let retention = project_info + let retention = project_state .config .event_retention .unwrap_or(DEFAULT_EVENT_RETENTION); @@ -3255,7 +3255,7 @@ mod tests { let not_ratelimited_org = 2; let message = { - let project_info = { + let project_state = { let quota = Quota { id: Some("testing".into()), categories: vec![DataCategory::MetricBucket].into(), @@ -3270,10 +3270,9 @@ mod tests { let mut config = ProjectConfig::default(); config.quotas.push(quota); - Arc::new(ProjectInfo { - config, - ..Default::default() - }) + let mut project_state = ProjectState::allowed(); + project_state.config = config; + Arc::new(project_state) }; let project_metrics = ProjectMetrics { @@ -3285,7 +3284,7 @@ mod tests { width: 10, metadata: BucketMetadata::default(), }], - project_info, + project_state, }; let scoping_by_org_id = |org_id: u64| Scoping { @@ -3392,10 +3391,8 @@ mod tests { ..Default::default() }; - let project_info = ProjectInfo { - config, - ..Default::default() - }; + let mut project_state = ProjectState::allowed(); + project_state.config = config; let mut envelopes = ProcessingGroup::split_envelope(*envelope); assert_eq!(envelopes.len(), 1); @@ -3405,8 +3402,8 @@ mod tests { let message = ProcessEnvelope { envelope, - project_info: Arc::new(project_info), - sampling_project_info: None, + project_state: Arc::new(project_state), + sampling_project_state: None, reservoir_counters: ReservoirCounters::default(), }; @@ -3467,8 +3464,8 @@ mod tests { let process_message = ProcessEnvelope { envelope: managed_envelope, - project_info: Arc::new(ProjectInfo::default()), - sampling_project_info: None, + project_state: Arc::new(ProjectState::allowed()), + sampling_project_state: None, reservoir_counters: ReservoirCounters::default(), }; @@ -3826,8 +3823,8 @@ mod tests { #[test] fn test_extrapolate() { - let mut project_info = ProjectInfo::default(); - project_info.config.metric_extraction = ErrorBoundary::Ok(MetricExtractionConfig { + let mut project_state = ProjectState::allowed(); + project_state.config.metric_extraction = ErrorBoundary::Ok(MetricExtractionConfig { extrapolate: ExtrapolationConfig { include: vec![LazyGlob::new("*")], exclude: vec![], @@ -3846,7 +3843,7 @@ mod tests { global_config.options.extrapolation_duplication_limit = 3; let mut extracted_metrics = ProcessingExtractedMetrics::new( - Arc::new(project_info), + Arc::new(project_state), Arc::new(global_config), Some(&dsc), ); diff --git a/relay-server/src/services/processor/dynamic_sampling.rs b/relay-server/src/services/processor/dynamic_sampling.rs index bc7b20ee2c..19549308ef 100644 --- a/relay-server/src/services/processor/dynamic_sampling.rs +++ b/relay-server/src/services/processor/dynamic_sampling.rs @@ -279,7 +279,7 @@ mod tests { use crate::services::processor::{ ProcessEnvelope, ProcessingExtractedMetrics, ProcessingGroup, SpanGroup, }; - use crate::services::project::ProjectInfo; + use crate::services::project::ProjectState; use crate::testutils::{ self, create_test_processor, new_envelope, state_with_rule_and_condition, }; @@ -313,7 +313,7 @@ mod tests { /// Always sets the processing item type to event. fn process_envelope_with_root_project_state( envelope: Box, - sampling_project_state: Option>, + sampling_project_state: Option>, ) -> Envelope { let processor = create_test_processor(Default::default()); let (outcome_aggregator, test_store) = testutils::processor_services(); @@ -324,8 +324,8 @@ mod tests { let message = ProcessEnvelope { envelope: ManagedEnvelope::standalone(envelope, outcome_aggregator, test_store, group), - project_info: Arc::new(ProjectInfo::default()), - sampling_project_info: sampling_project_state, + project_state: Arc::new(ProjectState::allowed()), + sampling_project_state, reservoir_counters: ReservoirCounters::default(), }; @@ -501,7 +501,7 @@ mod tests { assert_eq!(sampling_result.decision(), SamplingDecision::Drop); } - fn project_state_with_single_rule(sample_rate: f64) -> ProjectInfo { + fn project_state_with_single_rule(sample_rate: f64) -> ProjectState { let sampling_config = SamplingConfig { rules: vec![SamplingRule { condition: RuleCondition::all(), @@ -514,7 +514,7 @@ mod tests { ..SamplingConfig::new() }; - let mut sampling_project_state = ProjectInfo::default(); + let mut sampling_project_state = ProjectState::allowed(); sampling_project_state.config.sampling = Some(ErrorBoundary::Ok(sampling_config)); sampling_project_state } @@ -718,13 +718,13 @@ mod tests { where G: Sampling + TryFrom, { - let project_info = { - let mut info = ProjectInfo::default(); - info.config.transaction_metrics = Some(ErrorBoundary::Ok(TransactionMetricsConfig { + let project_state = { + let mut state = ProjectState::allowed(); + state.config.transaction_metrics = Some(ErrorBoundary::Ok(TransactionMetricsConfig { version: 1, ..Default::default() })); - Arc::new(info) + Arc::new(state) }; let bytes = Bytes::from( @@ -738,13 +738,13 @@ mod tests { metrics: Default::default(), sample_rates: Default::default(), extracted_metrics: ProcessingExtractedMetrics::new( - project_info.clone(), + project_state.clone(), Arc::new(GlobalConfig::default()), envelope.dsc(), ), - project_state: project_info, + project_state, sampling_project_state: { - let mut state = ProjectInfo::default(); + let mut state = ProjectState::allowed(); state.config.metric_extraction = ErrorBoundary::Ok(MetricExtractionConfig::default()); state.config.sampling = Some(ErrorBoundary::Ok(SamplingConfig { diff --git a/relay-server/src/services/processor/profile.rs b/relay-server/src/services/processor/profile.rs index d5f00af972..be8e3fb800 100644 --- a/relay-server/src/services/processor/profile.rs +++ b/relay-server/src/services/processor/profile.rs @@ -157,7 +157,7 @@ mod tests { use crate::envelope::Envelope; use crate::extractors::RequestMeta; use crate::services::processor::{ProcessEnvelope, ProcessingGroup}; - use crate::services::project::ProjectInfo; + use crate::services::project::ProjectState; use crate::testutils::create_test_processor; use crate::utils::ManagedEnvelope; @@ -252,7 +252,7 @@ mod tests { item }); - let mut project_state = ProjectInfo::default(); + let mut project_state = ProjectState::allowed(); project_state.config.features.0.insert(Feature::Profiling); let mut envelopes = ProcessingGroup::split_envelope(*envelope); @@ -263,8 +263,8 @@ mod tests { let message = ProcessEnvelope { envelope, - project_info: Arc::new(project_state), - sampling_project_info: None, + project_state: Arc::new(project_state), + sampling_project_state: None, reservoir_counters: ReservoirCounters::default(), }; @@ -382,8 +382,8 @@ mod tests { item }); - let mut project_info = ProjectInfo::default(); - project_info.config.features.0.insert(Feature::Profiling); + let mut project_state = ProjectState::allowed(); + project_state.config.features.0.insert(Feature::Profiling); let mut envelopes = ProcessingGroup::split_envelope(*envelope); assert_eq!(envelopes.len(), 1); @@ -393,8 +393,8 @@ mod tests { let message = ProcessEnvelope { envelope, - project_info: Arc::new(project_info), - sampling_project_info: None, + project_state: Arc::new(project_state), + sampling_project_state: None, reservoir_counters: ReservoirCounters::default(), }; @@ -451,7 +451,7 @@ mod tests { item }); - let mut project_state = ProjectInfo::default(); + let mut project_state = ProjectState::allowed(); project_state.config.features.0.insert(Feature::Profiling); let mut envelopes = ProcessingGroup::split_envelope(*envelope); @@ -463,8 +463,8 @@ mod tests { let message = ProcessEnvelope { envelope, - project_info: Arc::new(project_state), - sampling_project_info: None, + project_state: Arc::new(project_state), + sampling_project_state: None, reservoir_counters: ReservoirCounters::default(), }; @@ -523,7 +523,7 @@ mod tests { item }); - let mut project_state = ProjectInfo::default(); + let mut project_state = ProjectState::allowed(); project_state.config.features.0.insert(Feature::Profiling); let mut envelopes = ProcessingGroup::split_envelope(*envelope); @@ -534,8 +534,8 @@ mod tests { let message = ProcessEnvelope { envelope, - project_info: Arc::new(project_state), - sampling_project_info: None, + project_state: Arc::new(project_state), + sampling_project_state: None, reservoir_counters: ReservoirCounters::default(), }; diff --git a/relay-server/src/services/processor/report.rs b/relay-server/src/services/processor/report.rs index f49ae184dd..2f229d28b0 100644 --- a/relay-server/src/services/processor/report.rs +++ b/relay-server/src/services/processor/report.rs @@ -268,7 +268,7 @@ mod tests { use crate::extractors::RequestMeta; use crate::services::outcome::RuleCategory; use crate::services::processor::{ProcessEnvelope, ProcessingGroup}; - use crate::services::project::ProjectInfo; + use crate::services::project::ProjectState; use crate::testutils::{self, create_test_processor}; use crate::utils::ManagedEnvelope; @@ -318,8 +318,8 @@ mod tests { let envelope = ManagedEnvelope::standalone(envelope, outcome_aggregator, test_store, group); let message = ProcessEnvelope { envelope, - project_info: Arc::new(ProjectInfo::default()), - sampling_project_info: None, + project_state: Arc::new(ProjectState::allowed()), + sampling_project_state: None, reservoir_counters: ReservoirCounters::default(), }; @@ -372,8 +372,8 @@ mod tests { let message = ProcessEnvelope { envelope, - project_info: Arc::new(ProjectInfo::default()), - sampling_project_info: None, + project_state: Arc::new(ProjectState::allowed()), + sampling_project_state: None, reservoir_counters: ReservoirCounters::default(), }; @@ -434,8 +434,8 @@ mod tests { let envelope = ManagedEnvelope::standalone(envelope, outcome_aggregator, test_store, group); let message = ProcessEnvelope { envelope, - project_info: Arc::new(ProjectInfo::default()), - sampling_project_info: None, + project_state: Arc::new(ProjectState::allowed()), + sampling_project_state: None, reservoir_counters: ReservoirCounters::default(), }; @@ -474,8 +474,8 @@ mod tests { let envelope = ManagedEnvelope::standalone(envelope, outcome_aggregator, test_store, group); let message = ProcessEnvelope { envelope, - project_info: Arc::new(ProjectInfo::default()), - sampling_project_info: None, + project_state: Arc::new(ProjectState::allowed()), + sampling_project_state: None, reservoir_counters: ReservoirCounters::default(), }; @@ -522,8 +522,8 @@ mod tests { let message = ProcessEnvelope { envelope, - project_info: Arc::new(ProjectInfo::default()), - sampling_project_info: None, + project_state: Arc::new(ProjectState::allowed()), + sampling_project_state: None, reservoir_counters: ReservoirCounters::default(), }; diff --git a/relay-server/src/services/processor/span/processing.rs b/relay-server/src/services/processor/span/processing.rs index 3d8ef72dab..aa9407e164 100644 --- a/relay-server/src/services/processor/span/processing.rs +++ b/relay-server/src/services/processor/span/processing.rs @@ -634,7 +634,7 @@ mod tests { use crate::envelope::Envelope; use crate::services::processor::{ProcessingExtractedMetrics, ProcessingGroup}; - use crate::services::project::ProjectInfo; + use crate::services::project::ProjectState; use crate::utils::ManagedEnvelope; use super::*; @@ -647,7 +647,7 @@ mod tests { ); let dummy_envelope = Envelope::parse_bytes(bytes).unwrap(); - let mut project_state = ProjectInfo::default(); + let mut project_state = ProjectState::allowed(); project_state .config .features diff --git a/relay-server/src/services/project.rs b/relay-server/src/services/project.rs index 6fe84264f0..3401a012c7 100644 --- a/relay-server/src/services/project.rs +++ b/relay-server/src/services/project.rs @@ -2,39 +2,381 @@ use std::collections::BTreeSet; use std::sync::Arc; use std::time::Duration; -use relay_base_schema::project::ProjectKey; +use chrono::{DateTime, Utc}; +use relay_base_schema::project::{ProjectId, ProjectKey}; +#[cfg(feature = "processing")] +use relay_cardinality::CardinalityLimit; use relay_config::Config; -use relay_dynamic_config::{ErrorBoundary, Feature}; +use relay_dynamic_config::{ErrorBoundary, Feature, LimitedProjectConfig, ProjectConfig}; +use relay_filter::matches_any_origin; use relay_metrics::{Bucket, MetaAggregator, MetricMeta, MetricNamespace}; -use relay_quotas::{CachedRateLimits, DataCategory, MetricNamespaceScoping, RateLimits, Scoping}; +use relay_quotas::{ + CachedRateLimits, DataCategory, MetricNamespaceScoping, Quota, RateLimits, Scoping, +}; use relay_sampling::evaluation::ReservoirCounters; use relay_statsd::metric; use relay_system::{Addr, BroadcastChannel}; use serde::{Deserialize, Serialize}; +use smallvec::SmallVec; use tokio::time::Instant; +use url::Url; -use crate::envelope::ItemType; +use crate::envelope::{Envelope, ItemType}; use crate::metrics::{MetricOutcomes, MetricsLimiter}; use crate::services::metrics::{Aggregator, MergeBuckets}; use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome}; use crate::services::processor::{EncodeMetricMeta, EnvelopeProcessor}; -use crate::services::project::metrics::{apply_project_info, filter_namespaces}; -use crate::services::project::state::ExpiryState; +use crate::services::project::metrics::{apply_project_state, filter_namespaces}; use crate::services::project_cache::{BucketSource, CheckedEnvelope, ProjectCache, RequestUpdate}; use crate::utils::{Enforcement, SeqCount}; +use crate::extractors::RequestMeta; + use crate::statsd::RelayCounters; use crate::utils::{self, EnvelopeLimiter, ManagedEnvelope, RetryBackoff}; mod metrics; -pub mod state; -pub use state::{ - LimitedParsedProjectState, ParsedProjectState, ProjectFetchState, ProjectInfo, ProjectState, -}; +/// The expiry status of a project state. Return value of [`ProjectState::check_expiry`]. +#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)] +enum Expiry { + /// The project state is perfectly up to date. + Updated, + /// The project state is outdated but events depending on this project state can still be + /// processed. The state should be refreshed in the background though. + Stale, + /// The project state is completely outdated and events need to be buffered up until the new + /// state has been fetched. + Expired, +} + +/// The expiry status of a project state, together with the state itself if it has not expired. +/// Return value of [`Project::expiry_state`]. +pub enum ExpiryState { + /// An up-to-date project state. See [`Expiry::Updated`]. + Updated(Arc), + /// A stale project state that can still be used. See [`Expiry::Stale`]. + Stale(Arc), + /// An expired project state that should not be used. See [`Expiry::Expired`]. + Expired, +} /// Sender type for messages that respond with project states. -pub type ProjectSender = relay_system::BroadcastSender; +pub type ProjectSender = relay_system::BroadcastSender>; + +/// The project state is a cached server state of a project. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ProjectState { + /// Unique identifier of this project. + pub project_id: Option, + /// The timestamp of when the state was last changed. + /// + /// This might be `None` in some rare cases like where states + /// are faked locally. + #[serde(default)] + pub last_change: Option>, + /// Indicates that the project is disabled. + #[serde(default)] + pub disabled: bool, + /// A container of known public keys in the project. + /// + /// Since version 2, each project state corresponds to a single public key. For this reason, + /// only a single key can occur in this list. + #[serde(default)] + pub public_keys: SmallVec<[PublicKeyConfig; 1]>, + /// The project's slug if available. + #[serde(default)] + pub slug: Option, + /// The project's current config. + #[serde(default)] + pub config: ProjectConfig, + /// The organization id. + #[serde(default)] + pub organization_id: Option, + + /// The time at which this project state was last updated. + #[serde(skip, default = "Instant::now")] + pub last_fetch: Instant, + + /// True if this project state failed fetching or was incompatible with this Relay. + #[serde(skip, default)] + pub invalid: bool, +} + +/// Controls how we serialize a ProjectState for an external Relay +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase", remote = "ProjectState")] +pub struct LimitedProjectState { + pub project_id: Option, + pub last_change: Option>, + pub disabled: bool, + pub public_keys: SmallVec<[PublicKeyConfig; 1]>, + pub slug: Option, + #[serde(with = "LimitedProjectConfig")] + pub config: ProjectConfig, + pub organization_id: Option, +} + +impl ProjectState { + /// Project state for a missing project. + pub fn missing() -> Self { + ProjectState { + project_id: None, + last_change: None, + disabled: true, + public_keys: SmallVec::new(), + slug: None, + config: ProjectConfig::default(), + organization_id: None, + last_fetch: Instant::now(), + invalid: false, + } + } + + /// Project state for an unknown but allowed project. + /// + /// This state is used for forwarding in Proxy mode. + pub fn allowed() -> Self { + let mut state = ProjectState::missing(); + state.disabled = false; + state + } + + /// Project state for a deserialization error. + pub fn err() -> Self { + let mut state = ProjectState::missing(); + state.invalid = true; + state + } + + /// Returns configuration options for the public key. + pub fn get_public_key_config(&self) -> Option<&PublicKeyConfig> { + self.public_keys.first() + } + + /// Returns `true` if the entire project should be considered + /// disabled (blackholed, deleted etc.). + pub fn disabled(&self) -> bool { + self.disabled + } + + /// Returns `true` if the project state obtained from upstream could not be parsed. + /// + /// This results in events being dropped similar to disabled states, but can provide separate + /// metrics. + pub fn invalid(&self) -> bool { + self.invalid + } + + /// Returns whether this state is outdated and needs to be refetched. + fn check_expiry(&self, config: &Config) -> Expiry { + let expiry = match self.project_id { + None => config.cache_miss_expiry(), + Some(_) => config.project_cache_expiry(), + }; + + let elapsed = self.last_fetch.elapsed(); + if elapsed >= expiry + config.project_grace_period() { + Expiry::Expired + } else if elapsed >= expiry { + Expiry::Stale + } else { + Expiry::Updated + } + } + + /// Returns the project config. + pub fn config(&self) -> &ProjectConfig { + &self.config + } + + /// Returns `true` if the given project ID matches this project. + /// + /// If the project state has not been loaded, this check is skipped because the project + /// identifier is not yet known. Likewise, this check is skipped for the legacy store endpoint + /// which comes without a project ID. The id is later overwritten in `check_envelope`. + pub fn is_valid_project_id(&self, stated_id: Option, config: &Config) -> bool { + match (self.project_id, stated_id, config.override_project_ids()) { + (Some(actual_id), Some(stated_id), false) => actual_id == stated_id, + _ => true, + } + } + + /// Checks if this origin is allowed for this project. + fn is_valid_origin(&self, origin: Option<&Url>) -> bool { + // Generally accept any event without an origin. + let origin = match origin { + Some(origin) => origin, + None => return true, + }; + + // Match against list of allowed origins. If the list is empty we always reject. + let allowed = &self.config().allowed_domains; + if allowed.is_empty() { + return false; + } + + let allowed: Vec<_> = allowed + .iter() + .map(|origin| origin.as_str().into()) + .collect(); + + matches_any_origin(Some(origin.as_str()), &allowed) + } + + /// Returns `true` if the given public key matches this state. + /// + /// This is a sanity check since project states are keyed by the DSN public key. Unless the + /// state is invalid or unloaded, it must always match the public key. + pub fn is_matching_key(&self, project_key: ProjectKey) -> bool { + if let Some(key_config) = self.get_public_key_config() { + // Always validate if we have a key config. + key_config.public_key == project_key + } else { + // Loaded states must have a key config, but ignore missing and invalid states. + self.project_id.is_none() + } + } + + /// Amends request `Scoping` with information from this project state. + /// + /// This scoping amends `RequestMeta::get_partial_scoping` by adding organization and key info. + /// The processor must fetch the full scoping before attempting to rate limit with partial + /// scoping. + /// + /// To get the own scoping of this ProjectKey without amending request information, use + /// [`Project::scoping`] instead. + pub fn scope_request(&self, meta: &RequestMeta) -> Scoping { + let mut scoping = meta.get_partial_scoping(); + + // The key configuration may be missing if the event has been queued for extended times and + // project was refetched in between. In such a case, access to key quotas is not availabe, + // but we can gracefully execute all other rate limiting. + scoping.key_id = self + .get_public_key_config() + .and_then(|config| config.numeric_id); + + // The original project identifier is part of the DSN. If the DSN was moved to another + // project, the actual project identifier is different and can be obtained from project + // states. This is only possible when the project state has been loaded. + if let Some(project_id) = self.project_id { + scoping.project_id = project_id; + } + + // This is a hack covering three cases: + // 1. Relay has not fetched the project state. In this case we have no way of knowing + // which organization this project belongs to and we need to ignore any + // organization-wide rate limits stored globally. This project state cannot hold + // organization rate limits yet. + // 2. The state has been loaded, but the organization_id is not available. This is only + // the case for legacy Sentry servers that do not reply with organization rate + // limits. Thus, the organization_id doesn't matter. + // 3. An organization id is available and can be matched against rate limits. In this + // project, all organizations will match automatically, unless the organization id + // has changed since the last fetch. + scoping.organization_id = self.organization_id.unwrap_or(0); + + scoping + } + + /// Returns quotas declared in this project state. + pub fn get_quotas(&self) -> &[Quota] { + self.config.quotas.as_slice() + } + + /// Returns cardinality limits declared in this project state. + #[cfg(feature = "processing")] + pub fn get_cardinality_limits(&self) -> &[CardinalityLimit] { + match self.config.metrics { + ErrorBoundary::Ok(ref m) => m.cardinality_limits.as_slice(), + _ => &[], + } + } + + /// Returns `Err` if the project is known to be invalid or disabled. + /// + /// If this project state is hard outdated, this returns `Ok(())`, instead, to avoid prematurely + /// dropping data. + pub fn check_disabled(&self, config: &Config) -> Result<(), DiscardReason> { + // if the state is out of date, we proceed as if it was still up to date. The + // upstream relay (or sentry) will still filter events. + if self.check_expiry(config) == Expiry::Expired { + return Ok(()); + } + + // if we recorded an invalid project state response from the upstream (i.e. parsing + // failed), discard the event with a state reason. + if self.invalid() { + return Err(DiscardReason::ProjectState); + } + + // only drop events if we know for sure the project or key are disabled. + if self.disabled() { + return Err(DiscardReason::ProjectId); + } + + Ok(()) + } + + /// Determines whether the given envelope should be accepted or discarded. + /// + /// Returns `Ok(())` if the envelope should be accepted. Returns `Err(DiscardReason)` if the + /// envelope should be discarded, by indicating the reason. The checks preformed for this are: + /// + /// - Allowed origin headers + /// - Disabled or unknown projects + /// - Disabled project keys (DSN) + /// - Feature flags + pub fn check_envelope( + &self, + envelope: &Envelope, + config: &Config, + ) -> Result<(), DiscardReason> { + // Verify that the stated project id in the DSN matches the public key used to retrieve this + // project state. + let meta = envelope.meta(); + if !self.is_valid_project_id(meta.project_id(), config) { + return Err(DiscardReason::ProjectId); + } + + // Try to verify the request origin with the project config. + if !self.is_valid_origin(meta.origin()) { + return Err(DiscardReason::Cors); + } + + // sanity-check that the state has a matching public key loaded. + if !self.is_matching_key(meta.public_key()) { + relay_log::error!("public key mismatch on state {}", meta.public_key()); + return Err(DiscardReason::ProjectId); + } + + // Check for invalid or disabled projects. + self.check_disabled(config)?; + + // Check feature. + if let Some(disabled_feature) = envelope + .required_features() + .iter() + .find(|f| !self.has_feature(**f)) + { + return Err(DiscardReason::FeatureDisabled(*disabled_feature)); + } + + Ok(()) + } + + /// Validates data in this project state and removes values that are partially invalid. + pub fn sanitize(mut self) -> Self { + self.config.sanitize(); + self + } + + /// Returns `true` if the given feature is enabled for this project. + pub fn has_feature(&self, feature: Feature) -> bool { + self.config.features.has(feature) + } +} /// Represents a public key received from the projectconfig endpoint. #[derive(Debug, Clone, Serialize, Deserialize)] @@ -48,10 +390,9 @@ pub struct PublicKeyConfig { pub numeric_id: Option, } -/// Channel used to respond to state requests (e.g. by project config endpoint). #[derive(Debug)] struct StateChannel { - inner: BroadcastChannel, + inner: BroadcastChannel>, no_cache: bool, } @@ -69,12 +410,36 @@ impl StateChannel { } } -#[derive(Debug)] enum GetOrFetch<'a> { - Cached(ProjectState), + Cached(Arc), Scheduled(&'a mut StateChannel), } +/// Represents either the project state or an aggregation of metrics. +/// +/// We have to delay rate limiting on metrics until we have a valid project state, +/// So when we don't have one yet, we hold them in this aggregator until the project state arrives. +/// +/// TODO: spool queued metrics to disk when the in-memory aggregator becomes too full. +#[derive(Debug)] +enum State { + Cached(Arc), + Pending, +} + +impl State { + fn state_value(&self) -> Option> { + match self { + State::Cached(state) => Some(Arc::clone(state)), + State::Pending => None, + } + } + + fn new() -> Self { + Self::Pending + } +} + /// Structure representing organization and project configuration for a project key. /// /// This structure no longer uniquely identifies a project. Instead, it identifies a project key. @@ -86,7 +451,7 @@ pub struct Project { last_updated_at: Instant, project_key: ProjectKey, config: Arc, - state: ProjectFetchState, + state: State, state_channel: Option, rate_limits: CachedRateLimits, last_no_cache: Instant, @@ -103,7 +468,7 @@ impl Project { next_fetch_attempt: None, last_updated_at: Instant::now(), project_key: key, - state: ProjectFetchState::expired(), + state: State::new(), state_channel: None, rate_limits: CachedRateLimits::new(), last_no_cache: Instant::now(), @@ -119,13 +484,13 @@ impl Project { self.reservoir_counters.clone() } - fn current_state(&self) -> ProjectState { - self.state.current_state(&self.config) + fn state_value(&self) -> Option> { + self.state.state_value() } /// If a reservoir rule is no longer in the sampling config, we will remove those counters. fn remove_expired_reservoir_rules(&self) { - let Some(state) = self.current_state().enabled() else { + let Some(state) = self.state_value() else { return; }; @@ -143,6 +508,30 @@ impl Project { self.rate_limits.merge(rate_limits); } + /// Returns the current [`ExpiryState`] for this project. + /// If the project state's [`Expiry`] is `Expired`, do not return it. + pub fn expiry_state(&self) -> ExpiryState { + match self.state_value() { + Some(ref state) => match state.check_expiry(self.config.as_ref()) { + Expiry::Updated => ExpiryState::Updated(state.clone()), + Expiry::Stale => ExpiryState::Stale(state.clone()), + Expiry::Expired => ExpiryState::Expired, + }, + None => ExpiryState::Expired, + } + } + + /// Returns the project state if it is not expired. + /// + /// Convenience wrapper around [`expiry_state`](Self::expiry_state). + pub fn valid_state(&self) -> Option> { + match self.expiry_state() { + ExpiryState::Updated(state) => Some(state), + ExpiryState::Stale(state) => Some(state), + ExpiryState::Expired => None, + } + } + /// Returns the next attempt `Instant` if backoff is initiated, or None otherwise. pub fn next_fetch_attempt(&self) -> Option { self.next_fetch_attempt @@ -204,16 +593,15 @@ impl Project { meta: MetricMeta, envelope_processor: Addr, ) { + let state = self.state_value(); + // Only track metadata if custom metrics are enabled, or we don't know yet whether they are // enabled. - let is_enabled = match self.current_state() { - ProjectState::Enabled(info) => info.has_feature(Feature::CustomMetrics), - ProjectState::Disabled => false, - ProjectState::Pending => true, - }; - - if !is_enabled { - relay_log::trace!("metric meta not enabled for project {}", self.project_key); + if !state.map_or(true, |state| state.has_feature(Feature::CustomMetrics)) { + relay_log::trace!( + "metric meta feature flag not enabled for project {}", + self.project_key + ); return; } @@ -238,15 +626,9 @@ impl Project { if !self.has_pending_metric_meta { return; } - let is_enabled = match self.current_state() { - ProjectState::Enabled(project_info) => project_info.has_feature(Feature::CustomMetrics), - ProjectState::Disabled => false, - ProjectState::Pending => { - // Cannot flush, wait for project state to be loaded. - return; - } + let Some(state) = self.state_value() else { + return; }; - let Some(scoping) = self.scoping() else { return; }; @@ -254,7 +636,7 @@ impl Project { // All relevant info has been gathered, consider us flushed. self.has_pending_metric_meta = false; - if !is_enabled { + if !state.has_feature(Feature::CustomMetrics) { relay_log::debug!( "clearing metric meta aggregator, because project {} does not have feature flag enabled", self.project_key, @@ -339,15 +721,15 @@ impl Project { } } - let cached_state = match self.state.expiry_state(&self.config) { + let cached_state = match self.expiry_state() { // Never use the cached state if `no_cache` is set. _ if no_cache => None, // There is no project state that can be used, fetch a state and return it. ExpiryState::Expired => None, // The project is semi-outdated, fetch new state but return old one. - ExpiryState::Stale(state) => Some(state.clone()), + ExpiryState::Stale(state) => Some(state), // The project is not outdated, return early here to jump over fetching logic below. - ExpiryState::Updated(state) => return GetOrFetch::Cached(state.clone()), + ExpiryState::Updated(state) => return GetOrFetch::Cached(state), }; let channel = self.fetch_state(project_cache, no_cache); @@ -377,10 +759,10 @@ impl Project { &mut self, project_cache: Addr, no_cache: bool, - ) -> ProjectState { + ) -> Option> { match self.get_or_fetch_state(project_cache, no_cache) { - GetOrFetch::Cached(state) => state, - GetOrFetch::Scheduled(_) => ProjectState::Pending, + GetOrFetch::Cached(state) => Some(state), + GetOrFetch::Scheduled(_) => None, } } @@ -432,15 +814,16 @@ impl Project { /// take precedence. /// /// [`ValidateEnvelope`]: crate::services::project_cache::ValidateEnvelope + #[allow(clippy::too_many_arguments)] pub fn update_state( &mut self, project_cache: &Addr, - state: ProjectFetchState, + mut state: Arc, envelope_processor: &Addr, no_cache: bool, ) { // Initiate the backoff if the incoming state is invalid. Reset it otherwise. - if state.is_pending() { + if state.invalid() { self.next_fetch_attempt = Instant::now().checked_add(self.backoff.next_backoff()); } else { self.next_fetch_attempt = None; @@ -459,14 +842,15 @@ impl Project { return; } - // If the state is pending, return back the taken channel and schedule state update. - if state.is_pending() { - // Only overwrite if the old state is expired: - let is_expired = matches!(self.state.expiry_state(&self.config), ExpiryState::Expired); - if is_expired { - self.state = state; - } + match self.expiry_state() { + // If the new state is invalid but the old one still usable, keep the old one. + ExpiryState::Updated(old) | ExpiryState::Stale(old) if state.invalid() => state = old, + // If the new state is valid or the old one is expired, always use the new one. + _ => self.state = State::Cached(Arc::clone(&state)), + } + // If the state is still invalid, return back the taken channel and schedule state update. + if state.invalid() { self.state_channel = Some(channel); let attempts = self.backoff.attempt() + 1; relay_log::debug!( @@ -478,11 +862,9 @@ impl Project { return; } - self.state = state; - // Flush all waiting recipients. relay_log::debug!("project state {} updated", self.project_key); - channel.inner.send(self.state.current_state(&self.config)); + channel.inner.send(state); self.after_state_updated(envelope_processor); } @@ -500,13 +882,15 @@ impl Project { /// /// Returns `Some` if the project state has been fetched and contains a project identifier, /// otherwise `None`. + /// + /// NOTE: This function does not check the expiry of the project state. pub fn scoping(&self) -> Option { - let info = self.current_state().enabled()?; + let state = self.state_value()?; Some(Scoping { - organization_id: info.organization_id.unwrap_or(0), - project_id: info.project_id?, + organization_id: state.organization_id.unwrap_or(0), + project_id: state.project_id?, project_key: self.project_key, - key_id: info + key_id: state .get_public_key_config() .and_then(|config| config.numeric_id), }) @@ -527,16 +911,7 @@ impl Project { &mut self, mut envelope: ManagedEnvelope, ) -> Result { - let state = match self.current_state() { - ProjectState::Enabled(state) => Some(state.clone()), - ProjectState::Disabled => { - // TODO(jjbayer): We should refactor this function to either return a Result or - // handle envelope rejections internally, but not both. - envelope.reject(Outcome::Invalid(DiscardReason::ProjectId)); - return Err(DiscardReason::ProjectId); - } - ProjectState::Pending => None, - }; + let state = self.valid_state().filter(|state| !state.invalid()); let mut scoping = envelope.scoping(); if let Some(ref state) = state { @@ -605,15 +980,15 @@ impl Project { outcome_aggregator: &Addr, buckets: Vec, ) -> CheckedBuckets { - let project_info = match self.current_state() { - ProjectState::Enabled(info) => info.clone(), - ProjectState::Disabled => { - relay_log::debug!("dropping {} buckets for disabled project", buckets.len()); - return CheckedBuckets::Dropped; - } - ProjectState::Pending => return CheckedBuckets::NoProject(buckets), + let Some(project_state) = self.valid_state() else { + return CheckedBuckets::NoProject(buckets); }; + if project_state.invalid() || project_state.disabled() { + relay_log::debug!("dropping {} buckets for disabled project", buckets.len()); + return CheckedBuckets::Dropped; + } + let Some(scoping) = self.scoping() else { relay_log::error!( tags.project_key = self.project_key.as_str(), @@ -623,7 +998,7 @@ impl Project { return CheckedBuckets::Dropped; }; - let mut buckets = apply_project_info(buckets, metric_outcomes, &project_info, scoping); + let mut buckets = apply_project_state(buckets, metric_outcomes, &project_state, scoping); let namespaces: BTreeSet = buckets .iter() @@ -633,7 +1008,7 @@ impl Project { let current_limits = self.rate_limits.current_limits(); for namespace in namespaces { let limits = current_limits.check_with_quotas( - project_info.get_quotas(), + project_state.get_quotas(), scoping.item(DataCategory::MetricBucket), ); @@ -648,7 +1023,7 @@ impl Project { } } - let quotas = project_info.config.quotas.clone(); + let quotas = project_state.config.quotas.clone(); let buckets = match MetricsLimiter::create(buckets, quotas, scoping) { Ok(mut bucket_limiter) => { bucket_limiter.enforce_limits(current_limits, metric_outcomes, outcome_aggregator); @@ -663,7 +1038,7 @@ impl Project { CheckedBuckets::Checked { scoping, - project_info, + project_state, buckets, } } @@ -695,7 +1070,7 @@ fn sync_spans_to_enforcement(envelope: &ManagedEnvelope, enforcement: &mut Enfor } } -/// Counts the nested spans inside the first transaction envelope item inside the [`Envelope`](crate::envelope::Envelope). +/// Counts the nested spans inside the first transaction envelope item inside the [`Envelope`]. fn count_nested_spans(envelope: &ManagedEnvelope) -> usize { #[derive(Debug, Deserialize)] struct PartialEvent { @@ -723,8 +1098,8 @@ pub enum CheckedBuckets { Checked { /// Project scoping. scoping: Scoping, - /// Project info. - project_info: Arc, + /// Project state. + project_state: Arc, /// List of buckets. buckets: Vec, }, @@ -738,17 +1113,14 @@ pub enum CheckedBuckets { #[cfg(test)] mod tests { - use crate::envelope::{ContentType, Envelope, Item}; - use crate::extractors::RequestMeta; + use crate::envelope::{ContentType, Item}; use crate::metrics::MetricStats; use crate::services::processor::ProcessingGroup; - use relay_base_schema::project::ProjectId; use relay_common::time::UnixTimestamp; use relay_event_schema::protocol::EventId; use relay_metrics::BucketValue; use relay_test::mock_service; use serde_json::json; - use smallvec::SmallVec; use super::*; @@ -770,19 +1142,20 @@ mod tests { // Initialize project with a state let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); - let project_info = ProjectInfo { - project_id: Some(ProjectId::new(123)), - ..Default::default() - }; + let mut project_state = ProjectState::allowed(); + project_state.project_id = Some(ProjectId::new(123)); let mut project = Project::new(project_key, config.clone()); - project.state = ProjectFetchState::enabled(project_info); + project.state = State::Cached(Arc::new(project_state)); + + // Direct access should always yield a state: + assert!(project.state_value().is_some()); if expiry > 0 { // With long expiry, should get a state - assert!(matches!(project.current_state(), ProjectState::Enabled(_))); + assert!(project.valid_state().is_some()); } else { // With 0 expiry, project should expire immediately. No state can be set. - assert!(matches!(project.current_state(), ProjectState::Pending)); + assert!(project.valid_state().is_none()); } } } @@ -809,21 +1182,25 @@ mod tests { // Initialize project with a state. let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); + let mut project_state = ProjectState::allowed(); + project_state.project_id = Some(ProjectId::new(123)); let mut project = Project::new(project_key, config); project.state_channel = Some(channel); - project.state = ProjectFetchState::allowed(); + project.state = State::Cached(Arc::new(project_state)); + // The project ID must be set. + assert!(!project.state_value().unwrap().invalid()); assert!(project.next_fetch_attempt.is_none()); // Try to update project with errored project state. project.update_state( &addr, - ProjectFetchState::pending(), + Arc::new(ProjectState::err()), &envelope_processor, false, ); // Since we got invalid project state we still keep the old one meaning there // still must be the project id set. - assert!(matches!(project.current_state(), ProjectState::Enabled(_))); + assert!(!project.state_value().unwrap().invalid()); assert!(project.next_fetch_attempt.is_some()); // This tests that we actually initiate the backoff and the backoff mechanism works: @@ -838,7 +1215,7 @@ mod tests { project.state_channel = Some(channel); project.update_state( &addr, - ProjectFetchState::pending(), + Arc::new(ProjectState::err()), &envelope_processor, false, ); @@ -848,20 +1225,18 @@ mod tests { fn create_project(config: Option) -> Project { let project_key = ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(); let mut project = Project::new(project_key, Arc::new(Config::default())); - let mut project_info = ProjectInfo { - project_id: Some(ProjectId::new(42)), - ..Default::default() - }; + let mut project_state = ProjectState::allowed(); + project_state.project_id = Some(ProjectId::new(42)); let mut public_keys = SmallVec::new(); public_keys.push(PublicKeyConfig { public_key: project_key, numeric_id: None, }); - project_info.public_keys = public_keys; + project_state.public_keys = public_keys; if let Some(config) = config { - project_info.config = serde_json::from_value(config).unwrap(); + project_state.config = serde_json::from_value(config).unwrap(); } - project.state = ProjectFetchState::enabled(project_info); + project.state = State::Cached(Arc::new(project_state)); project } @@ -883,7 +1258,7 @@ mod tests { let metric_outcomes = MetricOutcomes::new(metric_stats, outcome_aggregator.clone()); let mut project = create_project(None); - project.state = ProjectFetchState::pending(); + project.state = State::Pending; let buckets = vec![create_metric("d:transactions/foo")]; let cb = project.check_buckets(&metric_outcomes, &outcome_aggregator, buckets.clone()); @@ -911,7 +1286,7 @@ mod tests { match cb { CheckedBuckets::Checked { scoping, - project_info: _, + project_state: _, buckets: b, } => { assert_eq!(scoping, project.scoping().unwrap()); @@ -978,7 +1353,7 @@ mod tests { match cb { CheckedBuckets::Checked { scoping, - project_info: _, + project_state: _, buckets, } => { assert_eq!(scoping, project.scoping().unwrap()); diff --git a/relay-server/src/services/project/metrics.rs b/relay-server/src/services/project/metrics.rs index 4fa31ab4a6..d8bbec239f 100644 --- a/relay-server/src/services/project/metrics.rs +++ b/relay-server/src/services/project/metrics.rs @@ -5,7 +5,7 @@ use relay_quotas::Scoping; use crate::metrics::MetricOutcomes; use crate::services::outcome::Outcome; -use crate::services::project::ProjectInfo; +use crate::services::project::ProjectState; use crate::services::project_cache::BucketSource; pub fn filter_namespaces(mut buckets: Vec, source: BucketSource) -> Vec { @@ -22,10 +22,10 @@ pub fn filter_namespaces(mut buckets: Vec, source: BucketSource) -> Vec< buckets } -pub fn apply_project_info( +pub fn apply_project_state( mut buckets: Vec, metric_outcomes: &MetricOutcomes, - project_info: &ProjectInfo, + project_state: &ProjectState, scoping: Scoping, ) -> Vec { let mut denied_buckets = Vec::new(); @@ -34,13 +34,13 @@ pub fn apply_project_info( buckets = buckets .into_iter() .filter_map(|mut bucket| { - if !is_metric_namespace_valid(project_info, bucket.name.namespace()) { + if !is_metric_namespace_valid(project_state, bucket.name.namespace()) { relay_log::trace!(mri = &*bucket.name, "dropping metric in disabled namespace"); disabled_namespace_buckets.push(bucket); return None; }; - if let ErrorBoundary::Ok(ref metric_config) = project_info.config.metrics { + if let ErrorBoundary::Ok(ref metric_config) = project_state.config.metrics { if metric_config.denied_names.is_match(&*bucket.name) { relay_log::trace!(mri = &*bucket.name, "dropping metrics due to block list"); denied_buckets.push(bucket); @@ -73,7 +73,7 @@ pub fn apply_project_info( buckets } -fn is_metric_namespace_valid(state: &ProjectInfo, namespace: MetricNamespace) -> bool { +fn is_metric_namespace_valid(state: &ProjectState, namespace: MetricNamespace) -> bool { match namespace { MetricNamespace::Sessions => true, MetricNamespace::Transactions => true, @@ -166,20 +166,21 @@ mod tests { let (metric_stats, mut metric_stats_rx) = MetricStats::test(); let metric_outcomes = MetricOutcomes::new(metric_stats, outcome_aggregator); - let project_state = ProjectInfo { - config: serde_json::from_value(serde_json::json!({ + let project_state = { + let mut project_state = ProjectState::allowed(); + project_state.config = serde_json::from_value(serde_json::json!({ "metrics": { "deniedNames": ["*cpu_time*"] }, "features": ["organizations:custom-metrics"] })) - .unwrap(), - ..Default::default() + .unwrap(); + project_state }; let b1 = create_custom_bucket_with_name("cpu_time".into()); let b2 = create_custom_bucket_with_name("memory_usage".into()); let buckets = vec![b1.clone(), b2.clone()]; - let buckets = apply_project_info( + let buckets = apply_project_state( buckets, &metric_outcomes, &project_state, @@ -216,10 +217,10 @@ mod tests { let b2 = create_custom_bucket_with_name("memory_usage".into()); let buckets = vec![b1.clone(), b2.clone()]; - let buckets = apply_project_info( + let buckets = apply_project_state( buckets, &metric_outcomes, - &ProjectInfo::default(), + &ProjectState::allowed(), Scoping { organization_id: 42, project_id: ProjectId::new(43), diff --git a/relay-server/src/services/project/state.rs b/relay-server/src/services/project/state.rs deleted file mode 100644 index 4788f51e10..0000000000 --- a/relay-server/src/services/project/state.rs +++ /dev/null @@ -1,90 +0,0 @@ -//! Types that represent the current project state. -use std::sync::Arc; - -mod fetch_state; -mod info; - -pub use fetch_state::{ExpiryState, ProjectFetchState}; -pub use info::{LimitedProjectInfo, ProjectInfo}; -use serde::{Deserialize, Serialize}; - -/// Representation of a project's current state. -#[derive(Clone, Debug)] -pub enum ProjectState { - /// A valid project that is not disabled. - Enabled(Arc), - /// A project that was marked as "gone" by the upstream. This variant does not expose - /// any other project information. - Disabled, - /// A project to which one of the following conditions apply: - /// - The project has not yet been fetched. - /// - The upstream returned "pending" for this project (see [`crate::services::project_upstream`]). - /// - The upstream returned an unparsable project so we have to try again. - /// - The project has expired and must be treated as "has not been fetched". - Pending, -} - -impl ProjectState { - /// Runs a post-deserialization step to normalize the project config (e.g. legacy fields). - pub fn sanitized(self) -> Self { - match self { - ProjectState::Enabled(state) => { - ProjectState::Enabled(Arc::new(state.as_ref().clone().sanitized())) - } - ProjectState::Disabled => ProjectState::Disabled, - ProjectState::Pending => ProjectState::Pending, - } - } - - /// Whether or not this state is pending. - pub fn is_pending(&self) -> bool { - matches!(self, ProjectState::Pending) - } - - /// Utility function that returns the project config if enabled. - pub fn enabled(&self) -> Option> { - match self { - ProjectState::Enabled(info) => Some(Arc::clone(info)), - ProjectState::Disabled | ProjectState::Pending => None, - } - } -} - -impl From for ProjectState { - fn from(value: ParsedProjectState) -> Self { - let ParsedProjectState { disabled, info } = value; - match disabled { - true => ProjectState::Disabled, - false => ProjectState::Enabled(Arc::new(info)), - } - } -} - -/// Project state as used in serialization / deserialization. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ParsedProjectState { - /// Whether the project state is disabled. - #[serde(default)] - pub disabled: bool, - /// Project info. - /// - /// This contains no information when `disabled` is `true`, except for - /// public keys in static project configs (see [`crate::services::project_local`]). - #[serde(flatten)] - pub info: ProjectInfo, -} - -/// Limited project state for external Relays. -#[derive(Debug, Clone, Serialize)] -#[serde(rename_all = "camelCase", remote = "ParsedProjectState")] -pub struct LimitedParsedProjectState { - /// Whether the project state is disabled. - pub disabled: bool, - /// Limited project info for external Relays. - /// - /// This contains no information when `disabled` is `true`, except for - /// public keys in static project configs (see [`crate::services::project_local`]). - #[serde(with = "LimitedProjectInfo")] - #[serde(flatten)] - pub info: ProjectInfo, -} diff --git a/relay-server/src/services/project/state/fetch_state.rs b/relay-server/src/services/project/state/fetch_state.rs deleted file mode 100644 index 3499c9184a..0000000000 --- a/relay-server/src/services/project/state/fetch_state.rs +++ /dev/null @@ -1,145 +0,0 @@ -use std::sync::Arc; - -use relay_config::Config; -use relay_dynamic_config::ProjectConfig; -use tokio::time::Instant; - -use crate::services::project::state::info::ProjectInfo; -use crate::services::project::ProjectState; - -/// Hides a cached project state and only exposes it if it has not expired. -#[derive(Clone, Debug)] -pub struct ProjectFetchState { - /// The time at which this project state was last updated. - last_fetch: Option, - state: ProjectState, -} - -impl ProjectFetchState { - /// Takes a [`ProjectState`] and sets it's last fetch to the current time. - pub fn new(state: ProjectState) -> Self { - Self { - last_fetch: Some(Instant::now()), - state, - } - } - - /// Project state for an unknown but allowed project. - /// - /// This state is used for forwarding in Proxy mode. - pub fn allowed() -> Self { - Self::enabled(ProjectInfo { - project_id: None, - last_change: None, - public_keys: Default::default(), - slug: None, - config: ProjectConfig::default(), - organization_id: None, - }) - } - - /// An enabled project state created from a project info. - pub fn enabled(project_info: ProjectInfo) -> Self { - Self::new(ProjectState::Enabled(Arc::new(project_info))) - } - - // Returns a disabled state. - pub fn disabled() -> Self { - Self::new(ProjectState::Disabled) - } - - /// Returns a pending or invalid state. - pub fn pending() -> Self { - Self::new(ProjectState::Pending) - } - - /// Create a config that immediately counts as expired. - /// - /// This is what [`Project`](crate::services::project::Project) initializes itself with. - pub fn expired() -> Self { - Self { - // Make sure the state immediately qualifies as expired: - last_fetch: None, - state: ProjectState::Pending, - } - } - - /// Sanitizes the contained project state. See [`ProjectState::sanitized`]. - pub fn sanitized(self) -> Self { - let Self { last_fetch, state } = self; - Self { - last_fetch, - state: state.sanitized(), - } - } - - /// Returns `true` if the contained state is pending. - pub fn is_pending(&self) -> bool { - matches!(self.state, ProjectState::Pending) - } - - /// Returns information about the expiry of a project state. - /// - /// If no detailed information is needed, use [`Self::current_state`] instead. - pub fn expiry_state(&self, config: &Config) -> ExpiryState { - match self.check_expiry(config) { - Expiry::Updated => ExpiryState::Updated(&self.state), - Expiry::Stale => ExpiryState::Stale(&self.state), - Expiry::Expired => ExpiryState::Expired, - } - } - - /// Returns the current project state, if it has not yet expired. - pub fn current_state(&self, config: &Config) -> ProjectState { - match self.expiry_state(config) { - ExpiryState::Updated(state) | ExpiryState::Stale(state) => state.clone(), - ExpiryState::Expired => ProjectState::Pending, - } - } - - /// Returns whether this state is outdated and needs to be refetched. - fn check_expiry(&self, config: &Config) -> Expiry { - let Some(last_fetch) = self.last_fetch else { - return Expiry::Expired; - }; - let expiry = match &self.state { - ProjectState::Enabled(info) if info.project_id.is_some() => { - config.project_cache_expiry() - } - _ => config.cache_miss_expiry(), - }; - - let elapsed = last_fetch.elapsed(); - if elapsed >= expiry + config.project_grace_period() { - Expiry::Expired - } else if elapsed >= expiry { - Expiry::Stale - } else { - Expiry::Updated - } - } -} - -/// Wrapper for a project state, with expiry information. -#[derive(Clone, Copy, Debug)] -pub enum ExpiryState<'a> { - /// An up-to-date project state. See [`Expiry::Updated`]. - Updated(&'a ProjectState), - /// A stale project state that can still be used. See [`Expiry::Stale`]. - Stale(&'a ProjectState), - /// An expired project state that should not be used. See [`Expiry::Expired`]. - Expired, -} - -/// The expiry status of a project state. Return value of [`ProjectFetchState::check_expiry`]. -#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)] -enum Expiry { - /// The project state is perfectly up to date. - Updated, - /// The project state is outdated but events depending on this project state can still be - /// processed. The state should be refreshed in the background though. - Stale, - /// The project state is completely outdated and events need to be buffered up until the new - /// state has been fetched. - Expired, -} diff --git a/relay-server/src/services/project/state/info.rs b/relay-server/src/services/project/state/info.rs deleted file mode 100644 index 0f21d2e015..0000000000 --- a/relay-server/src/services/project/state/info.rs +++ /dev/null @@ -1,233 +0,0 @@ -use chrono::{DateTime, Utc}; -use relay_base_schema::project::{ProjectId, ProjectKey}; -#[cfg(feature = "processing")] -use relay_cardinality::CardinalityLimit; -use relay_config::Config; -#[cfg(feature = "processing")] -use relay_dynamic_config::ErrorBoundary; -use relay_dynamic_config::{Feature, LimitedProjectConfig, ProjectConfig}; -use relay_filter::matches_any_origin; -use relay_quotas::{Quota, Scoping}; -use serde::{Deserialize, Serialize}; -use smallvec::SmallVec; -use url::Url; - -use crate::envelope::Envelope; -use crate::extractors::RequestMeta; -use crate::services::outcome::DiscardReason; -use crate::services::project::PublicKeyConfig; - -/// Information about an enabled project. -/// -/// Contains the project config plus metadata (organization_id, project_id, etc.). -#[derive(Debug, Default, Clone, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct ProjectInfo { - /// Unique identifier of this project. - pub project_id: Option, - /// The timestamp of when the state was last changed. - /// - /// This might be `None` in some rare cases like where states - /// are faked locally. - #[serde(default)] - pub last_change: Option>, - /// Indicates that the project is disabled. - /// A container of known public keys in the project. - /// - /// Since version 2, each project state corresponds to a single public key. For this reason, - /// only a single key can occur in this list. - #[serde(default)] - pub public_keys: SmallVec<[PublicKeyConfig; 1]>, - /// The project's slug if available. - #[serde(default)] - pub slug: Option, - /// The project's current config. - #[serde(default)] - pub config: ProjectConfig, - /// The organization id. - #[serde(default)] - pub organization_id: Option, -} - -/// Controls how we serialize a ProjectState for an external Relay -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase", remote = "ProjectInfo")] -pub struct LimitedProjectInfo { - pub project_id: Option, - pub last_change: Option>, - pub public_keys: SmallVec<[PublicKeyConfig; 1]>, - pub slug: Option, - #[serde(with = "LimitedProjectConfig")] - pub config: ProjectConfig, - pub organization_id: Option, -} - -impl ProjectInfo { - /// Returns configuration options for the public key. - pub fn get_public_key_config(&self) -> Option<&PublicKeyConfig> { - self.public_keys.first() - } - - /// Returns the project config. - pub fn config(&self) -> &ProjectConfig { - &self.config - } - - /// Determines whether the given envelope should be accepted or discarded. - /// - /// Returns `Ok(())` if the envelope should be accepted. Returns `Err(DiscardReason)` if the - /// envelope should be discarded, by indicating the reason. The checks preformed for this are: - /// - /// - Allowed origin headers - /// - Disabled or unknown projects - /// - Disabled project keys (DSN) - /// - Feature flags - pub fn check_envelope( - &self, - envelope: &Envelope, - config: &Config, - ) -> Result<(), DiscardReason> { - // Verify that the stated project id in the DSN matches the public key used to retrieve this - // project state. - let meta = envelope.meta(); - if !self.is_valid_project_id(meta.project_id(), config) { - return Err(DiscardReason::ProjectId); - } - - // Try to verify the request origin with the project config. - if !self.is_valid_origin(meta.origin()) { - return Err(DiscardReason::Cors); - } - - // sanity-check that the state has a matching public key loaded. - if !self.is_matching_key(meta.public_key()) { - relay_log::error!("public key mismatch on state {}", meta.public_key()); - return Err(DiscardReason::ProjectId); - } - - // Check feature. - if let Some(disabled_feature) = envelope - .required_features() - .iter() - .find(|f| !self.has_feature(**f)) - { - return Err(DiscardReason::FeatureDisabled(*disabled_feature)); - } - - Ok(()) - } - - /// Returns `true` if the given project ID matches this project. - /// - /// If the project state has not been loaded, this check is skipped because the project - /// identifier is not yet known. Likewise, this check is skipped for the legacy store endpoint - /// which comes without a project ID. The id is later overwritten in `check_envelope`. - fn is_valid_project_id(&self, stated_id: Option, config: &Config) -> bool { - match (self.project_id, stated_id, config.override_project_ids()) { - (Some(actual_id), Some(stated_id), false) => actual_id == stated_id, - _ => true, - } - } - - /// Checks if this origin is allowed for this project. - fn is_valid_origin(&self, origin: Option<&Url>) -> bool { - // Generally accept any event without an origin. - let origin = match origin { - Some(origin) => origin, - None => return true, - }; - - // Match against list of allowed origins. If the list is empty we always reject. - let allowed = &self.config().allowed_domains; - if allowed.is_empty() { - return false; - } - - let allowed: Vec<_> = allowed - .iter() - .map(|origin| origin.as_str().into()) - .collect(); - - matches_any_origin(Some(origin.as_str()), &allowed) - } - - /// Returns `true` if the given public key matches this state. - /// - /// This is a sanity check since project states are keyed by the DSN public key. Unless the - /// state is invalid or unloaded, it must always match the public key. - fn is_matching_key(&self, project_key: ProjectKey) -> bool { - if let Some(key_config) = self.get_public_key_config() { - // Always validate if we have a key config. - key_config.public_key == project_key - } else { - // Loaded states must have a key config, but ignore missing and invalid states. - self.project_id.is_none() - } - } - - /// Amends request `Scoping` with information from this project state. - /// - /// This scoping amends `RequestMeta::get_partial_scoping` by adding organization and key info. - /// The processor must fetch the full scoping before attempting to rate limit with partial - /// scoping. - /// - /// To get the own scoping of this ProjectKey without amending request information, use - /// [`Project::scoping`](crate::services::project::Project::scoping) instead. - pub fn scope_request(&self, meta: &RequestMeta) -> Scoping { - let mut scoping = meta.get_partial_scoping(); - - // The key configuration may be missing if the event has been queued for extended times and - // project was refetched in between. In such a case, access to key quotas is not availabe, - // but we can gracefully execute all other rate limiting. - scoping.key_id = self - .get_public_key_config() - .and_then(|config| config.numeric_id); - - // The original project identifier is part of the DSN. If the DSN was moved to another - // project, the actual project identifier is different and can be obtained from project - // states. This is only possible when the project state has been loaded. - if let Some(project_id) = self.project_id { - scoping.project_id = project_id; - } - - // This is a hack covering three cases: - // 1. Relay has not fetched the project state. In this case we have no way of knowing - // which organization this project belongs to and we need to ignore any - // organization-wide rate limits stored globally. This project state cannot hold - // organization rate limits yet. - // 2. The state has been loaded, but the organization_id is not available. This is only - // the case for legacy Sentry servers that do not reply with organization rate - // limits. Thus, the organization_id doesn't matter. - // 3. An organization id is available and can be matched against rate limits. In this - // project, all organizations will match automatically, unless the organization id - // has changed since the last fetch. - scoping.organization_id = self.organization_id.unwrap_or(0); - - scoping - } - - /// Returns quotas declared in this project state. - pub fn get_quotas(&self) -> &[Quota] { - self.config.quotas.as_slice() - } - - /// Returns cardinality limits declared in this project state. - #[cfg(feature = "processing")] - pub fn get_cardinality_limits(&self) -> &[CardinalityLimit] { - match self.config.metrics { - ErrorBoundary::Ok(ref m) => m.cardinality_limits.as_slice(), - _ => &[], - } - } - - /// Validates data in this project state and removes values that are partially invalid. - pub fn sanitized(mut self) -> Self { - self.config.sanitized(); - self - } - - /// Returns `true` if the given feature is enabled for this project. - pub fn has_feature(&self, feature: Feature) -> bool { - self.config.features.has(feature) - } -} diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index 2896581480..e7b99810ca 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -18,13 +18,11 @@ use tokio::time::Instant; use crate::services::global_config::{self, GlobalConfigManager, Subscribe}; use crate::services::metrics::{Aggregator, FlushBuckets}; -use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome}; +use crate::services::outcome::{DiscardReason, TrackOutcome}; use crate::services::processor::{ EncodeMetrics, EnvelopeProcessor, ProcessEnvelope, ProjectMetrics, }; -use crate::services::project::{ - CheckedBuckets, Project, ProjectFetchState, ProjectSender, ProjectState, -}; +use crate::services::project::{CheckedBuckets, Project, ProjectSender, ProjectState}; use crate::services::project_local::{LocalProjectSource, LocalProjectSourceService}; #[cfg(feature = "processing")] use crate::services::project_redis::RedisProjectSource; @@ -37,7 +35,9 @@ use crate::services::test_store::TestStore; use crate::services::upstream::UpstreamRelay; use crate::statsd::{RelayCounters, RelayGauges, RelayHistograms, RelayTimers}; -use crate::utils::{BufferGuard, GarbageDisposal, ManagedEnvelope, RetryBackoff, SleepHandle}; +use crate::utils::{ + self, BufferGuard, GarbageDisposal, ManagedEnvelope, RetryBackoff, SleepHandle, +}; /// Requests a refresh of a project state from one of the available sources. /// @@ -276,7 +276,7 @@ pub struct RefreshIndexCache(pub HashSet); pub enum ProjectCache { RequestUpdate(RequestUpdate), Get(GetProjectState, ProjectSender), - GetCached(GetCachedProjectState, Sender), + GetCached(GetCachedProjectState, Sender>>), CheckEnvelope( CheckEnvelope, Sender>, @@ -337,7 +337,7 @@ impl FromMessage for ProjectCache { } impl FromMessage for ProjectCache { - type Response = relay_system::BroadcastResponse; + type Response = relay_system::BroadcastResponse>; fn from_message(message: GetProjectState, sender: ProjectSender) -> Self { Self::Get(message, sender) @@ -345,9 +345,12 @@ impl FromMessage for ProjectCache { } impl FromMessage for ProjectCache { - type Response = relay_system::AsyncResponse; + type Response = relay_system::AsyncResponse>>; - fn from_message(message: GetCachedProjectState, sender: Sender) -> Self { + fn from_message( + message: GetCachedProjectState, + sender: Sender>>, + ) -> Self { Self::GetCached(message, sender) } } @@ -446,7 +449,7 @@ impl ProjectSource { } } - async fn fetch(self, project_key: ProjectKey, no_cache: bool) -> Result { + async fn fetch(self, project_key: ProjectKey, no_cache: bool) -> Result, ()> { let state_opt = self .local_source .send(FetchOptionalProjectState { project_key }) @@ -454,13 +457,13 @@ impl ProjectSource { .map_err(|_| ())?; if let Some(state) = state_opt { - return Ok(ProjectFetchState::new(state)); + return Ok(state); } match self.config.relay_mode() { - RelayMode::Proxy => return Ok(ProjectFetchState::allowed()), - RelayMode::Static => return Ok(ProjectFetchState::disabled()), - RelayMode::Capture => return Ok(ProjectFetchState::allowed()), + RelayMode::Proxy => return Ok(Arc::new(ProjectState::allowed())), + RelayMode::Static => return Ok(Arc::new(ProjectState::missing())), + RelayMode::Capture => return Ok(Arc::new(ProjectState::allowed())), RelayMode::Managed => (), // Proceed with loading the config from redis or upstream } @@ -471,19 +474,19 @@ impl ProjectSource { .await .map_err(|_| ())?; - let state = match state_fetch_result { - Ok(state) => state.sanitized(), + let state_opt = match state_fetch_result { + Ok(state) => state.map(ProjectState::sanitize).map(Arc::new), Err(error) => { relay_log::error!( error = &error as &dyn Error, "failed to fetch project from Redis", ); - ProjectState::Pending + None } }; - if !matches!(state, ProjectState::Pending) { - return Ok(ProjectFetchState::new(state)); + if let Some(state) = state_opt { + return Ok(state); } }; @@ -503,7 +506,7 @@ struct UpdateProjectState { project_key: ProjectKey, /// New project state information. - state: ProjectFetchState, + state: Arc, /// If true, all caches should be skipped and a fresh state should be computed. no_cache: bool, @@ -718,7 +721,7 @@ impl ProjectCacheBroker { let state = source .fetch(project_key, no_cache) .await - .unwrap_or_else(|()| ProjectFetchState::disabled()); + .unwrap_or_else(|()| Arc::new(ProjectState::err())); let message = UpdateProjectState { project_key, @@ -739,7 +742,7 @@ impl ProjectCacheBroker { ); } - fn handle_get_cached(&mut self, message: GetCachedProjectState) -> ProjectState { + fn handle_get_cached(&mut self, message: GetCachedProjectState) -> Option> { let project_cache = self.services.project_cache.clone(); self.get_or_create_project(message.project_key) .get_cached_state(project_cache, false) @@ -760,6 +763,56 @@ impl ProjectCacheBroker { project.check_envelope(context) } + /// Handles the processing of the provided envelope. + fn handle_processing(&mut self, key: QueueKey, managed_envelope: ManagedEnvelope) { + let project_key = managed_envelope.envelope().meta().public_key(); + + let Some(project) = self.projects.get_mut(&project_key) else { + relay_log::error!( + tags.project_key = %project_key, + "project could not be found in the cache", + ); + + let mut project = Project::new(project_key, self.config.clone()); + project.prefetch(self.services.project_cache.clone(), false); + self.projects.insert(project_key, project); + self.enqueue(key, managed_envelope); + return; + }; + + let Some(own_project_state) = project.valid_state().filter(|s| !s.invalid()) else { + relay_log::error!( + tags.project_key = %project_key, + "project has no valid cached state", + ); + return; + }; + + // The `Envelope` and `EnvelopeContext` will be dropped if the `Project::check_envelope()` + // function returns any error, which will also be ignored here. + if let Ok(CheckedEnvelope { + envelope: Some(managed_envelope), + .. + }) = project.check_envelope(managed_envelope) + { + let reservoir_counters = project.reservoir_counters(); + + let sampling_project_state = utils::get_sampling_key(managed_envelope.envelope()) + .and_then(|key| self.projects.get(&key)) + .and_then(|p| p.valid_state()) + .filter(|state| state.organization_id == own_project_state.organization_id); + + let process = ProcessEnvelope { + envelope: managed_envelope, + project_state: own_project_state, + sampling_project_state, + reservoir_counters, + }; + + self.services.envelope_processor.send(process); + } + } + /// Checks an incoming envelope and decides either process it immediately or buffer it. /// /// Few conditions are checked here: @@ -774,68 +827,38 @@ impl ProjectCacheBroker { /// /// The flushing of the buffered envelopes happens in `update_state`. fn handle_validate_envelope(&mut self, message: ValidateEnvelope) { - let ValidateEnvelope { - envelope: mut managed_envelope, - } = message; - + let ValidateEnvelope { envelope: context } = message; let project_cache = self.services.project_cache.clone(); - let envelope = managed_envelope.envelope(); + let envelope = context.envelope(); // Fetch the project state for our key and make sure it's not invalid. let own_key = envelope.meta().public_key(); - let project = self.get_or_create_project(own_key); - - let project_state = - project.get_cached_state(project_cache.clone(), envelope.meta().no_cache()); - let reservoir_counters = project.reservoir_counters(); - - let project_state = match project_state { - ProjectState::Enabled(state) => Some(state), - ProjectState::Disabled => { - managed_envelope.reject(Outcome::Invalid(DiscardReason::ProjectId)); - return; - } - ProjectState::Pending => None, - }; + let project_state = self + .get_or_create_project(own_key) + .get_cached_state(project_cache.clone(), envelope.meta().no_cache()) + .filter(|st| !st.invalid()); // Also, fetch the project state for sampling key and make sure it's not invalid. - let sampling_key = envelope.sampling_key(); - let sampling_state = if let Some(sampling_key) = sampling_key { - let state = self - .get_or_create_project(sampling_key) - .get_cached_state(project_cache, envelope.meta().no_cache()); - match state { - ProjectState::Enabled(state) => Some(state), - ProjectState::Disabled => { - // We accept events even if its root project has been disabled. - None - } - ProjectState::Pending => None, - } - } else { - None - }; + let sampling_key = utils::get_sampling_key(envelope); + let sampling_state = sampling_key.and_then(|key| { + self.get_or_create_project(key) + .get_cached_state(project_cache, envelope.meta().no_cache()) + .filter(|st| !st.invalid()) + }); let key = QueueKey::new(own_key, sampling_key.unwrap_or(own_key)); // Trigger processing once we have a project state and we either have a sampling project // state or we do not need one. - if let Some(project_state) = project_state { - if (sampling_state.is_some() || sampling_key.is_none()) - && !self.buffer_guard.is_over_high_watermark() - && self.global_config.is_ready() - { - self.services.envelope_processor.send(ProcessEnvelope { - envelope: managed_envelope, - project_info: project_state, - sampling_project_info: sampling_state, - reservoir_counters, - }); - - return; - } + if project_state.is_some() + && (sampling_state.is_some() || sampling_key.is_none()) + && !self.buffer_guard.is_over_high_watermark() + && self.global_config.is_ready() + { + return self.handle_processing(key, context); } - self.enqueue(key, managed_envelope); + + self.enqueue(key, context); } fn handle_rate_limits(&mut self, message: UpdateRateLimits) { @@ -886,12 +909,12 @@ impl ProjectCacheBroker { } CheckedBuckets::Checked { scoping, - project_info, + project_state, buckets, } => scoped_buckets .entry(scoping) .or_insert(ProjectMetrics { - project_info, + project_state, buckets: Vec::new(), }) .buckets @@ -948,18 +971,40 @@ impl ProjectCacheBroker { /// Returns `true` if the project state valid for the [`QueueKey`]. /// - /// Which includes the own key and the sampling key for the project. - /// Note: this function will trigger [`ProjectState`] refresh if it's already expired. - fn is_state_cached(&mut self, key: &QueueKey) -> bool { - key.unique_keys().iter().all(|key| { - self.projects.get_mut(key).is_some_and(|project| { - // Returns `Some` if the project is cached otherwise None and also triggers refresh - // in background. - !project - .get_cached_state(self.services.project_cache.clone(), false) - .is_pending() - }) - }) + /// Which includes the own key and the samplig key for the project. + /// Note: this function will trigger [`ProjectState`] refresh if it's already expired or not + /// valid. + fn is_state_valid(&mut self, key: &QueueKey) -> bool { + let QueueKey { + own_key, + sampling_key, + } = key; + + let is_own_state_valid = self.projects.get_mut(own_key).map_or(false, |project| { + // Returns `Some` if the project is cached otherwise None and also triggers refresh + // in background. + project + .get_cached_state(self.services.project_cache.clone(), false) + // Makes sure that the state also is valid. + .map_or(false, |state| !state.invalid()) + }); + + let is_sampling_state_valid = if own_key != sampling_key { + self.projects + .get_mut(sampling_key) + .map_or(false, |project| { + // Returns `Some` if the project is cached otherwise None and also triggers refresh + // in background. + project + .get_cached_state(self.services.project_cache.clone(), false) + // Makes sure that the state also is valid. + .map_or(false, |state| !state.invalid()) + }) + } else { + is_own_state_valid + }; + + is_own_state_valid && is_sampling_state_valid } /// Iterates the buffer index and tries to unspool the envelopes for projects with a valid @@ -992,7 +1037,7 @@ impl ProjectCacheBroker { let mut index = std::mem::take(&mut self.index); let keys = index - .extract_if(|key| self.is_state_cached(key)) + .extract_if(|key| self.is_state_valid(key)) .take(BATCH_KEY_COUNT) .collect::>(); let num_keys = keys.len(); @@ -1188,16 +1233,10 @@ impl Service for ProjectCacheService { } // Buffer will not dequeue the envelopes from the spool if there is not enough // permits in `BufferGuard` available. Currently this is 50%. - Some(UnspooledEnvelope { managed_envelope }) = buffer_rx.recv() => { - // Unspooled envelopes need to be checked, just like we do on the fast path. - if let Ok(CheckedEnvelope { envelope: Some(envelope), rate_limits: _ }) = metric!(timer(RelayTimers::ProjectCacheTaskDuration), task = "handle_check_envelope", { - broker.handle_check_envelope(CheckEnvelope::new(managed_envelope)) - }) { - metric!(timer(RelayTimers::ProjectCacheTaskDuration), task = "handle_validate_envelope", { - broker.handle_validate_envelope(ValidateEnvelope { envelope }); + Some(UnspooledEnvelope{managed_envelope, key}) = buffer_rx.recv() => { + metric!(timer(RelayTimers::ProjectCacheTaskDuration), task = "handle_processing", { + broker.handle_processing(key, managed_envelope) }) - } - }, _ = ticker.tick() => { metric!(timer(RelayTimers::ProjectCacheTaskDuration), task = "evict_project_caches", { @@ -1485,7 +1524,7 @@ mod tests { let update_dsn1_project_state = UpdateProjectState { project_key: ProjectKey::parse(dsn1).unwrap(), - state: ProjectFetchState::allowed(), + state: ProjectState::allowed().into(), no_cache: false, }; @@ -1499,7 +1538,7 @@ mod tests { let update_dsn2_project_state = UpdateProjectState { project_key: ProjectKey::parse(dsn2).unwrap(), - state: ProjectFetchState::allowed(), + state: ProjectState::allowed().into(), no_cache: false, }; @@ -1547,7 +1586,7 @@ mod tests { // Since there is no project we should not process anything but create a project and spool // the envelope. - broker.handle_validate_envelope(ValidateEnvelope { envelope }); + broker.handle_processing(key, envelope); // Assert that we have a new project and also added an index. assert!(broker.projects.get(&project_key).is_some()); @@ -1555,8 +1594,11 @@ mod tests { // Check is we actually spooled anything. buffer_svc.send(DequeueMany::new([key].into(), buffer_tx.clone())); - let UnspooledEnvelope { managed_envelope } = buffer_rx.recv().await.unwrap(); + let UnspooledEnvelope { + key: unspooled_key, + managed_envelope: _, + } = buffer_rx.recv().await.unwrap(); - assert_eq!(key, QueueKey::from_envelope(managed_envelope.envelope())); + assert_eq!(key, unspooled_key); } } diff --git a/relay-server/src/services/project_local.rs b/relay-server/src/services/project_local.rs index 3d94b9ede5..2307462b3c 100644 --- a/relay-server/src/services/project_local.rs +++ b/relay-server/src/services/project_local.rs @@ -9,20 +9,20 @@ use relay_system::{AsyncResponse, FromMessage, Interface, Receiver, Sender, Serv use tokio::sync::mpsc; use tokio::time::Instant; -use crate::services::project::{ParsedProjectState, ProjectState}; +use crate::services::project::ProjectState; use crate::services::project_cache::FetchOptionalProjectState; /// Service interface of the local project source. #[derive(Debug)] -pub struct LocalProjectSource(FetchOptionalProjectState, Sender>); +pub struct LocalProjectSource(FetchOptionalProjectState, Sender>>); impl Interface for LocalProjectSource {} impl FromMessage for LocalProjectSource { - type Response = AsyncResponse>; + type Response = AsyncResponse>>; fn from_message( message: FetchOptionalProjectState, - sender: Sender>, + sender: Sender>>, ) -> Self { Self(message, sender) } @@ -32,7 +32,7 @@ impl FromMessage for LocalProjectSource { #[derive(Debug)] pub struct LocalProjectSourceService { config: Arc, - local_states: HashMap, + local_states: HashMap>, } impl LocalProjectSourceService { @@ -56,18 +56,15 @@ fn get_project_id(path: &Path) -> Option { .and_then(|stem| stem.parse().ok()) } -fn parse_file( - path: std::path::PathBuf, -) -> tokio::io::Result<(std::path::PathBuf, ParsedProjectState)> { +fn parse_file(path: std::path::PathBuf) -> tokio::io::Result<(std::path::PathBuf, ProjectState)> { let file = std::fs::File::open(&path)?; let reader = std::io::BufReader::new(file); - let state = serde_json::from_reader(reader)?; - Ok((path, state)) + Ok((path, serde_json::from_reader(reader)?)) } async fn load_local_states( projects_path: &Path, -) -> tokio::io::Result> { +) -> tokio::io::Result>> { let mut states = HashMap::new(); let mut directory = match tokio::fs::read_dir(projects_path).await { @@ -98,11 +95,12 @@ async fn load_local_states( } // serde_json is not async, so spawn a blocking task here: - let (path, mut state) = tokio::task::spawn_blocking(move || parse_file(path)).await??; + let (path, state) = tokio::task::spawn_blocking(move || parse_file(path)).await??; - if state.info.project_id.is_none() { + let mut sanitized = ProjectState::sanitize(state); + if sanitized.project_id.is_none() { if let Some(project_id) = get_project_id(&path) { - state.info.project_id = Some(project_id); + sanitized.project_id = Some(project_id); } else { relay_log::warn!(?path, "skipping file, filename is not a valid project id"); continue; @@ -110,18 +108,17 @@ async fn load_local_states( } // Keep a separate project state per key. - let keys = std::mem::take(&mut state.info.public_keys); + let keys = std::mem::take(&mut sanitized.public_keys); for key in keys { - let mut state = state.clone(); - state.info.public_keys = smallvec::smallvec![key.clone()]; - states.insert(key.public_key, ProjectState::from(state).sanitized()); + sanitized.public_keys = smallvec::smallvec![key.clone()]; + states.insert(key.public_key, Arc::new(sanitized.clone())); } } Ok(states) } -async fn poll_local_states(path: &Path, tx: &mpsc::Sender>) { +async fn poll_local_states(path: &Path, tx: &mpsc::Sender>>) { let states = load_local_states(path).await; match states { Ok(states) => { @@ -139,7 +136,7 @@ async fn poll_local_states(path: &Path, tx: &mpsc::Sender>, + tx: mpsc::Sender>>, ) { let project_path = config.project_configs_path(); let period = config.local_cache_interval(); @@ -196,7 +193,7 @@ mod tests { use std::str::FromStr; use super::*; - use crate::services::project::{ProjectInfo, PublicKeyConfig}; + use crate::services::project::PublicKeyConfig; /// Tests that we can follow the symlinks and read the project file properly. #[tokio::test] @@ -207,7 +204,7 @@ mod tests { let tmp_project_file = "111111.json"; let project_key = ProjectKey::parse("55f6b2d962564e99832a39890ee4573e").unwrap(); - let mut tmp_project_state = ProjectInfo::default(); + let mut tmp_project_state = ProjectState::allowed(); tmp_project_state.public_keys.push(PublicKeyConfig { public_key: project_key, numeric_id: None, @@ -230,19 +227,23 @@ mod tests { .unwrap(); let extracted_project_state = load_local_states(temp2.path()).await.unwrap(); - let project_info = extracted_project_state - .get(&project_key) - .unwrap() - .enabled() - .unwrap(); assert_eq!( - project_info.project_id, + extracted_project_state + .get(&project_key) + .unwrap() + .project_id, Some(ProjectId::from_str("111111").unwrap()) ); assert_eq!( - project_info.public_keys.first().unwrap().public_key, + extracted_project_state + .get(&project_key) + .unwrap() + .public_keys + .first() + .unwrap() + .public_key, project_key, ) } @@ -255,7 +256,7 @@ mod tests { let project_key1 = ProjectKey::parse("55f6b2d962564e99832a39890ee4573e").unwrap(); let project_key2 = ProjectKey::parse("55bbb2d96256bb9983bb39890bb457bb").unwrap(); - let mut tmp_project_state = ProjectInfo::default(); + let mut tmp_project_state = ProjectState::allowed(); tmp_project_state.public_keys.extend(vec![ PublicKeyConfig { public_key: project_key1, diff --git a/relay-server/src/services/project_redis.rs b/relay-server/src/services/project_redis.rs index 275817d760..d04834b3dd 100644 --- a/relay-server/src/services/project_redis.rs +++ b/relay-server/src/services/project_redis.rs @@ -5,7 +5,7 @@ use relay_config::Config; use relay_redis::{RedisError, RedisPool}; use relay_statsd::metric; -use crate::services::project::{ParsedProjectState, ProjectState}; +use crate::services::project::ProjectState; use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers}; #[derive(Debug, Clone)] @@ -23,7 +23,7 @@ pub enum RedisProjectError { Redis(#[from] RedisError), } -fn parse_redis_response(raw_response: &[u8]) -> Result { +fn parse_redis_response(raw_response: &[u8]) -> Result { let decompression_result = metric!(timer(RelayTimers::ProjectStateDecompression), { zstd::decode_all(raw_response) }); @@ -52,7 +52,7 @@ impl RedisProjectSource { RedisProjectSource { config, redis } } - pub fn get_config(&self, key: ProjectKey) -> Result { + pub fn get_config(&self, key: ProjectKey) -> Result, RedisProjectError> { let mut command = relay_redis::redis::cmd("GET"); let prefix = self.config.projectconfig_cache_prefix(); @@ -65,15 +65,14 @@ impl RedisProjectSource { let response = match raw_response_opt { Some(response) => { metric!(counter(RelayCounters::ProjectStateRedis) += 1, hit = "true"); - let parsed = parse_redis_response(response.as_slice())?; - ProjectState::from(parsed) + Some(parse_redis_response(response.as_slice())?) } None => { metric!( counter(RelayCounters::ProjectStateRedis) += 1, hit = "false" ); - ProjectState::Pending + None } }; diff --git a/relay-server/src/services/project_upstream.rs b/relay-server/src/services/project_upstream.rs index 3334e98c31..668d3196ae 100644 --- a/relay-server/src/services/project_upstream.rs +++ b/relay-server/src/services/project_upstream.rs @@ -17,7 +17,7 @@ use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; use tokio::time::Instant; -use crate::services::project::{ParsedProjectState, ProjectFetchState}; +use crate::services::project::ProjectState; use crate::services::project_cache::FetchProjectState; use crate::services::upstream::{ Method, RequestPriority, SendQuery, UpstreamQuery, UpstreamRelay, UpstreamRequestError, @@ -44,9 +44,9 @@ pub struct GetProjectStates { #[derive(Debug, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] pub struct GetProjectStatesResponse { - /// Map of [`ProjectKey`] to [`ParsedProjectState`] that was fetched from the upstream. + /// Map of [`ProjectKey`] to [`ProjectState`] that was fetched from the upstream. #[serde(default)] - configs: HashMap>>, + configs: HashMap>>, /// The [`ProjectKey`]'s that couldn't be immediately retrieved from the upstream. #[serde(default)] pending: Vec, @@ -79,7 +79,7 @@ impl UpstreamQuery for GetProjectStates { /// The wrapper struct for the incoming external requests which also keeps addition information. #[derive(Debug)] struct ProjectStateChannel { - channel: BroadcastChannel, + channel: BroadcastChannel>, deadline: Instant, no_cache: bool, attempts: u64, @@ -91,7 +91,7 @@ struct ProjectStateChannel { impl ProjectStateChannel { pub fn new( - sender: BroadcastSender, + sender: BroadcastSender>, timeout: Duration, no_cache: bool, ) -> Self { @@ -110,12 +110,12 @@ impl ProjectStateChannel { self.no_cache = true; } - pub fn attach(&mut self, sender: BroadcastSender) { + pub fn attach(&mut self, sender: BroadcastSender>) { self.channel.attach(sender) } - pub fn send(self, state: ProjectFetchState) { - self.channel.send(state) + pub fn send(self, state: ProjectState) { + self.channel.send(Arc::new(state)) } pub fn expired(&self) -> bool { @@ -128,20 +128,20 @@ type ProjectStateChannels = HashMap; /// This is the [`UpstreamProjectSourceService`] interface. /// -/// The service is responsible for fetching the [`ParsedProjectState`] from the upstream. +/// The service is responsible for fetching the [`ProjectState`] from the upstream. /// Internally it maintains the buffer queue of the incoming requests, which got scheduled to fetch the /// state and takes care of the backoff in case there is a problem with the requests. #[derive(Debug)] -pub struct UpstreamProjectSource(FetchProjectState, BroadcastSender); +pub struct UpstreamProjectSource(FetchProjectState, BroadcastSender>); impl Interface for UpstreamProjectSource {} impl FromMessage for UpstreamProjectSource { - type Response = BroadcastResponse; + type Response = BroadcastResponse>; fn from_message( message: FetchProjectState, - sender: BroadcastSender, + sender: BroadcastSender>, ) -> Self { Self(message, sender) } @@ -159,7 +159,7 @@ struct UpstreamResponse { response: Result, } -/// The service which handles the fetching of the [`ParsedProjectState`] from upstream. +/// The service which handles the fetching of the [`ProjectState`] from upstream. #[derive(Debug)] pub struct UpstreamProjectSourceService { backoff: RetryBackoff, @@ -371,7 +371,6 @@ impl UpstreamProjectSourceService { response.configs.len() as u64 ); for (key, mut channel) in channels_batch { - let mut result = "ok"; if response.pending.contains(&key) { channel.pending += 1; self.state_channels.insert(key, channel); @@ -380,18 +379,13 @@ impl UpstreamProjectSourceService { let state = response .configs .remove(&key) - .unwrap_or(ErrorBoundary::Ok(None)); - let state = match state { - ErrorBoundary::Err(error) => { - result = "invalid"; - let error = &error as &dyn std::error::Error; + .unwrap_or(ErrorBoundary::Ok(None)) + .unwrap_or_else(|error| { relay_log::error!(error, "error fetching project state {key}"); - ProjectFetchState::pending() - } - ErrorBoundary::Ok(None) => ProjectFetchState::disabled(), - ErrorBoundary::Ok(Some(state)) => ProjectFetchState::new(state.into()), - }; - + Some(ProjectState::err()) + }) + .unwrap_or_else(ProjectState::missing); + let result = if state.invalid() { "invalid" } else { "ok" }; metric!( histogram(RelayHistograms::ProjectStateAttempts) = channel.attempts, result = result, @@ -400,8 +394,7 @@ impl UpstreamProjectSourceService { counter(RelayCounters::ProjectUpstreamCompleted) += 1, result = result, ); - - channel.send(state.sanitized()); + channel.send(state.sanitize()); } } Err(err) => { diff --git a/relay-server/src/services/spooler/mod.rs b/relay-server/src/services/spooler/mod.rs index 3e9596fd6d..50ceb96413 100644 --- a/relay-server/src/services/spooler/mod.rs +++ b/relay-server/src/services/spooler/mod.rs @@ -41,7 +41,6 @@ use relay_base_schema::project::{ParseProjectKeyError, ProjectKey}; use relay_config::Config; use relay_statsd::metric; use relay_system::{Addr, Controller, FromMessage, Interface, Sender, Service}; -use smallvec::{smallvec, SmallVec}; use sqlx::migrate::MigrateError; use sqlx::sqlite::{ SqliteAutoVacuum, SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions, SqliteRow, @@ -133,23 +132,6 @@ impl QueueKey { sampling_key, } } - - pub fn from_envelope(envelope: &Envelope) -> Self { - let meta = envelope.meta(); - Self { - own_key: meta.public_key(), - sampling_key: envelope.sampling_key().unwrap_or(meta.public_key()), - } - } - - /// Returns both keys, but omits duplicates. - pub fn unique_keys(&self) -> SmallVec<[ProjectKey; 2]> { - let mut keys = smallvec![self.own_key]; - if self.sampling_key != self.own_key { - keys.push(self.sampling_key); - } - keys - } } /// The envelope with its key sent to project cache for processing. @@ -157,6 +139,7 @@ impl QueueKey { /// It's sent in response to [`DequeueMany`] message from the [`ProjectCache`]. #[derive(Debug)] pub struct UnspooledEnvelope { + pub key: QueueKey, pub managed_envelope: ManagedEnvelope, } @@ -376,6 +359,7 @@ impl InMemory { sender .send(UnspooledEnvelope { managed_envelope: envelope, + key, }) .ok(); } @@ -583,9 +567,14 @@ impl OnDisk { }; match self.extract_envelope(envelope, services) { - Ok((_key, managed_envelopes)) => { + Ok((key, managed_envelopes)) => { for managed_envelope in managed_envelopes { - sender.send(UnspooledEnvelope { managed_envelope }).ok(); + sender + .send(UnspooledEnvelope { + managed_envelope, + key, + }) + .ok(); } } Err(err) => relay_log::error!( @@ -1430,7 +1419,10 @@ mod tests { sender: tx.clone(), }); - let UnspooledEnvelope { managed_envelope } = rx.recv().await.unwrap(); + let UnspooledEnvelope { + key: _, + managed_envelope, + } = rx.recv().await.unwrap(); let start_time_received = managed_envelope.envelope().meta().start_time(); // Check if the original start time elapsed to the same second as the restored one. diff --git a/relay-server/src/testutils.rs b/relay-server/src/testutils.rs index a562dd6439..a49cc16d00 100644 --- a/relay-server/src/testutils.rs +++ b/relay-server/src/testutils.rs @@ -21,14 +21,14 @@ use crate::metrics::{MetricOutcomes, MetricStats}; use crate::services::global_config::GlobalConfigHandle; use crate::services::outcome::TrackOutcome; use crate::services::processor::{self, EnvelopeProcessorService}; -use crate::services::project::ProjectInfo; +use crate::services::project::ProjectState; use crate::services::test_store::TestStore; pub fn state_with_rule_and_condition( sample_rate: Option, rule_type: RuleType, condition: RuleCondition, -) -> ProjectInfo { +) -> ProjectState { let rules = match sample_rate { Some(sample_rate) => vec![SamplingRule { condition, @@ -41,7 +41,7 @@ pub fn state_with_rule_and_condition( None => Vec::new(), }; - let mut state = ProjectInfo::default(); + let mut state = ProjectState::allowed(); state.config.sampling = Some(ErrorBoundary::Ok(SamplingConfig { rules, ..SamplingConfig::new() diff --git a/relay-server/src/utils/dynamic_sampling.rs b/relay-server/src/utils/dynamic_sampling.rs index c532655a0a..ec1e75571b 100644 --- a/relay-server/src/utils/dynamic_sampling.rs +++ b/relay-server/src/utils/dynamic_sampling.rs @@ -9,6 +9,7 @@ use relay_sampling::config::{RuleType, SamplingConfig}; use relay_sampling::dsc::{DynamicSamplingContext, TraceUserContext}; use relay_sampling::evaluation::{SamplingDecision, SamplingEvaluator, SamplingMatch}; +use crate::envelope::{Envelope, ItemType}; use crate::services::outcome::Outcome; /// Represents the specification for sampling an incoming event. @@ -89,6 +90,24 @@ pub fn is_trace_fully_sampled( Some(SamplingResult::from(evaluation).decision().is_keep()) } +/// Returns the project key defined in the `trace` header of the envelope. +/// +/// This function returns `None` if: +/// - there is no [`DynamicSamplingContext`] in the envelope headers. +/// - there are no transactions or events in the envelope, since in this case sampling by trace is redundant. +pub fn get_sampling_key(envelope: &Envelope) -> Option { + // If the envelope item is not of type transaction or event, we will not return a sampling key + // because it doesn't make sense to load the root project state if we don't perform trace + // sampling. + envelope.get_item_by(|item| { + matches!( + item.ty(), + ItemType::Transaction | ItemType::Event | ItemType::Span + ) + })?; + envelope.dsc().map(|dsc| dsc.public_key) +} + /// Computes a dynamic sampling context from a transaction event. /// /// Returns `None` if the passed event is not a transaction event, or if it does not contain a diff --git a/tests/integration/test_projectconfigs.py b/tests/integration/test_projectconfigs.py index 66ec48b500..5e44867e3c 100644 --- a/tests/integration/test_projectconfigs.py +++ b/tests/integration/test_projectconfigs.py @@ -269,8 +269,24 @@ def test_unparsable_project_config(buffer_config, mini_sentry, relay): packed, signature = SecretKey.parse(relay.secret_key).pack(body) # This request should return invalid project state and also send the error to Sentry. - data = request_config(relay, packed, signature, version="3").json() - assert data == {"configs": {}, "pending": [public_key]} + data = get_response(relay, packed, signature) + assert { + "configs": { + public_key: { + "projectId": None, + "lastChange": None, + "disabled": True, + "publicKeys": [], + "slug": None, + "config": { + "allowedDomains": ["*"], + "trustedRelays": [], + "piiConfig": None, + }, + "organizationId": None, + } + } + } == data def assert_clear_test_failures(): try: