Skip to content

Commit

Permalink
use checkpoints of metrics for collection
Browse files Browse the repository at this point in the history
In order to collect the current value of a metric, and reset it in
the case of a delta temporality, the reader process first must
checkpoint each metric. Checkpointing copies the current value to
a checkpoint field of the aggregation record and resets the value
and start time if needed. Then the checkpoint field is used to get
the metric value when collecting.

This is the easiest method to collect metrics without potentially
losing measurements during a reset that works across all types of
aggregations. There may be optimizations that can be done per
aggregation in the future.

The aggregation checkpoint function is called for each metric
individually, but this should likely change to have it so the
reader only has to call checkpoint for an aggregation and it
runs against all metrics in the table.
  • Loading branch information
Tristan Sloughter committed Jul 19, 2022
1 parent c601a26 commit b7328db
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 51 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,26 @@
%%%------------------------------------------------------------------------
%% Copyright 2022, OpenTelemetry Authors
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%
%% @doc
%% @end
%%%-------------------------------------------------------------------------
-module(otel_aggregation_histogram_explicit).

-export([init/2,
aggregate/3,
collect/5]).
checkpoint/5,
collect/4]).

-include("otel_metrics.hrl").

Expand Down Expand Up @@ -59,16 +77,29 @@ aggregate(MeasurementValue,
Aggregation#explicit_histogram_aggregation{bucket_counts=Buckets1,
sum=Sum+MeasurementValue}.

collect(_, _AggregationTemporality, CollectionStartNano,
#explicit_histogram_aggregation{
start_time_unix_nano=StartTimeUnixNano,
boundaries=Boundaries,
bucket_counts=Buckets,
record_min_max=_RecordMinMax,
min=Min,
max=Max,
sum=Sum
}, Attributes) ->
%% TODO: handle delta temporary checkpoints
checkpoint(_Tab, _Name, _, _, _CollectionStartNano) ->
ok.

collect(Tab, Name, _, CollectionStartTime) ->
Select = [{'$1',
[{'==', Name, {element, 1, {element, 2, '$1'}}}],
['$1']}],
AttributesAggregation = ets:select(Tab, Select),
[datapoint(CollectionStartTime, SumAgg) || SumAgg <- AttributesAggregation].

%%

datapoint(CollectionStartNano, #explicit_histogram_aggregation{
key={_, Attributes},
start_time_unix_nano=StartTimeUnixNano,
boundaries=Boundaries,
bucket_counts=Buckets,
record_min_max=_RecordMinMax,
min=Min,
max=Max,
sum=Sum
}) ->
#histogram_datapoint{
attributes=Attributes,
start_time_unix_nano=StartTimeUnixNano,
Expand All @@ -83,9 +114,6 @@ collect(_, _AggregationTemporality, CollectionStartNano,
max=Max
}.


%%

zero_buckets(Size) ->
erlang:list_to_tuple(lists:duplicate(Size, 0)).

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,26 @@
%%%------------------------------------------------------------------------
%% Copyright 2022, OpenTelemetry Authors
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%
%% @doc
%% @end
%%%-------------------------------------------------------------------------
-module(otel_aggregation_last_value).

-export([init/2,
aggregate/3,
collect/5]).
checkpoint/5,
collect/4]).

-include("otel_metrics.hrl").

Expand All @@ -23,11 +41,20 @@ aggregate(Tab, Key, Value) ->
ets:insert(Tab, Metric#last_value_aggregation{value=Value})
end.

collect(_Tab, _, _, #last_value_aggregation{value=undefined}, _) ->
undefined;
collect(Tab, _, CollectionStartNano, #last_value_aggregation{key=Key,
value=Value}, Attributes) ->
_ = ets:update_element(Tab, Key, {#last_value_aggregation.value, undefined}),
checkpoint(_Tab, _Name, _, _, _CollectionStartNano) ->
ok.

collect(Tab, Name, _, CollectionStartTime) ->
Select = [{'$1',
[{'==', Name, {element, 1, {element, 2, '$1'}}}],
['$1']}],
AttributesAggregation = ets:select(Tab, Select),
[datapoint(CollectionStartTime, SumAgg) || SumAgg <- AttributesAggregation].

%%

datapoint(CollectionStartNano, #last_value_aggregation{key={_, Attributes},
value=Value}) ->
#datapoint{attributes=Attributes,
time_unix_nano=CollectionStartNano,
value=Value,
Expand Down
61 changes: 52 additions & 9 deletions apps/opentelemetry_experimental/src/otel_aggregation_sum.erl
Original file line number Diff line number Diff line change
@@ -1,8 +1,26 @@
%%%------------------------------------------------------------------------
%% Copyright 2022, OpenTelemetry Authors
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%
%% @doc
%% @end
%%%-------------------------------------------------------------------------
-module(otel_aggregation_sum).

-export([init/2,
aggregate/3,
collect/5]).
checkpoint/5,
collect/4]).

