Skip to content

Commit

Permalink
[query] Fix Graphite hitcount function by adding support for alignToI…
Browse files Browse the repository at this point in the history
…nterval (#3521)
  • Loading branch information
yyin-sc authored Jul 11, 2021
1 parent 6922816 commit 07ec4e4
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 30 deletions.
79 changes: 70 additions & 9 deletions src/query/graphite/native/builtin_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -1598,20 +1598,81 @@ func durationToSeconds(d time.Duration) int {

// hitcount estimates hit counts from a list of time series. This function assumes the values in each time
// series represent hits per second. It calculates hits per some larger interval such as per day or per hour.
// NB(xichen): it doesn't support the alignToInterval parameter because no one seems to be using that.
func hitcount(ctx *common.Context, seriesList singlePathSpec, intervalString string) (ts.SeriesList, error) {
func hitcount(
ctx *common.Context,
_ singlePathSpec,
intervalString string,
alignToInterval bool,
) (*unaryContextShifter, error) {
interval, err := common.ParseInterval(intervalString)
if err != nil {
return ts.NewSeriesList(), err
return nil, err
}

intervalInSeconds := durationToSeconds(interval)
if intervalInSeconds <= 0 {
return ts.NewSeriesList(), common.ErrInvalidIntervalFormat
return nil, common.ErrInvalidIntervalFormat
}

resultSeries := make([]*ts.Series, len(seriesList.Values))
for index, series := range seriesList.Values {
shiftDuration := time.Second * 0 // Default to no shift.
if alignToInterval {
// Follow graphite implementation: only handle minutes, hours and days.
origStartTime := ctx.StartTime
switch {
case interval.Hours() >= 24:
// Interval in days, truncate to days.
newStartTime := time.Date(
origStartTime.Year(),
origStartTime.Month(),
origStartTime.Day(), 0, 0, 0, 0, origStartTime.Location())
shiftDuration = newStartTime.Sub(origStartTime)
case interval.Hours() >= 1:
// Interval is in hrs.
newStartTime := time.Date(
origStartTime.Year(),
origStartTime.Month(),
origStartTime.Day(),
origStartTime.Hour(), 0, 0, 0, origStartTime.Location())
shiftDuration = newStartTime.Sub(origStartTime)
case interval.Minutes() >= 1:
// Interval is in minutes.
newStartTime := time.Date(
origStartTime.Year(),
origStartTime.Month(),
origStartTime.Day(),
origStartTime.Hour(),
origStartTime.Minute(), 0, 0, origStartTime.Location())
shiftDuration = newStartTime.Sub(origStartTime)
}
}

contextShiftingFn := func(c *common.Context) *common.Context {
opts := common.NewChildContextOptions()
opts.AdjustTimeRange(shiftDuration, 0, 0, 0)
childCtx := c.NewChildContext(opts)
return childCtx
}

transformerFn := func(bootstrappedSeries ts.SeriesList) (ts.SeriesList, error) {
r := hitCountImpl(ctx, bootstrappedSeries, intervalString, interval, intervalInSeconds)
return r, nil
}

return &unaryContextShifter{
ContextShiftFunc: contextShiftingFn,
UnaryTransformer: transformerFn,
}, nil
}

func hitCountImpl(
ctx *common.Context,
seriesList ts.SeriesList,
intervalString string,
interval time.Duration,
intervalInSeconds int,
) ts.SeriesList {
resultSeries := make([]*ts.Series, 0, len(seriesList.Values))
for _, series := range seriesList.Values {
step := time.Duration(series.MillisPerStep()) * time.Millisecond
bucketCount := int(math.Ceil(float64(series.EndTime().Sub(series.StartTime())) / float64(interval)))
buckets := ts.NewValues(ctx, int(interval/time.Millisecond), bucketCount)
Expand Down Expand Up @@ -1645,14 +1706,14 @@ func hitcount(ctx *common.Context, seriesList singlePathSpec, intervalString str
}
}
}
newName := fmt.Sprintf("hitcount(%s, %q)", series.Name(), intervalString)
newName := fmt.Sprintf("hitcount(%s,%q)", series.Name(), intervalString)
newSeries := ts.NewSeries(ctx, newName, newStart, buckets)
resultSeries[index] = newSeries
resultSeries = append(resultSeries, newSeries)
}

r := ts.SeriesList(seriesList)
r.Values = resultSeries
return r, nil
return r
}

func safeIndex(len, index int) int {
Expand Down
66 changes: 45 additions & 21 deletions src/query/graphite/native/builtin_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3231,10 +3231,7 @@ func TestLimitSortStable(t *testing.T) {

}

func TestHitCount(t *testing.T) {
ctx := common.NewTestContext()
defer func() { _ = ctx.Close() }()

func TestHitcount(t *testing.T) {
now := time.Now()
tests := []struct {
name string
Expand Down Expand Up @@ -3268,23 +3265,50 @@ func TestHitCount(t *testing.T) {
},
}

for _, input := range tests {
series := ts.NewSeries(
ctx,
input.name,
input.startTime,
common.NewTestSeriesValues(ctx, input.stepInMilli, input.values),
)
results, err := hitcount(ctx, singlePathSpec{
Values: []*ts.Series{series},
}, input.intervalString)
expected := common.TestSeries{
Name: fmt.Sprintf(`hitcount(%s, %q)`, input.name, input.intervalString),
Data: input.output,
}
require.Nil(t, err)
common.CompareOutputsAndExpected(t, input.newStep, input.newStartTime,
[]common.TestSeries{expected}, results.Values)
for i, input := range tests {
input := input
t.Run(fmt.Sprintf("test_%d_%s", i, input.name), func(t *testing.T) {
ctrl := xgomock.NewController(t)
defer ctrl.Finish()

store := storage.NewMockStorage(ctrl)
engine := NewEngine(store, CompileOptions{})

ctx := common.NewContext(common.ContextOptions{
Start: input.startTime,
End: input.startTime.Add(time.Second * 10),
Engine: engine,
})
defer func() { _ = ctx.Close() }()

series := ts.NewSeries(ctx, input.name, input.startTime,
common.NewTestSeriesValues(ctx, input.stepInMilli, input.values))

target := fmt.Sprintf("hitcount(%s,%q,false)", input.name, input.intervalString)
testSeriesFn := func(
*common.Context,
string,
storage.FetchOptions,
) (*storage.FetchResult, error) {
return &storage.FetchResult{SeriesList: []*ts.Series{series}}, nil
}

store.EXPECT().
FetchByQuery(gomock.Any(), gomock.Any(), gomock.Any()).
DoAndReturn(testSeriesFn).
AnyTimes()
expr, err := engine.Compile(target)
require.NoError(t, err)
res, err := expr.Execute(ctx)
require.NoError(t, err)
expected := common.TestSeries{
Name: fmt.Sprintf("hitcount(%s,%q)", input.name, input.intervalString),
Data: input.output,
}
require.NoError(t, err)
common.CompareOutputsAndExpected(t, input.newStep, input.newStartTime,
[]common.TestSeries{expected}, res.Values)
})
}
}

Expand Down

0 comments on commit 07ec4e4

Please sign in to comment.