forked from grpc-ecosystem/go-grpc-middleware
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpayload_interceptors.go
145 lines (129 loc) · 5.76 KB
/
payload_interceptors.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package grpc_logrus
import (
"bytes"
"context"
"fmt"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"github.com/grpc-ecosystem/go-grpc-middleware/logging"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
)
var (
// JsonPbMarshaller is the marshaller used for serializing protobuf messages.
// If needed, this variable can be reassigned with a different marshaller with the same Marshal() signature.
JsonPbMarshaller grpc_logging.JsonPbMarshaler = &jsonpb.Marshaler{}
)
// PayloadUnaryServerInterceptor returns a new unary server interceptors that logs the payloads of requests.
//
// This *only* works when placed *after* the `grpc_logrus.UnaryServerInterceptor`. However, the logging can be done to a
// separate instance of the logger.
func PayloadUnaryServerInterceptor(entry *logrus.Entry, decider grpc_logging.ServerPayloadLoggingDecider) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
if !decider(ctx, info.FullMethod, info.Server) {
return handler(ctx, req)
}
// Use the provided logrus.Entry for logging but use the fields from context.
logEntry := entry.WithFields(ctxlogrus.Extract(ctx).Data)
logProtoMessageAsJson(logEntry, req, "grpc.request.content", "server request payload logged as grpc.request.content field")
resp, err := handler(ctx, req)
if err == nil {
logProtoMessageAsJson(logEntry, resp, "grpc.response.content", "server response payload logged as grpc.request.content field")
}
return resp, err
}
}
// PayloadStreamServerInterceptor returns a new server server interceptors that logs the payloads of requests.
//
// This *only* works when placed *after* the `grpc_logrus.StreamServerInterceptor`. However, the logging can be done to a
// separate instance of the logger.
func PayloadStreamServerInterceptor(entry *logrus.Entry, decider grpc_logging.ServerPayloadLoggingDecider) grpc.StreamServerInterceptor {
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
if !decider(stream.Context(), info.FullMethod, srv) {
return handler(srv, stream)
}
// Use the provided logrus.Entry for logging but use the fields from context.
logEntry := entry.WithFields(ctxlogrus.Extract(stream.Context()).Data)
newStream := &loggingServerStream{ServerStream: stream, entry: logEntry}
return handler(srv, newStream)
}
}
// PayloadUnaryClientInterceptor returns a new unary client interceptor that logs the payloads of requests and responses.
func PayloadUnaryClientInterceptor(entry *logrus.Entry, decider grpc_logging.ClientPayloadLoggingDecider) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
if !decider(ctx, method) {
return invoker(ctx, method, req, reply, cc, opts...)
}
logEntry := entry.WithFields(newClientLoggerFields(ctx, method))
logProtoMessageAsJson(logEntry, req, "grpc.request.content", "client request payload logged as grpc.request.content")
err := invoker(ctx, method, req, reply, cc, opts...)
if err == nil {
logProtoMessageAsJson(logEntry, reply, "grpc.response.content", "client response payload logged as grpc.response.content")
}
return err
}
}
// PayloadStreamClientInterceptor returns a new streaming client interceptor that logs the payloads of requests and responses.
func PayloadStreamClientInterceptor(entry *logrus.Entry, decider grpc_logging.ClientPayloadLoggingDecider) grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
if !decider(ctx, method) {
return streamer(ctx, desc, cc, method, opts...)
}
logEntry := entry.WithFields(newClientLoggerFields(ctx, method))
clientStream, err := streamer(ctx, desc, cc, method, opts...)
newStream := &loggingClientStream{ClientStream: clientStream, entry: logEntry}
return newStream, err
}
}
type loggingClientStream struct {
grpc.ClientStream
entry *logrus.Entry
}
func (l *loggingClientStream) SendMsg(m interface{}) error {
err := l.ClientStream.SendMsg(m)
if err == nil {
logProtoMessageAsJson(l.entry, m, "grpc.request.content", "server request payload logged as grpc.request.content field")
}
return err
}
func (l *loggingClientStream) RecvMsg(m interface{}) error {
err := l.ClientStream.RecvMsg(m)
if err == nil {
logProtoMessageAsJson(l.entry, m, "grpc.response.content", "server response payload logged as grpc.response.content field")
}
return err
}
type loggingServerStream struct {
grpc.ServerStream
entry *logrus.Entry
}
func (l *loggingServerStream) SendMsg(m interface{}) error {
err := l.ServerStream.SendMsg(m)
if err == nil {
logProtoMessageAsJson(l.entry, m, "grpc.response.content", "server response payload logged as grpc.response.content field")
}
return err
}
func (l *loggingServerStream) RecvMsg(m interface{}) error {
err := l.ServerStream.RecvMsg(m)
if err == nil {
logProtoMessageAsJson(l.entry, m, "grpc.request.content", "server request payload logged as grpc.request.content field")
}
return err
}
func logProtoMessageAsJson(entry *logrus.Entry, pbMsg interface{}, key string, msg string) {
if p, ok := pbMsg.(proto.Message); ok {
entry.WithField(key, &jsonpbMarshalleble{p}).Info(msg)
}
}
type jsonpbMarshalleble struct {
proto.Message
}
func (j *jsonpbMarshalleble) MarshalJSON() ([]byte, error) {
b := &bytes.Buffer{}
if err := JsonPbMarshaller.Marshal(b, j.Message); err != nil {
return nil, fmt.Errorf("jsonpb serializer failed: %v", err)
}
return b.Bytes(), nil
}