Skip to content
This repository has been archived by the owner on Jul 19, 2023. It is now read-only.

Add query subcommand to profilecli for downloading pprof from phlare #475

Merged
merged 3 commits into from
Jan 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions cmd/profilecli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,15 @@ func main() {
parquetInspectCmd := parquetCmd.Command("inspect", "Inspect a parquet file's structure.")
parquetInspectFiles := parquetInspectCmd.Arg("file", "parquet file path").Required().ExistingFiles()

queryCmd := app.Command("query", "Query profile store.")
queryParams := addQueryParams(queryCmd)
queryOutput := queryCmd.Flag("output", "How to output the result, examples: console, raw, pprof=./my.pprof").Default("console").String()
queryMergeCmd := queryCmd.Command("merge", "Request merged profile.")

// parse command line arguments
parsedCmd := kingpin.MustParse(app.Parse(os.Args[1:]))

// enable verbose logging if requested
if !cfg.verbose {
logger = level.NewFilter(logger, level.AllowWarn())
}
Expand All @@ -63,7 +70,14 @@ func main() {
os.Exit(checkError(err))
}
}
case queryMergeCmd.FullCommand():
if err := queryMerge(ctx, queryParams, *queryOutput); err != nil {
os.Exit(checkError(err))
}
default:
level.Error(logger).Log("msg", "unknown command", "cmd", parsedCmd)
}

}

func checkError(err error) int {
Expand Down
188 changes: 188 additions & 0 deletions cmd/profilecli/query.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
package main

import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"os"
"strings"
"time"

"github.com/bufbuild/connect-go"
"github.com/go-kit/log/level"
gprofile "github.com/google/pprof/profile"
"github.com/grafana/dskit/runutil"
"github.com/k0kubun/pp/v3"
"github.com/klauspost/compress/gzip"
"github.com/mattn/go-isatty"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"gopkg.in/alecthomas/kingpin.v2"

querierv1 "github.com/grafana/phlare/api/gen/proto/go/querier/v1"
"github.com/grafana/phlare/api/gen/proto/go/querier/v1/querierv1connect"
)

const (
outputConsole = "console"
outputRaw = "raw"
outputPprof = "pprof="
)

func parseTime(s string) (time.Time, error) {
if s == "" {
return time.Time{}, fmt.Errorf("empty time")
}
t, err := time.Parse(time.RFC3339, s)
if err == nil {
return t, nil
}

// try if it is a relative time
d, rerr := parseRelativeTime(s)
if rerr == nil {
return time.Now().Add(-d), nil
}

// if not return first error
return time.Time{}, err

}

func parseRelativeTime(s string) (time.Duration, error) {
s = strings.TrimSpace(s)
if s == "now" {
return 0, nil
}
s = strings.TrimPrefix(s, "now-")

d, err := model.ParseDuration(s)
if err != nil {
return 0, err
}
return time.Duration(d), nil
}

type queryParams struct {
URL string
From string
To string
ProfileType string
Query string
}

func (p *queryParams) parseFromTo() (from time.Time, to time.Time, err error) {
from, err = parseTime(p.From)
if err != nil {
return time.Time{}, time.Time{}, errors.Wrap(err, "failed to parse from")
}
to, err = parseTime(p.To)
if err != nil {
return time.Time{}, time.Time{}, errors.Wrap(err, "failed to parse to")
}

if to.Before(from) {
return time.Time{}, time.Time{}, errors.Wrap(err, "from cannot be after")
}

return from, to, nil
}

func (p *queryParams) client() querierv1connect.QuerierServiceClient {
return querierv1connect.NewQuerierServiceClient(
http.DefaultClient,
p.URL,
)
}

type flagger interface {
Flag(name, help string) *kingpin.FlagClause
}

