Skip to content

Commit

Permalink
[chore][pkg/stanza] Remove unnecessary parameter in recombine operator (
Browse files Browse the repository at this point in the history
open-telemetry#30930)

This PR is just a minor cleanup step broken out from open-telemetry#30784
  • Loading branch information
djaglowski authored and cparkins committed Feb 1, 2024
1 parent fa656f4 commit 23f23c4
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 15 deletions.
20 changes: 7 additions & 13 deletions pkg/stanza/operator/transformer/recombine/recombine.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (r *Transformer) flushLoop() {
if timeSinceFirstEntry < r.forceFlushTimeout {
continue
}
if err := r.flushSource(context.Background(), source, true); err != nil {
if err := r.flushSource(context.Background(), source); err != nil {
r.Errorf("there was error flushing combined logs %s", err)
}
}
Expand Down Expand Up @@ -241,7 +241,7 @@ func (r *Transformer) Process(ctx context.Context, e *entry.Entry) error {
// This is the first entry in the next batch
case matches && r.matchIndicatesFirst():
// Flush the existing batch
err := r.flushSource(ctx, s, true)
err := r.flushSource(ctx, s)
if err != nil {
return err
}
Expand All @@ -252,7 +252,7 @@ func (r *Transformer) Process(ctx context.Context, e *entry.Entry) error {
// This is the last entry in a complete batch
case matches && r.matchIndicatesLast():
r.addToBatch(ctx, e, s)
return r.flushSource(ctx, s, true)
return r.flushSource(ctx, s)
}

// This is neither the first entry of a new log,
Expand Down Expand Up @@ -302,7 +302,7 @@ func (r *Transformer) addToBatch(ctx context.Context, e *entry.Entry, source str
batch.recombined.WriteString(s)

if (r.maxLogSize > 0 && int64(batch.recombined.Len()) > r.maxLogSize) || len(batch.entries) >= r.maxBatchSize {
if err := r.flushSource(ctx, source, false); err != nil {
if err := r.flushSource(ctx, source); err != nil {
r.Errorf("there was error flushing combined logs %s", err)
}
}
Expand All @@ -326,7 +326,7 @@ func (r *Transformer) flushUncombined(ctx context.Context) {
func (r *Transformer) flushAllSources(ctx context.Context) {
var errs []error
for source := range r.batchMap {
if err := r.flushSource(ctx, source, true); err != nil {
if err := r.flushSource(ctx, source); err != nil {
errs = append(errs, err)
}
}
Expand All @@ -337,7 +337,7 @@ func (r *Transformer) flushAllSources(ctx context.Context) {

// flushSource combines the entries currently in the batch into a single entry,
// then forwards them to the next operator in the pipeline
func (r *Transformer) flushSource(ctx context.Context, source string, deleteSource bool) error {
func (r *Transformer) flushSource(ctx context.Context, source string) error {
batch := r.batchMap[source]
// Skip flushing a combined log if the batch is empty
if batch == nil {
Expand Down Expand Up @@ -366,13 +366,7 @@ func (r *Transformer) flushSource(ctx context.Context, source string, deleteSour
}

r.Write(ctx, base)
if deleteSource {
r.removeBatch(source)
} else {
batch.entries = batch.entries[:0]
batch.recombined.Reset()
}

r.removeBatch(source)
return nil
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/stanza/operator/transformer/recombine/recombine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,9 +756,8 @@ func TestSourceBatchDelete(t *testing.T) {
ctx := context.Background()

require.NoError(t, recombine.Process(ctx, start))
require.NoError(t, recombine.Process(ctx, next))
require.Equal(t, 1, len(recombine.batchMap))
require.NoError(t, recombine.flushSource(ctx, "file1", true))
require.NoError(t, recombine.Process(ctx, next))
require.Equal(t, 0, len(recombine.batchMap))
fake.ExpectEntry(t, expect)
require.NoError(t, recombine.Stop())
Expand Down

0 comments on commit 23f23c4

Please sign in to comment.