Skip to content

Commit

Permalink
ref(relay): Use DateTime instead of Instant for start_time (#4184)
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo authored Oct 30, 2024
1 parent 182a93e commit 0adf112
Show file tree
Hide file tree
Showing 25 changed files with 235 additions and 259 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

- Add a metric that counts span volume in the root project for dynamic sampling (`c:spans/count_per_root_project@none`). ([#4134](https://github.com/getsentry/relay/pull/4134))
- Add a tag `target_project_id` to both root project metrics for dynamic sampling (`c:transactions/count_per_root_project@none` and `c:spans/count_per_root_project@none`) which shows the flow trace traffic from root to target projects. ([#4170](https://github.com/getsentry/relay/pull/4170))
- Use `DateTime<Utc>` instead of `Instant` for tracking the received time of the `Envelope`. ([#4184](https://github.com/getsentry/relay/pull/4184))

## 24.10.0

Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 0 additions & 5 deletions relay-common/src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,6 @@ pub fn instant_to_system_time(instant: Instant) -> SystemTime {
SystemTime::now() - instant.elapsed()
}

/// Converts an `Instant` into a `DateTime`.
pub fn instant_to_date_time(instant: Instant) -> chrono::DateTime<chrono::Utc> {
instant_to_system_time(instant).into()
}

/// Returns the number of milliseconds contained by this `Duration` as `f64`.
///
/// The returned value does include the fractional (nanosecond) part of the duration.
Expand Down
5 changes: 3 additions & 2 deletions relay-server/benches/benches.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use bytes::Bytes;
use chrono::Utc;
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use relay_config::Config;
use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions};
use sqlx::{Pool, Sqlite};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Duration;
use tempfile::TempDir;
use tokio::runtime::Runtime;

Expand Down Expand Up @@ -66,7 +67,7 @@ fn mock_envelope_with_project_key(project_key: &ProjectKey, size: &str) -> Box<E
));

let mut envelope = Envelope::parse_bytes(bytes).unwrap();
envelope.set_start_time(Instant::now());
envelope.set_received_at(Utc::now());
envelope
}

Expand Down
6 changes: 3 additions & 3 deletions relay-server/src/endpoints/batch_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use axum::http::StatusCode;
use axum::response::IntoResponse;
use serde::{Deserialize, Serialize};

use crate::extractors::{SignedBytes, StartTime};
use crate::extractors::{ReceivedAt, SignedBytes};
use crate::service::ServiceState;
use crate::services::processor::ProcessBatchedMetrics;
use crate::services::projects::cache::BucketSource;
Expand All @@ -12,7 +12,7 @@ struct SendMetricsResponse {}

pub async fn handle(
state: ServiceState,
start_time: StartTime,
ReceivedAt(received_at): ReceivedAt,
body: SignedBytes,
) -> impl IntoResponse {
if !body.relay.internal {
Expand All @@ -22,7 +22,7 @@ pub async fn handle(
state.processor().send(ProcessBatchedMetrics {
payload: body.body,
source: BucketSource::Internal,
start_time: start_time.into_inner(),
received_at,
sent_at: None,
});

Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/endpoints/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ fn queue_envelope(
relay_log::trace!("sending metrics into processing queue");
state.project_cache().send(ProcessMetrics {
data: MetricData::Raw(metric_items.into_vec()),
start_time: envelope.meta().start_time().into(),
received_at: envelope.received_at(),
sent_at: envelope.sent_at(),
project_key: envelope.meta().public_key(),
source: envelope.meta().into(),
Expand Down
21 changes: 14 additions & 7 deletions relay-server/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use std::collections::BTreeMap;
use std::fmt;
use std::io::{self, Write};
use std::ops::AddAssign;
use std::time::Instant;
use std::time::Duration;
use uuid::Uuid;

use bytes::Bytes;
Expand Down Expand Up @@ -1219,10 +1219,17 @@ impl Envelope {
}

/// Returns the time at which the envelope was received at this Relay.
///
/// This is the date time equivalent to [`start_time`](Self::start_time).
pub fn received_at(&self) -> DateTime<Utc> {
relay_common::time::instant_to_date_time(self.meta().start_time())
self.meta().received_at()
}

/// Returns the time elapsed in seconds since the envelope was received by this Relay.
///
/// In case the elapsed time is negative, it is assumed that no time elapsed.
pub fn age(&self) -> Duration {
(Utc::now() - self.received_at())
.to_std()
.unwrap_or(Duration::ZERO)
}

/// Sets the event id on the envelope.
Expand All @@ -1235,9 +1242,9 @@ impl Envelope {
self.headers.sent_at = Some(sent_at);
}

/// Sets the start time to the provided `Instant`.
pub fn set_start_time(&mut self, start_time: Instant) {
self.headers.meta.set_start_time(start_time)
/// Sets the received at to the provided `DateTime`.
pub fn set_received_at(&mut self, start_time: DateTime<Utc>) {
self.headers.meta.set_received_at(start_time)
}

/// Sets the data retention in days for items in this envelope.
Expand Down
4 changes: 2 additions & 2 deletions relay-server/src/extractors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
mod content_type;
mod forwarded_for;
mod mime;
mod received_at;
mod remote;
mod request_meta;
mod signed_json;
mod start_time;

pub use self::content_type::*;
pub use self::forwarded_for::*;
pub use self::mime::*;
pub use self::received_at::*;
pub use self::remote::*;
pub use self::request_meta::*;
pub use self::signed_json::*;
pub use self::start_time::*;
32 changes: 32 additions & 0 deletions relay-server/src/extractors/received_at.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use std::convert::Infallible;

use axum::extract::FromRequestParts;
use axum::http::request::Parts;
use axum::Extension;
use chrono::{DateTime, Utc};

/// The time at which the request started.
#[derive(Clone, Copy, Debug)]
pub struct ReceivedAt(pub DateTime<Utc>);

impl ReceivedAt {
pub fn now() -> Self {
Self(Utc::now())
}
}

#[axum::async_trait]
impl<S> FromRequestParts<S> for ReceivedAt
where
S: Send + Sync,
{
type Rejection = Infallible;

async fn from_request_parts(parts: &mut Parts, state: &S) -> Result<Self, Self::Rejection> {
let Extension(start_time) = Extension::from_request_parts(parts, state)
.await
.expect("ReceivedAt middleware is not configured");

Ok(start_time)
}
}
34 changes: 17 additions & 17 deletions relay-server/src/extractors/request_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::convert::Infallible;
use std::fmt;
use std::net::{IpAddr, SocketAddr};
use std::str::FromStr;
use std::time::Instant;

use axum::extract::rejection::PathRejection;
use axum::extract::{ConnectInfo, FromRequestParts, Path};
Expand All @@ -11,6 +10,7 @@ use axum::http::request::Parts;
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use axum::RequestPartsExt;
use chrono::{DateTime, Utc};
use data_encoding::BASE64;
use relay_auth::RelayId;
use relay_base_schema::project::{ParseProjectKeyError, ProjectId, ProjectKey};
Expand All @@ -21,7 +21,7 @@ use relay_quotas::Scoping;
use serde::{Deserialize, Serialize};
use url::Url;

use crate::extractors::{ForwardedFor, StartTime};
use crate::extractors::{ForwardedFor, ReceivedAt};
use crate::service::ServiceState;
use crate::statsd::RelayCounters;
use crate::utils::ApiErrorResponse;
Expand Down Expand Up @@ -227,8 +227,8 @@ pub struct RequestMeta<D = PartialDsn> {
/// The time at which the request started.
///
/// NOTE: This is internal-only and not exposed to Envelope headers.
#[serde(skip, default = "Instant::now")]
start_time: Instant,
#[serde(skip, default = "Utc::now")]
received_at: DateTime<Utc>,

/// Whether the request is coming from an statically configured internal Relay.
///
Expand Down Expand Up @@ -303,13 +303,13 @@ impl<D> RequestMeta<D> {
}

/// The time at which the request started.
pub fn start_time(&self) -> Instant {
self.start_time
pub fn received_at(&self) -> DateTime<Utc> {
self.received_at
}

/// Sets the start time for this [`RequestMeta`] on the current envelope.
pub fn set_start_time(&mut self, start_time: Instant) {
self.start_time = start_time
/// Sets the received at for this [`RequestMeta`] on the current envelope.
pub fn set_received_at(&mut self, received_at: DateTime<Utc>) {
self.received_at = received_at
}

/// Whether the request is coming from a statically configured internal Relay.
Expand Down Expand Up @@ -340,7 +340,7 @@ impl RequestMeta {
forwarded_for: "".to_string(),
user_agent: Some(crate::constants::SERVER.to_owned()),
no_cache: false,
start_time: Instant::now(),
received_at: Utc::now(),
client_hints: ClientHints::default(),
from_internal_relay: false,
}
Expand Down Expand Up @@ -487,6 +487,8 @@ impl FromRequestParts<ServiceState> for PartialMeta {
.map_or(false, |ri| ri.internal);
}

let ReceivedAt(received_at) = ReceivedAt::from_request_parts(parts, state).await?;

Ok(RequestMeta {
dsn: None,
version: default_version(),
Expand All @@ -502,9 +504,7 @@ impl FromRequestParts<ServiceState> for PartialMeta {
.into_inner(),
user_agent: ua.user_agent,
no_cache: false,
start_time: StartTime::from_request_parts(parts, state)
.await?
.into_inner(),
received_at,
client_hints: ua.client_hints,
from_internal_relay,
})
Expand Down Expand Up @@ -667,7 +667,7 @@ impl FromRequestParts<ServiceState> for RequestMeta {
forwarded_for: partial_meta.forwarded_for,
user_agent: partial_meta.user_agent,
no_cache: key_flags.contains(&"no-cache"),
start_time: partial_meta.start_time,
received_at: partial_meta.received_at,
client_hints: partial_meta.client_hints,
from_internal_relay: partial_meta.from_internal_relay,
})
Expand All @@ -690,7 +690,7 @@ mod tests {
forwarded_for: String::new(),
user_agent: Some("sentry/agent".to_string()),
no_cache: false,
start_time: Instant::now(),
received_at: Utc::now(),
client_hints: ClientHints::default(),
from_internal_relay: false,
}
Expand Down Expand Up @@ -727,7 +727,7 @@ mod tests {
forwarded_for: "8.8.8.8".to_string(),
user_agent: Some("0x8000".to_string()),
no_cache: false,
start_time: Instant::now(),
received_at: Utc::now(),
client_hints: ClientHints {
sec_ch_ua_platform: Some("macOS".to_owned()),
sec_ch_ua_platform_version: Some("13.1.0".to_owned()),
Expand All @@ -739,7 +739,7 @@ mod tests {
},
from_internal_relay: false,
};
deserialized.start_time = reqmeta.start_time;
deserialized.received_at = reqmeta.received_at;
assert_eq!(deserialized, reqmeta);
}
}
76 changes: 0 additions & 76 deletions relay-server/src/extractors/start_time.rs

This file was deleted.

Loading

0 comments on commit 0adf112

Please sign in to comment.