From 7ae909496cd80442925c696926b9801dc586d3c2 Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Thu, 25 Jan 2024 14:36:27 -0600 Subject: [PATCH] [pkg/stanza] Fix recombine issues --- .../operator/transformer/recombine/recombine.go | 17 +++-------------- .../transformer/recombine/recombine_test.go | 16 ++++++++-------- 2 files changed, 11 insertions(+), 22 deletions(-) diff --git a/pkg/stanza/operator/transformer/recombine/recombine.go b/pkg/stanza/operator/transformer/recombine/recombine.go index 7580f8c87055..70eb32c2471e 100644 --- a/pkg/stanza/operator/transformer/recombine/recombine.go +++ b/pkg/stanza/operator/transformer/recombine/recombine.go @@ -201,7 +201,6 @@ func (r *Transformer) Stop() error { r.flushAllSources(ctx) close(r.chClose) - return nil } @@ -239,10 +238,9 @@ func (r *Transformer) Process(ctx context.Context, e *entry.Entry) error { switch { // This is the first entry in the next batch - case matches && r.matchIndicatesFirst(): + case matches && r.matchFirstLine: // Flush the existing batch - err := r.flushSource(ctx, s) - if err != nil { + if err := r.flushSource(ctx, s); err != nil { return err } @@ -250,7 +248,7 @@ func (r *Transformer) Process(ctx context.Context, e *entry.Entry) error { r.addToBatch(ctx, e, s) return nil // This is the last entry in a complete batch - case matches && r.matchIndicatesLast(): + case matches && !r.matchFirstLine: r.addToBatch(ctx, e, s) return r.flushSource(ctx, s) } @@ -261,14 +259,6 @@ func (r *Transformer) Process(ctx context.Context, e *entry.Entry) error { return nil } -func (r *Transformer) matchIndicatesFirst() bool { - return r.matchFirstLine -} - -func (r *Transformer) matchIndicatesLast() bool { - return !r.matchFirstLine -} - // addToBatch adds the current entry to the current batch of entries that will be combined func (r *Transformer) addToBatch(ctx context.Context, e *entry.Entry, source string) { batch, ok := r.batchMap[source] @@ -303,7 +293,6 @@ func (r *Transformer) addToBatch(ctx context.Context, e *entry.Entry, source str r.Errorf("there was error flushing combined logs %s", err) } } - } // flushAllSources flushes all sources. diff --git a/pkg/stanza/operator/transformer/recombine/recombine_test.go b/pkg/stanza/operator/transformer/recombine/recombine_test.go index fffa50ce4c89..d80050f7412e 100644 --- a/pkg/stanza/operator/transformer/recombine/recombine_test.go +++ b/pkg/stanza/operator/transformer/recombine/recombine_test.go @@ -158,7 +158,7 @@ func TestTransformer(t *testing.T) { cfg.IsFirstEntry = "$body == 'test1'" cfg.OutputIDs = []string{"fake"} cfg.OverwriteWith = "newest" - cfg.ForceFlushTimeout = 100 * time.Millisecond + cfg.ForceFlushTimeout = 10 * time.Millisecond return cfg }(), []*entry.Entry{ @@ -178,7 +178,7 @@ func TestTransformer(t *testing.T) { cfg.IsFirstEntry = "body == 'start'" cfg.OutputIDs = []string{"fake"} cfg.OverwriteWith = "oldest" - cfg.ForceFlushTimeout = 100 * time.Millisecond + cfg.ForceFlushTimeout = 10 * time.Millisecond return cfg }(), []*entry.Entry{ @@ -219,8 +219,8 @@ func TestTransformer(t *testing.T) { cfg := NewConfig() cfg.CombineField = entry.NewBodyField() cfg.IsFirstEntry = `body matches "^[^\\s]"` - cfg.ForceFlushTimeout = 100 * time.Millisecond cfg.OutputIDs = []string{"fake"} + cfg.ForceFlushTimeout = 10 * time.Millisecond return cfg }(), []*entry.Entry{ @@ -252,8 +252,8 @@ func TestTransformer(t *testing.T) { cfg := NewConfig() cfg.CombineField = entry.NewBodyField("message") cfg.IsFirstEntry = `body.message matches "^[^\\s]"` - cfg.ForceFlushTimeout = 100 * time.Millisecond cfg.OutputIDs = []string{"fake"} + cfg.ForceFlushTimeout = 10 * time.Millisecond return cfg }(), []*entry.Entry{ @@ -287,7 +287,6 @@ func TestTransformer(t *testing.T) { cfg.CombineWith = "" cfg.IsLastEntry = "body.logtag == 'F'" cfg.OverwriteWith = "oldest" - cfg.ForceFlushTimeout = 100 * time.Millisecond cfg.OutputIDs = []string{"fake"} return cfg }(), @@ -501,17 +500,18 @@ func TestTransformer(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() op, err := tc.config.Build(testutil.Logger(t)) require.NoError(t, err) require.NoError(t, op.Start(testutil.NewUnscopedMockPersister())) - recombine := op.(*Transformer) + r := op.(*Transformer) fake := testutil.NewFakeOutput(t) - err = recombine.SetOutputs([]operator.Operator{fake}) + err = r.SetOutputs([]operator.Operator{fake}) require.NoError(t, err) for _, e := range tc.input { - require.NoError(t, recombine.Process(context.Background(), e)) + require.NoError(t, r.Process(ctx, e)) } fake.ExpectEntries(t, tc.expectedOutput)