diff --git a/go.mod b/go.mod index 650ae59..8fabc28 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/flashbots/go-utils go 1.20 require ( + github.com/VictoriaMetrics/metrics v1.35.1 github.com/ethereum/go-ethereum v1.13.14 github.com/google/uuid v1.3.1 github.com/sirupsen/logrus v1.9.3 @@ -34,6 +35,8 @@ require ( github.com/supranational/blst v0.3.11 // indirect github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/numcpus v0.6.1 // indirect + github.com/valyala/fastrand v1.1.0 // indirect + github.com/valyala/histogram v1.2.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/mod v0.14.0 // indirect golang.org/x/sync v0.5.0 // indirect diff --git a/go.sum b/go.sum index a9916b7..335523f 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5 github.com/StackExchange/wmi v1.2.1 h1:VIkavFPXSjcnS+O8yTq7NI32k0R5Aj+v39y29VYDOSA= github.com/StackExchange/wmi v1.2.1/go.mod h1:rcmrprowKIVzvc+NUiLncP2uuArMWLCbu9SBzvHz7e8= github.com/VictoriaMetrics/fastcache v1.12.1 h1:i0mICQuojGDL3KblA7wUNlY5lOK6a4bwt3uRKnkZU40= +github.com/VictoriaMetrics/metrics v1.35.1 h1:o84wtBKQbzLdDy14XeskkCZih6anG+veZ1SwJHFGwrU= +github.com/VictoriaMetrics/metrics v1.35.1/go.mod h1:r7hveu6xMdUACXvB8TYdAj8WEsKzWB0EkpJN+RDtOf8= github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/bits-and-blooms/bitset v1.10.0 h1:ePXTeiPEazB5+opbv5fr8umg2R/1NlzgDsyepwsSr88= @@ -106,6 +108,10 @@ github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+F github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY= github.com/tyler-smith/go-bip39 v1.1.0 h1:5eUemwrMargf3BSLRRCalXT93Ns6pQJIjYQN2nyfOP8= github.com/urfave/cli/v2 v2.25.7 h1:VAzn5oq403l5pHjc4OhD54+XGO9cdKVL/7lDjF+iKUs= +github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G8= +github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ= +github.com/valyala/histogram v1.2.0 h1:wyYGAZZt3CpwUiIb9AU/Zbllg1llXyrtApRS815OLoQ= +github.com/valyala/histogram v1.2.0/go.mod h1:Hb4kBwb4UxsaNbbbh+RRz8ZR6pdodR57tzWUS3BUzXY= github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= diff --git a/rpcserver/jsonrpc_server.go b/rpcserver/jsonrpc_server.go index be4207c..aece06d 100644 --- a/rpcserver/jsonrpc_server.go +++ b/rpcserver/jsonrpc_server.go @@ -14,6 +14,7 @@ import ( "log/slog" "net/http" "strings" + "time" "github.com/ethereum/go-ethereum/common" "github.com/flashbots/go-utils/signature" @@ -75,6 +76,8 @@ type Methods map[string]any type JSONRPCHandlerOpts struct { // Logger, can be nil Log *slog.Logger + // Server name. Used to separate logs and metrics when having multiple servers in one binary. + ServerName string // Max size of the request payload MaxRequestBodySizeBytes int64 // If true payload signature from X-Flashbots-Signature will be verified @@ -120,9 +123,10 @@ func (h *JSONRPCHandler) writeJSONRPCResponse(w http.ResponseWriter, response js w.Header().Set("Content-Type", "application/json") if err := json.NewEncoder(w).Encode(response); err != nil { if h.Log != nil { - h.Log.Error("failed to marshall response", slog.Any("error", err)) + h.Log.Error("failed to marshall response", slog.Any("error", err), slog.String("serverName", h.ServerName)) } http.Error(w, errMarshalResponse, http.StatusInternalServerError) + incInternalErrors(h.ServerName) return } } @@ -142,15 +146,25 @@ func (h *JSONRPCHandler) writeJSONRPCError(w http.ResponseWriter, id any, code i } func (h *JSONRPCHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + startAt := time.Now() + methodForMetrics := unknownMethodLabel + ctx := r.Context() + defer func() { + incRequestCount(methodForMetrics, h.ServerName) + incRequestDuration(methodForMetrics, time.Since(startAt).Milliseconds(), h.ServerName) + }() + if r.Method != http.MethodPost { http.Error(w, errMethodNotAllowed, http.StatusMethodNotAllowed) + incIncorrectRequest(h.ServerName) return } if r.Header.Get("Content-Type") != "application/json" { http.Error(w, errWrongContentType, http.StatusUnsupportedMediaType) + incIncorrectRequest(h.ServerName) return } @@ -159,6 +173,7 @@ func (h *JSONRPCHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if err != nil { msg := fmt.Sprintf("request body is too big, max size: %d", h.MaxRequestBodySizeBytes) h.writeJSONRPCError(w, nil, CodeInvalidRequest, msg) + incIncorrectRequest(h.ServerName) return } @@ -167,6 +182,7 @@ func (h *JSONRPCHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { signer, err := signature.Verify(signatureHeader, body) if err != nil { h.writeJSONRPCError(w, nil, CodeInvalidRequest, err.Error()) + incIncorrectRequest(h.ServerName) return } ctx = context.WithValue(ctx, signerKey{}, signer) @@ -176,11 +192,13 @@ func (h *JSONRPCHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { var req jsonRPCRequest if err := json.Unmarshal(body, &req); err != nil { h.writeJSONRPCError(w, nil, CodeParseError, err.Error()) + incIncorrectRequest(h.ServerName) return } if req.JSONRPC != "2.0" { h.writeJSONRPCError(w, req.ID, CodeParseError, "invalid jsonrpc version") + incIncorrectRequest(h.ServerName) return } if req.ID != nil { @@ -189,6 +207,8 @@ func (h *JSONRPCHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { case string, int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, float32, float64: default: h.writeJSONRPCError(w, req.ID, CodeParseError, "invalid id type") + incIncorrectRequest(h.ServerName) + return } } @@ -210,6 +230,7 @@ func (h *JSONRPCHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if origin != "" { if len(origin) > maxOriginIDLength { h.writeJSONRPCError(w, req.ID, CodeInvalidRequest, "x-flashbots-origin header is too long") + incIncorrectRequest(h.ServerName) return } ctx = context.WithValue(ctx, originKey{}, origin) @@ -220,19 +241,23 @@ func (h *JSONRPCHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { method, ok := h.methods[req.Method] if !ok { h.writeJSONRPCError(w, req.ID, CodeMethodNotFound, "method not found") + incIncorrectRequest(h.ServerName) return } + methodForMetrics = req.Method // call method result, err := method.call(ctx, req.Params) if err != nil { h.writeJSONRPCError(w, req.ID, CodeCustomError, err.Error()) + incRequestErrorCount(methodForMetrics, h.ServerName) return } marshaledResult, err := json.Marshal(result) if err != nil { h.writeJSONRPCError(w, req.ID, CodeInternalError, err.Error()) + incInternalErrors(h.ServerName) return } diff --git a/rpcserver/metrics.go b/rpcserver/metrics.go new file mode 100644 index 0000000..828aee5 --- /dev/null +++ b/rpcserver/metrics.go @@ -0,0 +1,51 @@ +package rpcserver + +import ( + "fmt" + + "github.com/VictoriaMetrics/metrics" +) + +const ( + // we use unknown method label for methods that server does not support because otherwise + // users can create arbitrary number of metrics + unknownMethodLabel = "unknown" + + // incremented when user made incorrect request + incorrectRequestCounter = `goutils_rpcserver_incorrect_request_total{server_name="%s"}` + + // incremented when server has a bug (e.g. can't marshall response) + internalErrorsCounter = `goutils_rpcserver_internal_errors_total{server_name="%s"}` + + // incremented when request comes in + requestCountLabel = `goutils_rpcserver_request_count{method="%s",server_name="%s"}` + // incremented when handler method returns JSONRPC error + errorCountLabel = `goutils_rpcserver_error_count{method="%s",server_name="%s"}` + // total duration of the request + requestDurationLabel = `goutils_rpcserver_request_duration_milliseconds{method="%s",server_name="%s"}` +) + +func incRequestCount(method, serverName string) { + l := fmt.Sprintf(requestCountLabel, method, serverName) + metrics.GetOrCreateCounter(l).Inc() +} + +func incIncorrectRequest(serverName string) { + l := fmt.Sprintf(incorrectRequestCounter, serverName) + metrics.GetOrCreateCounter(l).Inc() +} + +func incRequestErrorCount(method, serverName string) { + l := fmt.Sprintf(errorCountLabel, method, serverName) + metrics.GetOrCreateCounter(l).Inc() +} + +func incRequestDuration(method string, duration int64, serverName string) { + l := fmt.Sprintf(requestDurationLabel, method, serverName) + metrics.GetOrCreateSummary(l).Update(float64(duration)) +} + +func incInternalErrors(serverName string) { + l := fmt.Sprintf(internalErrorsCounter, serverName) + metrics.GetOrCreateCounter(l).Inc() +}