diff --git a/receiver/otlpreceiver/otlp.go b/receiver/otlpreceiver/otlp.go index 75555c23a56..70b1c34b89f 100644 --- a/receiver/otlpreceiver/otlp.go +++ b/receiver/otlpreceiver/otlp.go @@ -75,6 +75,7 @@ func newOtlpReceiver(cfg *Config) (*otlpReceiver, error) { OrigName: true, } r.gatewayMux = gatewayruntime.NewServeMux( + gatewayruntime.WithProtoErrorHandler(gatewayruntime.DefaultHTTPProtoErrorHandler), gatewayruntime.WithMarshalerOption("application/x-protobuf", &xProtobufMarshaler{}), gatewayruntime.WithMarshalerOption(gatewayruntime.MIMEWildcard, jsonpb), ) diff --git a/receiver/otlpreceiver/otlp_test.go b/receiver/otlpreceiver/otlp_test.go index e2d1d10d893..5b647ab1562 100644 --- a/receiver/otlpreceiver/otlp_test.go +++ b/receiver/otlpreceiver/otlp_test.go @@ -19,6 +19,7 @@ import ( "compress/gzip" "context" "encoding/json" + "errors" "fmt" "io/ioutil" "net" @@ -26,12 +27,14 @@ import ( "testing" "time" - "github.com/gogo/protobuf/proto" + gogoproto "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + spb "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" @@ -55,10 +58,11 @@ import ( const otlpReceiverName = "otlp_receiver_test" -func TestGrpcGateway_endToEnd(t *testing.T) { +func TestJsonHttp(t *testing.T) { tests := []struct { name string encoding string + err error }{ { name: "JSONUncompressed", @@ -68,6 +72,16 @@ func TestGrpcGateway_endToEnd(t *testing.T) { name: "JSONGzipCompressed", encoding: "gzip", }, + { + name: "NotGRPCError", + encoding: "", + err: errors.New("my error"), + }, + { + name: "GRPCError", + encoding: "", + err: status.New(codes.Internal, "").Err(), + }, } addr := testutil.GetAvailableLocalAddress(t) @@ -129,6 +143,7 @@ func TestGrpcGateway_endToEnd(t *testing.T) { default: buf = bytes.NewBuffer(traceJSON) } + sink.SetConsumeTraceError(test.err) req, err := http.NewRequest("POST", url, buf) require.NoError(t, err, "Error creating trace POST request: %v", err) req.Header.Set("Content-Type", "application/json") @@ -149,16 +164,21 @@ func TestGrpcGateway_endToEnd(t *testing.T) { t.Errorf("Error closing response body, %v", err) } - if resp.StatusCode != 200 { - t.Errorf("Unexpected status from trace grpc-gateway: %v", resp.StatusCode) - } - - var respJSON map[string]interface{} - err = json.Unmarshal([]byte(respStr), &respJSON) - assert.NoError(t, err) - - if len(respJSON) != 0 { - t.Errorf("Got unexpected response from trace grpc-gateway: %v", respStr) + if test.err == nil { + assert.Equal(t, 200, resp.StatusCode) + var respJSON map[string]interface{} + assert.NoError(t, json.Unmarshal([]byte(respStr), &respJSON)) + assert.Len(t, respJSON, 0, "Got unexpected response from trace grpc-gateway") + } else { + errStatus := &spb.Status{} + assert.NoError(t, json.Unmarshal([]byte(respStr), errStatus)) + if s, ok := status.FromError(test.err); ok { + assert.Equal(t, http.StatusInternalServerError, resp.StatusCode) + assert.True(t, proto.Equal(errStatus, s.Proto())) + } else { + assert.Equal(t, http.StatusInternalServerError, resp.StatusCode) + assert.True(t, proto.Equal(errStatus, &spb.Status{Code: int32(codes.Unknown), Message: "my error"})) + } } got := sink.AllTraces()[0] @@ -203,6 +223,7 @@ func TestProtoHttp(t *testing.T) { tests := []struct { name string encoding string + err error }{ { name: "ProtoUncompressed", @@ -212,6 +233,16 @@ func TestProtoHttp(t *testing.T) { name: "ProtoGzipCompressed", encoding: "gzip", }, + { + name: "NotGRPCError", + encoding: "", + err: errors.New("my error"), + }, + { + name: "GRPCError", + encoding: "", + err: status.New(codes.Internal, "").Err(), + }, } addr := testutil.GetAvailableLocalAddress(t) @@ -248,6 +279,7 @@ func TestProtoHttp(t *testing.T) { default: buf = bytes.NewBuffer(traceBytes) } + tSink.SetConsumeTraceError(test.err) req, err := http.NewRequest("POST", url, buf) require.NoError(t, err, "Error creating trace POST request: %v", err) req.Header.Set("Content-Type", "application/x-protobuf") @@ -261,12 +293,24 @@ func TestProtoHttp(t *testing.T) { require.NoError(t, err, "Error reading response from trace grpc-gateway") require.NoError(t, resp.Body.Close(), "Error closing response body") - require.Equal(t, 200, resp.StatusCode, "Unexpected return status") - require.Equal(t, "application/x-protobuf", resp.Header.Get("Content-Type"), "Unexpected response Content-Type") + if test.err == nil { + require.Equal(t, 200, resp.StatusCode, "Unexpected return status") + tmp := &collectortrace.ExportTraceServiceResponse{} + err = tmp.Unmarshal(respBytes) + require.NoError(t, err, "Unable to unmarshal response to ExportTraceServiceResponse proto") + } else { + errStatus := &spb.Status{} + assert.NoError(t, proto.Unmarshal(respBytes, errStatus)) + if s, ok := status.FromError(test.err); ok { + assert.Equal(t, http.StatusInternalServerError, resp.StatusCode) + assert.True(t, proto.Equal(errStatus, s.Proto())) + } else { + assert.Equal(t, http.StatusInternalServerError, resp.StatusCode) + assert.True(t, proto.Equal(errStatus, &spb.Status{Code: int32(codes.Unknown), Message: "my error"})) + } + } - tmp := &collectortrace.ExportTraceServiceResponse{} - err = tmp.Unmarshal(respBytes) - require.NoError(t, err, "Unable to unmarshal response to ExportTraceServiceResponse proto") + require.Equal(t, "application/x-protobuf", resp.Header.Get("Content-Type"), "Unexpected response Content-Type") gotOtlp := pdata.TracesToOtlp(tSink.AllTraces()[0]) @@ -279,7 +323,7 @@ func TestProtoHttp(t *testing.T) { // assert.Equal doesn't work on protos, see: // https://github.com/stretchr/testify/issues/758 - if !proto.Equal(got, want) { + if !gogoproto.Equal(got, want) { t.Errorf("Sending trace proto over http failed\nGot:\n%v\nWant:\n%v\n", got.String(), want.String())