Skip to content

Commit

Permalink
feat: use aggregated metrics for volume requests
Browse files Browse the repository at this point in the history
Squashed commit of the following:

commit 4ea549a
Author: Trevor Whitney <[email protected]>
Date:   Tue Oct 8 13:18:35 2024 -0600

    chore: fix tests, add more requestion validation

commit 55b0aa1
Author: Trevor Whitney <[email protected]>
Date:   Mon Oct 7 14:28:21 2024 -0600

    chore: lint and format

commit c78c5fa
Author: Trevor Whitney <[email protected]>
Date:   Mon Oct 7 14:03:26 2024 -0600

    feat: add range support to agg metric volume queries

commit 93cc007
Author: Trevor Whitney <[email protected]>
Date:   Wed Oct 2 16:29:08 2024 -0600

    feat: aggregated metric volume queries

commit 1f30892
Author: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
Date:   Mon Oct 7 13:43:34 2024 -0400

    chore(deps): update dependency fluent-plugin-multi-format-parser to '~>1.1.0' (#14396)

    Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>

commit a05431f
Author: Callum Styan <[email protected]>
Date:   Mon Oct 7 10:01:28 2024 -0700

    fix: promtail config unmarshalling (#14408)

    Signed-off-by: Callum Styan <[email protected]>
  • Loading branch information
trevorwhitney committed Oct 10, 2024
1 parent ff6f3a1 commit 764f731
Show file tree
Hide file tree
Showing 14 changed files with 1,044 additions and 268 deletions.
2 changes: 1 addition & 1 deletion clients/cmd/fluentd/docker/Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
source 'https://rubygems.org'

gem 'fluentd', '1.15.3'
gem 'fluent-plugin-multi-format-parser', '~>1.0.0'
gem 'fluent-plugin-multi-format-parser', '~>1.1.0'
1 change: 0 additions & 1 deletion clients/pkg/promtail/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ type Config struct {

// UnmarshalYAML implements the yaml.Unmarshaler interface.
func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {
*c = Config{}
// We want to set c to the defaults and then overwrite it with the input.
// To make unmarshal fill the plain data struct rather than calling UnmarshalYAML
// again, we have to hide it using a type indirection.
Expand Down
72 changes: 42 additions & 30 deletions pkg/loghttp/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,12 +566,13 @@ func NewVolumeInstantQueryWithDefaults(matchers string) *logproto.VolumeRequest
}

type VolumeInstantQuery struct {
Start time.Time
End time.Time
Query string
Limit uint32
TargetLabels []string
AggregateBy string
Start time.Time
End time.Time
Query string
Limit uint32
TargetLabels []string
AggregateBy string
AggregatedMetrics bool
}

func ParseVolumeInstantQuery(r *http.Request) (*VolumeInstantQuery, error) {
Expand All @@ -585,16 +586,18 @@ func ParseVolumeInstantQuery(r *http.Request) (*VolumeInstantQuery, error) {
return nil, err
}

aggregateBy, err := volumeAggregateBy(r)
aggregatedMetrics := volumeAggregatedMetrics(r)
aggregateBy, err := volumeAggregateBy(r, aggregatedMetrics)
if err != nil {
return nil, err
}

svInstantQuery := VolumeInstantQuery{
Query: result.Query,
Limit: result.Limit,
TargetLabels: targetLabels(r),
AggregateBy: aggregateBy,
Query: result.Query,
Limit: result.Limit,
TargetLabels: targetLabels(r),
AggregateBy: aggregateBy,
AggregatedMetrics: aggregatedMetrics,
}

svInstantQuery.Start, svInstantQuery.End, err = bounds(r)
Expand All @@ -610,13 +613,14 @@ func ParseVolumeInstantQuery(r *http.Request) (*VolumeInstantQuery, error) {
}

type VolumeRangeQuery struct {
Start time.Time
End time.Time
Step time.Duration
Query string
Limit uint32
TargetLabels []string
AggregateBy string
Start time.Time
End time.Time
Step time.Duration
Query string
Limit uint32
TargetLabels []string
AggregateBy string
AggregatedMetrics bool
}

func ParseVolumeRangeQuery(r *http.Request) (*VolumeRangeQuery, error) {
Expand All @@ -630,19 +634,21 @@ func ParseVolumeRangeQuery(r *http.Request) (*VolumeRangeQuery, error) {
return nil, err
}

aggregateBy, err := volumeAggregateBy(r)
aggregatedMetrics := volumeAggregatedMetrics(r)
aggregateBy, err := volumeAggregateBy(r, aggregatedMetrics)
if err != nil {
return nil, err
}

return &VolumeRangeQuery{
Start: result.Start,
End: result.End,
Step: result.Step,
Query: result.Query,
Limit: result.Limit,
TargetLabels: targetLabels(r),
AggregateBy: aggregateBy,
Start: result.Start,
End: result.End,
Step: result.Step,
Query: result.Query,
Limit: result.Limit,
TargetLabels: targetLabels(r),
AggregateBy: aggregateBy,
AggregatedMetrics: aggregatedMetrics,
}, nil
}

Expand Down Expand Up @@ -722,15 +728,21 @@ func volumeLimit(r *http.Request) error {
return nil
}

func volumeAggregateBy(r *http.Request) (string, error) {
func volumeAggregateBy(r *http.Request, aggregatedMetrics bool) (string, error) {
l := r.Form.Get("aggregateBy")
if l == "" {
return seriesvolume.DefaultAggregateBy, nil
}

if seriesvolume.ValidateAggregateBy(l) {
return l, nil
err := seriesvolume.ValidateAggregateBy(l, aggregatedMetrics)
if err != nil {
return "", err
}

return "", errors.New("invalid aggregation option")
return l, nil
}

func volumeAggregatedMetrics(r *http.Request) bool {
l := r.Form.Get("aggregatedMetrics")
return l == "true"
}
95 changes: 74 additions & 21 deletions pkg/loghttp/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,13 +292,13 @@ func Test_QueryResponseUnmarshal(t *testing.T) {
}

func Test_ParseVolumeInstantQuery(t *testing.T) {
url := `?query={foo="bar"}` +
`&start=2017-06-10T21:42:24.760738998Z` +
`&end=2017-07-10T21:42:24.760738998Z` +
`&limit=1000` +
`&targetLabels=foo,bar`
req := &http.Request{
URL: mustParseURL(`?query={foo="bar"}` +
`&start=2017-06-10T21:42:24.760738998Z` +
`&end=2017-07-10T21:42:24.760738998Z` +
`&limit=1000` +
`&targetLabels=foo,bar`,
),
URL: mustParseURL(url),
}

err := req.ParseForm()
Expand All @@ -318,7 +318,7 @@ func Test_ParseVolumeInstantQuery(t *testing.T) {
require.Equal(t, expected, actual)

t.Run("aggregate by", func(t *testing.T) {
url := `?query={foo="bar"}` +
url = `?query={foo="bar"}` +
`&start=2017-06-10T21:42:24.760738998Z` +
`&end=2017-07-10T21:42:24.760738998Z` +
`&limit=1000` +
Expand Down Expand Up @@ -347,17 +347,47 @@ func Test_ParseVolumeInstantQuery(t *testing.T) {
require.EqualError(t, err, "invalid aggregation option")
})
})

t.Run("aggregated metrics", func(t *testing.T) {
req := &http.Request{URL: mustParseURL(url)}

err := req.ParseForm()
require.NoError(t, err)

actual, err = ParseVolumeInstantQuery(req)
require.NoError(t, err)

require.False(t, actual.AggregatedMetrics)

req = &http.Request{URL: mustParseURL(url + `&aggregatedMetrics=true`)}
err = req.ParseForm()
require.NoError(t, err)

actual, err = ParseVolumeInstantQuery(req)
require.NoError(t, err)

require.True(t, actual.AggregatedMetrics)

req = &http.Request{URL: mustParseURL(url + `&aggregatedMetrics=false`)}
err = req.ParseForm()
require.NoError(t, err)

actual, err = ParseVolumeInstantQuery(req)
require.NoError(t, err)

require.False(t, actual.AggregatedMetrics)
})
}

func Test_ParseVolumeRangeQuery(t *testing.T) {
url := `?query={foo="bar"}` +
`&start=2017-06-10T21:42:24.760738998Z` +
`&end=2017-07-10T21:42:24.760738998Z` +
`&limit=1000` +
`&step=3600` +
`&targetLabels=foo,bar`
req := &http.Request{
URL: mustParseURL(`?query={foo="bar"}` +
`&start=2017-06-10T21:42:24.760738998Z` +
`&end=2017-07-10T21:42:24.760738998Z` +
`&limit=1000` +
`&step=3600` +
`&targetLabels=foo,bar`,
),
URL: mustParseURL(url),
}

err := req.ParseForm()
Expand All @@ -378,13 +408,6 @@ func Test_ParseVolumeRangeQuery(t *testing.T) {
require.Equal(t, expected, actual)

t.Run("aggregate by", func(t *testing.T) {
url := `?query={foo="bar"}` +
`&start=2017-06-10T21:42:24.760738998Z` +
`&end=2017-07-10T21:42:24.760738998Z` +
`&limit=1000` +
`&step=3600` +
`&targetLabels=foo,bar`

t.Run("labels", func(t *testing.T) {
req := &http.Request{URL: mustParseURL(url + `&aggregateBy=labels`)}

Expand All @@ -406,5 +429,35 @@ func Test_ParseVolumeRangeQuery(t *testing.T) {
_, err = ParseVolumeRangeQuery(req)
require.EqualError(t, err, "invalid aggregation option")
})

t.Run("aggregated metrics", func(t *testing.T) {
req := &http.Request{URL: mustParseURL(url)}

err := req.ParseForm()
require.NoError(t, err)

actual, err = ParseVolumeRangeQuery(req)
require.NoError(t, err)

require.False(t, actual.AggregatedMetrics)

req = &http.Request{URL: mustParseURL(url + `&aggregatedMetrics=true`)}
err = req.ParseForm()
require.NoError(t, err)

actual, err = ParseVolumeRangeQuery(req)
require.NoError(t, err)

require.True(t, actual.AggregatedMetrics)

req = &http.Request{URL: mustParseURL(url + `&aggregatedMetrics=false`)}
err = req.ParseForm()
require.NoError(t, err)

actual, err = ParseVolumeRangeQuery(req)
require.NoError(t, err)

require.False(t, actual.AggregatedMetrics)
})
})
}
Loading

0 comments on commit 764f731

Please sign in to comment.