Skip to content

Commit

Permalink
Move compatibility changes to internal, use when pdata parses traces …
Browse files Browse the repository at this point in the history
…from bytes

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Mar 25, 2021
1 parent c9f29e6 commit d3cc537
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 139 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
- Remove `ComponentSettings` and `DefaultComponentSettings()`
- Rename `NewComponent()` to `New()`

## 🧰 Bug fixes 🧰

- `pdata.TracesFromOtlpProtoBytes`: Fixes to handle backwards compatibility changes in proto (#2798)

## v0.23.0 Beta

## 🛑 Breaking changes 🛑
Expand Down
1 change: 1 addition & 0 deletions consumer/pdata/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func TracesFromOtlpProtoBytes(data []byte) (Traces, error) {
if err := req.Unmarshal(data); err != nil {
return Traces{}, err
}
internal.TracesCompatibilityChanges(&req)
return Traces{orig: &req}, nil
}

Expand Down
25 changes: 25 additions & 0 deletions internal/otlp_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
otlpcollectorlog "go.opentelemetry.io/collector/internal/data/protogen/collector/logs/v1"
otlpcollectormetrics "go.opentelemetry.io/collector/internal/data/protogen/collector/metrics/v1"
otlpcollectortrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1"
otlptrace "go.opentelemetry.io/collector/internal/data/protogen/trace/v1"
)

// MetricsWrapper is an intermediary struct that is declared in an internal package
Expand Down Expand Up @@ -50,6 +51,30 @@ func TracesFromOtlp(req *otlpcollectortrace.ExportTraceServiceRequest) TracesWra
return TracesWrapper{req: req}
}

// TracesCompatibilityChanges performs backward compatibility conversion of Span Status code according to
// OTLP specification as we are a new receiver and sender (we are pushing data to the pipelines):
// See https://github.com/open-telemetry/opentelemetry-proto/blob/59c488bfb8fb6d0458ad6425758b70259ff4a2bd/opentelemetry/proto/trace/v1/trace.proto#L239
// See https://github.com/open-telemetry/opentelemetry-proto/blob/59c488bfb8fb6d0458ad6425758b70259ff4a2bd/opentelemetry/proto/trace/v1/trace.proto#L253
func TracesCompatibilityChanges(req *otlpcollectortrace.ExportTraceServiceRequest) {
for _, rss := range req.ResourceSpans {
for _, ils := range rss.InstrumentationLibrarySpans {
for _, span := range ils.Spans {
switch span.Status.Code {
case otlptrace.Status_STATUS_CODE_UNSET:
if span.Status.DeprecatedCode != otlptrace.Status_DEPRECATED_STATUS_CODE_OK {
span.Status.Code = otlptrace.Status_STATUS_CODE_ERROR
}
case otlptrace.Status_STATUS_CODE_OK:
// If status code is set then overwrites deprecated.
span.Status.DeprecatedCode = otlptrace.Status_DEPRECATED_STATUS_CODE_OK
case otlptrace.Status_STATUS_CODE_ERROR:
span.Status.DeprecatedCode = otlptrace.Status_DEPRECATED_STATUS_CODE_UNKNOWN_ERROR
}
}
}
}
}

// LogsWrapper is an intermediary struct that is declared in an internal package
// as a way to prevent certain functions of pdata.Logs data type to be callable by
// any code outside of this module.
Expand Down
116 changes: 116 additions & 0 deletions internal/otlp_wrappers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internal

import (
"testing"

"github.com/stretchr/testify/assert"

collectortrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1"
otlptrace "go.opentelemetry.io/collector/internal/data/protogen/trace/v1"
)

