diff --git a/CHANGELOG.md b/CHANGELOG.md index 496bbd5d39e..8099c8d5054 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ * [ENHANCEMENT] Microservices jsonnet: resource requests and limits can be set in `$._config`. [#793](https://github.com/grafana/tempo/pull/793) (@kvrhdn) * [ENHANCEMENT] Add `-config.expand-env` cli flag to support environment variables expansion in config file. [#796](https://github.com/grafana/tempo/pull/796) (@Ashmita152) * [ENHANCEMENT] Emit traces for ingester flush operations. [#812](https://github.com/grafana/tempo/pull/812) (@bboreham) +* [ENHANCEMENT] Add retry middleware in query-frontend. [#814](https://github.com/grafana/tempo/pull/814) (@kvrhdn) ## v1.0.1 diff --git a/docs/tempo/website/configuration/_index.md b/docs/tempo/website/configuration/_index.md index 43ff6d070a1..d499bb2a14d 100644 --- a/docs/tempo/website/configuration/_index.md +++ b/docs/tempo/website/configuration/_index.md @@ -159,6 +159,10 @@ The Query Frontend is responsible for sharding incoming requests for faster proc # Query Frontend configuration block query_frontend: + # number of times to retry a request sent to a querier + # (default: 5) + [max_retries: ] + # number of shards to split the query into # (default: 2) [query_shards: ] diff --git a/modules/frontend/config.go b/modules/frontend/config.go index a0e38e70360..164f1c61cb6 100644 --- a/modules/frontend/config.go +++ b/modules/frontend/config.go @@ -9,6 +9,7 @@ import ( type Config struct { Config frontend.CombinedFrontendConfig `yaml:",inline"` + MaxRetries int `yaml:"max_retries,omitempty"` QueryShards int `yaml:"query_shards,omitempty"` } @@ -17,6 +18,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) cfg.Config.DownstreamURL = "" cfg.Config.Handler.LogQueriesLongerThan = 0 cfg.Config.FrontendV1.MaxOutstandingPerTenant = 100 + cfg.MaxRetries = 5 cfg.QueryShards = 2 } diff --git a/modules/frontend/deduper.go b/modules/frontend/deduper.go index 9f6e1a8e685..a801631521e 100644 --- a/modules/frontend/deduper.go +++ b/modules/frontend/deduper.go @@ -46,9 +46,12 @@ type spanIDDeduper struct { // Do implements Handler func (s spanIDDeduper) Do(req *http.Request) (*http.Response, error) { ctx := req.Context() - span, _ := opentracing.StartSpanFromContext(ctx, "frontend.DedupeSpanIDs") + span, ctx := opentracing.StartSpanFromContext(ctx, "frontend.DedupeSpanIDs") defer span.Finish() + // context propagation + req = req.WithContext(ctx) + resp, err := s.next.Do(req) if err != nil { return nil, err diff --git a/modules/frontend/frontend.go b/modules/frontend/frontend.go index 0bfbbbef1f7..ef33948b203 100644 --- a/modules/frontend/frontend.go +++ b/modules/frontend/frontend.go @@ -35,10 +35,12 @@ func NewTripperware(cfg Config, logger log.Logger, registerer prometheus.Registe }, []string{"tenant"}) return func(next http.RoundTripper) http.RoundTripper { - // We're constructing middleware in this statement. There are two at the moment - - // - the rightmost one (executed first) is ShardingWare which helps to shard queries by splitting the block ID space - // - the leftmost one (executed last) is Deduper which dedupe Span IDs for Zipkin support - rt := NewRoundTripper(next, Deduper(logger), ShardingWare(cfg.QueryShards, logger)) + // We're constructing middleware in this statement, each middleware wraps the next one from left-to-right + // - the Deduper dedupes Span IDs for Zipkin support + // - the ShardingWare shards queries by splitting the block ID space + // - the RetryWare retries requests that have failed (error or http status 500) + rt := NewRoundTripper(next, Deduper(logger), ShardingWare(cfg.QueryShards, logger), RetryWare(cfg.MaxRetries, logger)) + return queryrange.RoundTripFunc(func(r *http.Request) (*http.Response, error) { start := time.Now() // tracing instrumentation diff --git a/modules/frontend/querysharding.go b/modules/frontend/querysharding.go index fa613b1776a..7dcfce59281 100644 --- a/modules/frontend/querysharding.go +++ b/modules/frontend/querysharding.go @@ -48,9 +48,12 @@ type shardQuery struct { // Do implements Handler func (s shardQuery) Do(r *http.Request) (*http.Response, error) { ctx := r.Context() - span, _ := opentracing.StartSpanFromContext(ctx, "frontend.ShardQuery") + span, ctx := opentracing.StartSpanFromContext(ctx, "frontend.ShardQuery") defer span.Finish() + // context propagation + r = r.WithContext(ctx) + userID, err := user.ExtractOrgID(ctx) if err != nil { return nil, err diff --git a/modules/frontend/retry.go b/modules/frontend/retry.go new file mode 100644 index 00000000000..dc04e199103 --- /dev/null +++ b/modules/frontend/retry.go @@ -0,0 +1,56 @@ +package frontend + +import ( + "net/http" + + "github.com/go-kit/kit/log" + "github.com/opentracing/opentracing-go" + ot_log "github.com/opentracing/opentracing-go/log" +) + +func RetryWare(maxRetries int, logger log.Logger) Middleware { + return MiddlewareFunc(func(next Handler) Handler { + return retryWare{ + next: next, + logger: logger, + maxRetries: maxRetries, + } + }) +} + +type retryWare struct { + next Handler + logger log.Logger + maxRetries int +} + +// Do implements Handler +func (r retryWare) Do(req *http.Request) (*http.Response, error) { + ctx := req.Context() + span, ctx := opentracing.StartSpanFromContext(ctx, "frontend.Retry") + defer span.Finish() + + // context propagation + req = req.WithContext(ctx) + + triesLeft := r.maxRetries + + for { + if ctx.Err() != nil { + return nil, ctx.Err() + } + + resp, err := r.next.Do(req) + + triesLeft-- + if triesLeft == 0 { + return resp, err + } + + if err == nil && resp.StatusCode/100 != 5 { + return resp, nil + } + + span.LogFields(ot_log.String("msg", "error processing request"), ot_log.Int("try", triesLeft), ot_log.Error(err)) + } +} diff --git a/modules/frontend/retry_test.go b/modules/frontend/retry_test.go new file mode 100644 index 00000000000..01c5e9ae65a --- /dev/null +++ b/modules/frontend/retry_test.go @@ -0,0 +1,130 @@ +package frontend + +import ( + "context" + "errors" + "net/http" + "net/http/httptest" + "testing" + + "github.com/go-kit/kit/log" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" +) + +type HandlerFunc func(req *http.Request) (*http.Response, error) + +// Wrap implements Handler. +func (q HandlerFunc) Do(req *http.Request) (*http.Response, error) { + return q(req) +} + +func TestRetry(t *testing.T) { + var try atomic.Int32 + + for _, tc := range []struct { + name string + handler Handler + expectedTries int32 + expectedRes *http.Response + expectedErr error + }{ + { + name: "retry until success", + handler: HandlerFunc(func(req *http.Request) (*http.Response, error) { + if try.Inc() == 5 { + return &http.Response{StatusCode: 200}, nil + } + return nil, errors.New("this request failed") + }), + expectedTries: 5, + expectedRes: &http.Response{StatusCode: 200}, + expectedErr: nil, + }, + { + name: "don't retry 400's", + handler: HandlerFunc(func(req *http.Request) (*http.Response, error) { + try.Inc() + return &http.Response{StatusCode: 400}, nil + }), + expectedTries: 1, + expectedRes: &http.Response{StatusCode: 400}, + expectedErr: nil, + }, + { + name: "retry 500s", + handler: HandlerFunc(func(req *http.Request) (*http.Response, error) { + try.Inc() + return &http.Response{StatusCode: 500}, nil + }), + expectedTries: 5, + expectedRes: &http.Response{StatusCode: 500}, + expectedErr: nil, + }, + { + name: "return last error", + handler: HandlerFunc(func(req *http.Request) (*http.Response, error) { + if try.Inc() == 5 { + return nil, errors.New("request failed") + } + return nil, errors.New("not the last request") + }), + expectedTries: 5, + expectedRes: nil, + expectedErr: errors.New("request failed"), + }, + } { + t.Run(tc.name, func(t *testing.T) { + try.Store(0) + + retryWare := RetryWare(5, log.NewNopLogger()) + handler := retryWare.Wrap(tc.handler) + + req := httptest.NewRequest("GET", "http://example.com", nil) + + res, err := handler.Do(req) + + require.Equal(t, tc.expectedTries, try.Load()) + require.Equal(t, tc.expectedErr, err) + require.Equal(t, tc.expectedRes, res) + }) + } +} + +func TestRetry_CancelledRequest(t *testing.T) { + var try atomic.Int32 + + // request is cancelled before first call + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + req, err := http.NewRequestWithContext(ctx, "GET", "http://example.com", nil) + require.NoError(t, err) + + _, err = RetryWare(5, log.NewNopLogger()). + Wrap(HandlerFunc(func(req *http.Request) (*http.Response, error) { + try.Inc() + return nil, ctx.Err() + })). + Do(req) + + require.Equal(t, int32(0), try.Load()) + require.Equal(t, ctx.Err(), err) + + // request is cancelled after first call + ctx, cancel = context.WithCancel(context.Background()) + + req, err = http.NewRequestWithContext(ctx, "GET", "http://example.com", nil) + require.NoError(t, err) + + _, err = RetryWare(5, log.NewNopLogger()). + Wrap(HandlerFunc(func(req *http.Request) (*http.Response, error) { + try.Inc() + cancel() + return nil, errors.New("this request failed") + })). + Do(req) + + require.Equal(t, int32(1), try.Load()) + require.Equal(t, ctx.Err(), err) +}