Skip to content

Commit

Permalink
feat: ability to log stream selectors before service name detection (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
trevorwhitney authored Oct 2, 2024
1 parent 41fafd8 commit d7ff426
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 18 deletions.
2 changes: 1 addition & 1 deletion clients/pkg/promtail/targets/lokipush/pushtarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (t *PushTarget) run() error {
func (t *PushTarget) handleLoki(w http.ResponseWriter, r *http.Request) {
logger := util_log.WithContext(r.Context(), util_log.Logger)
userID, _ := tenant.TenantID(r.Context())
req, err := push.ParseRequest(logger, userID, r, nil, push.EmptyLimits{}, push.ParseLokiRequest, nil)
req, err := push.ParseRequest(logger, userID, r, nil, push.EmptyLimits{}, push.ParseLokiRequest, nil, false)
if err != nil {
level.Warn(t.logger).Log("msg", "failed to parse incoming push request", "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
Expand Down
5 changes: 3 additions & 2 deletions pkg/distributor/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRe
pushRequestParser = d.RequestParserWrapper(pushRequestParser)
}

req, err := push.ParseRequest(logger, tenantID, r, d.tenantsRetention, d.validator.Limits, pushRequestParser, d.usageTracker)
logPushRequestStreams := d.tenantConfigs.LogPushRequestStreams(tenantID)
req, err := push.ParseRequest(logger, tenantID, r, d.tenantsRetention, d.validator.Limits, pushRequestParser, d.usageTracker, logPushRequestStreams)
if err != nil {
if d.tenantConfigs.LogPushRequest(tenantID) {
level.Debug(logger).Log(
Expand All @@ -73,7 +74,7 @@ func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRe
return
}

if d.tenantConfigs.LogPushRequestStreams(tenantID) {
if logPushRequestStreams {
var sb strings.Builder
for _, s := range req.Streams {
sb.WriteString(s.Labels)
Expand Down
11 changes: 10 additions & 1 deletion pkg/distributor/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http/httptest"
"testing"

"github.com/go-kit/log"
"github.com/grafana/dskit/user"

"github.com/grafana/loki/v3/pkg/loghttp/push"
Expand Down Expand Up @@ -114,6 +115,14 @@ func Test_OtelErrorHeaderInterceptor(t *testing.T) {
}
}

func stubParser(_ string, _ *http.Request, _ push.TenantsRetention, _ push.Limits, _ push.UsageTracker) (*logproto.PushRequest, *push.Stats, error) {
func stubParser(
_ string,
_ *http.Request,
_ push.TenantsRetention,
_ push.Limits,
_ push.UsageTracker,
_ bool,
_ log.Logger,
) (*logproto.PushRequest, *push.Stats, error) {
return &logproto.PushRequest{}, &push.Stats{}, nil
}
33 changes: 30 additions & 3 deletions pkg/loghttp/push/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import (
"io"
"net/http"
"sort"
"strings"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -40,14 +43,14 @@ func newPushStats() *Stats {
}
}

func ParseOTLPRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker) (*logproto.PushRequest, *Stats, error) {
func ParseOTLPRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker, logPushRequestStreams bool, logger log.Logger) (*logproto.PushRequest, *Stats, error) {
stats := newPushStats()
otlpLogs, err := extractLogs(r, stats)
if err != nil {
return nil, nil, err
}

req := otlpToLokiPushRequest(r.Context(), otlpLogs, userID, tenantsRetention, limits.OTLPConfig(userID), limits.DiscoverServiceName(userID), tracker, stats)
req := otlpToLokiPushRequest(r.Context(), otlpLogs, userID, tenantsRetention, limits.OTLPConfig(userID), limits.DiscoverServiceName(userID), tracker, stats, logPushRequestStreams, logger)
return req, stats, nil
}

Expand Down Expand Up @@ -98,7 +101,7 @@ func extractLogs(r *http.Request, pushStats *Stats) (plog.Logs, error) {
return req.Logs(), nil
}

func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, tenantsRetention TenantsRetention, otlpConfig OTLPConfig, discoverServiceName []string, tracker UsageTracker, stats *Stats) *logproto.PushRequest {
func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, tenantsRetention TenantsRetention, otlpConfig OTLPConfig, discoverServiceName []string, tracker UsageTracker, stats *Stats, logPushRequestStreams bool, logger log.Logger) *logproto.PushRequest {
if ld.LogRecordCount() == 0 {
return &logproto.PushRequest{}
}
Expand All @@ -113,6 +116,10 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten

resourceAttributesAsStructuredMetadata := make(push.LabelsAdapter, 0, resAttrs.Len())
streamLabels := make(model.LabelSet, 30) // we have a default labels limit of 30 so just initialize the map of same size
var pushedLabels model.LabelSet
if logPushRequestStreams {
pushedLabels = make(model.LabelSet, 30)
}

shouldDiscoverServiceName := len(discoverServiceName) > 0 && !stats.IsAggregatedMetric
hasServiceName := false
Expand All @@ -129,6 +136,9 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten
if action == IndexLabel {
for _, lbl := range attributeAsLabels {
streamLabels[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
if logPushRequestStreams && pushedLabels != nil {
pushedLabels[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
}

if !hasServiceName && shouldDiscoverServiceName {
for _, labelName := range discoverServiceName {
Expand All @@ -151,6 +161,23 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten
streamLabels[model.LabelName(LabelServiceName)] = model.LabelValue(ServiceUnknown)
}

if logPushRequestStreams {
var sb strings.Builder
sb.WriteString("{")
labels := make([]string, 0, len(pushedLabels))
for name, value := range pushedLabels {
labels = append(labels, fmt.Sprintf(`%s="%s"`, name, value))
}
sb.WriteString(strings.Join(labels, ", "))
sb.WriteString("}")

level.Debug(logger).Log(
"msg", "OTLP push request stream before service name discovery",
"stream", sb.String(),
"service_name", streamLabels[model.LabelName(LabelServiceName)],
)
}

if err := streamLabels.Validate(); err != nil {
stats.Errs = append(stats.Errs, fmt.Errorf("invalid labels: %w", err))
continue
Expand Down
14 changes: 13 additions & 1 deletion pkg/loghttp/push/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/go-kit/log"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -508,7 +509,18 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
stats := newPushStats()
tracker := NewMockTracker()
pushReq := otlpToLokiPushRequest(context.Background(), tc.generateLogs(), "foo", fakeRetention{}, tc.otlpConfig, defaultServiceDetection, tracker, stats)
pushReq := otlpToLokiPushRequest(
context.Background(),
tc.generateLogs(),
"foo",
fakeRetention{},
tc.otlpConfig,
defaultServiceDetection,
tracker,
stats,
false,
log.NewNopLogger(),
)
require.Equal(t, tc.expectedPushRequest, *pushReq)
require.Equal(t, tc.expectedStats, *stats)

Expand Down
23 changes: 18 additions & 5 deletions pkg/loghttp/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (EmptyLimits) DiscoverServiceName(string) []string {
}

type (
RequestParser func(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker) (*logproto.PushRequest, *Stats, error)
RequestParser func(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker, logPushRequestStreams bool, logger log.Logger) (*logproto.PushRequest, *Stats, error)
RequestParserWrapper func(inner RequestParser) RequestParser
)

Expand All @@ -106,8 +106,8 @@ type Stats struct {
IsAggregatedMetric bool
}

func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, pushRequestParser RequestParser, tracker UsageTracker) (*logproto.PushRequest, error) {
req, pushStats, err := pushRequestParser(userID, r, tenantsRetention, limits, tracker)
func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, pushRequestParser RequestParser, tracker UsageTracker, logPushRequestStreams bool) (*logproto.PushRequest, error) {
req, pushStats, err := pushRequestParser(userID, r, tenantsRetention, limits, tracker, logPushRequestStreams, logger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -164,7 +164,7 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete
return req, nil
}

func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker) (*logproto.PushRequest, *Stats, error) {
func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker, logPushRequestStreams bool, logger log.Logger) (*logproto.PushRequest, *Stats, error) {
// Body
var body io.Reader
// bodySize should always reflect the compressed size of the request body
Expand Down Expand Up @@ -247,8 +247,13 @@ func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRe
pushStats.IsAggregatedMetric = true
}

var beforeServiceName string
if logPushRequestStreams {
beforeServiceName = lbs.String()
}

serviceName := ServiceUnknown
if !lbs.Has(LabelServiceName) && len(discoverServiceName) > 0 && !pushStats.IsAggregatedMetric {
serviceName := ServiceUnknown
for _, labelName := range discoverServiceName {
if labelVal := lbs.Get(labelName); labelVal != "" {
serviceName = labelVal
Expand All @@ -264,6 +269,14 @@ func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRe
lbs = lb.Del(LabelServiceName).Labels()
}

if logPushRequestStreams {
level.Debug(logger).Log(
"msg", "push request stream before service name discovery",
"labels", beforeServiceName,
"service_name", serviceName,
)
}

var retentionPeriod time.Duration
if tenantsRetention != nil {
retentionPeriod = tenantsRetention.RetentionPeriodFor(userID, lbs)
Expand Down
19 changes: 14 additions & 5 deletions pkg/loghttp/push/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,16 @@ func TestParseRequest(t *testing.T) {
}

tracker := NewMockTracker()
data, err := ParseRequest(util_log.Logger, "fake", request, nil, &fakeLimits{enabled: test.enableServiceDiscovery}, ParseLokiRequest, tracker)
data, err := ParseRequest(
util_log.Logger,
"fake",
request,
nil,
&fakeLimits{enabled: test.enableServiceDiscovery},
ParseLokiRequest,
tracker,
false,
)

structuredMetadataBytesReceived := int(structuredMetadataBytesReceivedStats.Value()["total"].(int64)) - previousStructuredMetadataBytesReceived
previousStructuredMetadataBytesReceived += structuredMetadataBytesReceived
Expand Down Expand Up @@ -355,7 +364,7 @@ func Test_ServiceDetection(t *testing.T) {
request := createRequest("/loki/api/v1/push", strings.NewReader(body))

limits := &fakeLimits{enabled: true, labels: []string{"foo"}}
data, err := ParseRequest(util_log.Logger, "fake", request, nil, limits, ParseLokiRequest, tracker)
data, err := ParseRequest(util_log.Logger, "fake", request, nil, limits, ParseLokiRequest, tracker, false)

require.NoError(t, err)
require.Equal(t, labels.FromStrings("foo", "bar", LabelServiceName, "bar").String(), data.Streams[0].Labels)
Expand All @@ -366,7 +375,7 @@ func Test_ServiceDetection(t *testing.T) {
request := createRequest("/otlp/v1/push", bytes.NewReader(body))

limits := &fakeLimits{enabled: true}
data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker)
data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker, false)
require.NoError(t, err)
require.Equal(t, labels.FromStrings("k8s_job_name", "bar", LabelServiceName, "bar").String(), data.Streams[0].Labels)
})
Expand All @@ -380,7 +389,7 @@ func Test_ServiceDetection(t *testing.T) {
labels: []string{"special"},
indexAttributes: []string{"special"},
}
data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker)
data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker, false)
require.NoError(t, err)
require.Equal(t, labels.FromStrings("special", "sauce", LabelServiceName, "sauce").String(), data.Streams[0].Labels)
})
Expand All @@ -394,7 +403,7 @@ func Test_ServiceDetection(t *testing.T) {
labels: []string{"special"},
indexAttributes: []string{},
}
data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker)
data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker, false)
require.NoError(t, err)
require.Equal(t, labels.FromStrings(LabelServiceName, ServiceUnknown).String(), data.Streams[0].Labels)
})
Expand Down

0 comments on commit d7ff426

Please sign in to comment.