Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return proto status for OTLP receiver when failed #1788

Merged
merged 1 commit into from
Sep 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions receiver/otlpreceiver/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func newOtlpReceiver(cfg *Config) (*otlpReceiver, error) {
OrigName: true,
}
r.gatewayMux = gatewayruntime.NewServeMux(
gatewayruntime.WithProtoErrorHandler(gatewayruntime.DefaultHTTPProtoErrorHandler),
gatewayruntime.WithMarshalerOption("application/x-protobuf", &xProtobufMarshaler{}),
gatewayruntime.WithMarshalerOption(gatewayruntime.MIMEWildcard, jsonpb),
)
Expand Down
80 changes: 62 additions & 18 deletions receiver/otlpreceiver/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,22 @@ import (
"compress/gzip"
"context"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"testing"
"time"

"github.com/gogo/protobuf/proto"
gogoproto "github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
Expand All @@ -55,10 +58,11 @@ import (

const otlpReceiverName = "otlp_receiver_test"

func TestGrpcGateway_endToEnd(t *testing.T) {
func TestJsonHttp(t *testing.T) {
tests := []struct {
name string
encoding string
err error
}{
{
name: "JSONUncompressed",
Expand All @@ -68,6 +72,16 @@ func TestGrpcGateway_endToEnd(t *testing.T) {
name: "JSONGzipCompressed",
encoding: "gzip",
},
{
name: "NotGRPCError",
encoding: "",
err: errors.New("my error"),
},
{
name: "GRPCError",
encoding: "",
err: status.New(codes.Internal, "").Err(),
},
}
addr := testutil.GetAvailableLocalAddress(t)

Expand Down Expand Up @@ -129,6 +143,7 @@ func TestGrpcGateway_endToEnd(t *testing.T) {
default:
buf = bytes.NewBuffer(traceJSON)
}
sink.SetConsumeTraceError(test.err)
req, err := http.NewRequest("POST", url, buf)
require.NoError(t, err, "Error creating trace POST request: %v", err)
req.Header.Set("Content-Type", "application/json")
Expand All @@ -149,16 +164,21 @@ func TestGrpcGateway_endToEnd(t *testing.T) {
t.Errorf("Error closing response body, %v", err)
}

if resp.StatusCode != 200 {
t.Errorf("Unexpected status from trace grpc-gateway: %v", resp.StatusCode)
}

var respJSON map[string]interface{}
err = json.Unmarshal([]byte(respStr), &respJSON)
assert.NoError(t, err)

if len(respJSON) != 0 {
t.Errorf("Got unexpected response from trace grpc-gateway: %v", respStr)
if test.err == nil {
assert.Equal(t, 200, resp.StatusCode)
var respJSON map[string]interface{}
assert.NoError(t, json.Unmarshal([]byte(respStr), &respJSON))
assert.Len(t, respJSON, 0, "Got unexpected response from trace grpc-gateway")
} else {
errStatus := &spb.Status{}
assert.NoError(t, json.Unmarshal([]byte(respStr), errStatus))
if s, ok := status.FromError(test.err); ok {
assert.Equal(t, http.StatusInternalServerError, resp.StatusCode)
assert.True(t, proto.Equal(errStatus, s.Proto()))
} else {
assert.Equal(t, http.StatusInternalServerError, resp.StatusCode)
assert.True(t, proto.Equal(errStatus, &spb.Status{Code: int32(codes.Unknown), Message: "my error"}))
}
}

got := sink.AllTraces()[0]
Expand Down Expand Up @@ -203,6 +223,7 @@ func TestProtoHttp(t *testing.T) {
tests := []struct {
name string
encoding string
err error
}{
{
name: "ProtoUncompressed",
Expand All @@ -212,6 +233,16 @@ func TestProtoHttp(t *testing.T) {
name: "ProtoGzipCompressed",
encoding: "gzip",
},
{
name: "NotGRPCError",
encoding: "",
err: errors.New("my error"),
},
{
name: "GRPCError",
encoding: "",
err: status.New(codes.Internal, "").Err(),
},
}
addr := testutil.GetAvailableLocalAddress(t)

Expand Down Expand Up @@ -248,6 +279,7 @@ func TestProtoHttp(t *testing.T) {
default:
buf = bytes.NewBuffer(traceBytes)
}
tSink.SetConsumeTraceError(test.err)
req, err := http.NewRequest("POST", url, buf)
require.NoError(t, err, "Error creating trace POST request: %v", err)
req.Header.Set("Content-Type", "application/x-protobuf")
Expand All @@ -261,12 +293,24 @@ func TestProtoHttp(t *testing.T) {
require.NoError(t, err, "Error reading response from trace grpc-gateway")
require.NoError(t, resp.Body.Close(), "Error closing response body")

require.Equal(t, 200, resp.StatusCode, "Unexpected return status")
require.Equal(t, "application/x-protobuf", resp.Header.Get("Content-Type"), "Unexpected response Content-Type")
if test.err == nil {
require.Equal(t, 200, resp.StatusCode, "Unexpected return status")
tmp := &collectortrace.ExportTraceServiceResponse{}
err = tmp.Unmarshal(respBytes)
require.NoError(t, err, "Unable to unmarshal response to ExportTraceServiceResponse proto")
} else {
errStatus := &spb.Status{}
assert.NoError(t, proto.Unmarshal(respBytes, errStatus))
if s, ok := status.FromError(test.err); ok {
assert.Equal(t, http.StatusInternalServerError, resp.StatusCode)
assert.True(t, proto.Equal(errStatus, s.Proto()))
} else {
assert.Equal(t, http.StatusInternalServerError, resp.StatusCode)
assert.True(t, proto.Equal(errStatus, &spb.Status{Code: int32(codes.Unknown), Message: "my error"}))
}
}

tmp := &collectortrace.ExportTraceServiceResponse{}
err = tmp.Unmarshal(respBytes)
require.NoError(t, err, "Unable to unmarshal response to ExportTraceServiceResponse proto")
require.Equal(t, "application/x-protobuf", resp.Header.Get("Content-Type"), "Unexpected response Content-Type")

gotOtlp := pdata.TracesToOtlp(tSink.AllTraces()[0])

Expand All @@ -279,7 +323,7 @@ func TestProtoHttp(t *testing.T) {

// assert.Equal doesn't work on protos, see:
// https://github.com/stretchr/testify/issues/758
if !proto.Equal(got, want) {
if !gogoproto.Equal(got, want) {
t.Errorf("Sending trace proto over http failed\nGot:\n%v\nWant:\n%v\n",
got.String(),
want.String())
Expand Down