func TestDeprecatedStatusCode(t *testing.T) {
// See specification for handling status code here:
// https://github.com/open-telemetry/opentelemetry-proto/blob/59c488bfb8fb6d0458ad6425758b70259ff4a2bd/opentelemetry/proto/trace/v1/trace.proto#L231
tests := []struct {
sendCode otlptrace.Status_StatusCode
sendDeprecatedCode otlptrace.Status_DeprecatedStatusCode
expectedRcvCode otlptrace.Status_StatusCode
expectedDeprecatedCode otlptrace.Status_DeprecatedStatusCode
}{
{
// If code==STATUS_CODE_UNSET then the value of `deprecated_code` is the
// carrier of the overall status according to these rules:
//
// if deprecated_code==DEPRECATED_STATUS_CODE_OK then the receiver MUST interpret
// the overall status to be STATUS_CODE_UNSET.
sendCode: otlptrace.Status_STATUS_CODE_UNSET,
sendDeprecatedCode: otlptrace.Status_DEPRECATED_STATUS_CODE_OK,
expectedRcvCode: otlptrace.Status_STATUS_CODE_UNSET,
expectedDeprecatedCode: otlptrace.Status_DEPRECATED_STATUS_CODE_OK,
},
{
// if deprecated_code!=DEPRECATED_STATUS_CODE_OK then the receiver MUST interpret
// the overall status to be STATUS_CODE_ERROR.
sendCode: otlptrace.Status_STATUS_CODE_UNSET,
sendDeprecatedCode: otlptrace.Status_DEPRECATED_STATUS_CODE_ABORTED,
expectedRcvCode: otlptrace.Status_STATUS_CODE_ERROR,
expectedDeprecatedCode: otlptrace.Status_DEPRECATED_STATUS_CODE_ABORTED,
},
{
// If code!=STATUS_CODE_UNSET then the value of `deprecated_code` MUST be
// overwritten, the `code` field is the sole carrier of the status.
sendCode: otlptrace.Status_STATUS_CODE_OK,
sendDeprecatedCode: otlptrace.Status_DEPRECATED_STATUS_CODE_OK,
expectedRcvCode: otlptrace.Status_STATUS_CODE_OK,
expectedDeprecatedCode: otlptrace.Status_DEPRECATED_STATUS_CODE_OK,
},
{
// If code!=STATUS_CODE_UNSET then the value of `deprecated_code` MUST be
// overwritten, the `code` field is the sole carrier of the status.
sendCode: otlptrace.Status_STATUS_CODE_OK,
sendDeprecatedCode: otlptrace.Status_DEPRECATED_STATUS_CODE_UNKNOWN_ERROR,
expectedRcvCode: otlptrace.Status_STATUS_CODE_OK,
expectedDeprecatedCode: otlptrace.Status_DEPRECATED_STATUS_CODE_OK,
},
{
// If code!=STATUS_CODE_UNSET then the value of `deprecated_code` MUST be
// overwritten, the `code` field is the sole carrier of the status.
sendCode: otlptrace.Status_STATUS_CODE_ERROR,
sendDeprecatedCode: otlptrace.Status_DEPRECATED_STATUS_CODE_OK,
expectedRcvCode: otlptrace.Status_STATUS_CODE_ERROR,
expectedDeprecatedCode: otlptrace.Status_DEPRECATED_STATUS_CODE_UNKNOWN_ERROR,
},
{
// If code!=STATUS_CODE_UNSET then the value of `deprecated_code` MUST be
// overwritten, the `code` field is the sole carrier of the status.
sendCode: otlptrace.Status_STATUS_CODE_ERROR,
sendDeprecatedCode: otlptrace.Status_DEPRECATED_STATUS_CODE_UNKNOWN_ERROR,
expectedRcvCode: otlptrace.Status_STATUS_CODE_ERROR,
expectedDeprecatedCode: otlptrace.Status_DEPRECATED_STATUS_CODE_UNKNOWN_ERROR,
},
}

for _, test := range tests {
t.Run(test.sendCode.String()+"/"+test.sendDeprecatedCode.String(), func(t *testing.T) {
req := &collectortrace.ExportTraceServiceRequest{
ResourceSpans: []*otlptrace.ResourceSpans{
{
InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{
{
Spans: []*otlptrace.Span{
{
Status: otlptrace.Status{
Code: test.sendCode,
DeprecatedCode: test.sendDeprecatedCode,
},
},
},
},
},
},
},
}

TracesCompatibilityChanges(req)
spanProto := req.ResourceSpans[0].InstrumentationLibrarySpans[0].Spans[0]
// Check that DeprecatedCode is passed as is.
assert.EqualValues(t, test.expectedRcvCode, spanProto.Status.Code)
assert.EqualValues(t, test.expectedDeprecatedCode, spanProto.Status.DeprecatedCode)
})
}
}
25 changes: 1 addition & 24 deletions receiver/otlpreceiver/trace/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal"
collectortrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1"
otlptrace "go.opentelemetry.io/collector/internal/data/protogen/trace/v1"
"go.opentelemetry.io/collector/obsreport"
)

