Skip to content

Commit

Permalink
feat: downsample aggregated metrics (#13449)
Browse files Browse the repository at this point in the history
  • Loading branch information
trevorwhitney authored Jul 11, 2024
1 parent 583f7f3 commit 2c053ee
Show file tree
Hide file tree
Showing 10 changed files with 314 additions and 44 deletions.
4 changes: 4 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,10 @@ pattern_ingester:
# CLI flag: -pattern-ingester.metric-aggregation.log-push-observations
[log_push_observations: <boolean> | default = false]

# How often to downsample metrics from raw push observations.
# CLI flag: -pattern-ingester.downsample-period
[downsample_period: <duration> | default = 10s]

# The index_gateway block configures the Loki index gateway server, responsible
# for serving index queries without the need to constantly interact with the
# object store.
Expand Down
18 changes: 18 additions & 0 deletions pkg/pattern/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func (i *Ingester) Flush() {

func (i *Ingester) flush(mayRemoveStreams bool) {
i.sweepUsers(true, mayRemoveStreams)
i.downsampleMetrics(model.Now())

// Close the flush queues, to unblock waiting workers.
for _, flushQueue := range i.flushQueues {
Expand Down Expand Up @@ -73,3 +74,20 @@ func (i *Ingester) sweepInstance(instance *instance, _, mayRemoveStreams bool) {
return true, nil
})
}

func (i *Ingester) downsampleMetrics(ts model.Time) {
instances := i.getInstances()

for _, instance := range instances {
i.downsampleInstance(instance, ts)
}
}

func (i *Ingester) downsampleInstance(instance *instance, ts model.Time) {
_ = instance.streams.ForEach(func(s *stream) (bool, error) {
instance.streams.WithLock(func() {
s.Downsample(ts)
})
return true, nil
})
}
33 changes: 27 additions & 6 deletions pkg/pattern/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/tenant"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"google.golang.org/grpc/health/grpc_health_v1"

ring_client "github.com/grafana/dskit/ring/client"
Expand Down Expand Up @@ -206,13 +207,33 @@ func (i *Ingester) loop() {
flushTicker := util.NewTickerWithJitter(i.cfg.FlushCheckPeriod, j)
defer flushTicker.Stop()

for {
select {
case <-flushTicker.C:
i.sweepUsers(false, true)
if i.cfg.MetricAggregation.Enabled {
downsampleTicker := time.NewTimer(i.cfg.MetricAggregation.DownsamplePeriod)
defer downsampleTicker.Stop()

case <-i.loopQuit:
return
for {
select {
case <-flushTicker.C:
i.sweepUsers(false, true)

case t := <-downsampleTicker.C:
downsampleTicker.Reset(i.cfg.MetricAggregation.DownsamplePeriod)
now := model.TimeFromUnixNano(t.UnixNano())
i.downsampleMetrics(now)

case <-i.loopQuit:
return
}
}
} else {
for {
select {
case <-flushTicker.C:
i.sweepUsers(false, true)

case <-i.loopQuit:
return
}
}
}
}
Expand Down
129 changes: 124 additions & 5 deletions pkg/pattern/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/go-kit/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -39,6 +40,16 @@ func setup(t *testing.T) *instance {
return inst
}

func downsampleInstance(inst *instance, tts int64) {
ts := model.TimeFromUnixNano(time.Unix(tts, 0).UnixNano())
_ = inst.streams.ForEach(func(s *stream) (bool, error) {
inst.streams.WithLock(func() {
s.Downsample(ts)
})
return true, nil
})
}

func TestInstancePushQuery(t *testing.T) {
inst := setup(t)
err := inst.Push(context.Background(), &push.PushRequest{
Expand All @@ -55,6 +66,7 @@ func TestInstancePushQuery(t *testing.T) {
},
})
require.NoError(t, err)
downsampleInstance(inst, 20)

err = inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
Expand All @@ -70,6 +82,7 @@ func TestInstancePushQuery(t *testing.T) {
},
})
require.NoError(t, err)
downsampleInstance(inst, 30)

for i := 0; i <= 30; i++ {
err = inst.Push(context.Background(), &push.PushRequest{
Expand All @@ -87,6 +100,7 @@ func TestInstancePushQuery(t *testing.T) {
})
require.NoError(t, err)
}
downsampleInstance(inst, 30)

it, err := inst.Iterator(context.Background(), &logproto.QueryPatternsRequest{
Query: "{test=\"test\"}",
Expand Down Expand Up @@ -115,6 +129,9 @@ func TestInstancePushQuerySamples(t *testing.T) {
},
},
})
require.NoError(t, err)
downsampleInstance(inst, 0)

for i := 1; i <= 30; i++ {
err = inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
Expand All @@ -130,8 +147,8 @@ func TestInstancePushQuerySamples(t *testing.T) {
},
})
require.NoError(t, err)
downsampleInstance(inst, int64(20*i))
}
require.NoError(t, err)

expr, err := syntax.ParseSampleExpr(`count_over_time({test="test"}[20s])`)
require.NoError(t, err)
Expand All @@ -149,10 +166,11 @@ func TestInstancePushQuerySamples(t *testing.T) {

require.Equal(t, lbls.String(), res.Series[0].GetLabels())

// end - start / step -- (start is 0, step is 10s)
// end - start / step -- (start is 0, step is 10s, downsampling at 20s intervals)
expectedDataPoints := ((20 * 30) / 10)
require.Equal(t, expectedDataPoints, len(res.Series[0].Samples))
require.Equal(t, float64(1), res.Series[0].Samples[0].Value)
require.Equal(t, float64(1), res.Series[0].Samples[expectedDataPoints-1].Value)

expr, err = syntax.ParseSampleExpr(`count_over_time({test="test"}[80s])`)
require.NoError(t, err)
Expand All @@ -170,7 +188,7 @@ func TestInstancePushQuerySamples(t *testing.T) {

require.Equal(t, lbls.String(), res.Series[0].GetLabels())

// end - start / step -- (start is 0, step is 10s)
// end - start / step -- (start is 0, step is 10s, downsampling at 20s intervals)
expectedDataPoints = ((20 * 30) / 10)
require.Equal(t, expectedDataPoints, len(res.Series[0].Samples))

Expand All @@ -187,6 +205,101 @@ func TestInstancePushQuerySamples(t *testing.T) {
require.Equal(t, float64(4), res.Series[0].Samples[expectedDataPoints-1].Value)
})

t.Run("test count_over_time samples with downsampling", func(t *testing.T) {
inst := setup(t)
err := inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
{
Labels: lbls.String(),
Entries: []push.Entry{
{
Timestamp: time.Unix(0, 0),
Line: "ts=1 msg=hello",
},
},
},
},
})
require.NoError(t, err)
downsampleInstance(inst, 0)

for i := 1; i <= 30; i++ {
err = inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
{
Labels: lbls.String(),
Entries: []push.Entry{
{
Timestamp: time.Unix(int64(10*i), 0),
Line: "foo bar foo bar",
},
},
},
},
})
require.NoError(t, err)

