Skip to content

Commit

Permalink
Fix multitenancy when using OTLP HTTP
Browse files Browse the repository at this point in the history
Fixes #495

Signed-off-by: Goutham Veeramachaneni <[email protected]>
  • Loading branch information
gouthamve committed Oct 5, 2022
1 parent 6d2e2c4 commit 391aa3b
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 5 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ query_frontend:
```
* [ENHACEMENT] Update OTel collector to v0.57.2 [#1757](https://github.com/grafana/tempo/pull/1757) (@mapno)
* [ENHANCEMENT] Vulture now has improved distribution of the random traces it searches. [#1763](https://github.com/grafana/tempo/pull/1763) (@rfratto)
* [BUGFIX] Honor caching and buffering settings when finding traces by id [#1697](https://github.com/grafana/tempo/pull/1697) (@joe-elliott)
* [BUGFIX] Correctly propagate errors from the iterator layer up through the queriers [#1723](https://github.com/grafana/tempo/pull/1723) (@joe-elliott)
* [ENHANCEMENT] Upgrade opentelemetry-proto submodule to v0.18.0 [#1754](https://github.com/grafana/tempo/pull/1754) (@mapno)
Internal types are updated to use `scope` instead of `instrumentation_library`. This is a breaking change in trace by ID queries if JSON is requested.
* [BUGFIX] Honor caching and buffering settings when finding traces by id [#1697](https://github.com/grafana/tempo/pull/1697) (@joe-elliott)
* [BUGFIX] Correctly propagate errors from the iterator layer up through the queriers [#1723](https://github.com/grafana/tempo/pull/1723) (@joe-elliott)
* [BUGFIX] Make multitenancy work with HTTP [#1781](https://github.com/grafana/tempo/pull/1781) (@gouthamve)

## v1.5.0 / 2022-08-17

Expand Down
12 changes: 10 additions & 2 deletions modules/distributor/receiver/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/weaveworks/common/user"
"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/ptrace"

Expand Down Expand Up @@ -56,8 +57,15 @@ func (m *multiTenancyMiddleware) Wrap(next consumer.Traces) consumer.Traces {
var err error
_, ctx, err = user.ExtractFromGRPCRequest(ctx)
if err != nil {
log.Logger.Log("msg", "failed to extract org id", "err", err)
return err
// Maybe its a HTTP request.
info := client.FromContext(ctx)
orgIDs := info.Metadata.Get(user.OrgIDHeaderName)
if len(orgIDs) != 1 {
log.Logger.Log("msg", "failed to extract org id", "err", err)
return err
}

ctx = user.InjectOrgID(ctx, orgIDs[0])
}
return next.ConsumeTraces(ctx, td)
})
Expand Down
22 changes: 21 additions & 1 deletion modules/distributor/receiver/middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"
"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/ptrace"
"google.golang.org/grpc/metadata"
Expand Down Expand Up @@ -54,7 +55,7 @@ func TestFakeTenantMiddleware(t *testing.T) {
func TestMultiTenancyMiddleware(t *testing.T) {
m := MultiTenancyMiddleware()

t.Run("injects org id", func(t *testing.T) {
t.Run("injects org id grpc", func(t *testing.T) {
tenantID := "test-tenant-id"

consumer := newAssertingConsumer(t, func(t *testing.T, ctx context.Context) {
Expand All @@ -70,6 +71,25 @@ func TestMultiTenancyMiddleware(t *testing.T) {
require.NoError(t, m.Wrap(consumer).ConsumeTraces(ctx, ptrace.Traces{}))
})

t.Run("injects org id http", func(t *testing.T) {
tenantID := "test-tenant-id"

consumer := newAssertingConsumer(t, func(t *testing.T, ctx context.Context) {
orgID, err := user.ExtractOrgID(ctx)
require.NoError(t, err)
require.Equal(t, orgID, tenantID)
})

info := client.Info{
Metadata: client.NewMetadata(map[string][]string{
"x-scope-OrgID": {tenantID},
}),
}

ctx := client.NewContext(context.Background(), info)
require.NoError(t, m.Wrap(consumer).ConsumeTraces(ctx, ptrace.Traces{}))
})

t.Run("returns error if org id cannot be extracted", func(t *testing.T) {
// no need to assert anything, because the wrapped function is never called
consumer := newAssertingConsumer(t, func(t *testing.T, ctx context.Context) {})
Expand Down
26 changes: 26 additions & 0 deletions modules/distributor/receiver/shim.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,32 @@ func New(receiverCfg map[string]interface{}, pusher BatchPusher, middleware Midd
return nil, fmt.Errorf("receiver factory not found for type: %s", componentID.Type())
}

// Make sure that the headers are added to context. Required for Authentication.
switch componentID.Type() {
case "otlp":
otlpRecvCfg := cfg.(*otlpreceiver.Config)

if otlpRecvCfg.HTTP != nil {
otlpRecvCfg.HTTP.IncludeMetadata = true
cfg = otlpRecvCfg
}

case "zipkin":
zipkinRecvCfg := cfg.(*zipkinreceiver.Config)

zipkinRecvCfg.HTTPServerSettings.IncludeMetadata = true
cfg = zipkinRecvCfg

case "jaeger":
jaegerRecvCfg := cfg.(*jaegerreceiver.Config)

if jaegerRecvCfg.ThriftHTTP != nil {
jaegerRecvCfg.ThriftHTTP.IncludeMetadata = true
}

cfg = jaegerRecvCfg
}

receiver, err := factoryBase.CreateTracesReceiver(ctx, params, cfg, middleware.Wrap(shim))
if err != nil {
return nil, err
Expand Down

0 comments on commit 391aa3b

Please sign in to comment.