Expand Down Expand Up @@ -54,29 +53,7 @@ const (
func (r *Receiver) Export(ctx context.Context, req *collectortrace.ExportTraceServiceRequest) (*collectortrace.ExportTraceServiceResponse, error) {
// We need to ensure that it propagates the receiver name as a tag
ctxWithReceiverName := obsreport.ReceiverContext(ctx, r.instanceName, receiverTransport)

// Perform backward compatibility conversion of Span Status code according to
// OTLP specification as we are a new receiver and sender (we are pushing data to the pipelines):
// See https://github.com/open-telemetry/opentelemetry-proto/blob/59c488bfb8fb6d0458ad6425758b70259ff4a2bd/opentelemetry/proto/trace/v1/trace.proto#L239
// See https://github.com/open-telemetry/opentelemetry-proto/blob/59c488bfb8fb6d0458ad6425758b70259ff4a2bd/opentelemetry/proto/trace/v1/trace.proto#L253
for _, rss := range req.ResourceSpans {
for _, ils := range rss.InstrumentationLibrarySpans {
for _, span := range ils.Spans {
switch span.Status.Code {
case otlptrace.Status_STATUS_CODE_UNSET:
if span.Status.DeprecatedCode != otlptrace.Status_DEPRECATED_STATUS_CODE_OK {
span.Status.Code = otlptrace.Status_STATUS_CODE_ERROR
}
case otlptrace.Status_STATUS_CODE_OK:
// If status code is set then overwrites deprecated.
span.Status.DeprecatedCode = otlptrace.Status_DEPRECATED_STATUS_CODE_OK
case otlptrace.Status_STATUS_CODE_ERROR:
span.Status.DeprecatedCode = otlptrace.Status_DEPRECATED_STATUS_CODE_UNKNOWN_ERROR
}
}
}
}

internal.TracesCompatibilityChanges(req)
td := pdata.TracesFromInternalRep(internal.TracesFromOtlp(req))
err := r.sendToNextConsumer(ctxWithReceiverName, td)
if err != nil {
Expand Down
115 changes: 0 additions & 115 deletions receiver/otlpreceiver/trace/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,118 +179,3 @@ func otlpReceiverOnGRPCServer(t *testing.T, tc consumer.Traces) (int, func()) {

return port, done
}

func TestDeprecatedStatusCode(t *testing.T) {
traceSink := new(consumertest.TracesSink)

port, doneFn := otlpReceiverOnGRPCServer(t, traceSink)
defer doneFn()

traceClient, traceClientDoneFn, err := makeTraceServiceClient(port)
require.NoError(t, err, "Failed to create the TraceServiceClient: %v", err)
defer traceClientDoneFn()

// See specification for handling status code here:
// https://github.com/open-telemetry/opentelemetry-proto/blob/59c488bfb8fb6d0458ad6425758b70259ff4a2bd/opentelemetry/proto/trace/v1/trace.proto#L231
tests := []struct {
sendCode otlptrace.Status_StatusCode
sendDeprecatedCode otlptrace.Status_DeprecatedStatusCode
expectedRcvCode otlptrace.Status_StatusCode
expectedDeprecatedCode otlptrace.Status_DeprecatedStatusCode
}{
{
// If code==STATUS_CODE_UNSET then the value of `deprecated_code` is the
// carrier of the overall status according to these rules:
//
// if deprecated_code==DEPRECATED_STATUS_CODE_OK then the receiver MUST interpret
// the overall status to be STATUS_CODE_UNSET.
sendCode: otlptrace.Status_STATUS_CODE_UNSET,
sendDeprecatedCode: otlptrace.Status_DEPRECATED_STATUS_CODE_OK,
expectedRcvCode: otlptrace.Status_STATUS_CODE_UNSET,
expectedDeprecatedCode: otlptrace.Status_DEPRECATED_STATUS_CODE_OK,
},
{
// if deprecated_code!=DEPRECATED_STATUS_CODE_OK then the receiver MUST interpret
// the overall status to be STATUS_CODE_ERROR.
sendCode: otlptrace.Status_STATUS_CODE_UNSET,
sendDeprecatedCode: otlptrace.Status_DEPRECATED_STATUS_CODE_ABORTED,
expectedRcvCode: otlptrace.Status_STATUS_CODE_ERROR,
expectedDeprecatedCode: otlptrace.Status_DEPRECATED_STATUS_CODE_ABORTED,
},
{
// If code!=STATUS_CODE_UNSET then the value of `deprecated_code` MUST be
// overwritten, the `code` field is the sole carrier of the status.
sendCode: otlptrace.Status_STATUS_CODE_OK,
sendDeprecatedCode: otlptrace.Status_DEPRECATED_STATUS_CODE_OK,
expectedRcvCode: otlptrace.Status_STATUS_CODE_OK,
expectedDeprecatedCode: otlptrace.Status_DEPRECATED_STATUS_CODE_OK,
},
{
// If code!=STATUS_CODE_UNSET then the value of `deprecated_code` MUST be
// overwritten, the `code` field is the sole carrier of the status.
sendCode: otlptrace.Status_STATUS_CODE_OK,
sendDeprecatedCode: otlptrace.Status_DEPRECATED_STATUS_CODE_UNKNOWN_ERROR,
expectedRcvCode: otlptrace.Status_STATUS_CODE_OK,
expectedDeprecatedCode: otlptrace.Status_DEPRECATED_STATUS_CODE_OK,
},
{
// If code!=STATUS_CODE_UNSET then the value of `deprecated_code` MUST be
// overwritten, the `code` field is the sole carrier of the status.
sendCode: otlptrace.Status_STATUS_CODE_ERROR,
sendDeprecatedCode: otlptrace.Status_DEPRECATED_STATUS_CODE_OK,
expectedRcvCode: otlptrace.Status_STATUS_CODE_ERROR,
expectedDeprecatedCode: otlptrace.Status_DEPRECATED_STATUS_CODE_UNKNOWN_ERROR,
},
{
// If code!=STATUS_CODE_UNSET then the value of `deprecated_code` MUST be
// overwritten, the `code` field is the sole carrier of the status.
sendCode: otlptrace.Status_STATUS_CODE_ERROR,
sendDeprecatedCode: otlptrace.Status_DEPRECATED_STATUS_CODE_UNKNOWN_ERROR,
expectedRcvCode: otlptrace.Status_STATUS_CODE_ERROR,
expectedDeprecatedCode: otlptrace.Status_DEPRECATED_STATUS_CODE_UNKNOWN_ERROR,
},
}

for _, test := range tests {
t.Run(test.sendCode.String()+"/"+test.sendDeprecatedCode.String(), func(t *testing.T) {
resourceSpans := []*otlptrace.ResourceSpans{
{
InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{
{
Spans: []*otlptrace.Span{
{
Status: otlptrace.Status{
Code: test.sendCode,
DeprecatedCode: test.sendDeprecatedCode,
},
},
},
},
},
},
}

req := &collectortrace.ExportTraceServiceRequest{
ResourceSpans: resourceSpans,
}

traceSink.Reset()

resp, err := traceClient.Export(context.Background(), req)
require.NoError(t, err, "Failed to export trace: %v", err)
require.NotNil(t, resp, "The response is missing")

require.Equal(t, 1, len(traceSink.AllTraces()), "unexpected length: %v", len(traceSink.AllTraces()))

rcvdStatus := traceSink.AllTraces()[0].ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).Status()

// Check that Code is as expected.
assert.EqualValues(t, rcvdStatus.Code(), test.expectedRcvCode)

spanProto := internal.TracesToOtlp(traceSink.AllTraces()[0].InternalRep()).ResourceSpans[0].InstrumentationLibrarySpans[0].Spans[0]

// Check that DeprecatedCode is passed as is.
assert.EqualValues(t, spanProto.Status.DeprecatedCode, test.expectedDeprecatedCode)
})
}
}

0 comments on commit d3cc537

Please sign in to comment.