// downsample every 20s
if i%2 == 0 {
downsampleInstance(inst, int64(10*i))
}
}

expr, err := syntax.ParseSampleExpr(`count_over_time({test="test"}[20s])`)
require.NoError(t, err)

it, err := inst.QuerySample(context.Background(), expr, &logproto.QuerySamplesRequest{
Query: expr.String(),
Start: time.Unix(0, 0),
End: time.Unix(int64(10*30), 0),
Step: 20000,
})
require.NoError(t, err)
res, err := iter.ReadAllSamples(it)
require.NoError(t, err)
require.Equal(t, 1, len(res.Series))

require.Equal(t, lbls.String(), res.Series[0].GetLabels())

// end - start / step -- (start is 0, step is 10s, downsampling at 20s intervals)
expectedDataPoints := ((10 * 30) / 20)
require.Equal(t, expectedDataPoints, len(res.Series[0].Samples))
require.Equal(t, float64(1), res.Series[0].Samples[0].Value)

// after the first push there's 2 pushes per sample due to downsampling
require.Equal(t, float64(2), res.Series[0].Samples[expectedDataPoints-1].Value)

expr, err = syntax.ParseSampleExpr(`count_over_time({test="test"}[80s])`)
require.NoError(t, err)

it, err = inst.QuerySample(context.Background(), expr, &logproto.QuerySamplesRequest{
Query: expr.String(),
Start: time.Unix(0, 0),
End: time.Unix(int64(10*30), 0),
Step: 20000,
})
require.NoError(t, err)
res, err = iter.ReadAllSamples(it)
require.NoError(t, err)
require.Equal(t, 1, len(res.Series))

