Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retry middleware in query-frontend #814

Merged
merged 2 commits into from
Jul 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* [ENHANCEMENT] Microservices jsonnet: resource requests and limits can be set in `$._config`. [#793](https://github.com/grafana/tempo/pull/793) (@kvrhdn)
* [ENHANCEMENT] Add `-config.expand-env` cli flag to support environment variables expansion in config file. [#796](https://github.com/grafana/tempo/pull/796) (@Ashmita152)
* [ENHANCEMENT] Emit traces for ingester flush operations. [#812](https://github.com/grafana/tempo/pull/812) (@bboreham)
* [ENHANCEMENT] Add retry middleware in query-frontend. [#814](https://github.com/grafana/tempo/pull/814) (@kvrhdn)

## v1.0.1

Expand Down
4 changes: 4 additions & 0 deletions docs/tempo/website/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ The Query Frontend is responsible for sharding incoming requests for faster proc
# Query Frontend configuration block
query_frontend:

# number of times to retry a request sent to a querier
# (default: 5)
[max_retries: <int>]

# number of shards to split the query into
# (default: 2)
[query_shards: <int>]
Expand Down
2 changes: 2 additions & 0 deletions modules/frontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

type Config struct {
Config frontend.CombinedFrontendConfig `yaml:",inline"`
MaxRetries int `yaml:"max_retries,omitempty"`
QueryShards int `yaml:"query_shards,omitempty"`
}

Expand All @@ -17,6 +18,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
cfg.Config.DownstreamURL = ""
cfg.Config.Handler.LogQueriesLongerThan = 0
cfg.Config.FrontendV1.MaxOutstandingPerTenant = 100
cfg.MaxRetries = 5
cfg.QueryShards = 2
}

Expand Down
5 changes: 4 additions & 1 deletion modules/frontend/deduper.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,12 @@ type spanIDDeduper struct {
// Do implements Handler
func (s spanIDDeduper) Do(req *http.Request) (*http.Response, error) {
ctx := req.Context()
span, _ := opentracing.StartSpanFromContext(ctx, "frontend.DedupeSpanIDs")
span, ctx := opentracing.StartSpanFromContext(ctx, "frontend.DedupeSpanIDs")
defer span.Finish()

// context propagation
req = req.WithContext(ctx)

resp, err := s.next.Do(req)
if err != nil {
return nil, err
Expand Down
10 changes: 6 additions & 4 deletions modules/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ func NewTripperware(cfg Config, logger log.Logger, registerer prometheus.Registe
}, []string{"tenant"})

return func(next http.RoundTripper) http.RoundTripper {
// We're constructing middleware in this statement. There are two at the moment -
// - the rightmost one (executed first) is ShardingWare which helps to shard queries by splitting the block ID space
// - the leftmost one (executed last) is Deduper which dedupe Span IDs for Zipkin support
rt := NewRoundTripper(next, Deduper(logger), ShardingWare(cfg.QueryShards, logger))
// We're constructing middleware in this statement, each middleware wraps the next one from left-to-right
// - the Deduper dedupes Span IDs for Zipkin support
// - the ShardingWare shards queries by splitting the block ID space
// - the RetryWare retries requests that have failed (error or http status 500)
rt := NewRoundTripper(next, Deduper(logger), ShardingWare(cfg.QueryShards, logger), RetryWare(cfg.MaxRetries, logger))

return queryrange.RoundTripFunc(func(r *http.Request) (*http.Response, error) {
start := time.Now()
// tracing instrumentation
Expand Down
5 changes: 4 additions & 1 deletion modules/frontend/querysharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,12 @@ type shardQuery struct {
// Do implements Handler
func (s shardQuery) Do(r *http.Request) (*http.Response, error) {
ctx := r.Context()
span, _ := opentracing.StartSpanFromContext(ctx, "frontend.ShardQuery")
span, ctx := opentracing.StartSpanFromContext(ctx, "frontend.ShardQuery")
defer span.Finish()

// context propagation
r = r.WithContext(ctx)

userID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
Expand Down
56 changes: 56 additions & 0 deletions modules/frontend/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package frontend

import (
"net/http"

"github.com/go-kit/kit/log"
"github.com/opentracing/opentracing-go"
ot_log "github.com/opentracing/opentracing-go/log"
)

func RetryWare(maxRetries int, logger log.Logger) Middleware {
return MiddlewareFunc(func(next Handler) Handler {
return retryWare{
next: next,
logger: logger,
maxRetries: maxRetries,
}
})
}

type retryWare struct {
next Handler
logger log.Logger
maxRetries int
}

// Do implements Handler
func (r retryWare) Do(req *http.Request) (*http.Response, error) {
ctx := req.Context()
span, ctx := opentracing.StartSpanFromContext(ctx, "frontend.Retry")
defer span.Finish()

// context propagation
req = req.WithContext(ctx)

triesLeft := r.maxRetries

for {
if ctx.Err() != nil {
return nil, ctx.Err()
}

resp, err := r.next.Do(req)

triesLeft--
if triesLeft == 0 {
return resp, err
}

if err == nil && resp.StatusCode/100 != 5 {
return resp, nil
}

span.LogFields(ot_log.String("msg", "error processing request"), ot_log.Int("try", triesLeft), ot_log.Error(err))
}
}
130 changes: 130 additions & 0 deletions modules/frontend/retry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package frontend

import (
"context"
"errors"
"net/http"
"net/http/httptest"
"testing"

"github.com/go-kit/kit/log"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)

type HandlerFunc func(req *http.Request) (*http.Response, error)

// Wrap implements Handler.
func (q HandlerFunc) Do(req *http.Request) (*http.Response, error) {
return q(req)
}

func TestRetry(t *testing.T) {
var try atomic.Int32

for _, tc := range []struct {
name string
handler Handler
expectedTries int32
expectedRes *http.Response
expectedErr error
}{
{
name: "retry until success",
handler: HandlerFunc(func(req *http.Request) (*http.Response, error) {
if try.Inc() == 5 {
return &http.Response{StatusCode: 200}, nil
}
return nil, errors.New("this request failed")
}),
expectedTries: 5,
expectedRes: &http.Response{StatusCode: 200},
expectedErr: nil,
},
{
name: "don't retry 400's",
handler: HandlerFunc(func(req *http.Request) (*http.Response, error) {
try.Inc()
return &http.Response{StatusCode: 400}, nil
}),
expectedTries: 1,
expectedRes: &http.Response{StatusCode: 400},
expectedErr: nil,
},
{
name: "retry 500s",
handler: HandlerFunc(func(req *http.Request) (*http.Response, error) {
try.Inc()
return &http.Response{StatusCode: 500}, nil
}),
expectedTries: 5,
expectedRes: &http.Response{StatusCode: 500},
expectedErr: nil,
},
{
name: "return last error",
handler: HandlerFunc(func(req *http.Request) (*http.Response, error) {
if try.Inc() == 5 {
return nil, errors.New("request failed")
}
return nil, errors.New("not the last request")
}),
expectedTries: 5,
expectedRes: nil,
expectedErr: errors.New("request failed"),
},
} {
t.Run(tc.name, func(t *testing.T) {
try.Store(0)

retryWare := RetryWare(5, log.NewNopLogger())
handler := retryWare.Wrap(tc.handler)

req := httptest.NewRequest("GET", "http://example.com", nil)

res, err := handler.Do(req)

require.Equal(t, tc.expectedTries, try.Load())
require.Equal(t, tc.expectedErr, err)
require.Equal(t, tc.expectedRes, res)
})
}
}

func TestRetry_CancelledRequest(t *testing.T) {
var try atomic.Int32

// request is cancelled before first call
ctx, cancel := context.WithCancel(context.Background())
cancel()

req, err := http.NewRequestWithContext(ctx, "GET", "http://example.com", nil)
require.NoError(t, err)

_, err = RetryWare(5, log.NewNopLogger()).
Wrap(HandlerFunc(func(req *http.Request) (*http.Response, error) {
try.Inc()
return nil, ctx.Err()
})).
Do(req)

require.Equal(t, int32(0), try.Load())
require.Equal(t, ctx.Err(), err)

// request is cancelled after first call
ctx, cancel = context.WithCancel(context.Background())

req, err = http.NewRequestWithContext(ctx, "GET", "http://example.com", nil)
require.NoError(t, err)

_, err = RetryWare(5, log.NewNopLogger()).
Wrap(HandlerFunc(func(req *http.Request) (*http.Response, error) {
try.Inc()
cancel()
return nil, errors.New("this request failed")
})).
Do(req)

require.Equal(t, int32(1), try.Load())
require.Equal(t, ctx.Err(), err)
}