Skip to content

Commit

Permalink
fix(server): Revert project state refactor (#3831)
Browse files Browse the repository at this point in the history
This started buffering all envelopes to memory on S4S, need to
investigate why.

Reverts #3770.
  • Loading branch information
jjbayer authored Jul 18, 2024
1 parent e0a9c13 commit f9c8227
Show file tree
Hide file tree
Showing 24 changed files with 845 additions and 915 deletions.
4 changes: 0 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@

## Unreleased

**Bug Fixes**:

- Do not drop envelopes for unparsable project configs. ([#3770](https://github.com/getsentry/relay/pull/3770))

**Features**

- "Cardinality limit" outcomes now report which limit was exceeded. ([#3825](https://github.com/getsentry/relay/pull/3825))
Expand Down
2 changes: 1 addition & 1 deletion relay-dynamic-config/src/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub struct ProjectConfig {

impl ProjectConfig {
/// Validates fields in this project config and removes values that are partially invalid.
pub fn sanitized(&mut self) {
pub fn sanitize(&mut self) {
self.quotas.retain(Quota::is_valid);

metrics::convert_conditional_tagging(self);
Expand Down
4 changes: 1 addition & 3 deletions relay-server/src/endpoints/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,18 +297,16 @@ fn queue_envelope(
}

// Split off the envelopes by item type.
let scoping = managed_envelope.scoping();
let envelopes = ProcessingGroup::split_envelope(*managed_envelope.take_envelope());
for (group, envelope) in envelopes {
let mut envelope = buffer_guard
let envelope = buffer_guard
.enter(
envelope,
state.outcome_aggregator().clone(),
state.test_store().clone(),
group,
)
.map_err(BadStoreRequest::QueueFailed)?;
envelope.scope(scoping);
state.project_cache().send(ValidateEnvelope::new(envelope));
}
// The entire envelope is taken for a split above, and it's empty at this point, we can just
Expand Down
36 changes: 12 additions & 24 deletions relay-server/src/endpoints/project_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::endpoints::forward;
use crate::extractors::SignedJson;
use crate::service::ServiceState;
use crate::services::global_config::{self, StatusResponse};
use crate::services::project::{LimitedParsedProjectState, ParsedProjectState, ProjectState};
use crate::services::project::{LimitedProjectState, ProjectState};
use crate::services::project_cache::{GetCachedProjectState, GetProjectState};

/// V2 version of this endpoint.
Expand Down Expand Up @@ -49,13 +49,13 @@ struct VersionQuery {
#[derive(Debug, Clone, Serialize)]
#[serde(untagged)]
enum ProjectStateWrapper {
Full(ParsedProjectState),
Limited(#[serde(with = "LimitedParsedProjectState")] ParsedProjectState),
Full(ProjectState),
Limited(#[serde(with = "LimitedProjectState")] ProjectState),
}

impl ProjectStateWrapper {
/// Create a wrapper which forces serialization into external or internal format
pub fn new(state: ParsedProjectState, full: bool) -> Self {
pub fn new(state: ProjectState, full: bool) -> Self {
if full {
Self::Full(state)
} else {
Expand All @@ -76,7 +76,7 @@ impl ProjectStateWrapper {
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct GetProjectStatesResponseWrapper {
configs: HashMap<ProjectKey, ProjectStateWrapper>,
configs: HashMap<ProjectKey, Option<ProjectStateWrapper>>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pending: Vec<ProjectKey>,
#[serde(skip_serializing_if = "Option::is_none")]
Expand Down Expand Up @@ -123,6 +123,7 @@ async fn inner(
project_cache
.send(GetProjectState::new(project_key).no_cache(no_cache))
.await
.map(Some)
};

(project_key, state_result)
Expand All @@ -145,36 +146,23 @@ async fn inner(
let mut pending = Vec::with_capacity(keys_len);
let mut configs = HashMap::with_capacity(keys_len);
for (project_key, state_result) in future::join_all(futures).await {
let project_info = match state_result? {
ProjectState::Enabled(info) => info,
ProjectState::Disabled => {
// Don't insert project config. Downstream Relay will consider it disabled.
continue;
}
ProjectState::Pending => {
pending.push(project_key);
continue;
}
let Some(project_state) = state_result? else {
pending.push(project_key);
continue;
};

// If public key is known (even if rate-limited, which is Some(false)), it has
// access to the project config
let has_access = relay.internal
|| project_info
|| project_state
.config
.trusted_relays
.contains(&relay.public_key);

if has_access {
let full = relay.internal && inner.full_config;
let wrapper = ProjectStateWrapper::new(
ParsedProjectState {
disabled: false,
info: project_info.as_ref().clone(),
},
full,
);
configs.insert(project_key, wrapper);
let wrapper = ProjectStateWrapper::new((*project_state).clone(), full);
configs.insert(project_key, Some(wrapper));
} else {
relay_log::debug!(
relay = %relay.public_key,
Expand Down
19 changes: 0 additions & 19 deletions relay-server/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
//!
//! ```
use relay_base_schema::project::ProjectKey;
use std::borrow::Borrow;
use std::collections::BTreeMap;
use std::fmt;
Expand Down Expand Up @@ -1219,24 +1218,6 @@ impl Envelope {
self.headers.sent_at
}

/// Returns the project key defined in the `trace` header of the envelope.
///
/// This function returns `None` if:
/// - there is no [`DynamicSamplingContext`] in the envelope headers.
/// - there are no transactions or events in the envelope, since in this case sampling by trace is redundant.
pub fn sampling_key(&self) -> Option<ProjectKey> {
// If the envelope item is not of type transaction or event, we will not return a sampling key
// because it doesn't make sense to load the root project state if we don't perform trace
// sampling.
self.get_item_by(|item| {
matches!(
item.ty(),
ItemType::Transaction | ItemType::Event | ItemType::Span
)
})?;
self.dsc().map(|dsc| dsc.public_key)
}

/// Sets the event id on the envelope.
pub fn set_event_id(&mut self, event_id: EventId) {
self.headers.event_id = Some(event_id);
Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/metrics_extraction/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ mod tests {
metric_extraction: metric_extraction.map(ErrorBoundary::Ok).unwrap_or_default(),
..ProjectConfig::default()
};
project.sanitized(); // enables metrics extraction rules
project.sanitize(); // enables metrics extraction rules
let project = project.metric_extraction.ok().unwrap();

OwnedConfig { global, project }
Expand Down
Loading

0 comments on commit f9c8227

Please sign in to comment.