Skip to content

Commit

Permalink
Return proto status for OTLP receiver when failed
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Sep 15, 2020
1 parent b3a5cea commit f248b23
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 19 deletions.
4 changes: 3 additions & 1 deletion receiver/otlpreceiver/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net/http"
"sync"

"github.com/grpc-ecosystem/grpc-gateway/runtime"
gatewayruntime "github.com/grpc-ecosystem/grpc-gateway/runtime"
"google.golang.org/grpc"

Expand Down Expand Up @@ -75,6 +76,7 @@ func newOtlpReceiver(cfg *Config) (*otlpReceiver, error) {
OrigName: true,
}
r.gatewayMux = gatewayruntime.NewServeMux(
gatewayruntime.WithProtoErrorHandler(runtime.DefaultHTTPProtoErrorHandler),
gatewayruntime.WithMarshalerOption("application/x-protobuf", &xProtobufMarshaler{}),
gatewayruntime.WithMarshalerOption(gatewayruntime.MIMEWildcard, jsonpb),
)
Expand Down Expand Up @@ -107,7 +109,7 @@ func (r *otlpReceiver) Start(_ context.Context, host component.Host) error {
if r.cfg.HTTP != nil {
r.serverHTTP = r.cfg.HTTP.ToServer(
r.gatewayMux,
confighttp.WithErrorHandler(OTLPErrorHandler),
confighttp.WithErrorHandler(errorHandler),
)
var hln net.Listener
hln, err = r.cfg.HTTP.ToListener()
Expand Down
76 changes: 60 additions & 16 deletions receiver/otlpreceiver/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"compress/gzip"
"context"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net"
Expand All @@ -29,9 +30,11 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
gproto "google.golang.org/protobuf/proto"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
Expand All @@ -55,10 +58,11 @@ import (

const otlpReceiverName = "otlp_receiver_test"

func TestGrpcGateway_endToEnd(t *testing.T) {
func TestJsonHttp(t *testing.T) {
tests := []struct {
name string
encoding string
err error
}{
{
name: "JSONUncompressed",
Expand All @@ -68,6 +72,16 @@ func TestGrpcGateway_endToEnd(t *testing.T) {
name: "JSONGzipCompressed",
encoding: "gzip",
},
{
name: "NotGRPCError",
encoding: "",
err: errors.New("my error"),
},
{
name: "GRPCError",
encoding: "",
err: status.New(codes.Internal, "").Err(),
},
}
addr := testutil.GetAvailableLocalAddress(t)

Expand Down Expand Up @@ -129,6 +143,7 @@ func TestGrpcGateway_endToEnd(t *testing.T) {
default:
buf = bytes.NewBuffer(traceJSON)
}
sink.SetConsumeTraceError(test.err)
req, err := http.NewRequest("POST", url, buf)
require.NoError(t, err, "Error creating trace POST request: %v", err)
req.Header.Set("Content-Type", "application/json")
Expand All @@ -149,16 +164,21 @@ func TestGrpcGateway_endToEnd(t *testing.T) {
t.Errorf("Error closing response body, %v", err)
}

if resp.StatusCode != 200 {
t.Errorf("Unexpected status from trace grpc-gateway: %v", resp.StatusCode)
}

var respJSON map[string]interface{}
err = json.Unmarshal([]byte(respStr), &respJSON)
assert.NoError(t, err)

if len(respJSON) != 0 {
t.Errorf("Got unexpected response from trace grpc-gateway: %v", respStr)
if test.err == nil {
assert.Equal(t, 200, resp.StatusCode)
var respJSON map[string]interface{}
assert.NoError(t, json.Unmarshal([]byte(respStr), &respJSON))
assert.Len(t, respJSON, 0, "Got unexpected response from trace grpc-gateway")
} else {
errStatus := &spb.Status{}
assert.NoError(t, json.Unmarshal([]byte(respStr), errStatus))
if s, ok := status.FromError(test.err); ok {
assert.Equal(t, http.StatusInternalServerError, resp.StatusCode)
assert.True(t, gproto.Equal(errStatus, s.Proto()))
} else {
assert.Equal(t, http.StatusInternalServerError, resp.StatusCode)
assert.True(t, gproto.Equal(errStatus, &spb.Status{Code: int32(codes.Unknown), Message: "my error"}))
}
}

got := sink.AllTraces()[0]
Expand Down Expand Up @@ -203,6 +223,7 @@ func TestProtoHttp(t *testing.T) {
tests := []struct {
name string
encoding string
err error
}{
{
name: "ProtoUncompressed",
Expand All @@ -212,6 +233,16 @@ func TestProtoHttp(t *testing.T) {
name: "ProtoGzipCompressed",
encoding: "gzip",
},
{
name: "NotGRPCError",
encoding: "",
err: errors.New("my error"),
},
{
name: "GRPCError",
encoding: "",
err: status.New(codes.Internal, "").Err(),
},
}
addr := testutil.GetAvailableLocalAddress(t)

Expand Down Expand Up @@ -248,6 +279,7 @@ func TestProtoHttp(t *testing.T) {
default:
buf = bytes.NewBuffer(traceBytes)
}
tSink.SetConsumeTraceError(test.err)
req, err := http.NewRequest("POST", url, buf)
require.NoError(t, err, "Error creating trace POST request: %v", err)
req.Header.Set("Content-Type", "application/x-protobuf")
Expand All @@ -261,12 +293,24 @@ func TestProtoHttp(t *testing.T) {
require.NoError(t, err, "Error reading response from trace grpc-gateway")
require.NoError(t, resp.Body.Close(), "Error closing response body")

require.Equal(t, 200, resp.StatusCode, "Unexpected return status")
require.Equal(t, "application/x-protobuf", resp.Header.Get("Content-Type"), "Unexpected response Content-Type")
if test.err == nil {
require.Equal(t, 200, resp.StatusCode, "Unexpected return status")
tmp := &collectortrace.ExportTraceServiceResponse{}
err = tmp.Unmarshal(respBytes)
require.NoError(t, err, "Unable to unmarshal response to ExportTraceServiceResponse proto")
} else {
errStatus := &spb.Status{}
assert.NoError(t, gproto.Unmarshal(respBytes, errStatus))
if s, ok := status.FromError(test.err); ok {
assert.Equal(t, http.StatusInternalServerError, resp.StatusCode)
assert.True(t, gproto.Equal(errStatus, s.Proto()))
} else {
assert.Equal(t, http.StatusInternalServerError, resp.StatusCode)
assert.True(t, gproto.Equal(errStatus, &spb.Status{Code: int32(codes.Unknown), Message: "my error"}))
}
}

tmp := &collectortrace.ExportTraceServiceResponse{}
err = tmp.Unmarshal(respBytes)
require.NoError(t, err, "Unable to unmarshal response to ExportTraceServiceResponse proto")
require.Equal(t, "application/x-protobuf", resp.Header.Get("Content-Type"), "Unexpected response Content-Type")

gotOtlp := pdata.TracesToOtlp(tSink.AllTraces()[0])

Expand Down
4 changes: 2 additions & 2 deletions receiver/otlpreceiver/otlphttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ func (*xProtobufMarshaler) ContentType() string {

var jsonMarshaller = &jsonpb.Marshaler{}

// OTLPErrorHandler encodes the HTTP error message inside a rpc.Status message as required
// errorHandler encodes the HTTP error message inside a rpc.Status message as required
// by the OTLP protocol.
func OTLPErrorHandler(w http.ResponseWriter, r *http.Request, errMsg string, statusCode int) {
func errorHandler(w http.ResponseWriter, r *http.Request, errMsg string, statusCode int) {
var (
msg []byte
s *status.Status
Expand Down

0 comments on commit f248b23

Please sign in to comment.