Skip to content

Commit

Permalink
bundle a bunch of options into a struct for clippy
Browse files Browse the repository at this point in the history
  • Loading branch information
untitaker committed Nov 16, 2020
1 parent 311f4af commit 042ce66
Showing 1 changed file with 37 additions and 44 deletions.
81 changes: 37 additions & 44 deletions relay-server/src/actors/upstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,7 @@ impl fmt::Display for RequestPriority {
/// The objects are transformed int HTTP requests, and sent to upstream as HTTP connections
/// become available.
struct UpstreamRequest {
/// Queueing priority for the request.
priority: RequestPriority,
/// Should the request be retried in case of network error.
retry: bool,
/// Should 429s be honored within the upstream.
update_rate_limits: bool,
config: UpstreamRequestConfig,
/// One-shot channel to be notified when the request is done.
///
/// The request is either successful or it has failed but we are not going to retry it.
Expand Down Expand Up @@ -481,7 +476,7 @@ impl UpstreamRelay {
// we are about to send a HTTP message keep track of requests in flight
self.num_inflight_requests += 1;

let update_rate_limits = request.update_rate_limits;
let update_rate_limits = request.config.update_rate_limits;

request.send_start = Some(Instant::now());
client_request
Expand Down Expand Up @@ -563,7 +558,7 @@ impl UpstreamRelay {
if matches!(send_result, Err(ref err) if err.is_network_error()) {
self.handle_network_error(ctx);

if request.retry {
if request.config.retry {
request.previous_retries += 1;
return self.enqueue(request, ctx, EnqueuePosition::Back);
}
Expand All @@ -583,8 +578,8 @@ impl UpstreamRelay {
ctx: &mut Context<Self>,
position: EnqueuePosition,
) {
let name = request.priority.name();
let queue = match request.priority {
let name = request.config.priority.name();
let queue = match request.config.priority {
// Immediate is special and bypasses the queue. Directly send the request and return
// the response channel rather than waiting for `PumpHttpMessageQueue`.
RequestPriority::Immediate => return self.send_request(request, ctx),
Expand All @@ -607,9 +602,7 @@ impl UpstreamRelay {

fn enqueue_request<P, F>(
&mut self,
priority: RequestPriority,
retry: bool,
update_rate_limits: bool,
config: UpstreamRequestConfig,
method: Method,
path: P,
build: F,
Expand All @@ -622,9 +615,7 @@ impl UpstreamRelay {
let (tx, rx) = oneshot::channel::<Result<ClientResponse, UpstreamRequestError>>();

let request = UpstreamRequest {
priority,
retry,
update_rate_limits,
config,
method,
path: path.as_ref().to_owned(),
response_sender: tx,
Expand All @@ -651,9 +642,11 @@ impl UpstreamRelay {
) -> ResponseFuture<Q::Response, UpstreamRequestError> {
let method = query.method();
let path = query.path();
let priority = Q::priority();
let retry = Q::retry();
let update_rate_limits = Q::update_rate_limits();
let config = UpstreamRequestConfig {
retry: Q::retry(),
update_rate_limits: Q::update_rate_limits(),
priority: Q::priority(),
};

let credentials = tryf!(self
.config
Expand All @@ -667,9 +660,7 @@ impl UpstreamRelay {

let future = self
.enqueue_request(
priority,
retry,
update_rate_limits,
config,
method,
path,
move |builder| {
Expand Down Expand Up @@ -871,9 +862,11 @@ impl Handler<CheckUpstreamConnection> for UpstreamRelay {

fn handle(&mut self, _msg: CheckUpstreamConnection, ctx: &mut Self::Context) -> Self::Result {
self.enqueue_request(
RequestPriority::Immediate,
false,
false,
UpstreamRequestConfig {
priority: RequestPriority::Immediate,
retry: false,
update_rate_limits: false,
},
Method::GET,
"/api/0/relays/live/",
ClientRequestBuilder::finish,
Expand Down Expand Up @@ -962,7 +955,15 @@ pub struct SendRequest<B = (), T = ()> {
path: String,
builder: B,
transformer: T,
config: UpstreamRequestConfig,
}

struct UpstreamRequestConfig {
/// Queueing priority for the request.
priority: RequestPriority,
/// Should the request be retried in case of network error.
retry: bool,
/// Should 429s be honored within the upstream.
update_rate_limits: bool,
}

Expand All @@ -973,8 +974,11 @@ impl SendRequest {
path: path.into(),
builder: (),
transformer: (),
retry: true,
update_rate_limits: true,
config: UpstreamRequestConfig {
priority: RequestPriority::Low,
retry: true,
update_rate_limits: true,
},
}
}

Expand All @@ -993,20 +997,19 @@ impl<B, T> SendRequest<B, T> {
path: self.path,
builder: callback,
transformer: self.transformer,
retry: self.retry,
update_rate_limits: self.update_rate_limits,
config: self.config,
}
}

#[inline]
pub fn retry(mut self, should_retry: bool) -> Self {
self.retry = should_retry;
self.config.retry = should_retry;
self
}

#[inline]
pub fn update_rate_limits(mut self, should_update_rate_limits: bool) -> Self {
self.update_rate_limits = should_update_rate_limits;
self.config.update_rate_limits = should_update_rate_limits;
self
}

Expand All @@ -1020,8 +1023,7 @@ impl<B, T> SendRequest<B, T> {
path: self.path,
builder: self.builder,
transformer: callback,
retry: self.retry,
update_rate_limits: self.update_rate_limits,
config: self.config,
}
}
}
Expand Down Expand Up @@ -1058,20 +1060,11 @@ where
path,
mut builder,
transformer,
retry,
update_rate_limits,
config,
} = message;

let future = self
.enqueue_request(
RequestPriority::Low,
retry,
update_rate_limits,
method,
path,
move |b| builder.build_request(b),
ctx,
)
.enqueue_request(config, method, path, move |b| builder.build_request(b), ctx)
.from_err()
.and_then(move |r| transformer.transform_response(r));

Expand Down

0 comments on commit 042ce66

Please sign in to comment.