From f248b238a278b9d46b8bffc5bd1f14d66686a2eb Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Tue, 15 Sep 2020 15:31:39 -0700 Subject: [PATCH] Return proto status for OTLP receiver when failed Signed-off-by: Bogdan Drutu --- receiver/otlpreceiver/otlp.go | 4 +- receiver/otlpreceiver/otlp_test.go | 76 +++++++++++++++++++++++------- receiver/otlpreceiver/otlphttp.go | 4 +- 3 files changed, 65 insertions(+), 19 deletions(-) diff --git a/receiver/otlpreceiver/otlp.go b/receiver/otlpreceiver/otlp.go index 75555c23a567..5b1f21e493e5 100644 --- a/receiver/otlpreceiver/otlp.go +++ b/receiver/otlpreceiver/otlp.go @@ -21,6 +21,7 @@ import ( "net/http" "sync" + "github.com/grpc-ecosystem/grpc-gateway/runtime" gatewayruntime "github.com/grpc-ecosystem/grpc-gateway/runtime" "google.golang.org/grpc" @@ -75,6 +76,7 @@ func newOtlpReceiver(cfg *Config) (*otlpReceiver, error) { OrigName: true, } r.gatewayMux = gatewayruntime.NewServeMux( + gatewayruntime.WithProtoErrorHandler(runtime.DefaultHTTPProtoErrorHandler), gatewayruntime.WithMarshalerOption("application/x-protobuf", &xProtobufMarshaler{}), gatewayruntime.WithMarshalerOption(gatewayruntime.MIMEWildcard, jsonpb), ) @@ -107,7 +109,7 @@ func (r *otlpReceiver) Start(_ context.Context, host component.Host) error { if r.cfg.HTTP != nil { r.serverHTTP = r.cfg.HTTP.ToServer( r.gatewayMux, - confighttp.WithErrorHandler(OTLPErrorHandler), + confighttp.WithErrorHandler(errorHandler), ) var hln net.Listener hln, err = r.cfg.HTTP.ToListener() diff --git a/receiver/otlpreceiver/otlp_test.go b/receiver/otlpreceiver/otlp_test.go index e2d1d10d893a..dd5b42248810 100644 --- a/receiver/otlpreceiver/otlp_test.go +++ b/receiver/otlpreceiver/otlp_test.go @@ -19,6 +19,7 @@ import ( "compress/gzip" "context" "encoding/json" + "errors" "fmt" "io/ioutil" "net" @@ -29,9 +30,11 @@ import ( "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + spb "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + gproto "google.golang.org/protobuf/proto" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" @@ -55,10 +58,11 @@ import ( const otlpReceiverName = "otlp_receiver_test" -func TestGrpcGateway_endToEnd(t *testing.T) { +func TestJsonHttp(t *testing.T) { tests := []struct { name string encoding string + err error }{ { name: "JSONUncompressed", @@ -68,6 +72,16 @@ func TestGrpcGateway_endToEnd(t *testing.T) { name: "JSONGzipCompressed", encoding: "gzip", }, + { + name: "NotGRPCError", + encoding: "", + err: errors.New("my error"), + }, + { + name: "GRPCError", + encoding: "", + err: status.New(codes.Internal, "").Err(), + }, } addr := testutil.GetAvailableLocalAddress(t) @@ -129,6 +143,7 @@ func TestGrpcGateway_endToEnd(t *testing.T) { default: buf = bytes.NewBuffer(traceJSON) } + sink.SetConsumeTraceError(test.err) req, err := http.NewRequest("POST", url, buf) require.NoError(t, err, "Error creating trace POST request: %v", err) req.Header.Set("Content-Type", "application/json") @@ -149,16 +164,21 @@ func TestGrpcGateway_endToEnd(t *testing.T) { t.Errorf("Error closing response body, %v", err) } - if resp.StatusCode != 200 { - t.Errorf("Unexpected status from trace grpc-gateway: %v", resp.StatusCode) - } - - var respJSON map[string]interface{} - err = json.Unmarshal([]byte(respStr), &respJSON) - assert.NoError(t, err) - - if len(respJSON) != 0 { - t.Errorf("Got unexpected response from trace grpc-gateway: %v", respStr) + if test.err == nil { + assert.Equal(t, 200, resp.StatusCode) + var respJSON map[string]interface{} + assert.NoError(t, json.Unmarshal([]byte(respStr), &respJSON)) + assert.Len(t, respJSON, 0, "Got unexpected response from trace grpc-gateway") + } else { + errStatus := &spb.Status{} + assert.NoError(t, json.Unmarshal([]byte(respStr), errStatus)) + if s, ok := status.FromError(test.err); ok { + assert.Equal(t, http.StatusInternalServerError, resp.StatusCode) + assert.True(t, gproto.Equal(errStatus, s.Proto())) + } else { + assert.Equal(t, http.StatusInternalServerError, resp.StatusCode) + assert.True(t, gproto.Equal(errStatus, &spb.Status{Code: int32(codes.Unknown), Message: "my error"})) + } } got := sink.AllTraces()[0] @@ -203,6 +223,7 @@ func TestProtoHttp(t *testing.T) { tests := []struct { name string encoding string + err error }{ { name: "ProtoUncompressed", @@ -212,6 +233,16 @@ func TestProtoHttp(t *testing.T) { name: "ProtoGzipCompressed", encoding: "gzip", }, + { + name: "NotGRPCError", + encoding: "", + err: errors.New("my error"), + }, + { + name: "GRPCError", + encoding: "", + err: status.New(codes.Internal, "").Err(), + }, } addr := testutil.GetAvailableLocalAddress(t) @@ -248,6 +279,7 @@ func TestProtoHttp(t *testing.T) { default: buf = bytes.NewBuffer(traceBytes) } + tSink.SetConsumeTraceError(test.err) req, err := http.NewRequest("POST", url, buf) require.NoError(t, err, "Error creating trace POST request: %v", err) req.Header.Set("Content-Type", "application/x-protobuf") @@ -261,12 +293,24 @@ func TestProtoHttp(t *testing.T) { require.NoError(t, err, "Error reading response from trace grpc-gateway") require.NoError(t, resp.Body.Close(), "Error closing response body") - require.Equal(t, 200, resp.StatusCode, "Unexpected return status") - require.Equal(t, "application/x-protobuf", resp.Header.Get("Content-Type"), "Unexpected response Content-Type") + if test.err == nil { + require.Equal(t, 200, resp.StatusCode, "Unexpected return status") + tmp := &collectortrace.ExportTraceServiceResponse{} + err = tmp.Unmarshal(respBytes) + require.NoError(t, err, "Unable to unmarshal response to ExportTraceServiceResponse proto") + } else { + errStatus := &spb.Status{} + assert.NoError(t, gproto.Unmarshal(respBytes, errStatus)) + if s, ok := status.FromError(test.err); ok { + assert.Equal(t, http.StatusInternalServerError, resp.StatusCode) + assert.True(t, gproto.Equal(errStatus, s.Proto())) + } else { + assert.Equal(t, http.StatusInternalServerError, resp.StatusCode) + assert.True(t, gproto.Equal(errStatus, &spb.Status{Code: int32(codes.Unknown), Message: "my error"})) + } + } - tmp := &collectortrace.ExportTraceServiceResponse{} - err = tmp.Unmarshal(respBytes) - require.NoError(t, err, "Unable to unmarshal response to ExportTraceServiceResponse proto") + require.Equal(t, "application/x-protobuf", resp.Header.Get("Content-Type"), "Unexpected response Content-Type") gotOtlp := pdata.TracesToOtlp(tSink.AllTraces()[0]) diff --git a/receiver/otlpreceiver/otlphttp.go b/receiver/otlpreceiver/otlphttp.go index 49deeeda9a1f..1f0e14d5171a 100644 --- a/receiver/otlpreceiver/otlphttp.go +++ b/receiver/otlpreceiver/otlphttp.go @@ -38,9 +38,9 @@ func (*xProtobufMarshaler) ContentType() string { var jsonMarshaller = &jsonpb.Marshaler{} -// OTLPErrorHandler encodes the HTTP error message inside a rpc.Status message as required +// errorHandler encodes the HTTP error message inside a rpc.Status message as required // by the OTLP protocol. -func OTLPErrorHandler(w http.ResponseWriter, r *http.Request, errMsg string, statusCode int) { +func errorHandler(w http.ResponseWriter, r *http.Request, errMsg string, statusCode int) { var ( msg []byte s *status.Status