Skip to content

Commit

Permalink
ref(actix): Migrate ProjectUpstream to relay_system::Service (#1727)
Browse files Browse the repository at this point in the history
Remove `actix` actor implementation and replace it with
`relay_system::Service`.

Also replaces old `futures01` with new `futures`, and use `tokio::spawn`
to schedule the task which fetches the `ProjectState`'s from upstream.
In order to do that I had to change how the scheduling is working, so
the entire service won't be blocking:

- introduces the internal state channel
- use messages to pass the current state around, like reset the backoff
  period or schedule the another fetch
- the internal message have also higher priority right now
  • Loading branch information
olksdr authored Jan 18, 2023
1 parent 1dc8e2b commit a30b6d6
Show file tree
Hide file tree
Showing 4 changed files with 312 additions and 256 deletions.
42 changes: 10 additions & 32 deletions relay-server/src/actors/project_cache.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use std::sync::Arc;

use actix::{Actor, Message};
use actix_web::ResponseError;
use tokio::sync::mpsc;
use tokio::time::Instant;

Expand All @@ -11,13 +9,13 @@ use relay_metrics::{self, FlushBuckets, InsertMetrics, MergeBuckets};
use relay_quotas::RateLimits;
use relay_redis::RedisPool;
use relay_statsd::metric;
use relay_system::{compat, Addr, FromMessage, Interface, Sender, Service};
use relay_system::{Addr, FromMessage, Interface, Sender, Service};

use crate::actors::outcome::DiscardReason;
use crate::actors::processor::ProcessEnvelope;
use crate::actors::project::{Project, ProjectSender, ProjectState};
use crate::actors::project_local::{LocalProjectSource, LocalProjectSourceService};
use crate::actors::project_upstream::UpstreamProjectSource;
use crate::actors::project_upstream::{UpstreamProjectSource, UpstreamProjectSourceService};
use crate::envelope::Envelope;
use crate::service::REGISTRY;
use crate::statsd::{RelayCounters, RelayGauges, RelayHistograms, RelayTimers};
Expand All @@ -26,14 +24,6 @@ use crate::utils::{self, EnvelopeContext, GarbageDisposal};
#[cfg(feature = "processing")]
use crate::actors::project_redis::RedisProjectSource;

#[derive(Clone, Debug, thiserror::Error)]
pub enum ProjectError {
#[error("could not schedule project fetching")]
ScheduleFailed,
}

impl ResponseError for ProjectError {}

/// Requests a refresh of a project state from one of the available sources.
///
/// The project state is resolved in the following precedence:
Expand Down Expand Up @@ -324,15 +314,15 @@ impl FromMessage<FlushBuckets> for ProjectCache {
struct ProjectSource {
config: Arc<Config>,
local_source: Addr<LocalProjectSource>,
upstream_source: actix::Addr<UpstreamProjectSource>,
upstream_source: Addr<UpstreamProjectSource>,
#[cfg(feature = "processing")]
redis_source: Option<RedisProjectSource>,
}

impl ProjectSource {
pub fn new(config: Arc<Config>, _redis: Option<RedisPool>) -> Self {
let local_source = LocalProjectSourceService::new(config.clone()).start();
let upstream_source = UpstreamProjectSource::new(config.clone()).start();
let upstream_source = UpstreamProjectSourceService::new(config.clone()).start();

#[cfg(feature = "processing")]
let redis_source = _redis.map(|pool| RedisProjectSource::new(config.clone(), pool));
Expand Down Expand Up @@ -387,15 +377,13 @@ impl ProjectSource {
}
};

compat::send(
self.upstream_source,
FetchProjectState {
self.upstream_source
.send(FetchProjectState {
project_key,
no_cache,
},
)
.await
.map_err(|_| ())?
})
.await
.map_err(|_| ())
}
}

Expand Down Expand Up @@ -626,7 +614,7 @@ impl Service for ProjectCacheService {
}
}

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct FetchProjectState {
/// The public key to fetch the project by.
pub project_key: ProjectKey,
Expand All @@ -635,11 +623,6 @@ pub struct FetchProjectState {
pub no_cache: bool,
}

// TODO: Remove once `UpstreamProjectSource` was moved to tokio
impl Message for FetchProjectState {
type Result = Result<Arc<ProjectState>, ()>;
}

#[derive(Clone, Debug)]
pub struct FetchOptionalProjectState {
project_key: ProjectKey,
Expand All @@ -650,8 +633,3 @@ impl FetchOptionalProjectState {
self.project_key
}
}

// TODO: Remove once `RedisProjectSource` and `LocalProjectSource` were moved to tokio
impl Message for FetchOptionalProjectState {
type Result = Option<Arc<ProjectState>>;
}
Loading

0 comments on commit a30b6d6

Please sign in to comment.