func addQueryParams(queryCmd flagger) *queryParams {
params := &queryParams{}
queryCmd.Flag("url", "URL of the profile store.").Default("http://localhost:4100").StringVar(&params.URL)
queryCmd.Flag("from", "Beginning of the query.").Default("now-1h").StringVar(&params.From)
queryCmd.Flag("to", "End of the query.").Default("now").StringVar(&params.To)
queryCmd.Flag("profile-type", "Profile type to query.").Default("process_cpu:cpu:nanoseconds:cpu:nanoseconds").StringVar(&params.ProfileType)
queryCmd.Flag("query", "Label selector to query.").Default("{}").StringVar(&params.Query)
return params
}

func queryMerge(ctx context.Context, params *queryParams, outputFlag string) (err error) {
from, to, err := params.parseFromTo()
if err != nil {
return err
}

level.Info(logger).Log("msg", "query aggregated profile from profile store", "url", params.URL, "from", from, "to", to, "query", params.Query, "type", params.ProfileType)

qc := params.client()

resp, err := qc.SelectMergeProfile(ctx, connect.NewRequest(&querierv1.SelectMergeProfileRequest{
ProfileTypeID: params.ProfileType,
Start: from.UnixMilli(),
End: to.UnixMilli(),
LabelSelector: params.Query,
}))

if err != nil {
return errors.Wrap(err, "failed to query")
}

mypp := pp.New()
mypp.SetColoringEnabled(isatty.IsTerminal(os.Stdout.Fd()))
mypp.SetExportedOnly(true)

if outputFlag == outputConsole {
buf, err := resp.Msg.MarshalVT()
if err != nil {
return errors.Wrap(err, "failed to marshal protobuf")
}

p, err := gprofile.Parse(bytes.NewReader(buf))
if err != nil {
return errors.Wrap(err, "failed to parse profile")
}

fmt.Fprintln(output(ctx), p.String())
return nil

}

if outputFlag == outputRaw {
mypp.Print(resp.Msg)
return nil
}

if strings.HasPrefix(outputFlag, outputPprof) {
filePath := strings.TrimPrefix(outputFlag, outputPprof)
if filePath == "" {
return errors.New("no file path specified after pprof=")
}
buf, err := resp.Msg.MarshalVT()
if err != nil {
return errors.Wrap(err, "failed to marshal protobuf")
}

// open new file, fail when the file already exists
f, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0644)
if err != nil {
return errors.Wrap(err, "failed to create pprof file")
}
defer runutil.CloseWithErrCapture(&err, f, "failed to close pprof file")

gzipWriter := gzip.NewWriter(f)
defer runutil.CloseWithErrCapture(&err, gzipWriter, "failed to close pprof gzip writer")

if _, err := io.Copy(gzipWriter, bytes.NewReader(buf)); err != nil {
return errors.Wrap(err, "failed to write pprof")
}

return nil
}

return errors.Errorf("unknown output %s", outputFlag)
}
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.0
github.com/json-iterator/go v1.1.12
github.com/k0kubun/pp/v3 v3.2.0
github.com/klauspost/compress v1.15.13
github.com/mattn/go-isatty v0.0.16
github.com/minio/minio-go/v7 v7.0.45
github.com/mitchellh/go-wordwrap v1.0.1
github.com/oklog/ulid v1.3.1
Expand Down Expand Up @@ -177,7 +179,6 @@ require (
github.com/linode/linodego v1.9.3 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.16 // indirect
github.com/mattn/go-runewidth v0.0.14 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/miekg/dns v1.1.50 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,8 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V
github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes=
github.com/k0kubun/pp/v3 v3.2.0 h1:h33hNTZ9nVFNP3u2Fsgz8JXiF5JINoZfFq4SvKJwNcs=
github.com/k0kubun/pp/v3 v3.2.0/go.mod h1:ODtJQbQcIRfAD3N+theGCV1m/CBxweERz2dapdz1EwA=
github.com/karrick/godirwalk v1.8.0/go.mod h1:H5KPZjojv4lE+QYImBI8xVtrBRgYrIVsaRPx4tDPEn4=
github.com/karrick/godirwalk v1.10.3/go.mod h1:RoGL9dQei4vP9ilrpETWE8CLOZ1kiN0LhBygSwrAsHA=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
Expand Down