Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(metrics): Introduce revisions for metrics extraction [INGEST-1303] #1251

Merged
merged 2 commits into from
Apr 29, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 22 additions & 13 deletions relay-server/src/actors/envelopes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use crate::actors::upstream::{SendRequest, UpstreamRelay, UpstreamRequest, Upstr
use crate::envelope::{self, AttachmentType, ContentType, Envelope, EnvelopeError, Item, ItemType};
use crate::extractors::{PartialDsn, RequestMeta};
use crate::http::{HttpError, Request, RequestBuilder, Response};
use crate::metrics_extraction::sessions::extract_session_metrics;
use crate::metrics_extraction::sessions::{extract_session_metrics, SessionMetricsConfig};
use crate::service::ServerError;
use crate::statsd::{RelayCounters, RelayHistograms, RelaySets, RelayTimers};
use crate::utils::{
Expand Down Expand Up @@ -368,8 +368,8 @@ struct ProcessEnvelopeState {

/// Metrics extracted from items in the envelope.
///
/// This is controlled by [`Feature::MetricsExtraction`]. Relay extracts metrics for sessions
/// and transactions.
/// Relay can extract metrics for sessions and transactions, which is controlled by
/// configuration objects in the project config.
extracted_metrics: Vec<Metric>,

/// The state of the project that this envelope belongs to.
Expand Down Expand Up @@ -603,7 +603,7 @@ impl EnvelopeProcessor {
received: DateTime<Utc>,
client: Option<&str>,
client_addr: Option<net::IpAddr>,
metrics_extraction_enabled: bool,
metrics_config: SessionMetricsConfig,
clock_drift_processor: &ClockDriftProcessor,
extracted_metrics: &mut Vec<Metric>,
) -> bool {
Expand Down Expand Up @@ -660,12 +660,17 @@ impl EnvelopeProcessor {
}
}

// Extract metrics
if metrics_extraction_enabled && !item.metrics_extracted() {
// Extract metrics if they haven't been extracted by a prior Relay
if metrics_config.is_enabled() && !item.metrics_extracted() {
extract_session_metrics(&session.attributes, &session, client, extracted_metrics);
item.set_metrics_extracted(true);
}

// Drop the session if metrics have been extracted in this or a prior Relay
if metrics_config.should_drop() && item.metrics_extracted() {
return false;
}

if changed {
let json_string = match serde_json::to_string(&session) {
Ok(json) => json,
Expand All @@ -688,7 +693,7 @@ impl EnvelopeProcessor {
received: DateTime<Utc>,
client: Option<&str>,
client_addr: Option<net::IpAddr>,
metrics_extraction_enabled: bool,
metrics_config: SessionMetricsConfig,
clock_drift_processor: &ClockDriftProcessor,
extracted_metrics: &mut Vec<Metric>,
) -> bool {
Expand Down Expand Up @@ -729,14 +734,19 @@ impl EnvelopeProcessor {
}
}

// Extract metrics
if metrics_extraction_enabled && !item.metrics_extracted() {
// Extract metrics if they haven't been extracted by a prior Relay
if metrics_config.is_enabled() && !item.metrics_extracted() {
for aggregate in &session.aggregates {
extract_session_metrics(&session.attributes, aggregate, client, extracted_metrics);
item.set_metrics_extracted(true);
}
}

// Drop the aggregate if metrics have been extracted in this or a prior Relay
if metrics_config.should_drop() && item.metrics_extracted() {
return false;
}

if changed {
let json_string = match serde_json::to_string(&session) {
Ok(json) => json,
Expand All @@ -759,8 +769,7 @@ impl EnvelopeProcessor {
fn process_sessions(&self, state: &mut ProcessEnvelopeState) {
let received = state.envelope_context.received_at;
let extracted_metrics = &mut state.extracted_metrics;
let metrics_extraction_enabled =
state.project_state.has_feature(Feature::MetricsExtraction);
let metrics_config = state.project_state.config().session_metrics;
let envelope = &mut state.envelope;
let client = envelope.meta().client().map(|x| x.to_owned());
let client_addr = envelope.meta().client_addr();
Expand All @@ -775,7 +784,7 @@ impl EnvelopeProcessor {
received,
client.as_deref(),
client_addr,
metrics_extraction_enabled,
metrics_config,
&clock_drift_processor,
extracted_metrics,
),
Expand All @@ -784,7 +793,7 @@ impl EnvelopeProcessor {
received,
client.as_deref(),
client_addr,
metrics_extraction_enabled,
metrics_config,
&clock_drift_processor,
extracted_metrics,
),
Expand Down
22 changes: 16 additions & 6 deletions relay-server/src/actors/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::actors::project_cache::{
};
use crate::envelope::Envelope;
use crate::extractors::RequestMeta;
use crate::metrics_extraction::sessions::SessionMetricsConfig;
use crate::metrics_extraction::transactions::TransactionMetricsConfig;
use crate::metrics_extraction::TaggingRule;
use crate::statsd::RelayCounters;
Expand All @@ -47,16 +48,21 @@ pub enum Expiry {
Expired,
}

/// Features exposed by project config
/// Features exposed by project config.
#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub enum Feature {
#[serde(rename = "organizations:metrics-extraction")]
MetricsExtraction,

/// Enables ingestion and normalization of profiles.
#[serde(rename = "organizations:profiling")]
Profiling,

/// forward compatibility
/// Unused.
///
/// This used to control the initial experimental metrics extraction for sessions and has been
/// discontinued.
#[serde(rename = "organizations:metrics-extraction")]
Deprecated1,

/// Forward compatibility.
#[serde(other)]
Unknown,
}
Expand Down Expand Up @@ -91,7 +97,10 @@ pub struct ProjectConfig {
/// Configuration for operation breakdown. Will be emitted only if present.
#[serde(skip_serializing_if = "Option::is_none")]
pub breakdowns_v2: Option<BreakdownsConfig>,
/// Configuration in relation to extracting metrics from transaction events.
/// Configuration for extracting metrics from sessions.
#[serde(skip_serializing_if = "SessionMetricsConfig::is_disabled")]
pub session_metrics: SessionMetricsConfig,
/// Configuration for extracting metrics from transaction events.
#[serde(skip_serializing_if = "Option::is_none")]
pub transaction_metrics: Option<ErrorBoundary<TransactionMetricsConfig>>,
/// The span attributes configuration.
Expand All @@ -117,6 +126,7 @@ impl Default for ProjectConfig {
quotas: Vec::new(),
dynamic_sampling: None,
breakdowns_v2: None,
session_metrics: SessionMetricsConfig::default(),
transaction_metrics: None,
span_attributes: BTreeSet::new(),
metric_conditional_tagging: Vec::new(),
Expand Down
42 changes: 40 additions & 2 deletions relay-server/src/metrics_extraction/sessions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,46 @@ use relay_metrics::{DurationUnit, Metric, MetricUnit, MetricValue};

use super::utils::with_tag;

/// Namespace of session metricsfor the MRI.
const METRIC_NAMESPACE: &str = "sessions";

/// Current version of metrics extraction.
const EXTRACT_VERSION: u16 = 1;

/// Configuration for metric extraction from sessions.
#[derive(Debug, Clone, Copy, Default, serde::Deserialize, serde::Serialize)]
#[serde(default, rename_all = "camelCase")]
pub struct SessionMetricsConfig {
/// The revision of the extraction algorithm.
///
/// Provided the revision is lower than or equal to the revision supported by this Relay,
/// metrics are extracted. If the revision is higher than what this Relay supports, it does not
/// extract metrics from sessions, and instead forwards them to the upstream.
///
/// Version `0` (default) disables extraction.
version: u16,

/// Drop sessions after successfully extracting metrics.
drop: bool,
}

impl SessionMetricsConfig {
/// Returns `true` if session metrics is enabled and compatible.
pub fn is_enabled(&self) -> bool {
self.version > 0 && self.version <= EXTRACT_VERSION
}

/// Returns `true` if Relay should not extract metrics from sessions.
pub fn is_disabled(&self) -> bool {
!self.is_enabled()
}

/// Returns `true` if the session should be dropped after extracting metrics.
pub fn should_drop(&self) -> bool {
self.drop
}
}

/// Convert contained nil UUIDs to None
fn nil_to_none(distinct_id: Option<&String>) -> Option<&String> {
let distinct_id = distinct_id?;
Expand All @@ -18,8 +58,6 @@ fn nil_to_none(distinct_id: Option<&String>) -> Option<&String> {
Some(distinct_id)
}

const METRIC_NAMESPACE: &str = "sessions";

pub fn extract_session_metrics<T: SessionLike>(
attributes: &SessionAttributes,
session: &T,
Expand Down
9 changes: 5 additions & 4 deletions tests/integration/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def test_session_metrics_non_processing(

if extract_metrics:
# enable metrics extraction for the project
extra_config = {"config": {"features": ["organizations:metrics-extraction"]}}
extra_config = {"config": {"sessionMetrics": {"version": 1}}}
else:
extra_config = {}

Expand Down Expand Up @@ -322,7 +322,7 @@ def test_metrics_extracted_only_once(
)

# enable metrics extraction for the project
extra_config = {"config": {"features": ["organizations:metrics-extraction"]}}
extra_config = {"config": {"sessionMetrics": {"version": 1}}}

project_id = 42
mini_sentry.add_full_project_config(project_id, extra=extra_config)
Expand Down Expand Up @@ -358,7 +358,7 @@ def test_session_metrics_processing(
project_id = 42

# enable metrics extraction for the project
extra_config = {"config": {"features": ["organizations:metrics-extraction"]}}
extra_config = {"config": {"sessionMetrics": {"version": 1}}}

mini_sentry.add_full_project_config(project_id, extra=extra_config)

Expand Down Expand Up @@ -466,7 +466,8 @@ def test_transaction_metrics(
config = mini_sentry.project_configs[project_id]["config"]
timestamp = datetime.now(tz=timezone.utc)

config["features"] = ["organizations:metrics-extraction"] if extract_metrics else []
if extract_metrics:
config["sessionMetrics"] = {"version": 1}
config["breakdownsV2"] = {
"span_ops": {"type": "spanOperations", "matches": ["react.mount"]}
}
Expand Down