Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(relay): Use DateTime instead of Instant for start_time #4184

Merged
merged 12 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 0 additions & 5 deletions relay-common/src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,6 @@ pub fn instant_to_system_time(instant: Instant) -> SystemTime {
SystemTime::now() - instant.elapsed()
}

/// Converts an `Instant` into a `DateTime`.
pub fn instant_to_date_time(instant: Instant) -> chrono::DateTime<chrono::Utc> {
iambriccardo marked this conversation as resolved.
Show resolved Hide resolved
instant_to_system_time(instant).into()
}

/// Returns the number of milliseconds contained by this `Duration` as `f64`.
///
/// The returned value does include the fractional (nanosecond) part of the duration.
Expand Down
5 changes: 3 additions & 2 deletions relay-server/benches/benches.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use bytes::Bytes;
use chrono::Utc;
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use relay_config::Config;
use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions};
use sqlx::{Pool, Sqlite};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Duration;
use tempfile::TempDir;
use tokio::runtime::Runtime;

Expand Down Expand Up @@ -66,7 +67,7 @@ fn mock_envelope_with_project_key(project_key: &ProjectKey, size: &str) -> Box<E
));

let mut envelope = Envelope::parse_bytes(bytes).unwrap();
envelope.set_start_time(Instant::now());
envelope.set_received_at(Utc::now());
envelope
}

Expand Down
6 changes: 3 additions & 3 deletions relay-server/src/endpoints/batch_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use axum::http::StatusCode;
use axum::response::IntoResponse;
use serde::{Deserialize, Serialize};

use crate::extractors::{SignedBytes, StartTime};
use crate::extractors::{ReceivedAt, SignedBytes};
use crate::service::ServiceState;
use crate::services::processor::ProcessBatchedMetrics;
use crate::services::projects::cache::BucketSource;
Expand All @@ -12,7 +12,7 @@ struct SendMetricsResponse {}

