Skip to content

Commit

Permalink
Add Tracing Instrumentation for Querier
Browse files Browse the repository at this point in the history
  • Loading branch information
cyriltovena committed Sep 1, 2022
1 parent d9f66ce commit 79a6c31
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 0 deletions.
2 changes: 2 additions & 0 deletions pkg/firedb/block_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,8 @@ func (b *singleBlockQuerier) SelectProfiles(ctx context.Context, req *connect.Re
sp, ctx := opentracing.StartSpanFromContext(ctx, "BlockQuerier - SelectProfiles")
defer func() {
sp.LogFields(
otlog.String("block min", b.meta.MinTime.Time().String()),
otlog.String("block max", b.meta.MaxTime.Time().String()),
otlog.Int64("total_samples", totalSamples),
otlog.Int64("total_locations", totalLocations),
otlog.Int64("total_profiles", totalProfiles),
Expand Down
44 changes: 44 additions & 0 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,21 @@ import (
"context"
"flag"
"sort"
"strings"
"time"

"github.com/bufbuild/connect-go"
"github.com/go-kit/log"
"github.com/grafana/dskit/ring"
ring_client "github.com/grafana/dskit/ring/client"
"github.com/grafana/dskit/services"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/samber/lo"
"go.elastic.co/apm/model"

commonv1 "github.com/grafana/fire/pkg/gen/common/v1"
ingestv1 "github.com/grafana/fire/pkg/gen/ingester/v1"
Expand Down Expand Up @@ -91,6 +95,9 @@ func (q *Querier) stopping(_ error) error {
}

func (q *Querier) ProfileTypes(ctx context.Context, req *connect.Request[querierv1.ProfileTypesRequest]) (*connect.Response[querierv1.ProfileTypesResponse], error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "ProfileTypes")
defer sp.Finish()

responses, err := forAllIngesters(ctx, q.ingesterQuerier, func(ic IngesterQueryClient) ([]*commonv1.ProfileType, error) {
res, err := ic.ProfileTypes(ctx, connect.NewRequest(&ingestv1.ProfileTypesRequest{}))
if err != nil {
Expand Down Expand Up @@ -122,6 +129,13 @@ func (q *Querier) ProfileTypes(ctx context.Context, req *connect.Request[querier
}

func (q *Querier) LabelValues(ctx context.Context, req *connect.Request[querierv1.LabelValuesRequest]) (*connect.Response[querierv1.LabelValuesResponse], error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "LabelValues")
defer func() {
sp.LogFields(
otlog.String("name", req.Msg.Name),
)
sp.Finish()
}()
responses, err := forAllIngesters(ctx, q.ingesterQuerier, func(ic IngesterQueryClient) ([]string, error) {
res, err := ic.LabelValues(ctx, connect.NewRequest(&ingestv1.LabelValuesRequest{
Name: req.Msg.Name,
Expand All @@ -141,6 +155,13 @@ func (q *Querier) LabelValues(ctx context.Context, req *connect.Request[querierv
}

func (q *Querier) Series(ctx context.Context, req *connect.Request[querierv1.SeriesRequest]) (*connect.Response[querierv1.SeriesResponse], error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "Series")
defer func() {
sp.LogFields(
otlog.String("matchers", strings.Join(req.Msg.Matchers, ",")),
)
sp.Finish()
}()
responses, err := forAllIngesters(ctx, q.ingesterQuerier, func(ic IngesterQueryClient) ([]*commonv1.Labels, error) {
res, err := ic.Series(ctx, connect.NewRequest(&ingestv1.SeriesRequest{
Matchers: req.Msg.Matchers,
Expand All @@ -165,6 +186,17 @@ func (q *Querier) Series(ctx context.Context, req *connect.Request[querierv1.Ser
}

func (q *Querier) SelectMergeStacktraces(ctx context.Context, req *connect.Request[querierv1.SelectMergeStacktracesRequest]) (*connect.Response[querierv1.SelectMergeStacktracesResponse], error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectMergeStacktraces")
defer func() {
sp.LogFields(
otlog.String("start", model.Time(req.Msg.Start).Time().String()),
otlog.String("end", model.Time(req.Msg.End).Time().String()),
otlog.String("selector", req.Msg.LabelSelector),
otlog.String("profile_id", req.Msg.ProfileTypeID),
)
sp.Finish()
}()

profileType, err := firemodel.ParseProfileTypeSelector(req.Msg.ProfileTypeID)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, err)
Expand All @@ -190,6 +222,18 @@ func (q *Querier) SelectMergeStacktraces(ctx context.Context, req *connect.Reque
}

func (q *Querier) SelectSeries(ctx context.Context, req *connect.Request[querierv1.SelectSeriesRequest]) (*connect.Response[querierv1.SelectSeriesResponse], error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectSeries")
defer func() {
sp.LogFields(
otlog.String("start", model.Time(req.Msg.Start).Time().String()),
otlog.String("end", model.Time(req.Msg.End).Time().String()),
otlog.String("selector", req.Msg.LabelSelector),
otlog.String("profile_id", req.Msg.ProfileTypeID),
otlog.String("group_by", strings.Join(req.Msg.GroupBy, ",")),
otlog.Float64("step", req.Msg.Step),
)
sp.Finish()
}()
profileType, err := firemodel.ParseProfileTypeSelector(req.Msg.ProfileTypeID)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, err)
Expand Down

0 comments on commit 79a6c31

Please sign in to comment.