Skip to content

Commit

Permalink
Emit traces for ingester flush operations (#812)
Browse files Browse the repository at this point in the history
Create a new span for each flush, log the trace ID, and any error that
comes back.

Passing a context in to `handleFlush()` seems more natural than having
it assume it is only called in background context.
  • Loading branch information
bboreham authored Jul 14, 2021
1 parent 9794159 commit addfa70
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* [ENHANCEMENT] Add `tempo_ingester_flush_size_bytes` metric. [#777](https://github.com/grafana/tempo/pull/777) (@bboreham)
* [ENHANCEMENT] Microservices jsonnet: resource requests and limits can be set in `$._config`. [#793](https://github.com/grafana/tempo/pull/793) (@kvrhdn)
* [ENHANCEMENT] Add `-config.expand-env` cli flag to support environment variables expansion in config file. [#796](https://github.com/grafana/tempo/pull/796) (@Ashmita152)
* [ENHANCEMENT] Emit traces for ingester flush operations. [#812](https://github.com/grafana/tempo/pull/812) (@bboreham)

## v1.0.1

Expand Down
31 changes: 27 additions & 4 deletions modules/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,13 @@ import (
"strconv"
"time"

gklog "github.com/go-kit/kit/log"
"github.com/google/uuid"
ot "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/pkg/errors"
"github.com/uber/jaeger-client-go"

"github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/services"
Expand Down Expand Up @@ -187,7 +192,7 @@ func (i *Ingester) flushLoop(j int) {
if op.kind == opKindComplete {
retry, err = i.handleComplete(op)
} else {
retry, err = i.handleFlush(op.userID, op.blockID)
retry, err = i.handleFlush(context.Background(), op.userID, op.blockID)
}

if err != nil {
Expand Down Expand Up @@ -261,8 +266,24 @@ func (i *Ingester) handleComplete(op *flushOp) (retry bool, err error) {
return false, nil
}

func (i *Ingester) handleFlush(userID string, blockID uuid.UUID) (retry bool, err error) {
level.Info(log.Logger).Log("msg", "flushing block", "userid", userID, "block", blockID.String())
// withSpan adds traceID to a logger, if span is sampled
// TODO: move into some central trace/log package
func withSpan(logger gklog.Logger, sp ot.Span) gklog.Logger {
if sp == nil {
return logger
}
sctx, ok := sp.Context().(jaeger.SpanContext)
if !ok || !sctx.IsSampled() {
return logger
}

return gklog.With(logger, "traceID", sctx.TraceID().String())
}

func (i *Ingester) handleFlush(ctx context.Context, userID string, blockID uuid.UUID) (retry bool, err error) {
sp, ctx := ot.StartSpanFromContext(ctx, "flush", ot.Tag{Key: "organization", Value: userID}, ot.Tag{Key: "blockID", Value: blockID.String()})
defer sp.Finish()
withSpan(level.Info(log.Logger), sp).Log("msg", "flushing block", "userid", userID, "block", blockID.String())

instance, err := i.getOrCreateInstance(userID)
if err != nil {
Expand All @@ -274,7 +295,7 @@ func (i *Ingester) handleFlush(userID string, blockID uuid.UUID) (retry bool, er
}

if block := instance.GetBlockToBeFlushed(blockID); block != nil {
ctx := user.InjectOrgID(context.Background(), userID)
ctx := user.InjectOrgID(ctx, userID)
ctx, cancel := context.WithTimeout(ctx, i.cfg.FlushOpTimeout)
defer cancel()

Expand All @@ -283,6 +304,8 @@ func (i *Ingester) handleFlush(userID string, blockID uuid.UUID) (retry bool, er
metricFlushDuration.Observe(time.Since(start).Seconds())
metricFlushSize.Observe(float64(block.BlockMeta().Size))
if err != nil {
ext.Error.Set(sp, true)
sp.LogFields(otlog.Error(err))
return true, err
}

Expand Down

0 comments on commit addfa70

Please sign in to comment.