require.Equal(t, lbls.String(), res.Series[0].GetLabels())

// end - start / step -- (start is 0, step is 10s, downsampling at 20s intervals)
expectedDataPoints = ((10 * 30) / 20)
require.Equal(t, expectedDataPoints, len(res.Series[0].Samples))

// with a larger selection range of 80s, we expect to eventually get up to 8 per datapoint
// our pushes are spaced 10s apart, downsampled every 20s, and there's 10s step,
// so we expect to see the value increase by 2 every samples, maxing out and staying at 8 after 5 samples
require.Equal(t, float64(1), res.Series[0].Samples[0].Value)
require.Equal(t, float64(3), res.Series[0].Samples[1].Value)
require.Equal(t, float64(5), res.Series[0].Samples[2].Value)
require.Equal(t, float64(7), res.Series[0].Samples[3].Value)
require.Equal(t, float64(8), res.Series[0].Samples[4].Value)
require.Equal(t, float64(8), res.Series[0].Samples[expectedDataPoints-1].Value)
})

t.Run("test bytes_over_time samples", func(t *testing.T) {
inst := setup(t)
err := inst.Push(context.Background(), &push.PushRequest{
Expand All @@ -202,6 +315,9 @@ func TestInstancePushQuerySamples(t *testing.T) {
},
},
})
require.NoError(t, err)

downsampleInstance(inst, 0)
for i := 1; i <= 30; i++ {
err = inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
Expand All @@ -217,8 +333,8 @@ func TestInstancePushQuerySamples(t *testing.T) {
},
})
require.NoError(t, err)
downsampleInstance(inst, int64(20*i))
}
require.NoError(t, err)

expr, err := syntax.ParseSampleExpr(`bytes_over_time({test="test"}[20s])`)
require.NoError(t, err)
Expand Down Expand Up @@ -343,6 +459,9 @@ func TestInstancePushQuerySamples(t *testing.T) {
},
},
})
require.NoError(t, err)
downsampleInstance(inst, 0)

for i := 1; i <= 30; i++ {
err = inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
Expand Down Expand Up @@ -397,8 +516,8 @@ func TestInstancePushQuerySamples(t *testing.T) {
},
})
require.NoError(t, err)
downsampleInstance(inst, int64(20*i))
}
require.NoError(t, err)

for _, tt := range []struct {
name string
Expand Down
12 changes: 12 additions & 0 deletions pkg/pattern/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ func TestInstance_QuerySample(t *testing.T) {
return instance
}

downsampleInstance := func(inst *instance, ts model.Time) {
_ = inst.streams.ForEach(func(s *stream) (bool, error) {
inst.streams.WithLock(func() {
s.Downsample(ts)
})
return true, nil
})
}

ctx := context.Background()

thirtySeconds := int64(30000)
Expand Down Expand Up @@ -85,6 +94,7 @@ func TestInstance_QuerySample(t *testing.T) {
},
})
require.NoError(t, err)
downsampleInstance(instance, model.Time(lastTsMilli))

// 5 min query range
// 1 min step
Expand Down Expand Up @@ -203,6 +213,7 @@ func TestInstance_QuerySample(t *testing.T) {
},
})
require.NoError(t, err)
downsampleInstance(instance, model.Time(then+oneMin+thirtySeconds))

err = instance.Push(ctx, &logproto.PushRequest{
Streams: []push.Stream{
Expand Down Expand Up @@ -245,6 +256,7 @@ func TestInstance_QuerySample(t *testing.T) {
},
})
require.NoError(t, err)
downsampleInstance(instance, model.Time(then+oneMin+oneMin+oneMin+thirtySeconds))

// steps
start := then
Expand Down
Loading

0 comments on commit 2c053ee

Please sign in to comment.