Skip to content

Commit

Permalink
feat: add foyer metrics to our metrics pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
sprutton1 committed Dec 9, 2024
1 parent ffa4851 commit f90788f
Show file tree
Hide file tree
Showing 8 changed files with 719 additions and 628 deletions.
305 changes: 150 additions & 155 deletions Cargo.lock

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ fastrace = "0.7.3"
flate2 = "1.0.28"
futures = "0.3.30"
futures-lite = "2.3.0"
foyer = { version = "0.13.1", features = ["tracing"] }
foyer = { version = "0.13.1", features = ["tracing", "opentelemetry_0_26"] }
fs4 = "0.11.0"
glob = "0.3.1"
hex = "0.4.3"
Expand All @@ -143,15 +143,15 @@ manyhow = { version = "0.11.4", features = ["darling"] }
mime_guess = { version = "=2.0.4" } # TODO(fnichol): 2.0.5 sets an env var in build.rs which needs to be tracked, required by reqwest
miniz_oxide = { version = "0.7.2", features = ["simd"] }
names = { version = "0.14.0", default-features = false }
nix = { version = "0.27.1", features = ["fs", "mount", "process", "signal", "user"] }
nix = { version = "0.26.0", features = ["fs", "mount", "process", "signal", "user"] }
nkeys = "0.4.0"
num_cpus = "1.16.0"
once_cell = "1.19.0"
open = "5.1.2"
opentelemetry = { version = "0.22.0", features = ["trace"] }
opentelemetry-otlp = { version = "0.15.0", features = ["metrics", "trace"] }
opentelemetry = { version = "0.26.0", features = ["trace"] }
opentelemetry-otlp = { version = "0.26.0", features = ["metrics", "trace"] }
opentelemetry-semantic-conventions = "0.14.0"
opentelemetry_sdk = { version = "0.22.1", features = ["rt-tokio"] }
opentelemetry_sdk = { version = "0.26.0", features = ["rt-tokio"] }
ordered-float = { version = "4.4.0", features = ["serde"] }
ouroboros = "0.18.3"
parking_lot = "0.12.3"
Expand Down Expand Up @@ -212,7 +212,7 @@ toml = { version = "0.8.12" }
tower = { version = "0.4.13", features = ["full"] }
tower-http = { version = "0.4.4", features = ["compression-br", "compression-deflate", "compression-gzip", "cors", "trace"] } # todo: pinning back to 0.4.4, upgrade this alongside hyper/http/axum/tokio-tungstenite
tracing = { version = "0.1.40" }
tracing-opentelemetry = "0.23.0"
tracing-opentelemetry = "0.27.0"
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json", "std"] }
tracing-tunnel = "0.1.0"
trybuild = { version = "1.0.99", features = ["diff"] }
Expand Down
3 changes: 3 additions & 0 deletions lib/si-layer-cache/src/hybrid_cache.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use foyer::opentelemetry_0_26::OpenTelemetryMetricsRegistry;
use foyer::{
DirectFsDeviceOptions, Engine, FifoPicker, HybridCache, HybridCacheBuilder, LargeEngineOptions,
RateLimitPicker, RecoverMode,
};
use std::cmp::max;
use std::path::{Path, PathBuf};
use std::sync::{Arc, LazyLock};
use telemetry::opentelemetry::global;
use telemetry::tracing::{error, info};
use tokio::fs;

Expand Down Expand Up @@ -102,6 +104,7 @@ where

let cache: HybridCache<Arc<str>, MaybeDeserialized<V>> = HybridCacheBuilder::new()
.with_name(cache_name)
.with_metrics_registry(OpenTelemetryMetricsRegistry::new(global::meter(cache_name)))
.memory(memory_cache_capacity_bytes)
.with_weighter(
|_key: &Arc<str>, value: &MaybeDeserialized<V>| match value {
Expand Down
32 changes: 19 additions & 13 deletions lib/telemetry-application-rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
)]
// TODO(fnichol): document all, then drop `missing_errors_doc`
#![allow(clippy::missing_errors_doc)]
use telemetry::opentelemetry::{
metrics::MetricsError,
trace::{TraceError, TracerProvider},
};
use tracing_subscriber::{filter::FilterExt, layer::SubscriberExt, util::SubscriberInitExt};

use std::{
Expand All @@ -26,14 +30,17 @@ use derive_builder::Builder;
use opentelemetry_sdk::{
metrics::SdkMeterProvider,
propagation::TraceContextPropagator,
resource::{EnvResourceDetector, OsResourceDetector, ProcessResourceDetector},
resource::EnvResourceDetector,
runtime,
trace::{self, Tracer},
trace::{self, Config, Tracer},
Resource,
};
use opentelemetry_semantic_conventions::resource;
use telemetry::{
opentelemetry::{global, metrics::MetricsError, trace::TraceError, KeyValue},
opentelemetry::{
global::{self},
KeyValue,
},
prelude::*,
tracing::Subscriber,
TelemetryCommand, TracingLevel, Verbosity,
Expand Down Expand Up @@ -379,7 +386,9 @@ fn tracing_subscriber(
};

let (metrics_layer, metrics_filter_reload) = {
let layer = MetricsLayer::new(otel_metrics(config)?);
let metrics_provider = otel_metrics(config)?;
global::set_meter_provider(metrics_provider.clone());
let layer = MetricsLayer::new(metrics_provider);
let env_filter = EnvFilter::try_new(directives.as_str())?;
let (filter, handle) = reload::Layer::new(env_filter);
let layer = layer.with_filter(filter.and(IncludeMetricsFilter));
Expand All @@ -404,17 +413,18 @@ fn tracing_subscriber(
Ok((registry, handles))
}

fn otel_tracer(config: &TelemetryConfig) -> result::Result<Tracer, TraceError> {
opentelemetry_otlp::new_pipeline()
fn otel_tracer(config: &TelemetryConfig) -> Result<Tracer> {
Ok(opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(opentelemetry_otlp::new_exporter().tonic())
.with_trace_config(trace::config().with_resource(telemetry_resource(config)))
.with_trace_config(Config::default().with_resource(telemetry_resource(config)))
.with_batch_config(
trace::BatchConfigBuilder::default()
.with_max_queue_size(4096)
.build(),
)
.install_batch(runtime::Tokio)
.install_batch(runtime::Tokio)?
.tracer(config.service_name))
}

fn otel_metrics(config: &TelemetryConfig) -> result::Result<SdkMeterProvider, MetricsError> {
Expand All @@ -434,11 +444,7 @@ fn telemetry_resource(config: &TelemetryConfig) -> Resource {
// TODO(fnichol): create opentelemetry-resource-detector-aws for ec2 & eks detection
Resource::from_detectors(
Duration::from_secs(3),
vec![
Box::new(EnvResourceDetector::new()),
Box::new(OsResourceDetector),
Box::new(ProcessResourceDetector),
],
vec![Box::new(EnvResourceDetector::new())],
)
.merge(&Resource::new(vec![
KeyValue::new(resource::SERVICE_NAME, config.service_name.to_string()),
Expand Down
Loading

0 comments on commit f90788f

Please sign in to comment.