Skip to content

Commit

Permalink
[processor/lsminterval]Optimize critical section (#298)
Browse files Browse the repository at this point in the history
  • Loading branch information
lahsivjar authored Jan 15, 2025
1 parent 1492674 commit 25bd658
Showing 1 changed file with 20 additions and 14 deletions.
34 changes: 20 additions & 14 deletions processor/lsmintervalprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,22 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro
return errors.Join(append(errs, fmt.Errorf("failed to marshal value to proto binary: %w", err))...)
}

if err := p.mergeToBatch(vb); err != nil {
return fmt.Errorf("failed to merge the value to batch: %w", err)
}

// Call next for the metrics remaining in the input
if err := p.next.ConsumeMetrics(ctx, md); err != nil {
errs = append(errs, err)
}

if len(errs) > 0 {
return errors.Join(errs...)
}
return nil
}

func (p *Processor) mergeToBatch(vb []byte) (err error) {
p.mu.Lock()
defer p.mu.Unlock()

Expand All @@ -302,8 +318,7 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro
key := merger.NewKey(ivl.Duration, p.processingTime)
keys[i], err = key.Marshal()
if err != nil {
errs = append(errs, fmt.Errorf("failed to marshal key to binary for ivl %s: %w", ivl.Duration, err))
continue
return fmt.Errorf("failed to marshal key to binary for ivl %s: %w", ivl.Duration, err)
}
}

Expand All @@ -313,28 +328,19 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro

for _, k := range keys {
if err := p.batch.Merge(k, vb, nil); err != nil {
errs = append(errs, fmt.Errorf("failed to merge to db: %w", err))
return fmt.Errorf("failed to merge to db: %w", err)
}
}

if p.batch.Len() >= dbCommitThresholdBytes {
if err := p.batch.Commit(p.wOpts); err != nil {
return errors.Join(append(errs, fmt.Errorf("failed to commit a batch to db: %w", err))...)
return fmt.Errorf("failed to commit a batch to db: %w", err)
}
if err := p.batch.Close(); err != nil {
return errors.Join(append(errs, fmt.Errorf("failed to close a batch post commit: %w", err))...)
return fmt.Errorf("failed to close a batch post commit: %w", err)
}
p.batch = nil
}

// Call next for the metrics remaining in the input
if err := p.next.ConsumeMetrics(ctx, md); err != nil {
errs = append(errs, err)
}

if len(errs) > 0 {
return errors.Join(errs...)
}
return nil
}

Expand Down

0 comments on commit 25bd658

Please sign in to comment.