Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stats): Add quantity to TrackOutcome and new attachment outcomes #942

Merged
merged 18 commits into from
Mar 16, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions relay-server/src/actors/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::envelope::{self, AttachmentType, ContentType, Envelope, Item, ItemTyp
use crate::http::{HttpError, RequestBuilder};
use crate::metrics::{RelayCounters, RelayHistograms, RelaySets, RelayTimers};
use crate::service::ServerError;
use crate::utils::{self, ChunkedFormDataAggregator, FormDataIter, FutureExt};
use crate::utils::{self, ChunkedFormDataAggregator, EnvelopeSummary, FormDataIter, FutureExt};

#[cfg(feature = "processing")]
use {
Expand Down Expand Up @@ -1483,12 +1483,13 @@ impl Handler<HandleEnvelope> for EventManager {

let scoping = Rc::new(RefCell::new(envelope.meta().get_partial_scoping()));
let is_received = Rc::new(AtomicBool::from(false));
let envelope_summary = Rc::new(RefCell::new(EnvelopeSummary::empty()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be able to get rid of the event_category variable as a result of this, and use envelope_summary in the or_else error handler.


let future = project
.send(CheckEnvelope::fetched(envelope))
.map_err(ProcessingError::ScheduleFailed)
.and_then(|result| result.map_err(ProcessingError::ProjectFailed))
.and_then(clone!(scoping, |response| {
.and_then(clone!(scoping, envelope_summary, |response| {
// Use the project id from the loaded project state to account for redirects.
let project_id = response.scoping.project_id.value();
metric!(set(RelaySets::UniqueProjects) = project_id as i64);
Expand All @@ -1497,7 +1498,10 @@ impl Handler<HandleEnvelope> for EventManager {

let checked = response.result.map_err(ProcessingError::EventRejected)?;
match checked.envelope {
Some(envelope) => Ok(envelope),
Some(envelope) => {
envelope_summary.replace(EnvelopeSummary::compute(&envelope));
Ok(envelope)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the latest place where we need to update the envelope summary?

(I love how it can be written like this in the Rust syntax, btw.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the case in endpoints/common.rs, you'll have to make another update after calling ProcessEnvelope below once rate limiting can emit outcomes. For now, a comment will suffice.

Technically, that would also apply to the replace you added here. However, it is fine to do it in this instance. Prior to invoking the HandleEnvelope handler, we split envelopes into event-related and event-unrelated items.

None => Err(ProcessingError::RateLimited(checked.rate_limits)),
}
}))
Expand Down Expand Up @@ -1696,11 +1700,25 @@ impl Handler<HandleEnvelope> for EventManager {
outcome_producer.do_send(TrackOutcome {
timestamp: Instant::now(),
scoping: *scoping.borrow(),
outcome,
outcome: outcome.clone(),
event_id,
remote_addr,
category,
})
quantity: 1,
});

let envelope_summary = envelope_summary.borrow();
if envelope_summary.attachment_quantity > 0 {
JoshFerge marked this conversation as resolved.
Show resolved Hide resolved
jan-auer marked this conversation as resolved.
Show resolved Hide resolved
outcome_producer.do_send(TrackOutcome {
timestamp: start_time,
scoping: *scoping.borrow(),
outcome,
event_id,
remote_addr,
category: DataCategory::Attachment,
quantity: envelope_summary.attachment_quantity,
});
}
}
})
.then(move |x, slf, _| {
Expand Down
6 changes: 6 additions & 0 deletions relay-server/src/actors/outcome.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ pub struct TrackOutcome {
pub remote_addr: Option<IpAddr>,
/// The event's data category.
pub category: DataCategory,
/// The number of events or total attachment size in bytes.
pub quantity: usize,
JoshFerge marked this conversation as resolved.
Show resolved Hide resolved
}

impl Message for TrackOutcome {
Expand Down Expand Up @@ -313,6 +315,9 @@ pub struct TrackRawOutcome {
/// The event's data category.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub category: Option<u8>,
/// The number of events or total attachment size in bytes.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub quantity: Option<usize>,
}

impl TrackRawOutcome {
Expand Down Expand Up @@ -348,6 +353,7 @@ impl TrackRawOutcome {
remote_addr: msg.remote_addr.map(|addr| addr.to_string()),
source,
category: msg.category.value(),
quantity: Some(msg.quantity),
}
}
}
Expand Down
34 changes: 25 additions & 9 deletions relay-server/src/endpoints/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use failure::Fail;
use futures::prelude::*;
use serde::Deserialize;

use relay_common::{clone, metric, tryf};
use relay_common::{clone, metric, tryf, DataCategory};
use relay_config::Config;
use relay_general::protocol::{EventId, EventType};
use relay_log::LogError;
Expand All @@ -26,7 +26,7 @@ use crate::envelope::{AttachmentType, Envelope, EnvelopeError, ItemType, Items};
use crate::extractors::RequestMeta;
use crate::metrics::RelayCounters;
use crate::service::{ServiceApp, ServiceState};
use crate::utils::{self, ApiErrorResponse, FormDataIter, MultipartError};
use crate::utils::{self, ApiErrorResponse, EnvelopeSummary, FormDataIter, MultipartError};

#[derive(Fail, Debug)]
pub enum BadStoreRequest {
Expand Down Expand Up @@ -410,19 +410,19 @@ where

let scoping = Rc::new(RefCell::new(meta.get_partial_scoping()));
let event_id = Rc::new(RefCell::new(None));
let event_category = Rc::new(RefCell::new(None));
let envelope_summary = Rc::new(RefCell::new(EnvelopeSummary::empty()));
let config = request.state().config();
let processing_enabled = config.processing_enabled();

let future = project_manager
.send(GetProject { public_key })
.map_err(BadStoreRequest::ScheduleFailed)
.and_then(clone!(event_id, event_category, scoping, |project| {
.and_then(clone!(event_id, envelope_summary, scoping, |project| {
extract_envelope(&request, meta)
.into_future()
.and_then(clone!(event_id, event_category, |envelope| {
.and_then(clone!(event_id, envelope_summary, |envelope| {
event_id.replace(envelope.event_id());
event_category.replace(envelope.get_event_category());
envelope_summary.replace(EnvelopeSummary::compute(&envelope));

if envelope.is_empty() {
Err(BadStoreRequest::EmptyEnvelope)
Expand All @@ -446,6 +446,8 @@ where
Some(envelope) => envelope,
None => return Err(BadStoreRequest::RateLimited(checked.rate_limits)),
};
// TODO: Ensure that outcomes are emitted correctly for rate-limited attachments
// so that it is safe to update envelope_summary here.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, we will have to update the envelope summary here. We chose not to do this just yet, until the rate limiting code inside CheckEnvelope can emit its own outcomes.


if check_envelope_size_limits(&config, &envelope) {
Ok((envelope, checked.rate_limits))
Expand Down Expand Up @@ -530,15 +532,29 @@ where
.or_else(move |error: BadStoreRequest| {
metric!(counter(RelayCounters::EnvelopeRejected) += 1);

if let Some(category) = *event_category.borrow() {
if let Some(outcome) = error.to_outcome() {
let envelope_summary = envelope_summary.borrow();
if let Some(outcome) = error.to_outcome() {
if let Some(category) = envelope_summary.event_category {
outcome_producer.do_send(TrackOutcome {
timestamp: start_time,
scoping: *scoping.borrow(),
outcome,
outcome: outcome.clone(),
event_id: *event_id.borrow(),
remote_addr,
category,
quantity: 1,
});
}

if envelope_summary.attachment_quantity > 0 {
outcome_producer.do_send(TrackOutcome {
timestamp: start_time,
scoping: *scoping.borrow(),
outcome,
JoshFerge marked this conversation as resolved.
Show resolved Hide resolved
event_id: *event_id.borrow(),
remote_addr,
category: DataCategory::Attachment,
quantity: envelope_summary.attachment_quantity,
});
}
}
Expand Down
3 changes: 3 additions & 0 deletions tests/integration/test_outcome.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def test_outcomes_non_processing(relay, mini_sentry, event_type):
"event_id": event_id,
"remote_addr": "127.0.0.1",
"category": 2 if event_type == "transaction" else 1,
"quantity": 1,
}
assert outcome == expected_outcome

Expand Down Expand Up @@ -303,6 +304,7 @@ def test_outcome_forwarding(
"event_id": event_id,
"remote_addr": "127.0.0.1",
"category": 2 if event_type == "transaction" else 1,
"quantity": 1,
}
outcome.pop("timestamp")

Expand Down Expand Up @@ -377,6 +379,7 @@ def test_outcomes_forwarding_rate_limited(
"event_id": event_id,
"source": "processing-layer",
"category": 1,
"quantity": 1,
}
assert outcome == expected_outcome

Expand Down