diff --git a/opentelemetry-otlp/CHANGELOG.md b/opentelemetry-otlp/CHANGELOG.md index c324dd0ead..f6994aa03c 100644 --- a/opentelemetry-otlp/CHANGELOG.md +++ b/opentelemetry-otlp/CHANGELOG.md @@ -10,6 +10,7 @@ - Add `grpcio` metrics exporter (#1202) - Allow specifying OTLP HTTP headers from env variable (#1290) - Support custom channels in topic exporters [#1335](https://github.com/open-telemetry/opentelemetry-rust/pull/1335) +- Allow specifying OTLP Tonic metadata from env variable (#1377) ### Changed diff --git a/opentelemetry-otlp/src/exporter/http/mod.rs b/opentelemetry-otlp/src/exporter/http/mod.rs index e0c5e9cdff..d26d1302ca 100644 --- a/opentelemetry-otlp/src/exporter/http/mod.rs +++ b/opentelemetry-otlp/src/exporter/http/mod.rs @@ -10,7 +10,7 @@ use std::str::FromStr; use std::sync::{Arc, Mutex}; use std::time::Duration; -use super::default_headers; +use super::{default_headers, parse_header_string}; #[cfg(feature = "metrics")] mod metrics; @@ -316,46 +316,18 @@ fn resolve_endpoint( #[allow(clippy::mutable_key_type)] // http headers are not mutated fn add_header_from_string(input: &str, headers: &mut HashMap) { - for pair in input.split_terminator(',') { - if pair.trim().is_empty() { - continue; - } - if let Some((k, v)) = pair.trim().split_once('=') { - if !k.trim().is_empty() && !v.trim().is_empty() { - if let (Ok(key), Ok(value)) = ( - HeaderName::from_str(k.trim()), - HeaderValue::from_str(v.trim()), - ) { - headers.insert(key, value); - } - } - } - } + headers.extend(parse_header_string(input).filter_map(|(key, value)| { + Some(( + HeaderName::from_str(key).ok()?, + HeaderValue::from_str(value).ok()?, + )) + })); } #[cfg(test)] mod tests { + use crate::exporter::tests::run_env_test; use crate::{OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_EXPORTER_OTLP_TRACES_ENDPOINT}; - use std::sync::Mutex; - - // Make sure env tests are not running concurrently - static ENV_LOCK: Mutex<()> = Mutex::new(()); - - fn run_env_test(env_vars: T, f: F) - where - F: FnOnce(), - T: Into>, - { - let _env_lock = ENV_LOCK.lock().expect("env test lock poisoned"); - let env_vars = env_vars.into(); - for (k, v) in env_vars.iter() { - std::env::set_var(k, v); - } - f(); - for (k, _) in env_vars { - std::env::remove_var(k); - } - } #[test] fn test_append_signal_path_to_generic_env() { diff --git a/opentelemetry-otlp/src/exporter/mod.rs b/opentelemetry-otlp/src/exporter/mod.rs index a6236bc61f..eafd82e1aa 100644 --- a/opentelemetry-otlp/src/exporter/mod.rs +++ b/opentelemetry-otlp/src/exporter/mod.rs @@ -223,3 +223,78 @@ impl WithExportConfig for B { self } } + +#[cfg(any(feature = "grpc-tonic", feature = "http-proto"))] +fn parse_header_string(value: &str) -> impl Iterator { + value + .split_terminator(',') + .map(str::trim) + .filter_map(parse_header_key_value_string) +} + +#[cfg(any(feature = "grpc-tonic", feature = "http-proto"))] +fn parse_header_key_value_string(key_value_string: &str) -> Option<(&str, &str)> { + key_value_string + .split_once('=') + .map(|(key, value)| (key.trim(), value.trim())) + .filter(|(key, value)| !key.is_empty() && !value.is_empty()) +} + +#[cfg(test)] +#[cfg(any(feature = "grpc-tonic", feature = "http-proto"))] +mod tests { + // Make sure env tests are not running concurrently + static ENV_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(()); + + pub(crate) fn run_env_test(env_vars: T, f: F) + where + F: FnOnce(), + T: Into>, + { + let _env_lock = ENV_LOCK.lock().expect("env test lock poisoned"); + let env_vars = env_vars.into(); + for (k, v) in env_vars.iter() { + std::env::set_var(k, v); + } + f(); + for (k, _) in env_vars { + std::env::remove_var(k); + } + } + + #[test] + fn test_parse_header_string() { + let test_cases = vec![ + // Format: (input_str, expected_headers) + ("k1=v1", vec![("k1", "v1")]), + ("k1=v1,k2=v2", vec![("k1", "v1"), ("k2", "v2")]), + ("k1=v1=10,k2,k3", vec![("k1", "v1=10")]), + ("k1=v1,,,k2,k3=10", vec![("k1", "v1"), ("k3", "10")]), + ]; + + for (input_str, expected_headers) in test_cases { + assert_eq!( + super::parse_header_string(input_str).collect::>(), + expected_headers, + ) + } + } + + #[test] + fn test_parse_header_key_value_string() { + let test_cases = vec![ + // Format: (input_str, expected_header) + ("k1=v1", Some(("k1", "v1"))), + ("", None), + ("=v1", None), + ("k1=", None), + ]; + + for (input_str, expected_headers) in test_cases { + assert_eq!( + super::parse_header_key_value_string(input_str), + expected_headers, + ) + } + } +} diff --git a/opentelemetry-otlp/src/exporter/tonic/mod.rs b/opentelemetry-otlp/src/exporter/tonic/mod.rs index 30840157be..7f5f8709b2 100644 --- a/opentelemetry-otlp/src/exporter/tonic/mod.rs +++ b/opentelemetry-otlp/src/exporter/tonic/mod.rs @@ -1,7 +1,9 @@ use std::env; use std::fmt::{Debug, Formatter}; +use std::str::FromStr; use std::time::Duration; +use http::{HeaderMap, HeaderName, HeaderValue}; use tonic::codec::CompressionEncoding; use tonic::metadata::{KeyAndValueRef, MetadataMap}; use tonic::service::Interceptor; @@ -9,11 +11,11 @@ use tonic::transport::Channel; #[cfg(feature = "tls")] use tonic::transport::ClientTlsConfig; -use super::default_headers; +use super::{default_headers, parse_header_string}; use crate::exporter::Compression; use crate::{ ExportConfig, OTEL_EXPORTER_OTLP_COMPRESSION, OTEL_EXPORTER_OTLP_ENDPOINT, - OTEL_EXPORTER_OTLP_TIMEOUT, + OTEL_EXPORTER_OTLP_HEADERS, OTEL_EXPORTER_OTLP_TIMEOUT, }; #[cfg(feature = "logs")] @@ -213,11 +215,17 @@ impl TonicExporterBuilder { signal_endpoint_path: &str, signal_timeout_var: &str, signal_compression_var: &str, + signal_headers_var: &str, ) -> Result<(Channel, BoxInterceptor, Option), crate::Error> { let tonic_config = self.tonic_config; let compression = resolve_compression(&tonic_config, signal_compression_var)?; - let metadata = tonic_config.metadata.unwrap_or_default(); + let headers_from_env = parse_headers_from_env(signal_headers_var); + let metadata = merge_metadata_with_headers_from_env( + tonic_config.metadata.unwrap_or_default(), + headers_from_env, + ); + let add_metadata = move |mut req: tonic::Request<()>| { for key_and_value in metadata.iter() { match key_and_value { @@ -294,6 +302,7 @@ impl TonicExporterBuilder { "/v1/logs", crate::logs::OTEL_EXPORTER_OTLP_LOGS_TIMEOUT, crate::logs::OTEL_EXPORTER_OTLP_LOGS_COMPRESSION, + crate::logs::OTEL_EXPORTER_OTLP_LOGS_HEADERS, )?; let client = TonicLogsClient::new(channel, interceptor, compression); @@ -316,6 +325,7 @@ impl TonicExporterBuilder { "/v1/metrics", crate::metric::OTEL_EXPORTER_OTLP_METRICS_TIMEOUT, crate::metric::OTEL_EXPORTER_OTLP_METRICS_COMPRESSION, + crate::metric::OTEL_EXPORTER_OTLP_METRICS_HEADERS, )?; let client = TonicMetricsClient::new(channel, interceptor, compression); @@ -339,6 +349,7 @@ impl TonicExporterBuilder { "/v1/traces", crate::span::OTEL_EXPORTER_OTLP_TRACES_TIMEOUT, crate::span::OTEL_EXPORTER_OTLP_TRACES_COMPRESSION, + crate::span::OTEL_EXPORTER_OTLP_TRACES_HEADERS, )?; let client = TonicTracesClient::new(channel, interceptor, compression); @@ -347,11 +358,44 @@ impl TonicExporterBuilder { } } +fn merge_metadata_with_headers_from_env( + metadata: MetadataMap, + headers_from_env: HeaderMap, +) -> MetadataMap { + if headers_from_env.is_empty() { + metadata + } else { + let mut existing_headers: HeaderMap = metadata.into_headers(); + existing_headers.extend(headers_from_env); + + MetadataMap::from_headers(existing_headers) + } +} + +fn parse_headers_from_env(signal_headers_var: &str) -> HeaderMap { + env::var(signal_headers_var) + .or_else(|_| env::var(OTEL_EXPORTER_OTLP_HEADERS)) + .map(|input| { + parse_header_string(&input) + .filter_map(|(key, value)| { + Some(( + HeaderName::from_str(key).ok()?, + HeaderValue::from_str(value).ok()?, + )) + }) + .collect::() + }) + .unwrap_or_default() +} + #[cfg(test)] mod tests { + use crate::exporter::tests::run_env_test; #[cfg(feature = "gzip-tonic")] use crate::exporter::Compression; use crate::TonicExporterBuilder; + use crate::{OTEL_EXPORTER_OTLP_HEADERS, OTEL_EXPORTER_OTLP_TRACES_HEADERS}; + use http::{HeaderMap, HeaderName, HeaderValue}; use tonic::metadata::{MetadataMap, MetadataValue}; #[test] @@ -393,4 +437,62 @@ mod tests { let builder = TonicExporterBuilder::default().with_compression(Compression::Gzip); assert_eq!(builder.tonic_config.compression.unwrap(), Compression::Gzip); } + + #[test] + fn test_parse_headers_from_env() { + run_env_test( + vec![ + (OTEL_EXPORTER_OTLP_TRACES_HEADERS, "k1=v1,k2=v2"), + (OTEL_EXPORTER_OTLP_HEADERS, "k3=v3"), + ], + || { + assert_eq!( + super::parse_headers_from_env(OTEL_EXPORTER_OTLP_TRACES_HEADERS), + HeaderMap::from_iter([ + ( + HeaderName::from_static("k1"), + HeaderValue::from_static("v1") + ), + ( + HeaderName::from_static("k2"), + HeaderValue::from_static("v2") + ), + ]) + ); + + assert_eq!( + super::parse_headers_from_env("EMPTY_ENV"), + HeaderMap::from_iter([( + HeaderName::from_static("k3"), + HeaderValue::from_static("v3") + )]) + ); + }, + ) + } + + #[test] + fn test_merge_metadata_with_headers_from_env() { + run_env_test( + vec![(OTEL_EXPORTER_OTLP_TRACES_HEADERS, "k1=v1,k2=v2")], + || { + let headers_from_env = + super::parse_headers_from_env(OTEL_EXPORTER_OTLP_TRACES_HEADERS); + + let mut metadata = MetadataMap::new(); + metadata.insert("foo", "bar".parse().unwrap()); + metadata.insert("k1", "v0".parse().unwrap()); + + let result = + super::merge_metadata_with_headers_from_env(metadata, headers_from_env); + + assert_eq!( + result.get("foo").unwrap(), + MetadataValue::from_static("bar") + ); + assert_eq!(result.get("k1").unwrap(), MetadataValue::from_static("v1")); + assert_eq!(result.get("k2").unwrap(), MetadataValue::from_static("v2")); + }, + ); + } } diff --git a/opentelemetry-otlp/src/lib.rs b/opentelemetry-otlp/src/lib.rs index 0153d60218..1c8b41390e 100644 --- a/opentelemetry-otlp/src/lib.rs +++ b/opentelemetry-otlp/src/lib.rs @@ -349,12 +349,12 @@ pub enum Error { RequestFailed(#[from] opentelemetry_http::HttpError), /// The provided value is invalid in HTTP headers. - #[cfg(feature = "http-proto")] + #[cfg(any(feature = "grpc-tonic", feature = "http-proto"))] #[error("http header value error {0}")] InvalidHeaderValue(#[from] http::header::InvalidHeaderValue), /// The provided name is invalid in HTTP headers. - #[cfg(feature = "http-proto")] + #[cfg(any(feature = "grpc-tonic", feature = "http-proto"))] #[error("http header name error {0}")] InvalidHeaderName(#[from] http::header::InvalidHeaderName),