Skip to content

Commit

Permalink
feat: move metric aggregation to a per-tenant config (#14709)
Browse files Browse the repository at this point in the history
  • Loading branch information
trevorwhitney authored Nov 1, 2024
1 parent 2f41584 commit c1fde26
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 43 deletions.
11 changes: 7 additions & 4 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -359,10 +359,6 @@ pattern_ingester:
# Configures the metric aggregation and storage behavior of the pattern
# ingester.
metric_aggregation:
# Whether the pattern ingester metric aggregation is enabled.
# CLI flag: -pattern-ingester.metric-aggregation.enabled
[enabled: <boolean> | default = false]

# How often to downsample metrics from raw push observations.
# CLI flag: -pattern-ingester.metric-aggregation.downsample-period
[downsample_period: <duration> | default = 10s]
Expand Down Expand Up @@ -3845,6 +3841,13 @@ otlp_config:
# CLI flag: -limits.ingestion-partition-tenant-shard-size
[ingestion_partitions_tenant_shard_size: <int> | default = 0]

# Enable metric aggregation. When enabled, pushed streams will be sampled for
# bytes and count, and these metric will be written back into Loki as a special
# __aggregated_metric__ stream, which can be queried for faster histogram
# queries.
# CLI flag: -limits.metric-aggregation-enabled
[metric_aggregation_enabled: <boolean> | default = false]

# S3 server-side encryption type. Required to enable server-side encryption
# overrides for a specific tenant. If not set, the default S3 client settings
# are used.
Expand Down
1 change: 1 addition & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,7 @@ func (t *Loki) initPatternIngesterTee() (services.Service, error) {

svc, err := pattern.NewTeeService(
t.Cfg.Pattern,
t.Overrides,
t.PatternRingClient,
t.Cfg.MetricsNamespace,
prometheus.DefaultRegisterer,
Expand Down
12 changes: 4 additions & 8 deletions pkg/pattern/aggregation/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
)

type Config struct {
// TODO(twhitney): This needs to be a per-tenant config
Enabled bool `yaml:"enabled,omitempty" doc:"description=Whether the pattern ingester metric aggregation is enabled."`
DownsamplePeriod time.Duration `yaml:"downsample_period"`
LokiAddr string `yaml:"loki_address,omitempty" doc:"description=The address of the Loki instance to push aggregated metrics to."`
WriteTimeout time.Duration `yaml:"timeout,omitempty" doc:"description=The timeout for writing to Loki."`
Expand All @@ -27,12 +25,6 @@ func (cfg *Config) RegisterFlags(fs *flag.FlagSet) {
}

func (cfg *Config) RegisterFlagsWithPrefix(fs *flag.FlagSet, prefix string) {
fs.BoolVar(
&cfg.Enabled,
prefix+"metric-aggregation.enabled",
false,
"Flag to enable or disable metric aggregation.",
)
fs.DurationVar(
&cfg.DownsamplePeriod,
prefix+"metric-aggregation.downsample-period",
Expand Down Expand Up @@ -105,3 +97,7 @@ func (s *secretValue) Set(val string) error {
func (s *secretValue) Get() any { return string(*s) }

func (s *secretValue) String() string { return string(*s) }

type Limits interface {
MetricAggregationEnabled(userID string) bool
}
42 changes: 17 additions & 25 deletions pkg/pattern/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func (cfg *Config) Validate() error {

type Limits interface {
drain.Limits
aggregation.Limits
}

type Ingester struct {
Expand Down Expand Up @@ -294,29 +295,18 @@ func (i *Ingester) loop() {
flushTicker := util.NewTickerWithJitter(i.cfg.FlushCheckPeriod, j)
defer flushTicker.Stop()

if i.cfg.MetricAggregation.Enabled {
downsampleTicker := time.NewTimer(i.cfg.MetricAggregation.DownsamplePeriod)
defer downsampleTicker.Stop()
for {
select {
case <-flushTicker.C:
i.sweepUsers(false, true)
case t := <-downsampleTicker.C:
downsampleTicker.Reset(i.cfg.MetricAggregation.DownsamplePeriod)
now := model.TimeFromUnixNano(t.UnixNano())
i.downsampleMetrics(now)
case <-i.loopQuit:
return
}
}
} else {
for {
select {
case <-flushTicker.C:
i.sweepUsers(false, true)
case <-i.loopQuit:
return
}
downsampleTicker := time.NewTimer(i.cfg.MetricAggregation.DownsamplePeriod)
defer downsampleTicker.Stop()
for {
select {
case <-flushTicker.C:
i.sweepUsers(false, true)
case t := <-downsampleTicker.C:
downsampleTicker.Reset(i.cfg.MetricAggregation.DownsamplePeriod)
now := model.TimeFromUnixNano(t.UnixNano())
i.downsampleMetrics(now)
case <-i.loopQuit:
return
}
}
}
Expand Down Expand Up @@ -401,7 +391,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { /
var writer aggregation.EntryWriter

aggCfg := i.cfg.MetricAggregation
if aggCfg.Enabled {
if i.limits.MetricAggregationEnabled(instanceID) {
writer, err = aggregation.NewPush(
aggCfg.LokiAddr,
instanceID,
Expand Down Expand Up @@ -469,6 +459,8 @@ func (i *Ingester) downsampleMetrics(ts model.Time) {
instances := i.getInstances()

for _, instance := range instances {
instance.Downsample(ts)
if i.limits.MetricAggregationEnabled(instance.instanceID) {
instance.Downsample(ts)
}
}
}
5 changes: 5 additions & 0 deletions pkg/pattern/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,13 @@ func (m *mockEntryWriter) Stop() {

type fakeLimits struct {
Limits
metricAggregationEnabled bool
}

func (f *fakeLimits) PatternIngesterTokenizableJSONFields(_ string) []string {
return []string{"log", "message", "msg", "msg_", "_msg", "content"}
}

func (f *fakeLimits) MetricAggregationEnabled(_ string) bool {
return f.metricAggregationEnabled
}
12 changes: 8 additions & 4 deletions pkg/pattern/tee_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

type TeeService struct {
cfg Config
limits Limits
logger log.Logger
ringClient RingClient
wg *sync.WaitGroup
Expand All @@ -48,6 +49,7 @@ type TeeService struct {

func NewTeeService(
cfg Config,
limits Limits,
ringClient RingClient,
metricsNamespace string,
registerer prometheus.Registerer,
Expand Down Expand Up @@ -83,6 +85,7 @@ func NewTeeService(
),
),
cfg: cfg,
limits: limits,
ringClient: ringClient,

wg: &sync.WaitGroup{},
Expand Down Expand Up @@ -317,14 +320,15 @@ func (ts *TeeService) sendBatch(ctx context.Context, clientRequest clientRequest
ts.ingesterAppends.WithLabelValues(clientRequest.ingesterAddr, "fail").Inc()
level.Error(ts.logger).Log("msg", "failed to send patterns to pattern ingester", "err", err)

if !ts.cfg.MetricAggregation.Enabled {
return err
}

// Pattern ingesters serve 2 functions, processing patterns and aggregating metrics.
// Only owned streams are processed for patterns, however any pattern ingester can
// aggregate metrics for any stream. Therefore, if we can't send the owned stream,
// try to forward request to any pattern ingester so we at least capture the metrics.

if !ts.limits.MetricAggregationEnabled(clientRequest.tenant) {
return err
}

replicationSet, err := ts.ringClient.Ring().
GetReplicationSetForOperation(ring.WriteNoExtend)
if err != nil || len(replicationSet.Instances) == 0 {
Expand Down
3 changes: 3 additions & 0 deletions pkg/pattern/tee_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ func getTestTee(t *testing.T) (*TeeService, *mockPoolClient) {

logsTee, err := NewTeeService(
cfg,
&fakeLimits{
metricAggregationEnabled: true,
},
ringClient,
"test",
nil,
Expand Down
16 changes: 14 additions & 2 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,9 @@ type Limits struct {
IngestionPartitionsTenantShardSize int `yaml:"ingestion_partitions_tenant_shard_size" json:"ingestion_partitions_tenant_shard_size" category:"experimental"`

PatternIngesterTokenizableJSONFieldsDefault dskit_flagext.StringSliceCSV `yaml:"pattern_ingester_tokenizable_json_fields_default" json:"pattern_ingester_tokenizable_json_fields_default" doc:"hidden"`
PatternIngesterTokenizableJSONFieldsAppend dskit_flagext.StringSliceCSV `yaml:"pattern_ingester_tokenizable_json_fields_append" json:"pattern_ingester_tokenizable_json_fields_append" doc:"hidden"`
PatternIngesterTokenizableJSONFieldsDelete dskit_flagext.StringSliceCSV `yaml:"pattern_ingester_tokenizable_json_fields_delete" json:"pattern_ingester_tokenizable_json_fields_delete" doc:"hidden"`
PatternIngesterTokenizableJSONFieldsAppend dskit_flagext.StringSliceCSV `yaml:"pattern_ingester_tokenizable_json_fields_append" json:"pattern_ingester_tokenizable_json_fields_append" doc:"hidden"`
PatternIngesterTokenizableJSONFieldsDelete dskit_flagext.StringSliceCSV `yaml:"pattern_ingester_tokenizable_json_fields_delete" json:"pattern_ingester_tokenizable_json_fields_delete" doc:"hidden"`
MetricAggregationEnabled bool `yaml:"metric_aggregation_enabled" json:"metric_aggregation_enabled"`

// This config doesn't have a CLI flag registered here because they're registered in
// their own original config struct.
Expand Down Expand Up @@ -438,6 +439,13 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.Var(&l.PatternIngesterTokenizableJSONFieldsDefault, "limits.pattern-ingester-tokenizable-json-fields", "List of JSON fields that should be tokenized in the pattern ingester.")
f.Var(&l.PatternIngesterTokenizableJSONFieldsAppend, "limits.pattern-ingester-tokenizable-json-fields-append", "List of JSON fields that should be appended to the default list of tokenizable fields in the pattern ingester.")
f.Var(&l.PatternIngesterTokenizableJSONFieldsDelete, "limits.pattern-ingester-tokenizable-json-fields-delete", "List of JSON fields that should be deleted from the (default U append) list of tokenizable fields in the pattern ingester.")

f.BoolVar(
&l.MetricAggregationEnabled,
"limits.metric-aggregation-enabled",
false,
"Enable metric aggregation. When enabled, pushed streams will be sampled for bytes and count, and these metric will be written back into Loki as a special __aggregated_metric__ stream, which can be queried for faster histogram queries.",
)
}

// SetGlobalOTLPConfig set GlobalOTLPConfig which is used while unmarshaling per-tenant otlp config to use the default list of resource attributes picked as index labels.
Expand Down Expand Up @@ -1113,6 +1121,10 @@ func (o *Overrides) PatternIngesterTokenizableJSONFieldsDelete(userID string) []
return o.getOverridesForUser(userID).PatternIngesterTokenizableJSONFieldsDelete
}

func (o *Overrides) MetricAggregationEnabled(userID string) bool {
return o.getOverridesForUser(userID).MetricAggregationEnabled
}

// S3SSEType returns the per-tenant S3 SSE type.
func (o *Overrides) S3SSEType(user string) string {
return o.getOverridesForUser(user).S3SSEType
Expand Down
33 changes: 33 additions & 0 deletions pkg/validation/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,3 +414,36 @@ pattern_ingester_tokenizable_json_fields_delete: body
})
}
}

func Test_MetricAggregationEnabled(t *testing.T) {
for _, tc := range []struct {
name string
yaml string
expected bool
}{
{
name: "when true",
yaml: `
metric_aggregation_enabled: true
`,
expected: true,
},
{
name: "when false",
yaml: `
metric_aggregation_enabled: false
`,
expected: false,
},
} {
t.Run(tc.name, func(t *testing.T) {
overrides := Overrides{
defaultLimits: &Limits{},
}
require.NoError(t, yaml.Unmarshal([]byte(tc.yaml), overrides.defaultLimits))

actual := overrides.MetricAggregationEnabled("fake")
require.Equal(t, tc.expected, actual)
})
}
}

0 comments on commit c1fde26

Please sign in to comment.