-include("otel_metrics.hrl").
-include_lib("opentelemetry_api_experimental/include/otel_metrics.hrl").
Expand Down Expand Up @@ -32,15 +50,40 @@ aggregate(Tab, Key, Value) ->
true
end.

collect(Tab, ?AGGREGATION_TEMPORALITY_DELTA, CollectionStartNano, ActiveMetric=#sum_aggregation{key=Key}, Attributes) ->
_ = ets:update_element(Tab, Key, [{#sum_aggregation.start_time_unix_nano, CollectionStartNano},
{#sum_aggregation.value, 0}]),
datapoint(CollectionStartNano, ActiveMetric, Attributes);
collect(_Tab, _AggregationTemporality, CollectionStartNano, ActiveMetric, Attributes) ->
datapoint(CollectionStartNano, ActiveMetric, Attributes).
checkpoint(Tab, Name, ?AGGREGATION_TEMPORALITY_DELTA, ?VALUE_TYPE_INTEGER, CollectionStartNano) ->
MS = [{#sum_aggregation{key='$1',
value='$2',
_='_'},
[{'=:=', {element, 1, '$1'}, {const, Name}}],
[{#sum_aggregation{key='$1',
start_time_unix_nano={const, CollectionStartNano},
checkpoint='$2',
value=0}}]}],
_ = ets:select_replace(Tab, MS),
ok;
checkpoint(Tab, Name, ?AGGREGATION_TEMPORALITY_CUMULATIVE, ?VALUE_TYPE_INTEGER, _CollectionStartNano) ->
MS = [{#sum_aggregation{key='$1',
start_time_unix_nano='$2',
value='$3',
_='_'},
[{'=:=', {element, 1, '$1'}, {const, Name}}],
[{#sum_aggregation{key='$1',
start_time_unix_nano='$2',
checkpoint='$3',
value='$3'}}]}],
_ = ets:select_replace(Tab, MS),
ok.

datapoint(CollectionStartNano, #sum_aggregation{start_time_unix_nano=StartTimeUnixNano,
value=Value}, Attributes) ->
collect(Tab, Name, _, CollectionStartTime) ->
Select = [{'$1',
[{'==', Name, {element, 1, {element, 2, '$1'}}}],
['$1']}],
AttributesAggregation = ets:select(Tab, Select),
[datapoint(CollectionStartTime, SumAgg) || SumAgg <- AttributesAggregation].

datapoint(CollectionStartNano, #sum_aggregation{key={_, Attributes},
start_time_unix_nano=StartTimeUnixNano,
checkpoint=Value}) ->
#datapoint{
attributes=Attributes,
start_time_unix_nano=StartTimeUnixNano,
Expand Down
2 changes: 0 additions & 2 deletions apps/opentelemetry_experimental/src/otel_meter_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,9 @@ create_tables() ->
%% elements are {Instrument, [ViewAggregation]}
ViewAggregationTab = ets:new(view_aggregation_tab, [set,
protected,
{read_concurrency, true},
{keypos, 1}]),
MetricsTab = ets:new(metrics_tab, [set,
public,
{read_concurrency, true},
{keypos, 2}]),
{ViewAggregationTab, MetricsTab}.

Expand Down
31 changes: 12 additions & 19 deletions apps/opentelemetry_experimental/src/otel_metric_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ collect_(ViewAggregationTab, MetricsTab) ->
%% use the information (temporality) from the VIEW_AGGREGATIONS_TAB entry to reset the
%% METRICS_TAB entry value (like setting value back to 0 for DELTA)

