Skip to content

Commit

Permalink
Add collect methods on ValueMap (open-telemetry#2267)
Browse files Browse the repository at this point in the history
Co-authored-by: Cijo Thomas <[email protected]>
Co-authored-by: Lalit Kumar Bhasin <[email protected]>
  • Loading branch information
3 people authored Nov 6, 2024
1 parent 1fd871a commit ac0ea9f
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 423 deletions.
184 changes: 38 additions & 146 deletions opentelemetry-sdk/src/metrics/internal/histogram.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::collections::HashSet;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::mem::replace;
use std::ops::DerefMut;
use std::{sync::Mutex, time::SystemTime};

use crate::metrics::data::HistogramDataPoint;
Expand Down Expand Up @@ -37,6 +36,14 @@ where
buckets: Mutex::new(Buckets::<T>::new(*count)),
}
}

fn clone_and_reset(&self, count: &usize) -> Self {
let mut current = self.buckets.lock().unwrap_or_else(|err| err.into_inner());
let cloned = replace(current.deref_mut(), Buckets::new(*count));
Self {
buckets: Mutex::new(cloned),
}
}
}

#[derive(Default)]
Expand Down Expand Up @@ -73,16 +80,6 @@ impl<T: Number> Buckets<T> {
self.max = value
}
}

fn reset(&mut self) {
for item in &mut self.counts {
*item = 0;
}
self.count = Default::default();
self.total = Default::default();
self.min = T::max();
self.max = T::min();
}
}

