Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add a .no-cache option to event sending #911

Merged
merged 7 commits into from
Jan 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

**Features**:

- By adding `.no-cache` to the DSN key, Relay refreshes project configuration caches immediately. This allows to apply changed settings instantly, such as updates to data scrubbing or inbound filter rules. ([#911](https://github.com/getsentry/relay/pull/911))

**Internal**:

- Compatibility mode for pre-aggregated sessions was removed. The feature is now enabled by default in full fidelity. ([#913](https://github.com/getsentry/relay/pull/913))
Expand Down
7 changes: 7 additions & 0 deletions relay-common/src/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ impl ProjectKey {
Ok(project_key)
}

/// Parses a `ProjectKey` from a string with flags.
pub fn parse_with_flags(key: &str) -> Result<(Self, Vec<&str>), ParseProjectKeyError> {
let mut iter = key.split('.');
let key = ProjectKey::parse(iter.next().ok_or(ParseProjectKeyError)?)?;
Ok((key, iter.collect()))
}

/// Returns the string representation of the project key.
#[inline]
pub fn as_str(&self) -> &str {
Expand Down
6 changes: 4 additions & 2 deletions relay-server/src/actors/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1489,9 +1489,11 @@ impl Handler<HandleEnvelope> for EventManager {
}
})
.and_then(clone!(project, |envelope| {
// get the state for the current project
// get the state for the current project. we can always fetch the cached version
// even if the no_cache flag was passed, as the cache was updated prior in
// `CheckEnvelope`.
project
.send(GetProjectState)
.send(GetProjectState::new())
.map_err(ProcessingError::ScheduleFailed)
.and_then(|result| result.map_err(ProcessingError::ProjectFailed))
.map(|state| (envelope, state))
Expand Down
138 changes: 112 additions & 26 deletions relay-server/src/actors/project.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::sync::Arc;
use std::time::Instant;
use std::time::{Duration, Instant};

use actix::prelude::*;
use chrono::{DateTime, Utc};
Expand Down Expand Up @@ -372,6 +372,36 @@ pub struct PublicKeyConfig {
pub numeric_id: Option<u64>,
}

struct StateChannel {
sender: oneshot::Sender<Arc<ProjectState>>,
receiver: Shared<oneshot::Receiver<Arc<ProjectState>>>,
no_cache: bool,
}

impl StateChannel {
pub fn new() -> Self {
let (sender, receiver) = oneshot::channel();
Self {
sender,
receiver: receiver.shared(),
no_cache: false,
}
}

pub fn no_cache(&mut self, no_cache: bool) -> &mut Self {
self.no_cache = no_cache;
self
}

pub fn receiver(&self) -> Shared<oneshot::Receiver<Arc<ProjectState>>> {
self.receiver.clone()
}

pub fn send(self, state: Arc<ProjectState>) {
self.sender.send(state).ok();
}
}

/// Actor representing organization and project configuration for a project key.
///
/// This actor no longer uniquely identifies a project. Instead, it identifies a project key.
Expand All @@ -381,8 +411,9 @@ pub struct Project {
config: Arc<Config>,
manager: Addr<ProjectCache>,
state: Option<Arc<ProjectState>>,
state_channel: Option<Shared<oneshot::Receiver<Arc<ProjectState>>>>,
state_channel: Option<StateChannel>,
rate_limits: RateLimits,
last_no_cache: Instant,
}

impl Project {
Expand All @@ -394,6 +425,7 @@ impl Project {
state: None,
state_channel: None,
rate_limits: RateLimits::new(),
last_no_cache: Instant::now(),
}
}

Expand All @@ -403,17 +435,31 @@ impl Project {

fn get_or_fetch_state(
&mut self,
mut no_cache: bool,
context: &mut Context<Self>,
) -> Response<Arc<ProjectState>, ProjectError> {
// count number of times we are looking for the project state
metric!(counter(RelayCounters::ProjectStateGet) += 1);

// Allow at most 1 no_cache request per second. Gracefully degrade to cached requests.
if no_cache {
if self.last_no_cache.elapsed() < Duration::from_secs(1) {
no_cache = false;
} else {
metric!(counter(RelayCounters::ProjectStateNoCache) += 1);
self.last_no_cache = Instant::now();
}
}

let state = self.state.as_ref();
let outdated = state
.map(|s| s.outdated(&self.config))
.unwrap_or(Outdated::HardOutdated);

let cached_state = match (state, outdated) {
// Never use the cached state if `no_cache` is set.
_ if no_cache => None,

// There is no project state that can be used, fetch a state and return it.
(None, _) | (_, Outdated::HardOutdated) => None,

Expand All @@ -424,53 +470,73 @@ impl Project {
(Some(state), Outdated::Updated) => return Response::ok(state.clone()),
};

let channel = match self.state_channel {
Some(ref channel) => {
let receiver = match self.state_channel {
Some(ref channel) if channel.no_cache || !no_cache => {
relay_log::debug!("project {} state request amended", self.public_key);
channel.clone()
channel.receiver()
}
None => {
_ => {
relay_log::debug!("project {} state requested", self.public_key);
let channel = self.fetch_state(context);
self.state_channel = Some(channel.clone());
channel

let receiver = self
.state_channel
.get_or_insert_with(StateChannel::new)
.no_cache(no_cache)
.receiver();

// Either there is no running request, or the current request does not have
// `no_cache` set. In both cases, start a new request. All in-flight receivers will
// get the latest state.
self.fetch_state(no_cache, context);

receiver
}
};

if let Some(rv) = cached_state {
return Response::ok(rv);
}

let future = channel
let future = receiver
.map(|shared| (*shared).clone())
.map_err(|_| ProjectError::FetchFailed);

Response::future(future)
}

fn fetch_state(
&mut self,
context: &mut Context<Self>,
) -> Shared<oneshot::Receiver<Arc<ProjectState>>> {
let (sender, receiver) = oneshot::channel();
fn fetch_state(&mut self, no_cache: bool, context: &mut Context<Self>) {
debug_assert!(self.state_channel.is_some());
let public_key = self.public_key;

self.manager
.send(FetchProjectState { public_key })
.send(FetchProjectState {
public_key,
no_cache,
})
.into_actor(self)
.map(move |state_result, slf, _ctx| {
let channel = match slf.state_channel.take() {
Some(channel) => channel,
None => return,
};

// If the channel has `no_cache` set but we are not a `no_cache` request, we have
// been superseeded. Put it back and let the other request take precedence.
if channel.no_cache && !no_cache {
slf.state_channel = Some(channel);
return;
}

slf.state_channel = None;
slf.state = state_result.map(|resp| resp.state).ok();

if let Some(ref state) = slf.state {
relay_log::debug!("project state {} updated", public_key);
sender.send(state.clone()).ok();
channel.send(state.clone());
}
})
.drop_err()
.spawn(context);

receiver.shared()
}

fn get_scoping(&mut self, meta: &RequestMeta) -> Scoping {
Expand Down Expand Up @@ -532,7 +598,7 @@ impl Actor for Project {
///
/// This is used for cases when we only want to perform operations that do
/// not require waiting for network requests.
///
#[derive(Debug)]
pub struct GetCachedProjectState;

impl Message for GetCachedProjectState {
Expand All @@ -553,8 +619,24 @@ impl Handler<GetCachedProjectState> for Project {

/// Returns the project state.
///
/// The project state is fetched if it is missing or outdated.
pub struct GetProjectState;
/// The project state is fetched if it is missing or outdated. If `no_cache` is specified, then the
/// state is always refreshed.
#[derive(Debug)]
pub struct GetProjectState {
no_cache: bool,
}

impl GetProjectState {
/// Fetches the project state and uses the cached version if up-to-date.
pub fn new() -> Self {
Self { no_cache: false }
}

/// Fetches the project state and conditionally skips the cache.
pub fn no_cache(no_cache: bool) -> Self {
Self { no_cache }
}
}

impl Message for GetProjectState {
type Result = Result<Arc<ProjectState>, ProjectError>;
Expand All @@ -563,8 +645,8 @@ impl Message for GetProjectState {
impl Handler<GetProjectState> for Project {
type Result = Response<Arc<ProjectState>, ProjectError>;

fn handle(&mut self, _message: GetProjectState, context: &mut Context<Self>) -> Self::Result {
self.get_or_fetch_state(context)
fn handle(&mut self, message: GetProjectState, context: &mut Context<Self>) -> Self::Result {
self.get_or_fetch_state(message.no_cache, context)
}
}

Expand Down Expand Up @@ -629,13 +711,17 @@ impl Handler<CheckEnvelope> for Project {
if message.fetch {
// Project state fetching is allowed, so ensure the state is fetched and up-to-date.
// This will return synchronously if the state is still cached.
self.get_or_fetch_state(context)
self.get_or_fetch_state(message.envelope.meta().no_cache(), context)
.into_actor()
.map(self, context, move |_, slf, _ctx| {
slf.check_envelope_scoped(message)
})
} else {
self.get_or_fetch_state(context);
// Preload the project cache so that it arrives a little earlier in processing. However,
// do not pass `no_cache`. In case the project is rate limited, we do not want to force
// a full reload.
self.get_or_fetch_state(false, context);

// message.fetch == false: Fetching must not block the store request. The EventManager
// will later fetch the project state.
ActorResponse::ok(self.check_envelope_scoped(message))
Expand Down
7 changes: 6 additions & 1 deletion relay-server/src/actors/project_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,11 @@ impl Handler<GetProject> for ProjectCache {
/// individual requests.
#[derive(Clone)]
pub struct FetchProjectState {
/// The public key to fetch the project by.
pub public_key: ProjectKey,

/// If true, all caches should be skipped and a fresh state should be computed.
pub no_cache: bool,
}

#[derive(Debug)]
Expand Down Expand Up @@ -201,7 +205,8 @@ impl Handler<FetchProjectState> for ProjectCache {
type Result = Response<ProjectStateResponse, ()>;

fn handle(&mut self, message: FetchProjectState, _context: &mut Self::Context) -> Self::Result {
let FetchProjectState { public_key } = message;
let public_key = message.public_key;

if let Some(mut entry) = self.projects.get_mut(&public_key) {
// Bump the update time of the project in our hashmap to evade eviction. Eviction is a
// sequential scan over self.projects, so this needs to be as fast as possible and
Expand Down
23 changes: 22 additions & 1 deletion relay-server/src/actors/project_upstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ pub struct GetProjectStates {
pub public_keys: Vec<ProjectKey>,
#[serde(default)]
pub full_config: bool,
#[serde(default)]
pub no_cache: bool,
}

#[derive(Debug, Deserialize, Serialize)]
Expand Down Expand Up @@ -77,6 +79,7 @@ struct ProjectStateChannel {
sender: oneshot::Sender<Arc<ProjectState>>,
receiver: Shared<oneshot::Receiver<Arc<ProjectState>>>,
deadline: Instant,
no_cache: bool,
}

impl ProjectStateChannel {
Expand All @@ -87,9 +90,14 @@ impl ProjectStateChannel {
sender,
receiver: receiver.shared(),
deadline: Instant::now() + timeout,
no_cache: false,
}
}

pub fn no_cache(&mut self) {
self.no_cache = true;
}

pub fn send(self, state: ProjectState) {
self.sender.send(Arc::new(state)).ok();
}
Expand Down Expand Up @@ -183,6 +191,8 @@ impl UpstreamProjectSource {
// num_batches. Worst case, we're left with one project per request, but that's fine.
let actual_batch_size = (total_count + (total_count % num_batches)) / num_batches;

// TODO(ja): This mixes requests with no_cache. Separate out channels with no_cache: true?

let requests: Vec<_> = channels
.into_iter()
.chunks(actual_batch_size)
Expand All @@ -198,6 +208,7 @@ impl UpstreamProjectSource {
let query = GetProjectStates {
public_keys: channels_batch.keys().copied().collect(),
full_config: self.config.processing_enabled(),
no_cache: channels_batch.values().any(|c| c.no_cache),
};

// count number of http requests for project states
Expand Down Expand Up @@ -324,6 +335,10 @@ impl Handler<FetchProjectState> for UpstreamProjectSource {
}

let query_timeout = self.config.query_timeout();
let FetchProjectState {
public_key,
no_cache,
} = message;

// There's an edge case where a project is represented by two Project actors. This can
// happen if our project eviction logic removes an actor from `project_cache.projects`
Expand All @@ -336,9 +351,15 @@ impl Handler<FetchProjectState> for UpstreamProjectSource {
// channel for our current `message.id`.
let channel = self
.state_channels
.entry(message.public_key)
.entry(public_key)
.or_insert_with(|| ProjectStateChannel::new(query_timeout));

// Ensure upstream skips caches if one of the recipients requests an uncached response. This
// operation is additive across requests.
if no_cache {
channel.no_cache();
}

Box::new(
channel
.receiver()
Expand Down
Loading