Skip to content

Commit

Permalink
fix(cascading_filter): fix leak on traces where decision was made
Browse files Browse the repository at this point in the history
  • Loading branch information
pmm-sumo committed Jun 14, 2022
1 parent 32cfc1e commit 21390c9
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 3 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- feat: add logstransformprocessor from upstream [#604]
- feat: build arm64 binary for darwin [#611]

### Fixed
- fix(cascadingfilter): fix leak memory leak on late arriving traces where decision was already made [#616]

### Changed

[Unreleased]: https://github.com/SumoLogic/sumologic-otel-collector/compare/v0.52.0-sumo-0...main
[#595]: https://github.com/SumoLogic/sumologic-otel-collector/pull/595
[#604]: https://github.com/SumoLogic/sumologic-otel-collector/pull/604
[#611]: https://github.com/SumoLogic/sumologic-otel-collector/pull/611
[#616]: https://github.com/SumoLogic/sumologic-otel-collector/pull/616

## [v0.52.0-sumo-0]

Expand Down
143 changes: 143 additions & 0 deletions pkg/processor/cascadingfilterprocessor/long_running_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cascadingfilterprocessor

import (
"context"
"testing"
"time"

"github.com/SumoLogic/sumologic-otel-collector/pkg/processor/cascadingfilterprocessor/bigendianconverter"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"

cfconfig "github.com/SumoLogic/sumologic-otel-collector/pkg/processor/cascadingfilterprocessor/config"
"github.com/SumoLogic/sumologic-otel-collector/pkg/processor/cascadingfilterprocessor/sampling"
)

func TestRandomTraceProcessing(t *testing.T) {
t.Skip("a long running test, please run manually")
stepDuration := time.Millisecond * 100
decisionWait := time.Second * 1

tsp := newLongRunningTraceProcessor(t, decisionWait)
allIDs := map[pcommon.TraceID]interface{}{}

for i := 0; i < 100; i++ {
ids, batches := generateTraces(i*2, 100, 1, 1)
for _, td := range batches {
if err := tsp.ConsumeTraces(context.Background(), td); err != nil {
t.Errorf("Failed consuming traces: %v", err)
}
}
for _, id := range ids {
allIDs[id] = true
}
time.Sleep(stepDuration)
}

time.Sleep(decisionWait * 3)

for traceId := range allIDs {
d, ok := tsp.idToTrace.Load(traceKey(traceId.Bytes()))
if ok {
v := d.(*sampling.TraceData)
require.Empty(t, v.ReceivedBatches)
}
}
}

func TestTraceProcessing(t *testing.T) {
t.Skip("a long running test, please run manually")

decisionWait := time.Second * 1

tsp := newLongRunningTraceProcessor(t, decisionWait)

allIDs := generateAndConsumeTraces(t, tsp)
time.Sleep(decisionWait * 3)
assertTracesEmpty(t, tsp, allIDs)

allIDs = generateAndConsumeTraces(t, tsp)
time.Sleep(decisionWait * 3)
assertTracesEmpty(t, tsp, allIDs)
}

func generateTraces(startingTraceId int, traceCount int, spanCount int, increaseMultiplier int) ([]pcommon.TraceID, []ptrace.Traces) {
traceIds := make([]pcommon.TraceID, traceCount)
spanID := 0
var tds []ptrace.Traces
for i := 0; i < traceCount; i++ {
traceIds[i] = bigendianconverter.UInt64ToTraceID(1, uint64(startingTraceId+(i*increaseMultiplier)))
// Send each span in a separate batch
for j := 0; j < spanCount; j++ {
td := simpleTraces()
span := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0)
span.SetTraceID(traceIds[i])

spanID++
span.SetSpanID(bigendianconverter.UInt64ToSpanID(uint64(spanID)))
tds = append(tds, td)
}
}

return traceIds, tds
}

func newLongRunningTraceProcessor(t *testing.T, decisionWait time.Duration) *cascadingFilterSpanProcessor {
outputRate := int32(10)

id1 := config.NewComponentIDWithName("cascading_filter", "1")
ps1 := config.NewProcessorSettings(id1)
cfg := cfconfig.Config{
ProcessorSettings: &ps1,
DecisionWait: decisionWait,
ProbabilisticFilteringRate: &outputRate,
NumTraces: 100,
}
sp, err := newTraceProcessor(zap.NewNop(), consumertest.NewNop(), cfg)
require.NoError(t, err)
return sp.(*cascadingFilterSpanProcessor)
}

func generateAndConsumeTraces(t *testing.T, tsp *cascadingFilterSpanProcessor) map[pcommon.TraceID]interface{} {
allIDs := map[pcommon.TraceID]interface{}{}

ids, batches := generateTraces(0, 1, 11, 0)
for _, td := range batches {
if err := tsp.ConsumeTraces(context.Background(), td); err != nil {
t.Errorf("Failed consuming traces: %v", err)
}
}
for _, id := range ids {
allIDs[id] = true
}

return allIDs
}

func assertTracesEmpty(t *testing.T, tsp *cascadingFilterSpanProcessor, allIDs map[pcommon.TraceID]interface{}) {
for traceId := range allIDs {
d, ok := tsp.idToTrace.Load(traceKey(traceId.Bytes()))
if ok {
v := d.(*sampling.TraceData)
require.Empty(t, v.ReceivedBatches)
}
}
}
15 changes: 12 additions & 3 deletions pkg/processor/cascadingfilterprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,6 @@ func (cfsp *cascadingFilterSpanProcessor) processTraces(ctx context.Context, res

actualData := d.(*sampling.TraceData)
if loaded {
// PMM: why actualData is not updated with new trace?
atomic.AddInt32(&actualData.SpanCount, lenSpans)
} else {
newTraceIDs++
Expand All @@ -662,9 +661,19 @@ func (cfsp *cascadingFilterSpanProcessor) processTraces(ctx context.Context, res
// Add the spans to the trace, but only once for all policy, otherwise same spans will
// be duplicated in the final trace.
actualData.Lock()
traceTd := prepareTraceBatch(resourceSpans, spans)
actualData.ReceivedBatches = append(actualData.ReceivedBatches, traceTd)
finalDecision := actualData.FinalDecision

// If decision is pending, we want to add the new spans still under the lock, so the decision doesn't happen
// in between the transition from pending.
if finalDecision == sampling.Pending || finalDecision == sampling.Unspecified {
// Add the spans to the trace, but only once for all policy, otherwise same spans will
// be duplicated in the final trace.
traceTd := prepareTraceBatch(resourceSpans, spans)
actualData.ReceivedBatches = append(actualData.ReceivedBatches, traceTd)
actualData.Unlock()
break
}

actualData.Unlock()

// This section is run in case the decision was already applied earlier
Expand Down

0 comments on commit 21390c9

Please sign in to comment.