Skip to content

Commit

Permalink
chore: add service_name label earlier in the ingestion pipeline (#13702)
Browse files Browse the repository at this point in the history
  • Loading branch information
MasslessParticle authored Jul 30, 2024
1 parent e9a9c60 commit 573a184
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 147 deletions.
16 changes: 0 additions & 16 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ const (

ringAutoForgetUnhealthyPeriods = 2

labelServiceName = "service_name"
serviceUnknown = "unknown_service"
levelLabel = "detected_level"
logLevelDebug = "debug"
logLevelInfo = "info"
Expand Down Expand Up @@ -789,20 +787,6 @@ func (d *Distributor) parseStreamLabels(vContext validationContext, key string,
return nil, "", 0, err
}

// We do not want to count service_name added by us in the stream limit so adding it after validating original labels.
if !ls.Has(labelServiceName) && len(vContext.discoverServiceName) > 0 {
serviceName := serviceUnknown
for _, labelName := range vContext.discoverServiceName {
if labelVal := ls.Get(labelName); labelVal != "" {
serviceName = labelVal
break
}
}

ls = labels.NewBuilder(ls).Set(labelServiceName, serviceName).Labels()
stream.Labels = ls.String()
}

lsHash := ls.Hash()

d.labelCache.Add(key, labelData{ls, lsHash})
Expand Down
125 changes: 4 additions & 121 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ func TestDistributor(t *testing.T) {
t.Run(fmt.Sprintf("[%d](lines=%v)", i, tc.lines), func(t *testing.T) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.DiscoverServiceName = nil
limits.IngestionRateMB = ingestionRateLimit
limits.IngestionBurstSizeMB = ingestionRateLimit
limits.MaxLineSize = fe.ByteSize(tc.maxLineSize)
Expand Down Expand Up @@ -140,20 +139,17 @@ func TestDistributor(t *testing.T) {
func Test_IncrementTimestamp(t *testing.T) {
incrementingDisabled := &validation.Limits{}
flagext.DefaultValues(incrementingDisabled)
incrementingDisabled.DiscoverServiceName = nil
incrementingDisabled.RejectOldSamples = false
incrementingDisabled.DiscoverLogLevels = false

incrementingEnabled := &validation.Limits{}
flagext.DefaultValues(incrementingEnabled)
incrementingEnabled.DiscoverServiceName = nil
incrementingEnabled.RejectOldSamples = false
incrementingEnabled.IncrementDuplicateTimestamp = true
incrementingEnabled.DiscoverLogLevels = false

defaultLimits := &validation.Limits{}
flagext.DefaultValues(defaultLimits)
now := time.Now()
defaultLimits.DiscoverLogLevels = false

tests := map[string]struct {
Expand Down Expand Up @@ -401,34 +397,6 @@ func Test_IncrementTimestamp(t *testing.T) {
},
},
},
"default limit adding service_name label": {
limits: defaultLimits,
push: &logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: "{job=\"foo\"}",
Entries: []logproto.Entry{
{Timestamp: now.Add(-2 * time.Second), Line: "hey1"},
{Timestamp: now.Add(-time.Second), Line: "hey2"},
{Timestamp: now, Line: "hey3"},
},
},
},
},
expectedPush: &logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: "{job=\"foo\", service_name=\"foo\"}",
Hash: 0x86ca305b6d86e8b0,
Entries: []logproto.Entry{
{Timestamp: now.Add(-2 * time.Second), Line: "hey1"},
{Timestamp: now.Add(-time.Second), Line: "hey2"},
{Timestamp: now, Line: "hey3"},
},
},
},
},
},
}

