Skip to content

Commit

Permalink
chore(otel): support ingesting offcpu (#3875)
Browse files Browse the repository at this point in the history

---------

Co-authored-by: Marc Sanmiquel <[email protected]>
  • Loading branch information
korniltsev and marcsanmi authored Feb 5, 2025
1 parent 1a57c65 commit 71187cc
Show file tree
Hide file tree
Showing 10 changed files with 339,922 additions and 94 deletions.
4 changes: 4 additions & 0 deletions examples/tracing/golang-push/go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06F
golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw=
golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U=
golang.org/x/crypto v0.30.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA=
golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
Expand All @@ -67,11 +69,13 @@ golang.org/x/oauth2 v0.23.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbht
golang.org/x/oauth2 v0.24.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.25.0 h1:WtHI/ltw4NvSUig5KARz9h521QvRC8RmF/cuYqifU24=
golang.org/x/term v0.25.0/go.mod h1:RPyXicDX+6vLxogjjRxjgD2TKtmAO6NZBsBRfrOLu7M=
golang.org/x/term v0.27.0 h1:WP60Sv1nlK1T6SupCHbXzSaN0b9wUmsPoRS9b61A23Q=
golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM=
golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/text v0.20.0/go.mod h1:D4IsuqiFMhST5bX19pQ9ikHC2GsaKyk/oF+pn3ducp4=
Expand Down
112 changes: 90 additions & 22 deletions pkg/ingester/otlp/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,20 @@ import (
"time"

googleProfile "github.com/grafana/pyroscope/api/gen/proto/go/google/v1"
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
otelProfile "github.com/grafana/pyroscope/api/otlp/profiles/v1development"
pyromodel "github.com/grafana/pyroscope/pkg/model"
)

const serviceNameKey = "service.name"

type convertedProfile struct {
profile *googleProfile.Profile
name *typesv1.LabelPair
}

// ConvertOtelToGoogle converts an OpenTelemetry profile to a Google profile.
func ConvertOtelToGoogle(src *otelProfile.Profile) map[string]*googleProfile.Profile {
func ConvertOtelToGoogle(src *otelProfile.Profile) (map[string]convertedProfile, error) {
svc2Profile := make(map[string]*profileBuilder)
for _, sample := range src.Sample {
svc := serviceNameFromSample(src, sample)
Expand All @@ -20,17 +27,27 @@ func ConvertOtelToGoogle(src *otelProfile.Profile) map[string]*googleProfile.Pro
p = newProfileBuilder(src)
svc2Profile[svc] = p
}
p.convertSampleBack(sample)
if _, err := p.convertSampleBack(sample); err != nil {
return nil, err
}
}

result := make(map[string]*googleProfile.Profile)
result := make(map[string]convertedProfile)
for svc, p := range svc2Profile {
result[svc] = p.dst
result[svc] = convertedProfile{p.dst, p.name}
}

return result
return result, nil
}

type sampleConversionType int

const (
sampleConversionTypeNone sampleConversionType = 0
sampleConversionTypeSamplesToNanos sampleConversionType = 1
sampleConversionTypeSumEvents sampleConversionType = 2
)

type profileBuilder struct {
src *otelProfile.Profile
dst *googleProfile.Profile
Expand All @@ -39,7 +56,9 @@ type profileBuilder struct {
unsymbolziedFuncNameMap map[string]uint64
locationMap map[*otelProfile.Location]uint64
mappingMap map[*otelProfile.Mapping]uint64
cpuConversion bool

sampleProcessingTypes []sampleConversionType
name *typesv1.LabelPair
}

func newProfileBuilder(src *otelProfile.Profile) *profileBuilder {
Expand All @@ -66,19 +85,36 @@ func newProfileBuilder(src *otelProfile.Profile) *profileBuilder {
Unit: res.addstr("ms"),
}}
res.dst.DefaultSampleType = res.addstr("samples")
} else if len(res.dst.SampleType) == 1 && res.dst.PeriodType != nil && res.dst.Period != 0 {
profileType := fmt.Sprintf("%s:%s:%s:%s",
res.dst.StringTable[res.dst.SampleType[0].Type],
res.dst.StringTable[res.dst.SampleType[0].Unit],
res.dst.StringTable[res.dst.PeriodType.Type],
res.dst.StringTable[res.dst.PeriodType.Unit],
)
}
res.sampleProcessingTypes = make([]sampleConversionType, len(res.dst.SampleType))
for i := 0; i < len(res.dst.SampleType); i++ {
profileType := res.profileType(i)
if profileType == "samples:count:cpu:nanoseconds" {
res.dst.SampleType = []*googleProfile.ValueType{{
res.dst.SampleType[i] = &googleProfile.ValueType{
Type: res.addstr("cpu"),
Unit: res.addstr("nanoseconds"),
}}
res.cpuConversion = true
}
if len(res.dst.SampleType) == 1 {
res.name = &typesv1.LabelPair{
Name: pyromodel.LabelNameProfileName,
Value: "process_cpu",
}
}
res.sampleProcessingTypes[i] = sampleConversionTypeSamplesToNanos
}
// Identify off cpu profiles
if profileType == "events:nanoseconds::" && len(res.dst.SampleType) == 1 {
res.sampleProcessingTypes[i] = sampleConversionTypeSumEvents
res.name = &typesv1.LabelPair{
Name: pyromodel.LabelNameProfileName,
Value: pyromodel.ProfileNameOffCpu,
}
}
}
if res.name == nil {
res.name = &typesv1.LabelPair{
Name: pyromodel.LabelNameProfileName,
Value: "process_cpu", // guess
}
}

Expand All @@ -91,6 +127,22 @@ func newProfileBuilder(src *otelProfile.Profile) *profileBuilder {
return res
}

func (p *profileBuilder) profileType(idx int) string {
var (
periodType, periodUnit string
)
if p.dst.PeriodType != nil && p.dst.Period != 0 {
periodType = p.dst.StringTable[p.dst.PeriodType.Type]
periodUnit = p.dst.StringTable[p.dst.PeriodType.Unit]
}
return fmt.Sprintf("%s:%s:%s:%s",
p.dst.StringTable[p.dst.SampleType[idx].Type],
p.dst.StringTable[p.dst.SampleType[idx].Unit],
periodType,
periodUnit,
)
}

func (p *profileBuilder) addstr(s string) int64 {
if i, ok := p.stringMap[s]; ok {
return i
Expand Down Expand Up @@ -198,16 +250,32 @@ func (p *profileBuilder) convertFunctionBack(of *otelProfile.Function) uint64 {
return gf.Id
}

func (p *profileBuilder) convertSampleBack(os *otelProfile.Sample) *googleProfile.Sample {
func (p *profileBuilder) convertSampleBack(os *otelProfile.Sample) (*googleProfile.Sample, error) {
gs := &googleProfile.Sample{
Value: os.Value,
}

if len(gs.Value) == 0 {
gs.Value = []int64{int64(len(os.TimestampsUnixNano))}
} else if len(gs.Value) == 1 && p.cpuConversion {
gs.Value[0] *= p.src.Period
return nil, fmt.Errorf("sample value is required")
}

for i, typ := range p.sampleProcessingTypes {
switch typ {
case sampleConversionTypeSamplesToNanos:
gs.Value[i] *= p.src.Period
case sampleConversionTypeSumEvents:
// For off-CPU profiles, aggregate all sample values into a single sum
// since pprof cannot represent variable-length sample values
sum := int64(0)
for _, v := range gs.Value {
sum += v
}
gs.Value = []int64{sum}
}
}
if p.dst.Period != 0 && p.dst.PeriodType != nil && len(gs.Value) != len(p.dst.SampleType) {
return nil, fmt.Errorf("sample values length mismatch %d %d", len(gs.Value), len(p.dst.SampleType))
}

p.convertSampleAttributesToLabelsBack(os, gs)

for i := os.LocationsStartIndex; i < os.LocationsStartIndex+os.LocationsLength; i++ {
Expand All @@ -216,7 +284,7 @@ func (p *profileBuilder) convertSampleBack(os *otelProfile.Sample) *googleProfil

p.dst.Sample = append(p.dst.Sample, gs)

return gs
return gs, nil
}

func (p *profileBuilder) convertSampleAttributesToLabelsBack(os *otelProfile.Sample, gs *googleProfile.Sample) {
Expand Down
29 changes: 12 additions & 17 deletions pkg/ingester/otlp/ingest_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@ import (
"net/http"
"strings"

distirbutormodel "github.com/grafana/pyroscope/pkg/distributor/model"
pyromodel "github.com/grafana/pyroscope/pkg/model"
"github.com/grafana/pyroscope/pkg/pprof"

"connectrpc.com/connect"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand All @@ -21,6 +17,9 @@ import (
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
pprofileotlp "github.com/grafana/pyroscope/api/otlp/collector/profiles/v1development"
v1 "github.com/grafana/pyroscope/api/otlp/common/v1"
distirbutormodel "github.com/grafana/pyroscope/pkg/distributor/model"
pyromodel "github.com/grafana/pyroscope/pkg/model"
"github.com/grafana/pyroscope/pkg/pprof"
"github.com/grafana/pyroscope/pkg/tenant"
)

Expand Down Expand Up @@ -97,7 +96,10 @@ func (h *ingestHandler) Export(ctx context.Context, er *pprofileotlp.ExportProfi
for k := 0; k < len(sp.Profiles); k++ {
p := sp.Profiles[k]

pprofProfiles := ConvertOtelToGoogle(p)
pprofProfiles, err := ConvertOtelToGoogle(p)
if err != nil {
return &pprofileotlp.ExportProfilesServiceResponse{}, fmt.Errorf("failed to convert otel profile: %w", err)
}

req := &distirbutormodel.PushRequest{
RawProfileSize: p.Size(),
Expand All @@ -106,15 +108,16 @@ func (h *ingestHandler) Export(ctx context.Context, er *pprofileotlp.ExportProfi

for samplesServiceName, pprofProfile := range pprofProfiles {
labels := getDefaultLabels()
processedKeys := make(map[string]bool)
labels = append(labels, pprofProfile.name)
processedKeys := map[string]bool{pyromodel.LabelNameProfileName: true}
labels = appendAttributesUnique(labels, rp.Resource.GetAttributes(), processedKeys)
labels = appendAttributesUnique(labels, sp.Scope.GetAttributes(), processedKeys)
svc := samplesServiceName
if svc == "" {
svc = serviceName
}
labels = append(labels, &typesv1.LabelPair{
Name: "service_name",
Name: pyromodel.LabelNameServiceName,
Value: svc,
})

Expand All @@ -123,7 +126,7 @@ func (h *ingestHandler) Export(ctx context.Context, er *pprofileotlp.ExportProfi
Samples: []*distirbutormodel.ProfileSample{
{
RawProfile: nil,
Profile: pprof.RawFromProto(pprofProfile),
Profile: pprof.RawFromProto(pprofProfile.profile),
ID: uuid.New().String(),
},
},
Expand All @@ -133,7 +136,7 @@ func (h *ingestHandler) Export(ctx context.Context, er *pprofileotlp.ExportProfi
if len(req.Series) == 0 {
continue
}
_, err := h.svc.PushParsed(ctx, req)
_, err = h.svc.PushParsed(ctx, req)
if err != nil {
h.log.Log("msg", "failed to push profile", "err", err)
return &pprofileotlp.ExportProfilesServiceResponse{}, fmt.Errorf("failed to make a GRPC request: %w", err)
Expand Down Expand Up @@ -163,10 +166,6 @@ func getServiceNameFromAttributes(attrs []v1.KeyValue) string {
// getDefaultLabels returns the required base labels for Pyroscope profiles
func getDefaultLabels() []*typesv1.LabelPair {
return []*typesv1.LabelPair{
{
Name: pyromodel.LabelNameProfileName,
Value: "process_cpu",
},
{
Name: pyromodel.LabelNameDelta,
Value: "false",
Expand All @@ -175,10 +174,6 @@ func getDefaultLabels() []*typesv1.LabelPair {
Name: pyromodel.LabelNameOTEL,
Value: "true",
},
{
Name: "pyroscope_spy",
Value: "unknown",
},
}
}

Expand Down
Loading

0 comments on commit 71187cc

Please sign in to comment.