pub async fn handle(
state: ServiceState,
start_time: StartTime,
ReceivedAt(received_at): ReceivedAt,
body: SignedBytes,
) -> impl IntoResponse {
if !body.relay.internal {
Expand All @@ -22,7 +22,7 @@ pub async fn handle(
state.processor().send(ProcessBatchedMetrics {
payload: body.body,
source: BucketSource::Internal,
start_time: start_time.into_inner(),
received_at,
sent_at: None,
});

Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/endpoints/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ fn queue_envelope(
relay_log::trace!("sending metrics into processing queue");
state.project_cache().send(ProcessMetrics {
data: MetricData::Raw(metric_items.into_vec()),
start_time: envelope.meta().start_time().into(),
received_at: envelope.received_at(),
sent_at: envelope.sent_at(),
project_key: envelope.meta().public_key(),
source: envelope.meta().into(),
Expand Down
21 changes: 14 additions & 7 deletions relay-server/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use std::collections::BTreeMap;
use std::fmt;
use std::io::{self, Write};
use std::ops::AddAssign;
use std::time::Instant;
use std::time::Duration;
use uuid::Uuid;

use bytes::Bytes;
Expand Down Expand Up @@ -1219,10 +1219,17 @@ impl Envelope {
}

/// Returns the time at which the envelope was received at this Relay.
///
/// This is the date time equivalent to [`start_time`](Self::start_time).
pub fn received_at(&self) -> DateTime<Utc> {
relay_common::time::instant_to_date_time(self.meta().start_time())
self.meta().received_at()
}

/// Returns the time elapsed in seconds since the envelope was received by this Relay.
///
/// In case the elapsed time is negative, it is assumed that no time elapsed.
pub fn age(&self) -> Duration {
(Utc::now() - self.received_at())
.to_std()
.unwrap_or(Duration::ZERO)
}

/// Sets the event id on the envelope.
Expand All @@ -1235,9 +1242,9 @@ impl Envelope {
self.headers.sent_at = Some(sent_at);
}

/// Sets the start time to the provided `Instant`.
pub fn set_start_time(&mut self, start_time: Instant) {
self.headers.meta.set_start_time(start_time)
/// Sets the received at to the provided `DateTime`.
pub fn set_received_at(&mut self, start_time: DateTime<Utc>) {
self.headers.meta.set_received_at(start_time)
}

/// Sets the data retention in days for items in this envelope.
Expand Down
4 changes: 2 additions & 2 deletions relay-server/src/extractors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
mod content_type;
mod forwarded_for;
mod mime;
mod received_at;
mod remote;
mod request_meta;
mod signed_json;
mod start_time;

pub use self::content_type::*;
pub use self::forwarded_for::*;
pub use self::mime::*;
pub use self::received_at::*;
pub use self::remote::*;
pub use self::request_meta::*;
pub use self::signed_json::*;
pub use self::start_time::*;
32 changes: 32 additions & 0 deletions relay-server/src/extractors/received_at.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use std::convert::Infallible;

use axum::extract::FromRequestParts;
use axum::http::request::Parts;
use axum::Extension;
use chrono::{DateTime, Utc};

/// The time at which the request started.
#[derive(Clone, Copy, Debug)]
pub struct ReceivedAt(pub DateTime<Utc>);

impl ReceivedAt {
pub fn now() -> Self {
Self(Utc::now())
}
}

#[axum::async_trait]
impl<S> FromRequestParts<S> for ReceivedAt
where
S: Send + Sync,
{
type Rejection = Infallible;

async fn from_request_parts(parts: &mut Parts, state: &S) -> Result<Self, Self::Rejection> {
let Extension(start_time) = Extension::from_request_parts(parts, state)
.await
.expect("ReceivedAt middleware is not configured");

Ok(start_time)
}
}
34 changes: 17 additions & 17 deletions relay-server/src/extractors/request_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::convert::Infallible;
use std::fmt;
use std::net::{IpAddr, SocketAddr};
use std::str::FromStr;
use std::time::Instant;

use axum::extract::rejection::PathRejection;
use axum::extract::{ConnectInfo, FromRequestParts, Path};
Expand All @@ -11,6 +10,7 @@ use axum::http::request::Parts;
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use axum::RequestPartsExt;
use chrono::{DateTime, Utc};
use data_encoding::BASE64;
use relay_auth::RelayId;
use relay_base_schema::project::{ParseProjectKeyError, ProjectId, ProjectKey};
Expand All @@ -21,7 +21,7 @@ use relay_quotas::Scoping;
use serde::{Deserialize, Serialize};
use url::Url;

use crate::extractors::{ForwardedFor, StartTime};
use crate::extractors::{ForwardedFor, ReceivedAt};
use crate::service::ServiceState;
use crate::statsd::RelayCounters;
use crate::utils::ApiErrorResponse;
Expand Down Expand Up @@ -227,8 +227,8 @@ pub struct RequestMeta<D = PartialDsn> {
/// The time at which the request started.
///
/// NOTE: This is internal-only and not exposed to Envelope headers.
#[serde(skip, default = "Instant::now")]
start_time: Instant,
#[serde(skip, default = "Utc::now")]
received_at: DateTime<Utc>,

/// Whether the request is coming from an statically configured internal Relay.
///
Expand Down Expand Up @@ -303,13 +303,13 @@ impl<D> RequestMeta<D> {
}

/// The time at which the request started.
pub fn start_time(&self) -> Instant {
self.start_time
pub fn received_at(&self) -> DateTime<Utc> {
self.received_at
}

/// Sets the start time for this [`RequestMeta`] on the current envelope.
pub fn set_start_time(&mut self, start_time: Instant) {
self.start_time = start_time
/// Sets the received at for this [`RequestMeta`] on the current envelope.
pub fn set_received_at(&mut self, received_at: DateTime<Utc>) {
self.received_at = received_at
}

/// Whether the request is coming from a statically configured internal Relay.
Expand Down Expand Up @@ -340,7 +340,7 @@ impl RequestMeta {
forwarded_for: "".to_string(),
user_agent: Some(crate::constants::SERVER.to_owned()),
no_cache: false,
start_time: Instant::now(),
received_at: Utc::now(),
client_hints: ClientHints::default(),
from_internal_relay: false,
}
Expand Down Expand Up @@ -487,6 +487,8 @@ impl FromRequestParts<ServiceState> for PartialMeta {
.map_or(false, |ri| ri.internal);
}

let ReceivedAt(received_at) = ReceivedAt::from_request_parts(parts, state).await?;

Ok(RequestMeta {
dsn: None,
version: default_version(),
Expand All @@ -502,9 +504,7 @@ impl FromRequestParts<ServiceState> for PartialMeta {
.into_inner(),
user_agent: ua.user_agent,
no_cache: false,
start_time: StartTime::from_request_parts(parts, state)
.await?
.into_inner(),
received_at,
client_hints: ua.client_hints,
from_internal_relay,
})
Expand Down Expand Up @@ -667,7 +667,7 @@ impl FromRequestParts<ServiceState> for RequestMeta {
forwarded_for: partial_meta.forwarded_for,
user_agent: partial_meta.user_agent,
no_cache: key_flags.contains(&"no-cache"),
start_time: partial_meta.start_time,
received_at: partial_meta.received_at,
client_hints: partial_meta.client_hints,
from_internal_relay: partial_meta.from_internal_relay,
})
Expand All @@ -690,7 +690,7 @@ mod tests {
forwarded_for: String::new(),
user_agent: Some("sentry/agent".to_string()),
no_cache: false,
start_time: Instant::now(),
received_at: Utc::now(),
client_hints: ClientHints::default(),
from_internal_relay: false,
}
Expand Down Expand Up @@ -727,7 +727,7 @@ mod tests {
forwarded_for: "8.8.8.8".to_string(),
user_agent: Some("0x8000".to_string()),
no_cache: false,
start_time: Instant::now(),
received_at: Utc::now(),
client_hints: ClientHints {
sec_ch_ua_platform: Some("macOS".to_owned()),
sec_ch_ua_platform_version: Some("13.1.0".to_owned()),
Expand All @@ -739,7 +739,7 @@ mod tests {
},
from_internal_relay: false,
};
deserialized.start_time = reqmeta.start_time;
deserialized.received_at = reqmeta.received_at;
assert_eq!(deserialized, reqmeta);
}
}
76 changes: 0 additions & 76 deletions relay-server/src/extractors/start_time.rs

This file was deleted.

Loading
Loading