Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Choose lowest downsampling resolution with granular control. #1465

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 24 additions & 19 deletions pkg/query/api/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,27 +182,27 @@ func (api *API) parseEnableDedupParam(r *http.Request) (enableDeduplication bool
return enableDeduplication, nil
}

func (api *API) parseDownsamplingParamMillis(r *http.Request, step time.Duration) (maxResolutionMillis int64, _ *ApiError) {
func (api *API) parseDownsamplingParamMillis(r *http.Request) (query.MaxResolutionMillisFn, *ApiError) {
const maxSourceResolutionParam = "max_source_resolution"
maxSourceResolution := 0 * time.Second

if api.enableAutodownsampling {
// If no max_source_resolution is specified fit at least 5 samples between steps.
maxSourceResolution = step / 5
}
if val := r.FormValue(maxSourceResolutionParam); val != "" {
var err error
maxSourceResolution, err = parseDuration(val)
if err != nil {
return 0, &ApiError{errorBadData, errors.Wrapf(err, "'%s' parameter", maxSourceResolutionParam)}
val := r.FormValue(maxSourceResolutionParam)
if val == "" {
if api.enableAutodownsampling {
return query.StepBasedMaxResolution, nil
}
return func(_ *storage.SelectParams) int64 { return 0 }, nil
}

maxSourceResolution, err := parseDuration(val)
if err != nil {
return nil, &ApiError{errorBadData, errors.Wrapf(err, "'%s' parameter", maxSourceResolutionParam)}
}

if maxSourceResolution < 0 {
return 0, &ApiError{errorBadData, errors.Errorf("negative '%s' is not accepted. Try a positive integer", maxSourceResolutionParam)}
return nil, &ApiError{errorBadData, errors.Errorf("negative '%s' is not accepted. Try a positive integer", maxSourceResolutionParam)}
}

return int64(maxSourceResolution / time.Millisecond), nil
return func(_ *storage.SelectParams) int64 { return int64(maxSourceResolution / time.Millisecond) }, nil
}

func (api *API) parsePartialResponseParam(r *http.Request) (enablePartialResponse bool, _ *ApiError) {
Expand Down Expand Up @@ -261,7 +261,12 @@ func (api *API) query(r *http.Request) (interface{}, []error, *ApiError) {
span, ctx := tracing.StartSpan(ctx, "promql_instant_query")
defer span.Finish()

qry, err := api.queryEngine.NewInstantQuery(api.queryableCreate(enableDedup, 0, enablePartialResponse), r.FormValue("query"), ts)
maxResolutionMillisFn := func(_ *storage.SelectParams) int64 { return 0 }
if api.enableAutodownsampling {
maxResolutionMillisFn = query.StepBasedMaxResolution
}

qry, err := api.queryEngine.NewInstantQuery(api.queryableCreate(enableDedup, maxResolutionMillisFn, enablePartialResponse), r.FormValue("query"), ts)
if err != nil {
return nil, nil, &ApiError{errorBadData, err}
}
Expand Down Expand Up @@ -333,7 +338,7 @@ func (api *API) queryRange(r *http.Request) (interface{}, []error, *ApiError) {
return nil, nil, apiErr
}

maxSourceResolution, apiErr := api.parseDownsamplingParamMillis(r, step)
maxSourceResMillisFn, apiErr := api.parseDownsamplingParamMillis(r)
if apiErr != nil {
return nil, nil, apiErr
}
Expand All @@ -348,7 +353,7 @@ func (api *API) queryRange(r *http.Request) (interface{}, []error, *ApiError) {
defer span.Finish()

qry, err := api.queryEngine.NewRangeQuery(
api.queryableCreate(enableDedup, maxSourceResolution, enablePartialResponse),
api.queryableCreate(enableDedup, maxSourceResMillisFn, enablePartialResponse),
r.FormValue("query"),
start,
end,
Expand Down Expand Up @@ -388,7 +393,7 @@ func (api *API) labelValues(r *http.Request) (interface{}, []error, *ApiError) {
return nil, nil, apiErr
}

q, err := api.queryableCreate(true, 0, enablePartialResponse).Querier(ctx, math.MinInt64, math.MaxInt64)
q, err := api.queryableCreate(true, nil, enablePartialResponse).Querier(ctx, math.MinInt64, math.MaxInt64)
if err != nil {
return nil, nil, &ApiError{errorExec, err}
}
Expand Down Expand Up @@ -460,7 +465,7 @@ func (api *API) series(r *http.Request) (interface{}, []error, *ApiError) {
}

// TODO(bwplotka): Support downsampling?
q, err := api.queryableCreate(enableDedup, 0, enablePartialResponse).Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
q, err := api.queryableCreate(enableDedup, func(_ *storage.SelectParams) int64 { return 0 }, enablePartialResponse).Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &ApiError{errorExec, err}
}
Expand Down Expand Up @@ -563,7 +568,7 @@ func (api *API) labelNames(r *http.Request) (interface{}, []error, *ApiError) {
return nil, nil, apiErr
}

q, err := api.queryableCreate(true, 0, enablePartialResponse).Querier(ctx, math.MinInt64, math.MaxInt64)
q, err := api.queryableCreate(true, nil, enablePartialResponse).Querier(ctx, math.MinInt64, math.MaxInt64)
if err != nil {
return nil, nil, &ApiError{errorExec, err}
}
Expand Down
12 changes: 8 additions & 4 deletions pkg/query/api/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (
)

func testQueryableCreator(queryable storage.Queryable) query.QueryableCreator {
return func(_ bool, _ int64, _ bool) storage.Queryable {
return func(_ bool, _ query.MaxResolutionMillisFn, _ bool) storage.Queryable {
return queryable
}
}
Expand Down Expand Up @@ -898,11 +898,15 @@ func TestParseDownsamplingParamMillis(t *testing.T) {
v.Set("max_source_resolution", test.maxSourceResolutionParam)
r := http.Request{PostForm: v}

maxResMillis, _ := api.parseDownsamplingParamMillis(&r, test.step)
maxResMillisFn, err := api.parseDownsamplingParamMillis(&r)
testutil.Equals(t, (*ApiError)(nil), err)

result := maxResMillisFn(&storage.SelectParams{Step: int64(test.step / time.Millisecond)})

if test.fail == false {
testutil.Assert(t, maxResMillis == test.result, "case %v: expected %v to be equal to %v", i, maxResMillis, test.result)
testutil.Assert(t, result == test.result, "case %v: expected %v to be equal to %v", i, result, test.result)
} else {
testutil.Assert(t, maxResMillis != test.result, "case %v: expected %v not to be equal to %v", i, maxResMillis, test.result)
testutil.Assert(t, result != test.result, "case %v: expected %v not to be equal to %v", i, result, test.result)
}

}
Expand Down
82 changes: 45 additions & 37 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,48 +15,56 @@ import (

// QueryableCreator returns implementation of promql.Queryable that fetches data from the proxy store API endpoints.
// If deduplication is enabled, all data retrieved from it will be deduplicated along the replicaLabel by default.
// maxResolutionMillis controls downsampling resolution that is allowed (specified in milliseconds).
// maxResolutionMillisFn controls downsampling resolution that is allowed (specified in milliseconds).
// partialResponse controls `partialResponseDisabled` option of StoreAPI and partial response behaviour of proxy.
type QueryableCreator func(deduplicate bool, maxResolutionMillis int64, partialResponse bool) storage.Queryable
type QueryableCreator func(deduplicate bool, maxResolutionMillisFn MaxResolutionMillisFn, partialResponse bool) storage.Queryable

// NewQueryableCreator creates QueryableCreator.
func NewQueryableCreator(logger log.Logger, proxy storepb.StoreServer, replicaLabel string) QueryableCreator {
return func(deduplicate bool, maxResolutionMillis int64, partialResponse bool) storage.Queryable {
return func(deduplicate bool, maxResolutionMillisFn MaxResolutionMillisFn, partialResponse bool) storage.Queryable {
return &queryable{
logger: logger,
replicaLabel: replicaLabel,
proxy: proxy,
deduplicate: deduplicate,
maxResolutionMillis: maxResolutionMillis,
partialResponse: partialResponse,
logger: logger,
replicaLabel: replicaLabel,
proxy: proxy,
deduplicate: deduplicate,
maxResolutionMillisFn: maxResolutionMillisFn,
partialResponse: partialResponse,
}
}
}

type queryable struct {
logger log.Logger
replicaLabel string
proxy storepb.StoreServer
deduplicate bool
maxResolutionMillis int64
partialResponse bool
logger log.Logger
replicaLabel string
proxy storepb.StoreServer
deduplicate bool
maxResolutionMillisFn MaxResolutionMillisFn
partialResponse bool
}

type MaxResolutionMillisFn func(params *storage.SelectParams) int64

// StepBasedMaxResolution returns max data resolution to be at least 1/5 requested step.
// 5 is to ensure that we choose downsampling/raw resolution that have at least 5 samples per evaluation.
func StepBasedMaxResolution(params *storage.SelectParams) int64 {
return params.Step / 5
}

// Querier returns a new storage querier against the underlying proxy store API.
func (q *queryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabel, q.proxy, q.deduplicate, int64(q.maxResolutionMillis), q.partialResponse), nil
return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabel, q.proxy, q.deduplicate, q.maxResolutionMillisFn, q.partialResponse), nil
}

type querier struct {
ctx context.Context
logger log.Logger
cancel func()
mint, maxt int64
replicaLabel string
proxy storepb.StoreServer
deduplicate bool
maxResolutionMillis int64
partialResponse bool
ctx context.Context
logger log.Logger
cancel func()
mint, maxt int64
replicaLabel string
proxy storepb.StoreServer
deduplicate bool
maxResolutionMillisFn MaxResolutionMillisFn
partialResponse bool
}

// newQuerier creates implementation of storage.Querier that fetches data from the proxy
Expand All @@ -68,24 +76,24 @@ func newQuerier(
replicaLabel string,
proxy storepb.StoreServer,
deduplicate bool,
maxResolutionMillis int64,
maxResolutionMillisFn MaxResolutionMillisFn,
partialResponse bool,
) *querier {
if logger == nil {
logger = log.NewNopLogger()
}
ctx, cancel := context.WithCancel(ctx)
return &querier{
ctx: ctx,
logger: logger,
cancel: cancel,
mint: mint,
maxt: maxt,
replicaLabel: replicaLabel,
proxy: proxy,
deduplicate: deduplicate,
maxResolutionMillis: maxResolutionMillis,
partialResponse: partialResponse,
ctx: ctx,
logger: logger,
cancel: cancel,
mint: mint,
maxt: maxt,
replicaLabel: replicaLabel,
proxy: proxy,
deduplicate: deduplicate,
maxResolutionMillisFn: maxResolutionMillisFn,
partialResponse: partialResponse,
}
}

Expand Down Expand Up @@ -169,7 +177,7 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s
MinTime: q.mint,
MaxTime: q.maxt,
Matchers: sms,
MaxResolutionWindow: q.maxResolutionMillis,
MaxResolutionWindow: q.maxResolutionMillisFn(params),
Aggregates: queryAggrs,
PartialResponseDisabled: !q.partialResponse,
}, resp); err != nil {
Expand Down
Loading