for testName, testData := range tests {
Expand All @@ -448,7 +416,6 @@ func Test_IncrementTimestamp(t *testing.T) {
func TestDistributorPushConcurrently(t *testing.T) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.DiscoverServiceName = nil

distributors, ingesters := prepare(t, 1, 5, limits, nil)

Expand Down Expand Up @@ -552,20 +519,6 @@ func Test_SortLabelsOnPush(t *testing.T) {
topVal := ingester.Peek()
require.Equal(t, `{a="b", buzz="f", service_name="foo"}`, topVal.Streams[0].Labels)
})

t.Run("with service_name added during ingestion", func(t *testing.T) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
ingester := &mockIngester{}
distributors, _ := prepare(t, 1, 5, limits, func(addr string) (ring_client.PoolClient, error) { return ingester, nil })

request := makeWriteRequest(10, 10)
request.Streams[0].Labels = `{buzz="f", x="y", a="b"}`
_, err := distributors[0].Push(ctx, request)
require.NoError(t, err)
topVal := ingester.Peek()
require.Equal(t, `{a="b", buzz="f", service_name="unknown_service", x="y"}`, topVal.Streams[0].Labels)
})
}

func Test_TruncateLogLines(t *testing.T) {
Expand Down Expand Up @@ -603,7 +556,7 @@ func Test_DiscardEmptyStreamsAfterValidation(t *testing.T) {
distributors, _ := prepare(t, 1, 5, limits, func(addr string) (ring_client.PoolClient, error) { return ingester, nil })

_, err := distributors[0].Push(ctx, makeWriteRequest(1, 10))
require.Equal(t, err, httpgrpc.Errorf(http.StatusBadRequest, fmt.Sprintf(validation.LineTooLongErrorMsg, 5, "{foo=\"bar\", service_name=\"unknown_service\"}", 10)))
require.Equal(t, err, httpgrpc.Errorf(http.StatusBadRequest, fmt.Sprintf(validation.LineTooLongErrorMsg, 5, "{foo=\"bar\"}", 10)))
topVal := ingester.Peek()
require.Nil(t, topVal)
})
Expand Down Expand Up @@ -885,53 +838,9 @@ func TestParseStreamLabels(t *testing.T) {
expectedErr error
generateLimits func() *validation.Limits
}{
{
name: "service name label mapping disabled",
generateLimits: func() *validation.Limits {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.DiscoverServiceName = nil
return limits
},
origLabels: `{foo="bar"}`,
expectedLabels: labels.Labels{
{
Name: "foo",
Value: "bar",
},
},
},
{
name: "no labels defined - service name label mapping disabled",
generateLimits: func() *validation.Limits {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.DiscoverServiceName = nil
return limits
},
origLabels: `{}`,
expectedErr: fmt.Errorf(validation.MissingLabelsErrorMsg),
},
{
name: "service name label enabled",
origLabels: `{foo="bar"}`,
generateLimits: func() *validation.Limits {
return defaultLimit
},
expectedLabels: labels.Labels{
{
Name: "foo",
Value: "bar",
},
{
Name: labelServiceName,
Value: serviceUnknown,
},
},
},
{
name: "service name label should not get counted against max labels count",
origLabels: `{foo="bar"}`,
origLabels: `{foo="bar", service_name="unknown_service"}`,
generateLimits: func() *validation.Limits {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
Expand All @@ -944,33 +853,8 @@ func TestParseStreamLabels(t *testing.T) {
Value: "bar",
},
{
Name: labelServiceName,
Value: serviceUnknown,
},
},
},
{
name: "use label service as service name",
origLabels: `{container="nginx", foo="bar", service="auth"}`,
generateLimits: func() *validation.Limits {
return defaultLimit
},
expectedLabels: labels.Labels{
{
Name: "container",
Value: "nginx",
},
{
Name: "foo",
Value: "bar",
},
{
Name: "service",
Value: "auth",
},
{
Name: labelServiceName,
Value: "auth",
Name: loghttp_push.LabelServiceName,
Value: loghttp_push.ServiceUnknown,
},
},
},
Expand Down Expand Up @@ -1562,7 +1446,6 @@ func Test_DetectLogLevels(t *testing.T) {
flagext.DefaultValues(limits)

limits.DiscoverLogLevels = discoverLogLevels
limits.DiscoverServiceName = nil
limits.AllowStructuredMetadata = true
return limits, &mockIngester{}
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/distributor/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,14 @@ func (v Validator) ValidateLabels(ctx validationContext, ls labels.Labels, strea
validation.DiscardedSamples.WithLabelValues(validation.MissingLabels, ctx.userID).Inc()
return fmt.Errorf(validation.MissingLabelsErrorMsg)
}

numLabelNames := len(ls)
// This is a special case that's often added by the Loki infrastructure. It may result in allowing one extra label
// if incoming requests already have a service_name
if ls.Has(push.LabelServiceName) {
numLabelNames--
}

if numLabelNames > ctx.maxLabelNamesPerSeries {
updateMetrics(validation.MaxLabelNamesPerSeries, ctx.userID, stream)
return fmt.Errorf(validation.MaxLabelNamesPerSeriesErrorMsg, stream.Labels, numLabelNames, ctx.maxLabelNamesPerSeries)
Expand Down
47 changes: 38 additions & 9 deletions pkg/loghttp/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"net/http"
"time"

"github.com/grafana/loki/v3/pkg/logql/syntax"

"github.com/go-kit/log/level"

"github.com/grafana/loki/pkg/push"
Expand All @@ -25,7 +27,6 @@ import (
"github.com/grafana/loki/v3/pkg/analytics"
"github.com/grafana/loki/v3/pkg/loghttp"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/util"
"github.com/grafana/loki/v3/pkg/util/constants"
"github.com/grafana/loki/v3/pkg/util/unmarshal"
Expand Down Expand Up @@ -57,14 +58,19 @@ var (
linesReceivedStats = analytics.NewCounter("distributor_lines_received")
)

const applicationJSON = "application/json"
const (
applicationJSON = "application/json"
LabelServiceName = "service_name"
ServiceUnknown = "unknown_service"
)

type TenantsRetention interface {
RetentionPeriodFor(userID string, lbs labels.Labels) time.Duration
}

type Limits interface {
OTLPConfig(userID string) OTLPConfig
DiscoverServiceName(userID string) []string
}

type EmptyLimits struct{}
Expand All @@ -73,6 +79,10 @@ func (EmptyLimits) OTLPConfig(string) OTLPConfig {
return DefaultOTLPConfig(GlobalOTLPConfig{})
}

func (EmptyLimits) DiscoverServiceName(string) []string {
return nil
}

type RequestParser func(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker) (*logproto.PushRequest, *Stats, error)
type RequestParserWrapper func(inner RequestParser) RequestParser

Expand Down Expand Up @@ -148,7 +158,7 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete
return req, nil
}

func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, _ Limits, tracker UsageTracker) (*logproto.PushRequest, *Stats, error) {
func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker) (*logproto.PushRequest, *Stats, error) {
// Body
var body io.Reader
// bodySize should always reflect the compressed size of the request body
Expand Down Expand Up @@ -217,16 +227,33 @@ func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRe
pushStats.ContentType = contentType
pushStats.ContentEncoding = contentEncoding

for _, s := range req.Streams {
discoverServiceName := limits.DiscoverServiceName(userID)
for i := range req.Streams {
s := req.Streams[i]
pushStats.StreamLabelsSize += int64(len(s.Labels))

var lbs labels.Labels
if tenantsRetention != nil || tracker != nil {
lbs, err = syntax.ParseLabels(s.Labels)
if err != nil {
return nil, nil, fmt.Errorf("couldn't parse labels: %w", err)
lbs, err := syntax.ParseLabels(s.Labels)
if err != nil {
return nil, nil, fmt.Errorf("couldn't parse labels: %w", err)
}

if !lbs.Has(LabelServiceName) && len(discoverServiceName) > 0 {
serviceName := ServiceUnknown
for _, labelName := range discoverServiceName {
if labelVal := lbs.Get(labelName); labelVal != "" {
serviceName = labelVal
break
}
}

lb := labels.NewBuilder(lbs)
lbs = lb.Set(LabelServiceName, serviceName).Labels()
s.Labels = lbs.String()

// Remove the added label after it's added to the stream so it's not consumed by subsequent steps
lbs = lb.Del(LabelServiceName).Labels()
}

var retentionPeriod time.Duration
if tenantsRetention != nil {
retentionPeriod = tenantsRetention.RetentionPeriodFor(userID, lbs)
Expand All @@ -249,6 +276,8 @@ func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRe
pushStats.MostRecentEntryTimestamp = e.Timestamp
}
}

req.Streams[i] = s
}

return &req, pushStats, nil
Expand Down
Loading

0 comments on commit 573a184

Please sign in to comment.