Skip to content

Commit

Permalink
fix: reduce memory pressure of including structured metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
trevorwhitney committed Aug 13, 2024
1 parent ca058b7 commit 64326ca
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 110 deletions.
52 changes: 26 additions & 26 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ const (

LevelLabel = "detected_level"
LogLevelUnknown = "unknown"
logLevelDebug = "debug"
logLevelInfo = "info"
logLevelWarn = "warn"
logLevelError = "error"
logLevelFatal = "fatal"
logLevelCritical = "critical"
logLevelTrace = "trace"
LogLevelDebug = "debug"
LogLevelInfo = "info"
LogLevelWarn = "warn"
LogLevelError = "error"
LogLevelFatal = "fatal"
LogLevelCritical = "critical"
LogLevelTrace = "trace"
)

var (
Expand Down Expand Up @@ -887,22 +887,22 @@ func DetectLogLevelFromLogEntry(entry logproto.Entry, structuredMetadata labels.
if otlpSeverityNumberTxt := structuredMetadata.Get(push.OTLPSeverityNumber); otlpSeverityNumberTxt != "" {
otlpSeverityNumber, err := strconv.Atoi(otlpSeverityNumberTxt)
if err != nil {
return logLevelInfo
return LogLevelInfo
}
if otlpSeverityNumber == int(plog.SeverityNumberUnspecified) {
return LogLevelUnknown
} else if otlpSeverityNumber <= int(plog.SeverityNumberTrace4) {
return logLevelTrace
return LogLevelTrace
} else if otlpSeverityNumber <= int(plog.SeverityNumberDebug4) {
return logLevelDebug
return LogLevelDebug
} else if otlpSeverityNumber <= int(plog.SeverityNumberInfo4) {
return logLevelInfo
return LogLevelInfo
} else if otlpSeverityNumber <= int(plog.SeverityNumberWarn4) {
return logLevelWarn
return LogLevelWarn
} else if otlpSeverityNumber <= int(plog.SeverityNumberError4) {
return logLevelError
return LogLevelError
} else if otlpSeverityNumber <= int(plog.SeverityNumberFatal4) {
return logLevelFatal
return LogLevelFatal
}
return LogLevelUnknown
}
Expand All @@ -921,19 +921,19 @@ func extractLogLevelFromLogLine(log string) string {

switch {
case bytes.EqualFold(v, []byte("trace")), bytes.EqualFold(v, []byte("trc")):
return logLevelTrace
return LogLevelTrace
case bytes.EqualFold(v, []byte("debug")), bytes.EqualFold(v, []byte("dbg")):
return logLevelDebug
return LogLevelDebug
case bytes.EqualFold(v, []byte("info")), bytes.EqualFold(v, []byte("inf")):
return logLevelInfo
return LogLevelInfo
case bytes.EqualFold(v, []byte("warn")), bytes.EqualFold(v, []byte("wrn")):
return logLevelWarn
return LogLevelWarn
case bytes.EqualFold(v, []byte("error")), bytes.EqualFold(v, []byte("err")):
return logLevelError
return LogLevelError
case bytes.EqualFold(v, []byte("critical")):
return logLevelCritical
return LogLevelCritical
case bytes.EqualFold(v, []byte("fatal")):
return logLevelFatal
return LogLevelFatal
default:
return detectLevelFromLogLine(log)
}
Expand Down Expand Up @@ -988,21 +988,21 @@ func isJSON(line string) bool {
func detectLevelFromLogLine(log string) string {
if strings.Contains(log, "info:") || strings.Contains(log, "INFO:") ||
strings.Contains(log, "info") || strings.Contains(log, "INFO") {
return logLevelInfo
return LogLevelInfo
}
if strings.Contains(log, "err:") || strings.Contains(log, "ERR:") ||
strings.Contains(log, "error") || strings.Contains(log, "ERROR") {
return logLevelError
return LogLevelError
}
if strings.Contains(log, "warn:") || strings.Contains(log, "WARN:") ||
strings.Contains(log, "warning") || strings.Contains(log, "WARNING") {
return logLevelWarn
return LogLevelWarn
}
if strings.Contains(log, "CRITICAL:") || strings.Contains(log, "critical:") {
return logLevelCritical
return LogLevelCritical
}
if strings.Contains(log, "debug:") || strings.Contains(log, "DEBUG:") {
return logLevelDebug
return LogLevelDebug
}
return LogLevelUnknown
}
23 changes: 13 additions & 10 deletions pkg/pattern/aggregation/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/v3/pkg/loghttp/push"
"github.com/grafana/loki/pkg/push"
loghttp_push "github.com/grafana/loki/v3/pkg/loghttp/push"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/util/build"
Expand All @@ -41,7 +42,7 @@ var defaultUserAgent = fmt.Sprintf("pattern-ingester-push/%s", build.GetVersion(
type EntryWriter interface {
// WriteEntry handles sending the log to the output
// To maintain consistent log timing, Write is expected to be non-blocking
WriteEntry(ts time.Time, entry string, lbls labels.Labels)
WriteEntry(ts time.Time, entry string, structuredMetadata, lbls labels.Labels)
Stop()
}

Expand Down Expand Up @@ -72,9 +73,10 @@ type Push struct {
}

type entry struct {
ts time.Time
entry string
labels labels.Labels
ts time.Time
entry string
structuredMetadata push.LabelsAdapter
labels labels.Labels
}

type entries struct {
Expand Down Expand Up @@ -145,8 +147,8 @@ func NewPush(
}

// WriteEntry implements EntryWriter
func (p *Push) WriteEntry(ts time.Time, e string, lbls labels.Labels) {
p.entries.add(entry{ts: ts, entry: e, labels: lbls})
func (p *Push) WriteEntry(ts time.Time, e string, structuredMetadata, lbls labels.Labels) {
p.entries.add(entry{ts: ts, entry: e, structuredMetadata: logproto.FromLabelsToLabelAdapters(structuredMetadata), labels: lbls})
}

// Stop will cancel any ongoing requests and stop the goroutine listening for requests
Expand All @@ -171,8 +173,9 @@ func (p *Push) buildPayload() ([]byte, error) {
}

entries = append(entries, logproto.Entry{
Timestamp: e.ts,
Line: e.entry,
Timestamp: e.ts,
Line: e.entry,
StructuredMetadata: e.structuredMetadata,
})
entriesByStream[stream] = entries
}
Expand Down Expand Up @@ -319,7 +322,7 @@ func AggregatedMetricEntry(
ts.UnixNano(),
byteString,
totalCount,
push.LabelServiceName, service,
loghttp_push.LabelServiceName, service,
)

for _, l := range lbls {
Expand Down
56 changes: 28 additions & 28 deletions pkg/pattern/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ func TestInstancePushAggregateMetrics(t *testing.T) {
labels.Label{Name: "level", Value: "error"},
labels.Label{Name: "service_name", Value: "baz_service"},
)
lbs3String := lbs3WithLevel.String()

setup := func() (*instance, *mockEntryWriter) {
ingesterID := "foo"
Expand All @@ -148,7 +147,7 @@ func TestInstancePushAggregateMetrics(t *testing.T) {
}

mockWriter := &mockEntryWriter{}
mockWriter.On("WriteEntry", mock.Anything, mock.Anything, mock.Anything)
mockWriter.On("WriteEntry", mock.Anything, mock.Anything, mock.Anything, mock.Anything)

inst, err := newInstance(
"foo",
Expand Down Expand Up @@ -190,10 +189,6 @@ func TestInstancePushAggregateMetrics(t *testing.T) {
Name: distributor.LevelLabel,
Value: "debug",
},
{
Name: "level",
Value: "debug",
},
},
},
},
Expand All @@ -203,7 +198,7 @@ func TestInstancePushAggregateMetrics(t *testing.T) {
Entries: []push.Entry{
{
Timestamp: time.Unix(20, 0),
Line: "error error error",
Line: "level=error msg=error",
},
},
},
Expand Down Expand Up @@ -239,10 +234,6 @@ func TestInstancePushAggregateMetrics(t *testing.T) {
Name: distributor.LevelLabel,
Value: "debug",
},
{
Name: "level",
Value: "debug",
},
},
},
},
Expand All @@ -259,27 +250,27 @@ func TestInstancePushAggregateMetrics(t *testing.T) {
t.Run("correctly detects level", func(t *testing.T) {
inst, _ := setup()

require.Len(t, inst.aggMetricsByStream, 3)
require.Len(t, inst.aggByStream, 3)

for stream := range inst.aggMetricsByStream {
if stream != lbsWithPod.String() && stream != lbs2.String() && stream != lbs3String {
require.Fail(t, fmt.Sprintf("stream %s should not be present", stream))
for stream := range inst.aggByStream {
if stream != lbsWithPod.Hash() && stream != lbs2.Hash() && stream != lbs3WithLevel.Hash() {
require.Fail(t, fmt.Sprintf("stream hash %d should not be present", stream))
}
}
})

t.Run("accumulates bytes and count on every push", func(t *testing.T) {
inst, _ := setup()

require.Len(t, inst.aggMetricsByStream, 3)
require.Len(t, inst.aggByStream, 3)

require.Equal(t, uint64(14+(15*30)), inst.aggMetricsByStream[lbsWithPod.String()].bytes)
require.Equal(t, uint64(14+(15*30)), inst.aggMetricsByStream[lbs2.String()].bytes)
require.Equal(t, uint64(17), inst.aggMetricsByStream[lbs3String].bytes)
require.Equal(t, uint64(14+(15*30)), inst.aggByStream[lbsWithPod.Hash()].bytes)
require.Equal(t, uint64(14+(15*30)), inst.aggByStream[lbs2.Hash()].bytes)
require.Equal(t, uint64(21), inst.aggByStream[lbs3WithLevel.Hash()].bytes)

require.Equal(t, uint64(31), inst.aggMetricsByStream[lbsWithPod.String()].count)
require.Equal(t, uint64(31), inst.aggMetricsByStream[lbs2.String()].count)
require.Equal(t, uint64(1), inst.aggMetricsByStream[lbs3String].count)
require.Equal(t, uint64(31), inst.aggByStream[lbsWithPod.Hash()].count)
require.Equal(t, uint64(31), inst.aggByStream[lbs2.Hash()].count)
require.Equal(t, uint64(1), inst.aggByStream[lbs3WithLevel.Hash()].count)
})

t.Run("downsamples aggregated metrics", func(t *testing.T) {
Expand All @@ -296,7 +287,10 @@ func TestInstancePushAggregateMetrics(t *testing.T) {
uint64(14+(15*30)),
uint64(31),
"test_service",
lbsWithPod,
lbs,
),
labels.New(
labels.Label{Name: "pod", Value: "pea"},
),
labels.New(
labels.Label{Name: loghttp_push.AggregatedMetricLabel, Value: "test_service"},
Expand All @@ -315,6 +309,7 @@ func TestInstancePushAggregateMetrics(t *testing.T) {
"foo_service",
lbs2,
),
labels.New(),
labels.New(
labels.Label{Name: loghttp_push.AggregatedMetricLabel, Value: "foo_service"},
labels.Label{Name: "level", Value: "error"},
Expand All @@ -327,27 +322,32 @@ func TestInstancePushAggregateMetrics(t *testing.T) {
now.Time(),
aggregation.AggregatedMetricEntry(
now,
uint64(17),
uint64(21),
uint64(1),
"baz_service",
lbs3WithLevel,
lbs3,
),
labels.New(),
labels.New(
labels.Label{Name: loghttp_push.AggregatedMetricLabel, Value: "baz_service"},
labels.Label{Name: "level", Value: "error"},
),
)

require.Equal(t, 0, len(inst.aggMetricsByStream))
require.Equal(t, 0, len(inst.aggByStream))
})
}

type mockEntryWriter struct {
mock.Mock
}

func (m *mockEntryWriter) WriteEntry(ts time.Time, entry string, lbls labels.Labels) {
_ = m.Called(ts, entry, lbls)
func (m *mockEntryWriter) WriteEntry(
ts time.Time,
entry string,
structuredMetadata, lbls labels.Labels,
) {
_ = m.Called(ts, entry, structuredMetadata, lbls)
}

func (m *mockEntryWriter) Stop() {
Expand Down
Loading

0 comments on commit 64326ca

Please sign in to comment.