ets:foldl(fun({#instrument{value_type=_ValueType,
ets:foldl(fun({#instrument{value_type=ValueType,
unit=Unit}, ViewAggregations}, MetricsAcc) ->
lists:foldl(fun(#view_aggregation{aggregation_module=otel_aggregation_drop}, Acc) ->
Acc;
Expand All @@ -137,12 +137,10 @@ collect_(ViewAggregationTab, MetricsTab) ->
temporality=Temporality,
is_monotonic=IsMonotonic
}, Acc) ->
Select = [{'$1',
[{'==', Name, {element, 1, {element, 2, '$1'}}}],
['$1']}],
AttributesAggregation = ets:select(MetricsTab, Select),
Data = data(AggregationModule, Temporality, IsMonotonic,
AttributesAggregation, CollectionStartTime, MetricsTab),
AggregationModule:checkpoint(MetricsTab, Name, Temporality,
ValueType, CollectionStartTime),
Data = data(AggregationModule, Name, Temporality, IsMonotonic,
CollectionStartTime, MetricsTab),

[metric(Name, Description, Unit, Data) | Acc]
end, MetricsAcc, ViewAggregations)
Expand All @@ -156,23 +154,18 @@ metric(Name, Description, Unit, Data) ->
unit=Unit,
data=Data}.

data(otel_aggregation_sum, Temporality, IsMonotonic, AttributesAggregation, CollectionStartTime, MetricTab) ->
Datapoints = lists:foldl(fun(Aggregation=#sum_aggregation{key={_, Attributes}}, Acc) ->
[otel_aggregation_sum:collect(MetricTab, Temporality, CollectionStartTime, Aggregation, Attributes) | Acc]
end, [], AttributesAggregation),
data(otel_aggregation_sum, Name, Temporality, IsMonotonic, CollectionStartTime, MetricTab) ->
Datapoints = otel_aggregation_sum:collect(MetricTab, Name, Temporality, CollectionStartTime),
#sum{
aggregation_temporality=Temporality,
is_monotonic=IsMonotonic,
datapoints=Datapoints};
data(otel_aggregation_last_value, Temporality, _IsMonotonic, AttributesAggregation, CollectionStartTime, MetricTab) ->
Datapoints = lists:foldl(fun(Aggregation=#last_value_aggregation{key={_, Attributes}}, Acc) ->
[otel_aggregation_last_value:collect(MetricTab, Temporality, CollectionStartTime, Aggregation, Attributes) | Acc]
end, [], AttributesAggregation),
data(otel_aggregation_last_value, Name, Temporality, _IsMonotonic, CollectionStartTime, MetricTab) ->
Datapoints = otel_aggregation_last_value:collect(MetricTab, Name, Temporality, CollectionStartTime),
#gauge{datapoints=Datapoints};
data(otel_aggregation_histogram_explicit, Temporality, _IsMonotonic, AttributesAggregation, CollectionStartTime, MetricTab) ->
Datapoints = lists:foldl(fun(Aggregation=#explicit_histogram_aggregation{key={_, Attributes}}, Acc) ->
[otel_aggregation_histogram_explicit:collect(MetricTab, Temporality, CollectionStartTime, Aggregation, Attributes) | Acc]
end, [], AttributesAggregation),
data(otel_aggregation_histogram_explicit, Name, Temporality, _IsMonotonic, CollectionStartTime, MetricTab) ->
Datapoints = otel_aggregation_histogram_explicit:collect(MetricTab, Name, Temporality, CollectionStartTime),
#histogram{datapoints=Datapoints,
aggregation_temporality=Temporality
}.

3 changes: 2 additions & 1 deletion apps/opentelemetry_experimental/src/otel_metrics.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
%% TODO: attributes should be a tuple of just the values, sorted by attribute name
key :: {term(), opentelemetry:attributes_map()},
start_time_unix_nano :: integer(),
value :: number() | undefined
value :: number() | undefined,
checkpoint :: number() | undefined
}).

-record(last_value_aggregation,
Expand Down

0 comments on commit b7328db

Please sign in to comment.