Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancement: Add prefix configuration option to azure and gcs storage #2386

Merged
merged 5 commits into from
Apr 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## main / unreleased

* [ENHANCEMENT] Add `prefix` configuration option to `storage.trace.azure` and `storage.trace.gcs` [#2362](https://github.com/grafana/tempo/pull/2386) (@kousikmitra)
* [ENHANCEMENT] Add `prefix` configuration option to `storage.trace.s3` [#2362](https://github.com/grafana/tempo/pull/2362) (@kousikmitra)
* [FEATURE] Add support for `q` query param in `/api/v2/search/<tag.name>/values` to filter results based on a TraceQL query [#2253](https://github.com/grafana/tempo/pull/2253) (@mapno)
* [ENHANCEMENT] Add `scope` parameter to `/api/search/tags` [#2282](https://github.com/grafana/tempo/pull/2282) (@joe-elliott)
Expand Down
23 changes: 15 additions & 8 deletions docs/sources/tempo/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -598,11 +598,15 @@ storage:
gcs:

# Bucket name in gcs
# Tempo requires a dedicated bucket since it maintains a top-level object structure and does not support
# a custom prefix to nest within a shared bucket.
# Tempo requires a bucket to maintain a top-level object structure. You can use prefix option with this to nest all objects within a shared bucket.
# Example: "bucket_name: tempo"
[bucket_name: <string>]

# optional.
# Prefix name in gcs
# Tempo has this additional option to support a custom prefix to nest all the objects withing a shared bucket.
[prefix: <string>]

# Buffer size for reads. Default is 10MB
# Example: "chunk_buffer_size: 5_000_000"
[chunk_buffer_size: <int>]
Expand Down Expand Up @@ -647,13 +651,12 @@ storage:
s3:

# Bucket name in s3
# Tempo requires a dedicated bucket since it maintains a top-level object structure and does not support
# a custom prefix to nest within a shared bucket.
# Tempo requires a bucket to maintain a top-level object structure. You can use prefix option with this to nest all objects within a shared bucket.
[bucket: <string>]

# optional.
# Prefix name in s3
# Tempo has this additional option to support a custom prefix to nest all
# the objects withing a shared bucket.
# Tempo has this additional option to support a custom prefix to nest all the objects withing a shared bucket.
[prefix: <string>]

# api endpoint to connect to. use AWS S3 or any S3 compatible object storage endpoint.
Expand Down Expand Up @@ -721,10 +724,14 @@ storage:
azure:

# store traces in this container.
# Tempo requires a dedicated bucket since it maintains a top-level object structure and does not support
# a custom prefix to nest within a shared bucket.
# Tempo requires bucket to maintain a top-level object structure. You can use prefix option to nest all objects within a shared bucket
[container_name: <string>]

# optional.
# Prefix for azure.
# Tempo has this additional option to support a custom prefix to nest all the objects withing a shared bucket.
[prefix: <string>]

# optional.
# Azure endpoint to use, defaults to Azure global(core.windows.net) for other
# regions this needs to be changed e.g Azure China(blob.core.chinacloudapi.cn),
Expand Down
2 changes: 2 additions & 0 deletions modules/storage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
f.StringVar(&cfg.Trace.Azure.StorageAccountName, util.PrefixConfig(prefix, "trace.azure.storage_account_name"), "", "Azure storage account name.")
f.Var(&cfg.Trace.Azure.StorageAccountKey, util.PrefixConfig(prefix, "trace.azure.storage_account_key"), "Azure storage access key.")
f.StringVar(&cfg.Trace.Azure.ContainerName, util.PrefixConfig(prefix, "trace.azure.container_name"), "", "Azure container name to store blocks in.")
f.StringVar(&cfg.Trace.Azure.Prefix, util.PrefixConfig(prefix, "trace.azure.prefix"), "", "Azure container prefix to store blocks in.")
f.StringVar(&cfg.Trace.Azure.Endpoint, util.PrefixConfig(prefix, "trace.azure.endpoint"), "blob.core.windows.net", "Azure endpoint to push blocks to.")
f.IntVar(&cfg.Trace.Azure.MaxBuffers, util.PrefixConfig(prefix, "trace.azure.max_buffers"), 4, "Number of simultaneous uploads.")
cfg.Trace.Azure.BufferSize = 3 * 1024 * 1024
Expand All @@ -84,6 +85,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)

cfg.Trace.GCS = &gcs.Config{}
f.StringVar(&cfg.Trace.GCS.BucketName, util.PrefixConfig(prefix, "trace.gcs.bucket"), "", "gcs bucket to store traces in.")
f.StringVar(&cfg.Trace.GCS.Prefix, util.PrefixConfig(prefix, "trace.gcs.prefix"), "", "gcs bucket prefix to store traces in.")
cfg.Trace.GCS.ChunkBufferSize = 10 * 1024 * 1024
cfg.Trace.GCS.HedgeRequestsUpTo = 2

Expand Down
9 changes: 9 additions & 0 deletions tempodb/backend/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ func internalNew(cfg *Config, confirm bool) (backend.RawReader, backend.RawWrite

// Write implements backend.Writer
func (rw *readerWriter) Write(ctx context.Context, name string, keypath backend.KeyPath, data io.Reader, _ int64, _ bool) error {
keypath = backend.KeyPathWithPrefix(keypath, rw.cfg.Prefix)

span, derivedCtx := opentracing.StartSpanFromContext(ctx, "azure.Write")
defer span.Finish()

Expand All @@ -85,6 +87,7 @@ func (rw *readerWriter) Write(ctx context.Context, name string, keypath backend.

// Append implements backend.Writer
func (rw *readerWriter) Append(ctx context.Context, name string, keypath backend.KeyPath, tracker backend.AppendTracker, buffer []byte) (backend.AppendTracker, error) {
keypath = backend.KeyPathWithPrefix(keypath, rw.cfg.Prefix)
var a appendTracker
if tracker == nil {
a.Name = backend.ObjectFileName(keypath, name)
Expand Down Expand Up @@ -112,6 +115,8 @@ func (rw *readerWriter) CloseAppend(ctx context.Context, tracker backend.AppendT

// List implements backend.Reader
func (rw *readerWriter) List(ctx context.Context, keypath backend.KeyPath) ([]string, error) {
keypath = backend.KeyPathWithPrefix(keypath, rw.cfg.Prefix)

marker := blob.Marker{}
prefix := path.Join(keypath...)

Expand Down Expand Up @@ -145,6 +150,8 @@ func (rw *readerWriter) List(ctx context.Context, keypath backend.KeyPath) ([]st

// Read implements backend.Reader
func (rw *readerWriter) Read(ctx context.Context, name string, keypath backend.KeyPath, _ bool) (io.ReadCloser, int64, error) {
keypath = backend.KeyPathWithPrefix(keypath, rw.cfg.Prefix)

span, derivedCtx := opentracing.StartSpanFromContext(ctx, "azure.Read")
defer span.Finish()

Expand All @@ -159,6 +166,8 @@ func (rw *readerWriter) Read(ctx context.Context, name string, keypath backend.K

// ReadRange implements backend.Reader
func (rw *readerWriter) ReadRange(ctx context.Context, name string, keypath backend.KeyPath, offset uint64, buffer []byte, _ bool) error {
keypath = backend.KeyPathWithPrefix(keypath, rw.cfg.Prefix)

span, derivedCtx := opentracing.StartSpanFromContext(ctx, "azure.ReadRange", opentracing.Tags{
"len": len(buffer),
"offset": offset,
Expand Down
72 changes: 72 additions & 0 deletions tempodb/backend/azure/azure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,75 @@ func blobStorageError(serviceCode string) error {

return blob.NewResponseError(nil, resp, "")
}

func TestObjectWithPrefix(t *testing.T) {

tests := []struct {
name string
prefix string
objectName string
keyPath backend.KeyPath
httpHandler func(t *testing.T) http.HandlerFunc
}{
{
name: "with prefix",
prefix: "test_prefix",
objectName: "object",
keyPath: backend.KeyPath{"test_path"},
httpHandler: func(t *testing.T) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
_, _ = w.Write([]byte(""))
return
}

assert.Equal(t, "/testing_account/blerg/test_prefix/test_path/object", r.URL.Path)
}
},
},
{
name: "without prefix",
prefix: "",
objectName: "object",
keyPath: backend.KeyPath{"test_path"},
httpHandler: func(t *testing.T) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
_, _ = w.Write([]byte(""))
return
}

assert.Equal(t, "/testing_account/blerg/test_path/object", r.URL.Path)
}
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
server := testServer(t, tc.httpHandler(t))
_, w, _, err := New(&Config{
StorageAccountName: "testing_account",
StorageAccountKey: flagext.SecretWithValue("YQo="),
MaxBuffers: 3,
BufferSize: 1000,
ContainerName: "blerg",
Prefix: tc.prefix,
Endpoint: server.URL[7:], // [7:] -> strip http://,
})
require.NoError(t, err)

ctx := context.Background()
err = w.Write(ctx, tc.objectName, tc.keyPath, bytes.NewReader([]byte{}), 0, false)
assert.NoError(t, err)
})
}
}

func testServer(t *testing.T, httpHandler http.HandlerFunc) *httptest.Server {
t.Helper()
assert.NotNil(t, httpHandler)
server := httptest.NewServer(httpHandler)
t.Cleanup(server.Close)
return server
}
1 change: 1 addition & 0 deletions tempodb/backend/azure/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Config struct {
UseFederatedToken bool `yaml:"use_federated_token"`
UserAssignedID string `yaml:"user_assigned_id"`
ContainerName string `yaml:"container_name"`
Prefix string `yaml:"prefix"`
Endpoint string `yaml:"endpoint_suffix"`
MaxBuffers int `yaml:"max_buffers"`
BufferSize int `yaml:"buffer_size"`
Expand Down
1 change: 1 addition & 0 deletions tempodb/backend/gcs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

type Config struct {
BucketName string `yaml:"bucket_name"`
Prefix string `yaml:"prefix"`
ChunkBufferSize int `yaml:"chunk_buffer_size"`
Endpoint string `yaml:"endpoint"`
HedgeRequestsAt time.Duration `yaml:"hedge_requests_at"`
Expand Down
5 changes: 5 additions & 0 deletions tempodb/backend/gcs/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func internalNew(cfg *Config, confirm bool) (backend.RawReader, backend.RawWrite

// Write implements backend.Writer
func (rw *readerWriter) Write(ctx context.Context, name string, keypath backend.KeyPath, data io.Reader, _ int64, _ bool) error {
keypath = backend.KeyPathWithPrefix(keypath, rw.cfg.Prefix)
span, derivedCtx := opentracing.StartSpanFromContext(ctx, "gcs.Write")
defer span.Finish()

Expand All @@ -90,6 +91,7 @@ func (rw *readerWriter) Write(ctx context.Context, name string, keypath backend.

// Append implements backend.Writer
func (rw *readerWriter) Append(ctx context.Context, name string, keypath backend.KeyPath, tracker backend.AppendTracker, buffer []byte) (backend.AppendTracker, error) {
keypath = backend.KeyPathWithPrefix(keypath, rw.cfg.Prefix)
span, ctx := opentracing.StartSpanFromContext(ctx, "gcs.Append", opentracing.Tags{
"len": len(buffer),
})
Expand Down Expand Up @@ -122,6 +124,7 @@ func (rw *readerWriter) CloseAppend(_ context.Context, tracker backend.AppendTra

// List implements backend.Reader
func (rw *readerWriter) List(ctx context.Context, keypath backend.KeyPath) ([]string, error) {
keypath = backend.KeyPathWithPrefix(keypath, rw.cfg.Prefix)
prefix := path.Join(keypath...)
if len(prefix) > 0 {
prefix = prefix + "/"
Expand Down Expand Up @@ -151,6 +154,7 @@ func (rw *readerWriter) List(ctx context.Context, keypath backend.KeyPath) ([]st

// Read implements backend.Reader
func (rw *readerWriter) Read(ctx context.Context, name string, keypath backend.KeyPath, _ bool) (io.ReadCloser, int64, error) {
keypath = backend.KeyPathWithPrefix(keypath, rw.cfg.Prefix)
span, derivedCtx := opentracing.StartSpanFromContext(ctx, "gcs.Read")
defer span.Finish()

Expand All @@ -165,6 +169,7 @@ func (rw *readerWriter) Read(ctx context.Context, name string, keypath backend.K

// ReadRange implements backend.Reader
func (rw *readerWriter) ReadRange(ctx context.Context, name string, keypath backend.KeyPath, offset uint64, buffer []byte, _ bool) error {
keypath = backend.KeyPathWithPrefix(keypath, rw.cfg.Prefix)
span, derivedCtx := opentracing.StartSpanFromContext(ctx, "gcs.ReadRange", opentracing.Tags{
"len": len(buffer),
"offset": offset,
Expand Down
82 changes: 82 additions & 0 deletions tempodb/backend/gcs/gcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,85 @@ func fakeServerWithObjectAttributes(t *testing.T, o *raw.Object) *httptest.Serve

return server
}

func TestObjectWithPrefix(t *testing.T) {

tests := []struct {
name string
prefix string
objectName string
keyPath backend.KeyPath
httpHandler func(t *testing.T) http.HandlerFunc
}{
{
name: "with prefix",
prefix: "test_storage",
objectName: "object",
keyPath: backend.KeyPath{"test_path"},
httpHandler: func(t *testing.T) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
_, _ = w.Write([]byte(`
{
"location": "US",
"storageClass": "STANDARD"
}
`))
return
}

assert.Equal(t, "/upload/storage/v1/b/blerg/o", r.URL.Path)
assert.True(t, r.URL.Query().Get("name") == "test_storage/test_path/object")
_, _ = w.Write([]byte(`{}`))
}
},
},
{
name: "without prefix",
objectName: "object",
keyPath: backend.KeyPath{"test_path"},
httpHandler: func(t *testing.T) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
_, _ = w.Write([]byte(`
{
"location": "US",
"storageClass": "STANDARD"
}
`))
return
}

assert.Equal(t, "/upload/storage/v1/b/blerg/o", r.URL.Path)
assert.True(t, r.URL.Query().Get("name") == "test_path/object")
_, _ = w.Write([]byte(`{}`))
}
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
server := testServer(t, tc.httpHandler(t))
_, w, _, err := New(&Config{
BucketName: "blerg",
Endpoint: server.URL,
Insecure: true,
Prefix: tc.prefix,
})
require.NoError(t, err)

ctx := context.Background()
err = w.Write(ctx, tc.objectName, tc.keyPath, bytes.NewReader([]byte{}), 0, false)
assert.NoError(t, err)
})
}
}

func testServer(t *testing.T, httpHandler http.HandlerFunc) *httptest.Server {
t.Helper()
assert.NotNil(t, httpHandler)
server := httptest.NewServer(httpHandler)
t.Cleanup(server.Close)
return server
}
4 changes: 2 additions & 2 deletions tempodb/backend/s3/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func TestObjectWithPrefix(t *testing.T) {
return
}

assert.Equal(t, r.URL.String(), "/blerg/test_storage/test/object")
assert.Equal(t, "/blerg/test_storage/test/object", r.URL.String())
}
},
},
Expand All @@ -237,7 +237,7 @@ func TestObjectWithPrefix(t *testing.T) {
return
}

assert.Equal(t, r.URL.String(), "/blerg/test/object")
assert.Equal(t, "/blerg/test/object", r.URL.String())
}
},
},
Expand Down