Skip to content

Commit

Permalink
Minor improvements to GCS driver and Query Path (#693)
Browse files Browse the repository at this point in the history
* Checkpoint: replace gcs ioutils

Signed-off-by: Annanay <[email protected]>

* Replace ioutil with buf.ReadFrom with exact response size for gcs & s3

Signed-off-by: Annanay <[email protected]>

* Fix ContentLength header in spanIdDeduper middleware

Signed-off-by: Annanay <[email protected]>

* Update GCS sdk version v1.12.0 => v1.15.0

Signed-off-by: Annanay <[email protected]>

* Check resp is non-nil to avoid server panic

Signed-off-by: Annanay <[email protected]>

* Use ReadAllWithEstimate, add some tests around it

Signed-off-by: Annanay <[email protected]>

* make make make fmt fmt fmt

Signed-off-by: Annanay <[email protected]>
  • Loading branch information
annanay25 authored May 13, 2021
1 parent 8d628d7 commit 0a5eb28
Show file tree
Hide file tree
Showing 113 changed files with 3,952 additions and 1,996 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* [ENHANCEMENT] Reduce marshalling in the ingesters to improve performance. [#694](https://github.com/grafana/tempo/pull/694)
This is kind of a **breaking change**. Rollout all ingesters before any other component to prevent dropped spans.
* [CHANGE] Fix Query Frontend grpc settings to avoid noisy error log. [#690](https://github.com/grafana/tempo/pull/690)
* [CHANGE] GCS SDK update v1.12.0 => v.15.0, ReadAllWithEstimate used in GCS/S3 backends. [#693](https://github.com/grafana/tempo/pull/693)

## v0.7.0

Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/grafana/tempo
go 1.16

require (
cloud.google.com/go/storage v1.12.0
cloud.google.com/go/storage v1.15.0
contrib.go.opencensus.io/exporter/prometheus v0.2.0
github.com/Azure/azure-storage-blob-go v0.8.0
github.com/alecthomas/kong v0.2.11
Expand All @@ -14,7 +14,7 @@ require (
github.com/gogo/protobuf v1.3.2
github.com/gogo/status v1.0.3
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/golang/protobuf v1.4.3
github.com/golang/protobuf v1.5.2
github.com/golang/snappy v0.0.3
github.com/google/uuid v1.2.0
github.com/gorilla/mux v1.8.0
Expand Down Expand Up @@ -49,8 +49,8 @@ require (
go.uber.org/goleak v1.1.10
go.uber.org/zap v1.16.0
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
google.golang.org/api v0.42.0
google.golang.org/grpc v1.36.0
google.golang.org/api v0.45.0
google.golang.org/grpc v1.37.0
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
)
Expand Down
45 changes: 25 additions & 20 deletions go.sum

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions modules/frontend/deduper.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,10 @@ func (s spanIDDeduper) Do(req *http.Request) (*http.Response, error) {
}

return &http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(bytes.NewReader(traceBytes)),
Header: http.Header{},
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(bytes.NewReader(traceBytes)),
Header: http.Header{},
ContentLength: resp.ContentLength,
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion modules/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func NewTripperware(cfg Config, logger log.Logger, registerer prometheus.Registe

resp, err := rt.RoundTrip(r)

if resp.StatusCode == http.StatusOK && marshallingFormat == util.JSONTypeHeaderValue {
if resp != nil && resp.StatusCode == http.StatusOK && marshallingFormat == util.JSONTypeHeaderValue {
// if request is for application/json, unmarshal into proto object and re-marshal into json bytes
body, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
Expand Down
4 changes: 2 additions & 2 deletions pkg/io/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import "io"

// ReadAllWithEstimate is a fork of https://go.googlesource.com/go/+/go1.16.3/src/io/io.go#626
// with a starting buffer size. if none is provided it uses the existing default of 512
func ReadAllWithEstimate(r io.Reader, estimatedBytes int) ([]byte, error) {
func ReadAllWithEstimate(r io.Reader, estimatedBytes int64) ([]byte, error) {
if estimatedBytes == 0 {
estimatedBytes = 512
}

b := make([]byte, 0, estimatedBytes)
b := make([]byte, 0, estimatedBytes+1) // if the calling code knows the exact bytes needed the below logic will do one extra allocation unless we add 1
for {
if len(b) == cap(b) {
// Add more capacity (let append pick how much).
Expand Down
25 changes: 25 additions & 0 deletions pkg/io/read_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io

import (
"bytes"
"math/rand"
"testing"

"github.com/stretchr/testify/assert"
)

var testBufLength = 10

func TestReadAllWithEstimate(t *testing.T) {
buf := make([]byte, testBufLength)
_, err := rand.Read(buf)
assert.NoError(t, err)
assert.Equal(t, testBufLength, len(buf))
assert.Equal(t, testBufLength, cap(buf))

actualBuf, err := ReadAllWithEstimate(bytes.NewReader(buf), int64(testBufLength))
assert.NoError(t, err)
assert.Equal(t, buf, actualBuf)
assert.Equal(t, testBufLength, len(actualBuf))
assert.Equal(t, testBufLength+1, cap(actualBuf)) // one extra byte used in ReadAllWithEstimate
}
16 changes: 8 additions & 8 deletions tempodb/backend/gcs/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,21 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"strings"
"time"

"github.com/grafana/tempo/tempodb/backend/util"
"github.com/pkg/errors"

"cloud.google.com/go/storage"
"github.com/google/uuid"
"github.com/grafana/tempo/tempodb/backend"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
google_http "google.golang.org/api/transport/http"

tempo_io "github.com/grafana/tempo/pkg/io"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/backend/util"
)

type readerWriter struct {
Expand Down Expand Up @@ -279,7 +279,7 @@ func (rw *readerWriter) readAll(ctx context.Context, name string) ([]byte, error
}
defer r.Close()

return ioutil.ReadAll(r)
return tempo_io.ReadAllWithEstimate(r, r.Attrs.Size)
}

func (rw *readerWriter) readAllWithModTime(ctx context.Context, name string) ([]byte, time.Time, error) {
Expand All @@ -289,12 +289,12 @@ func (rw *readerWriter) readAllWithModTime(ctx context.Context, name string) ([]
}
defer r.Close()

bytes, err := ioutil.ReadAll(r)
buf, err := tempo_io.ReadAllWithEstimate(r, r.Attrs.Size)
if err != nil {
return nil, time.Time{}, err
}

return bytes, r.Attrs.LastModified, nil
return buf, r.Attrs.LastModified, nil
}

func (rw *readerWriter) readRange(ctx context.Context, name string, offset int64, buffer []byte) error {
Expand Down
15 changes: 6 additions & 9 deletions tempodb/backend/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"strings"

Expand All @@ -20,6 +19,8 @@ import (
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"

tempo_io "github.com/grafana/tempo/pkg/io"
)

const (
Expand Down Expand Up @@ -341,19 +342,15 @@ func (rw *readerWriter) Shutdown() {
}

func (rw *readerWriter) readAll(ctx context.Context, name string) ([]byte, error) {
reader, _, _, err := rw.core.GetObject(ctx, rw.cfg.Bucket, name, minio.GetObjectOptions{})
reader, info, _, err := rw.core.GetObject(ctx, rw.cfg.Bucket, name, minio.GetObjectOptions{})
if err != nil {
// do not change or wrap this error
// we need to compare the specific err message
return nil, err
}
defer reader.Close()

body, err := ioutil.ReadAll(reader)
if err != nil {
return nil, err
}
return body, nil
return tempo_io.ReadAllWithEstimate(reader, info.Size)
}

func (rw *readerWriter) readAllWithObjInfo(ctx context.Context, name string) ([]byte, minio.ObjectInfo, error) {
Expand All @@ -365,11 +362,11 @@ func (rw *readerWriter) readAllWithObjInfo(ctx context.Context, name string) ([]
}
defer reader.Close()

body, err := ioutil.ReadAll(reader)
buf, err := tempo_io.ReadAllWithEstimate(reader, info.Size)
if err != nil {
return nil, minio.ObjectInfo{}, errors.Wrap(err, "error reading response from s3 backend")
}
return body, info, nil
return buf, info, nil
}

func (rw *readerWriter) readRange(ctx context.Context, objName string, offset int64, buffer []byte) error {
Expand Down
45 changes: 45 additions & 0 deletions vendor/cloud.google.com/go/CHANGES.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions vendor/cloud.google.com/go/go.mod

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 0a5eb28

Please sign in to comment.