Skip to content

Commit

Permalink
more logs
Browse files Browse the repository at this point in the history
  • Loading branch information
jjbayer committed Nov 26, 2024
1 parent 73cea7d commit 006cc1e
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 7 deletions.
2 changes: 1 addition & 1 deletion relay-server/src/services/buffer/envelope_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ impl PolymorphicEnvelopeBuffer {
}

/// Returns the partition tag for this [`PolymorphicEnvelopeBuffer`].
fn partition_tag(&self) -> &str {
pub fn partition_tag(&self) -> &str {
match self {
PolymorphicEnvelopeBuffer::InMemory(buffer) => &buffer.partition_tag,
PolymorphicEnvelopeBuffer::Sqlite(buffer) => &buffer.partition_tag,
Expand Down
32 changes: 26 additions & 6 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,13 +263,18 @@ impl EnvelopeBufferService {
buffer: &PolymorphicEnvelopeBuffer,
dequeue: bool,
) -> Option<Permit<legacy::DequeuedEnvelope>> {
relay_log::trace!("EnvelopeBufferService {partition_tag}: ready_to_pop?");
relay_statsd::metric!(
counter(RelayCounters::BufferReadyToPop) += 1,
status = "checking",
partition_id = partition_tag
);

self.system_ready(buffer, dequeue).await;
relay_log::trace!(
"EnvelopeBufferService {partition_tag}: system is ready. Sleeping for {:?}",
self.sleep
);

relay_statsd::metric!(
counter(RelayCounters::BufferReadyToPop) += 1,
Expand All @@ -287,8 +292,14 @@ impl EnvelopeBufferService {
partition_id = partition_tag
);

relay_log::trace!("EnvelopeBufferService {partition_tag}: waiting for permit");
let permit = self.services.envelopes_tx.reserve().await.ok();

relay_log::trace!(
"EnvelopeBufferService {partition_tag}: got permit: {}",
permit.is_some()
);

relay_statsd::metric!(
counter(RelayCounters::BufferReadyToPop) += 1,
status = "checked",
Expand Down Expand Up @@ -332,6 +343,7 @@ impl EnvelopeBufferService {
) -> Result<Duration, EnvelopeBufferError> {
let sleep = match buffer.peek().await? {
Peek::Empty => {
relay_log::trace!("EnvelopeBufferService {partition_tag}: empty");
relay_statsd::metric!(
counter(RelayCounters::BufferTryPop) += 1,
peek_result = "empty",
Expand All @@ -344,6 +356,7 @@ impl EnvelopeBufferService {
| Peek::NotReady {
last_received_at, ..
} if is_expired(last_received_at, config) => {
relay_log::trace!("EnvelopeBufferService {partition_tag}: expired");
let envelope = buffer
.pop()
.await?
Expand All @@ -354,7 +367,7 @@ impl EnvelopeBufferService {
Duration::ZERO // try next pop immediately
}
Peek::Ready { .. } => {
relay_log::trace!("EnvelopeBufferService: popping envelope");
relay_log::trace!("EnvelopeBufferService {partition_tag}: popping envelope");
relay_statsd::metric!(
counter(RelayCounters::BufferTryPop) += 1,
peek_result = "ready",
Expand All @@ -373,7 +386,9 @@ impl EnvelopeBufferService {
next_project_fetch,
last_received_at: _,
} => {
relay_log::trace!("EnvelopeBufferService: project(s) of envelope not ready");
relay_log::trace!(
"EnvelopeBufferService {partition_tag}: project(s) of envelope not ready"
);
relay_statsd::metric!(
counter(RelayCounters::BufferTryPop) += 1,
peek_result = "not_ready",
Expand All @@ -384,7 +399,9 @@ impl EnvelopeBufferService {
// peek of this not ready project key pair and the current peek. This is done to
// avoid flooding the project cache with `UpdateProject` messages.
if Instant::now() >= next_project_fetch {
relay_log::trace!("EnvelopeBufferService: requesting project(s) update");
relay_log::trace!(
"EnvelopeBufferService {partition_tag}: requesting project(s) update"
);

let ProjectKeyPair {
own_key,
Expand Down Expand Up @@ -430,12 +447,12 @@ impl EnvelopeBufferService {
// projects was already triggered (see XXX).
// For better separation of concerns, this prefetch should be triggered from here
// once buffer V1 has been removed.
relay_log::trace!("EnvelopeBufferService: received push message");
relay_log::trace!("EnvelopeBufferService {partition_tag}: received push message");
Self::push(buffer, envelope).await;
}
EnvelopeBuffer::NotReady(project_key, envelope) => {
relay_log::trace!(
"EnvelopeBufferService: received project not ready message for project key {}",
"EnvelopeBufferService {partition_tag}: received project not ready message for project key {}",
&project_key
);
relay_statsd::metric!(
Expand All @@ -452,7 +469,10 @@ impl EnvelopeBufferService {
async fn handle_shutdown(buffer: &mut PolymorphicEnvelopeBuffer, message: Shutdown) -> bool {
// We gracefully shut down only if the shutdown has a timeout.
if let Some(shutdown_timeout) = message.timeout {
relay_log::trace!("EnvelopeBufferService: shutting down gracefully");
relay_log::trace!(
"EnvelopeBufferService {}: shutting down gracefully",
buffer.partition_tag()
);

let shutdown_result = timeout(shutdown_timeout, buffer.shutdown()).await;
match shutdown_result {
Expand Down

0 comments on commit 006cc1e

Please sign in to comment.