diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index eb33d08351..c7da8386e2 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -12,7 +12,6 @@ import ( "sort" "sync" "time" - "unsafe" "connectrpc.com/connect" "github.com/dustin/go-humanize" @@ -41,6 +40,7 @@ import ( distributormodel "github.com/grafana/pyroscope/pkg/distributor/model" writepath "github.com/grafana/pyroscope/pkg/distributor/write_path" phlaremodel "github.com/grafana/pyroscope/pkg/model" + pprofsplit "github.com/grafana/pyroscope/pkg/model/pprof_split" "github.com/grafana/pyroscope/pkg/model/relabel" "github.com/grafana/pyroscope/pkg/pprof" "github.com/grafana/pyroscope/pkg/slices" @@ -567,18 +567,6 @@ func (d *Distributor) sendRequestsToIngester(ctx context.Context, req *distribut } } -// sampleSize returns the size of a samples in bytes. -func sampleSize(stringTable []string, samplesSlice []*profilev1.Sample) int64 { - var size int64 - for _, s := range samplesSlice { - size += int64(s.SizeVT()) - for _, l := range s.Label { - size += int64(len(stringTable[l.Key]) + len(stringTable[l.Str]) + len(stringTable[l.NumUnit])) - } - } - return size -} - // profileSizeBytes returns the size of symbols and samples in bytes. func profileSizeBytes(p *profilev1.Profile) (symbols, samples int64) { fullSize := p.SizeVT() @@ -703,190 +691,6 @@ func (d *Distributor) HealthyInstancesCount() int { return int(d.healthyInstancesCount.Load()) } -type sampleKey struct { - stacktrace string - // note this is an index into the string table, rather than span ID - spanIDIdx int64 -} - -func sampleKeyFromSample(stringTable []string, s *profilev1.Sample) sampleKey { - var k sampleKey - - // populate spanID if present - for _, l := range s.Label { - if stringTable[int(l.Key)] == pprof.SpanIDLabelName { - k.spanIDIdx = l.Str - } - } - if len(s.LocationId) > 0 { - k.stacktrace = unsafe.String( - (*byte)(unsafe.Pointer(&s.LocationId[0])), - len(s.LocationId)*8, - ) - } - return k -} - -type lazyGroup struct { - sampleGroup pprof.SampleGroup - // The map is only initialized when the group is being modified. Key is the - // string representation (unsafe) of the sample stack trace and its potential - // span ID. - sampleMap map[sampleKey]*profilev1.Sample - labels phlaremodel.Labels -} - -func (g *lazyGroup) addSampleGroup(stringTable []string, sg pprof.SampleGroup) { - if len(g.sampleGroup.Samples) == 0 { - g.sampleGroup = sg - return - } - - // If the group is already initialized, we need to merge the samples. - if g.sampleMap == nil { - g.sampleMap = make(map[sampleKey]*profilev1.Sample) - for _, s := range g.sampleGroup.Samples { - g.sampleMap[sampleKeyFromSample(stringTable, s)] = s - } - } - - for _, s := range sg.Samples { - k := sampleKeyFromSample(stringTable, s) - if _, ok := g.sampleMap[k]; !ok { - g.sampleGroup.Samples = append(g.sampleGroup.Samples, s) - g.sampleMap[k] = s - } else { - // merge the samples - for idx := range s.Value { - g.sampleMap[k].Value[idx] += s.Value[idx] - } - } - } -} - -type groupsWithFingerprints struct { - m map[uint64][]lazyGroup - order []uint64 -} - -func newGroupsWithFingerprints() *groupsWithFingerprints { - return &groupsWithFingerprints{ - m: make(map[uint64][]lazyGroup), - } -} - -func (g *groupsWithFingerprints) add(stringTable []string, lbls phlaremodel.Labels, group pprof.SampleGroup) { - fp := lbls.Hash() - idxs, ok := g.m[fp] - if ok { - // fingerprint matches, check if the labels are the same - for _, idx := range idxs { - if phlaremodel.CompareLabelPairs(idx.labels, lbls) == 0 { - // append samples to the group - idx.addSampleGroup(stringTable, group) - return - } - } - } else { - g.order = append(g.order, fp) - } - - // add the labels to the list - g.m[fp] = append(g.m[fp], lazyGroup{ - sampleGroup: group, - labels: lbls, - }) -} - -func extractSampleSeries(req *distributormodel.PushRequest, tenantID string, usageGroups *validation.UsageGroupConfig, relabelRules []*relabel.Config) (result []*distributormodel.ProfileSeries, bytesRelabelDropped, profilesRelabelDropped float64) { - var ( - lblbuilder = phlaremodel.NewLabelsBuilder(phlaremodel.EmptyLabels()) - ) - - profileSeries := make([]*distributormodel.ProfileSeries, 0, len(req.Series)) - for _, series := range req.Series { - s := &distributormodel.ProfileSeries{ - Labels: series.Labels, - Samples: make([]*distributormodel.ProfileSample, 0, len(series.Samples)), - } - usageGroups := usageGroups.GetUsageGroups(tenantID, series.Labels) - - for _, raw := range series.Samples { - pprof.RenameLabel(raw.Profile.Profile, pprof.ProfileIDLabelName, pprof.SpanIDLabelName) - groups := pprof.GroupSamplesWithoutLabels(raw.Profile.Profile, pprof.SpanIDLabelName) - - if len(groups) == 0 || (len(groups) == 1 && len(groups[0].Labels) == 0) { - // No sample labels in the profile. - - // relabel the labels of the series - lblbuilder.Reset(series.Labels) - if len(relabelRules) > 0 { - keep := relabel.ProcessBuilder(lblbuilder, relabelRules...) - if !keep { - bytesRelabelDropped += float64(raw.Profile.SizeVT()) - profilesRelabelDropped++ // in this case we dropped a whole profile - usageGroups.CountDiscardedBytes(string(validation.DroppedByRelabelRules), int64(raw.Profile.SizeVT())) - continue - } - } - - // Copy over the labels from the builder - s.Labels = lblbuilder.Labels() - - // We do not modify the request. - s.Samples = append(s.Samples, raw) - - continue - } - - // iterate through groups relabel them and find relevant overlapping labelsets - groupsKept := newGroupsWithFingerprints() - for _, group := range groups { - lblbuilder.Reset(series.Labels) - addSampleLabelsToLabelsBuilder(lblbuilder, raw.Profile.Profile, group.Labels) - if len(relabelRules) > 0 { - keep := relabel.ProcessBuilder(lblbuilder, relabelRules...) - if !keep { - droppedBytes := sampleSize(raw.Profile.Profile.StringTable, group.Samples) - bytesRelabelDropped += float64(droppedBytes) - usageGroups.CountDiscardedBytes(string(validation.DroppedByRelabelRules), droppedBytes) - continue - } - } - - // add the group to the list - groupsKept.add(raw.Profile.StringTable, lblbuilder.Labels(), group) - } - - if len(groupsKept.m) == 0 { - // no groups kept, count the whole profile as dropped - profilesRelabelDropped++ - continue - } - - e := pprof.NewSampleExporter(raw.Profile.Profile) - for _, idx := range groupsKept.order { - for _, group := range groupsKept.m[idx] { - // exportSamples creates a new profile with the samples provided. - // The samples are obtained via GroupSamples call, which means - // the underlying capacity is referenced by the source profile. - // Therefore, the slice has to be copied and samples zeroed to - // avoid ownership issues. - profile := exportSamples(e, group.sampleGroup.Samples) - profileSeries = append(profileSeries, &distributormodel.ProfileSeries{ - Labels: group.labels, - Samples: []*distributormodel.ProfileSample{{Profile: profile}}, - }) - } - } - } - if len(s.Samples) > 0 { - profileSeries = append(profileSeries, s) - } - } - return profileSeries, bytesRelabelDropped, profilesRelabelDropped -} - func (d *Distributor) limitMaxSessionsPerSeries(maxSessionsPerSeries int, labels phlaremodel.Labels) phlaremodel.Labels { if maxSessionsPerSeries == 0 { return labels.Delete(phlaremodel.LabelNameSessionID) @@ -927,32 +731,6 @@ func (d *Distributor) rateLimit(tenantID string, req *distributormodel.PushReque return nil } -// addSampleLabelsToLabelsBuilder: adds sample label that don't exists yet on the profile builder. So the existing labels take precedence. -func addSampleLabelsToLabelsBuilder(b *phlaremodel.LabelsBuilder, p *profilev1.Profile, pl []*profilev1.Label) { - var name string - for _, l := range pl { - name = p.StringTable[l.Key] - if l.Str <= 0 { - // skip if label value is not a string - continue - } - if b.Get(name) != "" { - // do nothing if label name already exists - continue - } - b.Set(name, p.StringTable[l.Str]) - } -} - -func exportSamples(e *pprof.SampleExporter, samples []*profilev1.Sample) *pprof.Profile { - samplesCopy := make([]*profilev1.Sample, len(samples)) - copy(samplesCopy, samples) - slices.Clear(samples) - n := pprof.NewProfile() - e.ExportSamples(n.Profile, samplesCopy) - return n -} - type profileTracker struct { profile *distributormodel.ProfileSeries minSuccess int @@ -1029,3 +807,66 @@ func injectMappingVersions(series []*distributormodel.ProfileSeries) error { } return nil } + +func extractSampleSeries( + req *distributormodel.PushRequest, + tenantID string, + usage *validation.UsageGroupConfig, + rules []*relabel.Config, +) ( + result []*distributormodel.ProfileSeries, + bytesRelabelDropped float64, + profilesRelabelDropped float64, +) { + for _, series := range req.Series { + for _, p := range series.Samples { + v := &sampleSeriesVisitor{profile: p.Profile.Profile} + pprofsplit.VisitSampleSeries( + p.Profile.Profile, + series.Labels, + rules, + v, + ) + result = append(result, v.series...) + bytesRelabelDropped += float64(v.discardedBytes) + profilesRelabelDropped += float64(v.discardedProfiles) + usage.GetUsageGroups(tenantID, series.Labels). + CountDiscardedBytes(string(validation.DroppedByRelabelRules), int64(v.discardedBytes)) + } + } + + return result, bytesRelabelDropped, profilesRelabelDropped +} + +type sampleSeriesVisitor struct { + profile *profilev1.Profile + exp *pprof.SampleExporter + series []*distributormodel.ProfileSeries + + discardedBytes int + discardedProfiles int +} + +func (v *sampleSeriesVisitor) VisitSampleSeries(labels []*typesv1.LabelPair, samples []*profilev1.Sample) { + if v.exp == nil { + v.exp = pprof.NewSampleExporter(v.profile) + } + v.series = append(v.series, &distributormodel.ProfileSeries{ + Samples: []*distributormodel.ProfileSample{{Profile: exportSamples(v.exp, samples)}}, + Labels: labels, + }) +} + +func (v *sampleSeriesVisitor) Discarded(profiles, bytes int) { + v.discardedProfiles += profiles + v.discardedBytes += bytes +} + +func exportSamples(e *pprof.SampleExporter, samples []*profilev1.Sample) *pprof.Profile { + samplesCopy := make([]*profilev1.Sample, len(samples)) + copy(samplesCopy, samples) + clear(samples) + n := pprof.NewProfile() + e.ExportSamples(n.Profile, samplesCopy) + return n +} diff --git a/pkg/experiment/ingester/memdb/head.go b/pkg/experiment/ingester/memdb/head.go index 84064fce2b..76e7f97e1d 100644 --- a/pkg/experiment/ingester/memdb/head.go +++ b/pkg/experiment/ingester/memdb/head.go @@ -4,18 +4,20 @@ import ( "bytes" "context" "fmt" + "math" + "sync" + "github.com/google/uuid" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "go.uber.org/atomic" + profilev1 "github.com/grafana/pyroscope/api/gen/proto/go/google/v1" typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" phlaremodel "github.com/grafana/pyroscope/pkg/model" - phlarelabels "github.com/grafana/pyroscope/pkg/phlaredb/labels" + "github.com/grafana/pyroscope/pkg/phlaredb/labels" schemav1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1" "github.com/grafana/pyroscope/pkg/phlaredb/symdb" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/model" - "go.uber.org/atomic" - "math" - "sync" ) type FlushedHead struct { @@ -65,14 +67,14 @@ func (h *Head) Ingest(p *profilev1.Profile, id uuid.UUID, externalLabels []*type return } - // delta not supported + // Delta not supported. externalLabels = phlaremodel.Labels(externalLabels).Delete(phlaremodel.LabelNameDelta) - - enforceLabelOrder := phlaremodel.Labels(externalLabels).Get(phlaremodel.LabelNameOrder) == phlaremodel.LabelOrderEnforced + // Label order is enforced to ensure that __profile_type__ and __service_name__ always + // come first in the label set. This is important for spatial locality: profiles are + // stored in the label series order. externalLabels = phlaremodel.Labels(externalLabels).Delete(phlaremodel.LabelNameOrder) - lbls, seriesFingerprints := phlarelabels.CreateProfileLabels(enforceLabelOrder, p, externalLabels...) - + lbls, seriesFingerprints := labels.CreateProfileLabels(true, p, externalLabels...) metricName := phlaremodel.Labels(externalLabels).Get(model.MetricNameLabel) var profileIngested bool diff --git a/pkg/experiment/ingester/segment.go b/pkg/experiment/ingester/segment.go index 24e187583c..4f32dc1a42 100644 --- a/pkg/experiment/ingester/segment.go +++ b/pkg/experiment/ingester/segment.go @@ -22,9 +22,12 @@ import ( metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" "github.com/grafana/pyroscope/pkg/experiment/ingester/memdb" - phlaremodel "github.com/grafana/pyroscope/pkg/model" + "github.com/grafana/pyroscope/pkg/model" + pprofsplit "github.com/grafana/pyroscope/pkg/model/pprof_split" + pprofmodel "github.com/grafana/pyroscope/pkg/pprof" "github.com/grafana/pyroscope/pkg/tenant" "github.com/grafana/pyroscope/pkg/util/math" + "github.com/grafana/pyroscope/pkg/validation" ) const pathSegments = "segments" @@ -42,6 +45,7 @@ type segmentWriterConfig struct { type segmentsWriter struct { segmentDuration time.Duration + limits Limits l log.Logger bucket objstore.Bucket @@ -119,9 +123,10 @@ func (sh *shard) flushSegment(ctx context.Context) { }() } -func newSegmentWriter(l log.Logger, metrics *segmentMetrics, hm *memdb.HeadMetrics, cfg segmentWriterConfig, bucket objstore.Bucket, metastoreClient metastorev1.MetastoreServiceClient) *segmentsWriter { +func newSegmentWriter(l log.Logger, metrics *segmentMetrics, hm *memdb.HeadMetrics, cfg segmentWriterConfig, limits Limits, bucket objstore.Bucket, metastoreClient metastorev1.MetastoreServiceClient) *segmentsWriter { ctx, cancelFunc := context.WithCancel(context.Background()) sw := &segmentsWriter{ + limits: limits, metrics: metrics, headMetrics: hm, segmentDuration: cfg.segmentDuration, @@ -442,12 +447,41 @@ func (s *segment) waitFlushed(ctx context.Context) error { func (s *segment) ingest(ctx context.Context, tenantID string, p *profilev1.Profile, id uuid.UUID, labels []*typesv1.LabelPair) { k := serviceKey{ tenant: tenantID, - service: phlaremodel.Labels(labels).Get(phlaremodel.LabelNameServiceName), + service: model.Labels(labels).Get(model.LabelNameServiceName), } - s.sw.metrics.segmentIngestBytes.WithLabelValues(s.sshard, tenantID).Observe(float64(p.SizeVT())) - h := s.headForIngest(k) + size := p.SizeVT() + rules := s.sw.limits.IngestionRelabelingRules(tenantID) + usage := s.sw.limits.DistributorUsageGroups(tenantID).GetUsageGroups(tenantID, labels) + appender := &sampleAppender{ + id: id, + head: s.headForIngest(k), + exporter: pprofmodel.NewSampleExporter(p), + } + pprofsplit.VisitSampleSeries(p, labels, rules, appender) + size -= appender.discardedBytes + s.sw.metrics.segmentIngestBytes.WithLabelValues(s.sshard, tenantID).Observe(float64(size)) + usage.CountDiscardedBytes(string(validation.DroppedByRelabelRules), int64(appender.discardedBytes)) + // CountReceivedBytes is tracked in distributors. +} + +type sampleAppender struct { + id uuid.UUID + head *memdb.Head + exporter *pprofmodel.SampleExporter + + discardedProfiles int + discardedBytes int +} + +func (v *sampleAppender) VisitSampleSeries(labels []*typesv1.LabelPair, samples []*profilev1.Sample) { + var n profilev1.Profile + v.exporter.ExportSamples(&n, samples) + v.head.Ingest(&n, v.id, labels) +} - h.Ingest(p, id, labels) +func (v *sampleAppender) Discarded(profiles, bytes int) { + v.discardedProfiles += profiles + v.discardedBytes += bytes } func (s *segment) headForIngest(k serviceKey) *memdb.Head { diff --git a/pkg/experiment/ingester/segment_test.go b/pkg/experiment/ingester/segment_test.go index 51946b89cd..52b5ac0528 100644 --- a/pkg/experiment/ingester/segment_test.go +++ b/pkg/experiment/ingester/segment_test.go @@ -14,6 +14,7 @@ import ( "time" gprofile "github.com/google/pprof/profile" + profilev1 "github.com/grafana/pyroscope/api/gen/proto/go/google/v1" ingesterv1 "github.com/grafana/pyroscope/api/gen/proto/go/ingester/v1" "github.com/grafana/pyroscope/api/gen/proto/go/ingester/v1/ingesterv1connect" @@ -30,6 +31,8 @@ import ( pprofth "github.com/grafana/pyroscope/pkg/pprof/testhelper" "github.com/grafana/pyroscope/pkg/test/mocks/mockmetastorev1" "github.com/grafana/pyroscope/pkg/test/mocks/mockobjstore" + "github.com/grafana/pyroscope/pkg/validation" + model2 "github.com/prometheus/common/model" "github.com/prometheus/prometheus/util/testutil" "github.com/stretchr/testify/assert" @@ -239,6 +242,7 @@ func TestDLQFail(t *testing.T) { segmentWriterConfig{ segmentDuration: 100 * time.Millisecond, }, + validation.MockDefaultOverrides(), bucket, client, ) @@ -283,6 +287,7 @@ func TestDatasetMinMaxTime(t *testing.T) { segmentWriterConfig{ segmentDuration: 100 * time.Millisecond, }, + validation.MockDefaultOverrides(), bucket, client, ) @@ -381,6 +386,7 @@ func newTestSegmentWriter(t *testing.T, cfg segmentWriterConfig) sw { newSegmentMetrics(nil), memdb.NewHeadMetricsWithPrefix(nil, ""), cfg, + validation.MockDefaultOverrides(), bucket, client, ) diff --git a/pkg/experiment/ingester/service.go b/pkg/experiment/ingester/service.go index 9b971d4c73..659c17d520 100644 --- a/pkg/experiment/ingester/service.go +++ b/pkg/experiment/ingester/service.go @@ -21,11 +21,13 @@ import ( segmentwriterv1 "github.com/grafana/pyroscope/api/gen/proto/go/segmentwriter/v1" "github.com/grafana/pyroscope/pkg/experiment/ingester/memdb" metastoreclient "github.com/grafana/pyroscope/pkg/experiment/metastore/client" + "github.com/grafana/pyroscope/pkg/model/relabel" phlareobj "github.com/grafana/pyroscope/pkg/objstore" "github.com/grafana/pyroscope/pkg/phlaredb" "github.com/grafana/pyroscope/pkg/pprof" "github.com/grafana/pyroscope/pkg/tenant" "github.com/grafana/pyroscope/pkg/util" + "github.com/grafana/pyroscope/pkg/validation" ) const ( @@ -57,6 +59,11 @@ func (cfg *Config) Validate() error { return cfg.GRPCClientConfig.Validate() } +type Limits interface { + IngestionRelabelingRules(tenantID string) []*relabel.Config + DistributorUsageGroups(tenantID string) *validation.UsageGroupConfig +} + type SegmentWriterService struct { services.Service segmentwriterv1.UnimplementedSegmentWriterServiceServer @@ -89,6 +96,7 @@ func New( reg prometheus.Registerer, log log.Logger, cfg Config, + lim Limits, storageBucket phlareobj.Bucket, metastoreClient *metastoreclient.Client, ) (*SegmentWriterService, error) { @@ -123,10 +131,8 @@ func New( } segmentMetrics := newSegmentMetrics(i.reg) headMetrics := memdb.NewHeadMetricsWithPrefix(reg, "pyroscope_segment_writer") - - i.segmentWriter = newSegmentWriter(i.logger, segmentMetrics, headMetrics, segmentWriterConfig{ - segmentDuration: cfg.SegmentDuration, - }, storageBucket, metastoreClient) + config := segmentWriterConfig{segmentDuration: cfg.SegmentDuration} + i.segmentWriter = newSegmentWriter(i.logger, segmentMetrics, headMetrics, config, lim, storageBucket, metastoreClient) i.subservicesWatcher = services.NewFailureWatcher() i.subservicesWatcher.WatchManager(i.subservices) i.Service = services.NewBasicService(i.starting, i.running, i.stopping) diff --git a/pkg/model/pprof_split/pprof_split.go b/pkg/model/pprof_split/pprof_split.go new file mode 100644 index 0000000000..bdc4bbf97c --- /dev/null +++ b/pkg/model/pprof_split/pprof_split.go @@ -0,0 +1,201 @@ +package pprof_split + +import ( + "unsafe" + + profilev1 "github.com/grafana/pyroscope/api/gen/proto/go/google/v1" + typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" + phlaremodel "github.com/grafana/pyroscope/pkg/model" + "github.com/grafana/pyroscope/pkg/model/relabel" + "github.com/grafana/pyroscope/pkg/pprof" +) + +type SampleSeriesVisitor interface { + VisitSampleSeries([]*typesv1.LabelPair, []*profilev1.Sample) + Discarded(profiles, bytes int) +} + +func VisitSampleSeries( + profile *profilev1.Profile, + labels []*typesv1.LabelPair, + rules []*relabel.Config, + visitor SampleSeriesVisitor, +) { + var profilesDiscarded, bytesDiscarded int + defer func() { + visitor.Discarded(profilesDiscarded, bytesDiscarded) + }() + + pprof.RenameLabel(profile, pprof.ProfileIDLabelName, pprof.SpanIDLabelName) + groups := pprof.GroupSamplesWithoutLabels(profile, pprof.SpanIDLabelName) + builder := phlaremodel.NewLabelsBuilder(nil) + + if len(groups) == 0 || (len(groups) == 1 && len(groups[0].Labels) == 0) { + // No sample labels in the profile. + // Relabel the series labels. + builder.Reset(labels) + if len(rules) > 0 { + keep := relabel.ProcessBuilder(builder, rules...) + if !keep { + // We drop the profile. + profilesDiscarded++ + bytesDiscarded += profile.SizeVT() + return + } + } + if len(profile.Sample) > 0 { + visitor.VisitSampleSeries(builder.Labels(), profile.Sample) + } + return + } + + // iterate through groups relabel them and find relevant overlapping label sets. + groupsKept := newGroupsWithFingerprints() + for _, group := range groups { + builder.Reset(labels) + addSampleLabelsToLabelsBuilder(builder, profile, group.Labels) + if len(rules) > 0 { + keep := relabel.ProcessBuilder(builder, rules...) + if !keep { + bytesDiscarded += sampleSize(group.Samples) + continue + } + } + // add the group to the list. + groupsKept.add(profile.StringTable, builder.Labels(), group) + } + + if len(groupsKept.m) == 0 { + // no groups kept, count the whole profile as dropped + profilesDiscarded++ + return + } + + for _, idx := range groupsKept.order { + for _, group := range groupsKept.m[idx] { + if len(group.sampleGroup.Samples) > 0 { + visitor.VisitSampleSeries(group.labels, group.sampleGroup.Samples) + } + } + } +} + +// addSampleLabelsToLabelsBuilder: adds sample label that don't exists yet on the profile builder. So the existing labels take precedence. +func addSampleLabelsToLabelsBuilder(b *phlaremodel.LabelsBuilder, p *profilev1.Profile, pl []*profilev1.Label) { + var name string + for _, l := range pl { + name = p.StringTable[l.Key] + if l.Str <= 0 { + // skip if label value is not a string + continue + } + if b.Get(name) != "" { + // do nothing if label name already exists + continue + } + b.Set(name, p.StringTable[l.Str]) + } +} + +type sampleKey struct { + stacktrace string + // note this is an index into the string table, rather than span ID + spanIDIdx int64 +} + +func sampleKeyFromSample(stringTable []string, s *profilev1.Sample) sampleKey { + var k sampleKey + // populate spanID if present + for _, l := range s.Label { + if stringTable[int(l.Key)] == pprof.SpanIDLabelName { + k.spanIDIdx = l.Str + } + } + if len(s.LocationId) > 0 { + k.stacktrace = unsafe.String( + (*byte)(unsafe.Pointer(&s.LocationId[0])), + len(s.LocationId)*8, + ) + } + return k +} + +type lazyGroup struct { + sampleGroup pprof.SampleGroup + // The map is only initialized when the group is being modified. Key is the + // string representation (unsafe) of the sample stack trace and its potential + // span ID. + sampleMap map[sampleKey]*profilev1.Sample + labels phlaremodel.Labels +} + +func (g *lazyGroup) addSampleGroup(stringTable []string, sg pprof.SampleGroup) { + if len(g.sampleGroup.Samples) == 0 { + g.sampleGroup = sg + return + } + + // If the group is already initialized, we need to merge the samples. + if g.sampleMap == nil { + g.sampleMap = make(map[sampleKey]*profilev1.Sample) + for _, s := range g.sampleGroup.Samples { + g.sampleMap[sampleKeyFromSample(stringTable, s)] = s + } + } + + for _, s := range sg.Samples { + k := sampleKeyFromSample(stringTable, s) + if _, ok := g.sampleMap[k]; !ok { + g.sampleGroup.Samples = append(g.sampleGroup.Samples, s) + g.sampleMap[k] = s + } else { + // merge the samples + for idx := range s.Value { + g.sampleMap[k].Value[idx] += s.Value[idx] + } + } + } +} + +type groupsWithFingerprints struct { + m map[uint64][]lazyGroup + order []uint64 +} + +func newGroupsWithFingerprints() *groupsWithFingerprints { + return &groupsWithFingerprints{ + m: make(map[uint64][]lazyGroup), + } +} + +func (g *groupsWithFingerprints) add(stringTable []string, lbls phlaremodel.Labels, group pprof.SampleGroup) { + fp := lbls.Hash() + idxs, ok := g.m[fp] + if ok { + // fingerprint matches, check if the labels are the same + for _, idx := range idxs { + if phlaremodel.CompareLabelPairs(idx.labels, lbls) == 0 { + // append samples to the group + idx.addSampleGroup(stringTable, group) + return + } + } + } else { + g.order = append(g.order, fp) + } + + // add the labels to the list + g.m[fp] = append(g.m[fp], lazyGroup{ + sampleGroup: group, + labels: lbls, + }) +} + +// sampleSize returns the size of a samples in bytes. +func sampleSize(samples []*profilev1.Sample) int { + var size int + for _, s := range samples { + size += s.SizeVT() + } + return size +} diff --git a/pkg/phlare/modules_experimental.go b/pkg/phlare/modules_experimental.go index bc76094dae..6480a302d3 100644 --- a/pkg/phlare/modules_experimental.go +++ b/pkg/phlare/modules_experimental.go @@ -50,6 +50,7 @@ func (f *Phlare) initSegmentWriter() (services.Service, error) { f.reg, f.logger, f.Cfg.SegmentWriter, + f.Overrides, f.storageBucket, f.metastoreClient, ) diff --git a/pkg/pprof/pprof.go b/pkg/pprof/pprof.go index 3d2d0ca8d8..b0d6b63feb 100644 --- a/pkg/pprof/pprof.go +++ b/pkg/pprof/pprof.go @@ -613,6 +613,10 @@ func (l LabelsByKeyValue) Swap(i, j int) { l[i], l[j] = l[j], l[i] } +// SampleGroup refers to a group of samples that share the same +// labels. Note that the Span ID label is handled in a special +// way and is not included in the Labels member but is kept as +// as a sample label. type SampleGroup struct { Labels []*profilev1.Label Samples []*profilev1.Sample @@ -963,12 +967,12 @@ func LabelID(p *profilev1.Profile, name string) int64 { func ProfileSpans(p *profilev1.Profile) []uint64 { if i := LabelID(p, SpanIDLabelName); i > 0 { - return profileSpans(i, p) + return Spans(p, i) } return nil } -func profileSpans(spanIDLabelIdx int64, p *profilev1.Profile) []uint64 { +func Spans(p *profilev1.Profile, spanIDLabelIdx int64) []uint64 { tmp := make([]byte, 8) s := make([]uint64, len(p.Sample)) for i, sample := range p.Sample { diff --git a/pkg/pprof/pprof_test.go b/pkg/pprof/pprof_test.go index 615fbc6d22..f6a0101f91 100644 --- a/pkg/pprof/pprof_test.go +++ b/pkg/pprof/pprof_test.go @@ -872,6 +872,17 @@ func Test_GroupSamplesWithout(t *testing.T) { input: new(profilev1.Profile), expected: nil, }, + { + description: "no sample labels", + input: &profilev1.Profile{ + Sample: []*profilev1.Sample{{}, {}}, + }, + expected: []SampleGroup{ + { + Samples: []*profilev1.Sample{{}, {}}, + }, + }, + }, { description: "without all, single label set", input: &profilev1.Profile{ diff --git a/pkg/slices/slices.go b/pkg/slices/slices.go index d3245fe740..0afc7b9c46 100644 --- a/pkg/slices/slices.go +++ b/pkg/slices/slices.go @@ -31,6 +31,5 @@ func Clear[S ~[]E, E any](s S) { } func GrowLen[S ~[]E, E any](s S, n int) S { - s = s[:0] - return slices.Grow(s, n)[:n] + return slices.Grow(s[:0], n)[:n] }