/// Summarizes a set of measurements as a histogram with explicitly defined
Expand Down Expand Up @@ -139,11 +136,6 @@ impl<T: Number> Histogram<T> {
dest: Option<&mut dyn Aggregation>,
) -> (usize, Option<Box<dyn Aggregation>>) {
let t = SystemTime::now();
let start = self
.start
.lock()
.map(|s| *s)
.unwrap_or_else(|_| SystemTime::now());
let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::Histogram<T>>());
let mut new_agg = if h.is_none() {
Some(data::Histogram {
Expand All @@ -155,24 +147,22 @@ impl<T: Number> Histogram<T> {
};
let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
h.temporality = Temporality::Delta;
h.data_points.clear();

// Max number of data points need to account for the special casing
// of the no attribute value + overflow attribute.
let n = self.value_map.count.load(Ordering::SeqCst) + 2;
if n > h.data_points.capacity() {
h.data_points.reserve_exact(n - h.data_points.capacity());
}

if self
.value_map
.has_no_attribute_value
.swap(false, Ordering::AcqRel)
{
if let Ok(ref mut b) = self.value_map.no_attribute_tracker.buckets.lock() {
h.data_points.push(HistogramDataPoint {
attributes: vec![],
start_time: start,
let prev_start = self
.start
.lock()
.map(|mut start| replace(start.deref_mut(), t))
.unwrap_or(t);

self.value_map
.collect_and_reset(&mut h.data_points, |attributes, aggr| {
let b = aggr
.buckets
.into_inner()
.unwrap_or_else(|err| err.into_inner());
HistogramDataPoint {
attributes,
start_time: prev_start,
time: t,
count: b.count,
bounds: self.bounds.clone(),
Expand All @@ -193,54 +183,8 @@ impl<T: Number> Histogram<T> {
None
},
exemplars: vec![],
});

b.reset();
}
}

let mut trackers = match self.value_map.trackers.write() {
Ok(v) => v,
Err(_) => return (0, None),
};

let mut seen = HashSet::new();
for (attrs, tracker) in trackers.drain() {
if seen.insert(Arc::as_ptr(&tracker)) {
if let Ok(b) = tracker.buckets.lock() {
h.data_points.push(HistogramDataPoint {
attributes: attrs.clone(),
start_time: start,
time: t,
count: b.count,
bounds: self.bounds.clone(),
bucket_counts: b.counts.clone(),
sum: if self.record_sum {
b.total
} else {
T::default()
},
min: if self.record_min_max {
Some(b.min)
} else {
None
},
max: if self.record_min_max {
Some(b.max)
} else {
None
},
exemplars: vec![],
});
}
}
}

// The delta collection cycle resets.
if let Ok(mut start) = self.start.lock() {
*start = t;
}
self.value_map.count.store(0, Ordering::SeqCst);
});

(h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>))
}
Expand All @@ -250,11 +194,6 @@ impl<T: Number> Histogram<T> {
dest: Option<&mut dyn Aggregation>,
) -> (usize, Option<Box<dyn Aggregation>>) {
let t = SystemTime::now();
let start = self
.start
.lock()
.map(|s| *s)
.unwrap_or_else(|_| SystemTime::now());
let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::Histogram<T>>());
let mut new_agg = if h.is_none() {
Some(data::Histogram {
Expand All @@ -266,24 +205,19 @@ impl<T: Number> Histogram<T> {
};
let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
h.temporality = Temporality::Cumulative;
h.data_points.clear();

// Max number of data points need to account for the special casing
// of the no attribute value + overflow attribute.
let n = self.value_map.count.load(Ordering::SeqCst) + 2;
if n > h.data_points.capacity() {
h.data_points.reserve_exact(n - h.data_points.capacity());
}
let prev_start = self
.start
.lock()
.map(|s| *s)
.unwrap_or_else(|_| SystemTime::now());

if self
.value_map
.has_no_attribute_value
.load(Ordering::Acquire)
{
if let Ok(b) = &self.value_map.no_attribute_tracker.buckets.lock() {
h.data_points.push(HistogramDataPoint {
attributes: vec![],
start_time: start,
self.value_map
.collect_readonly(&mut h.data_points, |attributes, aggr| {
let b = aggr.buckets.lock().unwrap_or_else(|err| err.into_inner());
HistogramDataPoint {
attributes,
start_time: prev_start,
time: t,
count: b.count,
bounds: self.bounds.clone(),
Expand All @@ -304,50 +238,8 @@ impl<T: Number> Histogram<T> {
None
},
exemplars: vec![],
});
}
}

let trackers = match self.value_map.trackers.write() {
Ok(v) => v,
Err(_) => return (0, None),
};

// TODO: This will use an unbounded amount of memory if there
// are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not
// overload the system.
let mut seen = HashSet::new();
for (attrs, tracker) in trackers.iter() {
if seen.insert(Arc::as_ptr(tracker)) {
if let Ok(b) = tracker.buckets.lock() {
h.data_points.push(HistogramDataPoint {
attributes: attrs.clone(),
start_time: start,
time: t,
count: b.count,
bounds: self.bounds.clone(),
bucket_counts: b.counts.clone(),
sum: if self.record_sum {
b.total
} else {
T::default()
},
min: if self.record_min_max {
Some(b.min)
} else {
None
},
max: if self.record_min_max {
Some(b.max)
} else {
None
},
exemplars: vec![],
});
}
}
}
});

(h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>))
}
Expand Down
109 changes: 20 additions & 89 deletions opentelemetry-sdk/src/metrics/internal/last_value.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
use std::{
collections::HashSet,
sync::{atomic::Ordering, Arc, Mutex},
time::SystemTime,
};
use std::{mem::replace, ops::DerefMut, sync::Mutex, time::SystemTime};

use crate::metrics::data::DataPoint;
use opentelemetry::KeyValue;
Expand Down Expand Up @@ -33,6 +29,12 @@ where
fn update(&self, value: T) {
self.value.store(value)
}

fn clone_and_reset(&self, _: &()) -> Self {
Self {
value: T::new_atomic_tracker(self.value.get_and_reset_value()),
}
}
}

/// Summarizes a set of measurements as the last one made.
Expand All @@ -56,102 +58,31 @@ impl<T: Number> LastValue<T> {

pub(crate) fn compute_aggregation_delta(&self, dest: &mut Vec<DataPoint<T>>) {
let t = SystemTime::now();
let prev_start = self.start.lock().map(|start| *start).unwrap_or(t);
dest.clear();

// Max number of data points need to account for the special casing
// of the no attribute value + overflow attribute.
let n = self.value_map.count.load(Ordering::SeqCst) + 2;
if n > dest.capacity() {
dest.reserve_exact(n - dest.capacity());
}

if self
.value_map
.has_no_attribute_value
.swap(false, Ordering::AcqRel)
{
dest.push(DataPoint {
attributes: vec![],
let prev_start = self
.start
.lock()
.map(|mut start| replace(start.deref_mut(), t))
.unwrap_or(t);
self.value_map
.collect_and_reset(dest, |attributes, aggr| DataPoint {
attributes,
start_time: Some(prev_start),
time: Some(t),
value: self
.value_map
.no_attribute_tracker
.value
.get_and_reset_value(),
value: aggr.value.get_value(),
exemplars: vec![],
});
}

let mut trackers = match self.value_map.trackers.write() {
Ok(v) => v,
_ => return,
};

let mut seen = HashSet::new();
for (attrs, tracker) in trackers.drain() {
if seen.insert(Arc::as_ptr(&tracker)) {
dest.push(DataPoint {
attributes: attrs.clone(),
start_time: Some(prev_start),
time: Some(t),
value: tracker.value.get_value(),
exemplars: vec![],
});
}
}

// The delta collection cycle resets.
if let Ok(mut start) = self.start.lock() {
*start = t;
}
self.value_map.count.store(0, Ordering::SeqCst);
}

pub(crate) fn compute_aggregation_cumulative(&self, dest: &mut Vec<DataPoint<T>>) {
let t = SystemTime::now();
let prev_start = self.start.lock().map(|start| *start).unwrap_or(t);

dest.clear();

// Max number of data points need to account for the special casing
// of the no attribute value + overflow attribute.
let n = self.value_map.count.load(Ordering::SeqCst) + 2;
if n > dest.capacity() {
dest.reserve_exact(n - dest.capacity());
}

if self
.value_map
.has_no_attribute_value
.load(Ordering::Acquire)
{
dest.push(DataPoint {
attributes: vec![],
self.value_map
.collect_readonly(dest, |attributes, aggr| DataPoint {
attributes,
start_time: Some(prev_start),
time: Some(t),
value: self.value_map.no_attribute_tracker.value.get_value(),
value: aggr.value.get_value(),
exemplars: vec![],
});
}

let trackers = match self.value_map.trackers.write() {
Ok(v) => v,
_ => return,
};

let mut seen = HashSet::new();
for (attrs, tracker) in trackers.iter() {
if seen.insert(Arc::as_ptr(tracker)) {
dest.push(DataPoint {
attributes: attrs.clone(),
start_time: Some(prev_start),
time: Some(t),
value: tracker.value.get_value(),
exemplars: vec![],
});
}
}
}
}
Loading

0 comments on commit ac0ea9f

Please sign in to comment.