From 7af131633cc4a364d8bfb20666ef77275ece1970 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Sun, 22 Oct 2023 06:48:04 +0000 Subject: [PATCH] sync: enable rate limiting for servers (#5151) closes: https://github.com/spacemeshos/go-spacemesh/issues/4977 closes: https://github.com/spacemeshos/go-spacemesh/issues/4603 this change introduces two configuration parameter for every server: - requests per interval pace, for example 10 req/s, this caps the maximum bandwidth that every server can use - queue size, it is set to serve requests within expected latency. every other request is dropped immediately so that client can retry with different node. currently the timeout is set to 10s, so the queue should be roughly 10 times larger then rps it doesn't provide global limit for bandwidth, but we have limit for the number of peers. and honest peer doesn't run many concurrent queries. so what we really want to handle is peers with intentionally malicious behavior, but thats not a pressing issue example configuration: ```json "fetch": { "servers": { "ax/1": {"queue": 10, "requests": 1, "interval": "1s"}, "ld/1": {"queue": 1000, "requests": 100, "interval": "1s"}, "hs/1": {"queue": 2000, "requests": 200, "interval": "1s"}, "mh/1": {"queue": 1000, "requests": 100, "interval": "1s"}, "ml/1": {"queue": 100, "requests": 10, "interval": "1s"}, "lp/2": {"queue": 10000, "requests": 1000, "interval": "1s"} } } ``` https://github.com/spacemeshos/go-spacemesh/blob/3cf02146bf27f53c001bffcacffbda05933c27c4/fetch/fetch.go#L130-L144 metrics are per server: https://github.com/spacemeshos/go-spacemesh/blob/3cf02146bf27f53c001bffcacffbda05933c27c4/p2p/server/metrics.go#L15-L52 have to be enabled for all servers with ```json "fetch": { "servers-metrics": true } ``` --- fetch/fetch.go | 110 ++++++++--- fetch/fetch_test.go | 12 +- fetch/interface.go | 1 + fetch/mocks/mocks.go | 38 ++++ go.mod | 2 +- p2p/server/metrics.go | 78 ++++++++ p2p/server/server.go | 232 +++++++++++++++++------- p2p/server/server_test.go | 118 ++++++++++-- systest/parameters/fastnet/smesher.json | 15 +- 9 files changed, 496 insertions(+), 110 deletions(-) create mode 100644 p2p/server/metrics.go diff --git a/fetch/fetch.go b/fetch/fetch.go index c5157308a78..9035b95cc82 100644 --- a/fetch/fetch.go +++ b/fetch/fetch.go @@ -78,24 +78,72 @@ func (b *batchInfo) toMap() map[types.Hash32]RequestMessage { return m } +type ServerConfig struct { + Queue int `mapstructure:"queue"` + Requests int `mapstructure:"requests"` + Interval time.Duration `mapstructure:"interval"` +} + +func (s ServerConfig) toOpts() []server.Opt { + opts := []server.Opt{} + if s.Queue != 0 { + opts = append(opts, server.WithQueueSize(s.Queue)) + } + if s.Requests != 0 && s.Interval != 0 { + opts = append(opts, server.WithRequestsPerInterval(s.Requests, s.Interval)) + } + return opts +} + // Config is the configuration file of the Fetch component. type Config struct { - BatchTimeout time.Duration // in milliseconds + BatchTimeout time.Duration BatchSize, QueueSize int - RequestTimeout time.Duration // in seconds + RequestTimeout time.Duration MaxRetriesForRequest int - PeersRateThreshold float64 `mapstructure:"peers-rate-threshold"` + EnableServesMetrics bool `mapstructure:"servers-metrics"` + ServersConfig map[string]ServerConfig `mapstructure:"servers"` + PeersRateThreshold float64 `mapstructure:"peers-rate-threshold"` +} + +func (c Config) getServerConfig(protocol string) ServerConfig { + cfg, exists := c.ServersConfig[protocol] + if exists { + return cfg + } + return ServerConfig{ + Queue: 10000, + Requests: 100, + Interval: time.Second, + } } // DefaultConfig is the default config for the fetch component. func DefaultConfig() Config { return Config{ - BatchTimeout: time.Millisecond * time.Duration(50), + BatchTimeout: 50 * time.Millisecond, QueueSize: 20, BatchSize: 20, - RequestTimeout: time.Second * time.Duration(10), + RequestTimeout: 10 * time.Second, MaxRetriesForRequest: 100, - PeersRateThreshold: 0.02, + ServersConfig: map[string]ServerConfig{ + // serves 1 MB of data + atxProtocol: {Queue: 10, Requests: 1, Interval: time.Second}, + // serves 1 KB of data + lyrDataProtocol: {Queue: 1000, Requests: 100, Interval: time.Second}, + // serves atxs, ballots, active sets + // atx - 1 KB + // ballots > 300 bytes + // often queried after receiving gossip message + hashProtocol: {Queue: 2000, Requests: 200, Interval: time.Second}, + // serves at most 100 hashes - 3KB + meshHashProtocol: {Queue: 1000, Requests: 100, Interval: time.Second}, + // serves all malicious ids (id - 32 byte) - 10KB + malProtocol: {Queue: 100, Requests: 10, Interval: time.Second}, + // 64 bytes + OpnProtocol: {Queue: 10000, Requests: 1000, Interval: time.Second}, + }, + PeersRateThreshold: 0.02, } } @@ -220,34 +268,34 @@ func NewFetch( } f.batchTimeout = time.NewTicker(f.cfg.BatchTimeout) - srvOpts := []server.Opt{ - server.WithTimeout(f.cfg.RequestTimeout), - server.WithLog(f.logger), - } if len(f.servers) == 0 { h := newHandler(cdb, bs, msh, b, f.logger) - f.servers[atxProtocol] = server.New(host, atxProtocol, h.handleEpochInfoReq, srvOpts...) - f.servers[lyrDataProtocol] = server.New( - host, - lyrDataProtocol, - h.handleLayerDataReq, - srvOpts...) - f.servers[hashProtocol] = server.New(host, hashProtocol, h.handleHashReq, srvOpts...) - f.servers[meshHashProtocol] = server.New( - host, - meshHashProtocol, - h.handleMeshHashReq, - srvOpts...) - f.servers[malProtocol] = server.New(host, malProtocol, h.handleMaliciousIDsReq, srvOpts...) - f.servers[OpnProtocol] = server.New( - host, - OpnProtocol, - h.handleLayerOpinionsReq2, - srvOpts...) + f.registerServer(host, atxProtocol, h.handleEpochInfoReq) + f.registerServer(host, lyrDataProtocol, h.handleLayerDataReq) + f.registerServer(host, hashProtocol, h.handleHashReq) + f.registerServer(host, meshHashProtocol, h.handleMeshHashReq) + f.registerServer(host, malProtocol, h.handleMaliciousIDsReq) + f.registerServer(host, OpnProtocol, h.handleLayerOpinionsReq2) } return f } +func (f *Fetch) registerServer( + host *p2p.Host, + protocol string, + handler server.Handler, +) { + opts := []server.Opt{ + server.WithTimeout(f.cfg.RequestTimeout), + server.WithLog(f.logger), + } + if f.cfg.EnableServesMetrics { + opts = append(opts, server.WithMetrics()) + } + opts = append(opts, f.cfg.getServerConfig(protocol).toOpts()...) + f.servers[protocol] = server.New(host, protocol, handler, opts...) +} + type dataValidators struct { atx SyncValidator poet SyncValidator @@ -295,6 +343,12 @@ func (f *Fetch) Start() error { f.loop() return nil }) + for _, srv := range f.servers { + srv := srv + f.eg.Go(func() error { + return srv.Run(f.shutdownCtx) + }) + } }) return nil } diff --git a/fetch/fetch_test.go b/fetch/fetch_test.go index 6a74cd40788..ffe4943bc14 100644 --- a/fetch/fetch_test.go +++ b/fetch/fetch_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" + "golang.org/x/sync/errgroup" "github.com/spacemeshos/go-spacemesh/codec" "github.com/spacemeshos/go-spacemesh/common/types" @@ -65,6 +66,9 @@ func createFetch(tb testing.TB) *testFetch { mTxProposalH: mocks.NewMockSyncValidator(ctrl), mPoetH: mocks.NewMockSyncValidator(ctrl), } + for _, srv := range []*mocks.Mockrequester{tf.mMalS, tf.mAtxS, tf.mLyrS, tf.mHashS, tf.mMHashS, tf.mOpn2S} { + srv.EXPECT().Run(gomock.Any()).AnyTimes() + } cfg := Config{ BatchTimeout: 2 * time.Second, // make sure we never hit the batch timeout BatchSize: 3, @@ -373,7 +377,13 @@ func TestFetch_PeerDroppedWhenMessageResultsInValidationReject(t *testing.T) { } return result, nil } - server.New(badPeerHost, hashProtocol, badPeerHandler) + badsrv := server.New(badPeerHost, hashProtocol, badPeerHandler) + var eg errgroup.Group + eg.Go(func() error { + badsrv.Run(ctx) + return nil + }) + defer eg.Wait() fetcher := NewFetch(datastore.NewCachedDB(sql.InMemory(), lg), nil, nil, h, WithContext(ctx), diff --git a/fetch/interface.go b/fetch/interface.go index 759397b4b64..93a5b093b9e 100644 --- a/fetch/interface.go +++ b/fetch/interface.go @@ -11,6 +11,7 @@ import ( //go:generate mockgen -typed -package=mocks -destination=./mocks/mocks.go -source=./interface.go type requester interface { + Run(context.Context) error Request(context.Context, p2p.Peer, []byte, func([]byte), func(error)) error } diff --git a/fetch/mocks/mocks.go b/fetch/mocks/mocks.go index 4b2245ca59c..14531ef52f7 100644 --- a/fetch/mocks/mocks.go +++ b/fetch/mocks/mocks.go @@ -78,6 +78,44 @@ func (c *requesterRequestCall) DoAndReturn(f func(context.Context, p2p.Peer, []b return c } +// Run mocks base method. +func (m *Mockrequester) Run(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Run", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Run indicates an expected call of Run. +func (mr *MockrequesterMockRecorder) Run(arg0 any) *requesterRunCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*Mockrequester)(nil).Run), arg0) + return &requesterRunCall{Call: call} +} + +// requesterRunCall wrap *gomock.Call +type requesterRunCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *requesterRunCall) Return(arg0 error) *requesterRunCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *requesterRunCall) Do(f func(context.Context) error) *requesterRunCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *requesterRunCall) DoAndReturn(f func(context.Context) error) *requesterRunCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + // MockSyncValidator is a mock of SyncValidator interface. type MockSyncValidator struct { ctrl *gomock.Controller diff --git a/go.mod b/go.mod index aeaaa7ba97e..b0e3740aed3 100644 --- a/go.mod +++ b/go.mod @@ -52,6 +52,7 @@ require ( go.uber.org/zap v1.26.0 golang.org/x/exp v0.0.0-20231006140011-7918f672742d golang.org/x/sync v0.4.0 + golang.org/x/time v0.3.0 google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b google.golang.org/grpc v1.59.0 google.golang.org/protobuf v1.31.0 @@ -206,7 +207,6 @@ require ( golang.org/x/sys v0.13.0 // indirect golang.org/x/term v0.13.0 // indirect golang.org/x/text v0.13.0 // indirect - golang.org/x/time v0.3.0 // indirect golang.org/x/tools v0.14.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect diff --git a/p2p/server/metrics.go b/p2p/server/metrics.go new file mode 100644 index 00000000000..5d5c435e737 --- /dev/null +++ b/p2p/server/metrics.go @@ -0,0 +1,78 @@ +package server + +import ( + "github.com/prometheus/client_golang/prometheus" + + "github.com/spacemeshos/go-spacemesh/metrics" +) + +const ( + namespace = "server" + protoLabel = "protocol" +) + +var ( + targetQueue = metrics.NewGauge( + "target_queue", + namespace, + "target size of the queue", + []string{protoLabel}, + ) + queue = metrics.NewGauge( + "queue", + namespace, + "actual size of the queue", + []string{protoLabel}, + ) + targetRps = metrics.NewGauge( + "rps", + namespace, + "target requests per second", + []string{protoLabel}, + ) + requests = metrics.NewCounter( + "requests", + namespace, + "requests counter", + []string{protoLabel, "state"}, + ) + clientLatency = metrics.NewHistogramWithBuckets( + "client_latency_seconds", + namespace, + "latency since initiating a request", + []string{protoLabel, "result"}, + prometheus.ExponentialBuckets(0.01, 2, 10), + ) + serverLatency = metrics.NewHistogramWithBuckets( + "server_latency_seconds", + namespace, + "latency since accepting new stream", + []string{protoLabel}, + prometheus.ExponentialBuckets(0.01, 2, 10), + ) +) + +func newTracker(protocol string) *tracker { + return &tracker{ + targetQueue: targetQueue.WithLabelValues(protocol), + queue: queue.WithLabelValues(protocol), + targetRps: targetRps.WithLabelValues(protocol), + completed: requests.WithLabelValues(protocol, "completed"), + accepted: requests.WithLabelValues(protocol, "accepted"), + dropped: requests.WithLabelValues(protocol, "dropped"), + serverLatency: serverLatency.WithLabelValues(protocol), + clientLatency: clientLatency.WithLabelValues(protocol, "success"), + clientLatencyFailure: clientLatency.WithLabelValues(protocol, "failure"), + } +} + +type tracker struct { + targetQueue prometheus.Gauge + queue prometheus.Gauge + targetRps prometheus.Gauge + completed prometheus.Counter + accepted prometheus.Counter + dropped prometheus.Counter + serverLatency prometheus.Observer + clientLatency, clientLatencyFailure prometheus.Observer +} diff --git a/p2p/server/server.go b/p2p/server/server.go index 60389f3b882..cf8a8f6019d 100644 --- a/p2p/server/server.go +++ b/p2p/server/server.go @@ -13,6 +13,8 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" "github.com/multiformats/go-varint" + "golang.org/x/sync/errgroup" + "golang.org/x/time/rate" "github.com/spacemeshos/go-spacemesh/codec" "github.com/spacemeshos/go-spacemesh/log" @@ -38,16 +40,40 @@ func WithLog(log log.Log) Opt { } } -// WithContext configures parent context for contexts that are passed to the handler. -func WithContext(ctx context.Context) Opt { +func WithRequestSizeLimit(limit int) Opt { return func(s *Server) { - s.ctx = ctx + s.requestLimit = limit } } -func WithRequestSizeLimit(limit int) Opt { +// WithMetrics will enable metrics collection in the server. +func WithMetrics() Opt { return func(s *Server) { - s.requestLimit = limit + s.metrics = newTracker(s.protocol) + } +} + +// WithQueueSize parametrize number of message that will be kept in queue +// and eventually processed by server. Otherwise stream is closed immediately. +// +// Size of the queue should be set to account for maximum expected latency, such as if expected latency is 10s +// and server processes 1000 requests per second size should be 100. +// +// Defaults to 100. +func WithQueueSize(size int) Opt { + return func(s *Server) { + s.queueSize = size + } +} + +// WithRequestsPerInterval parametrizes server rate limit to limit maximum amount of bandwidth +// that this handler can consume. +// +// Defaults to 100 requests per second. +func WithRequestsPerInterval(n int, interval time.Duration) Opt { + return func(s *Server) { + s.requestsPerInterval = n + s.interval = interval } } @@ -73,36 +99,91 @@ type Host interface { // Server for the Handler. type Server struct { - logger log.Log - protocol string - handler Handler - timeout time.Duration - requestLimit int + logger log.Log + protocol string + handler Handler + timeout time.Duration + requestLimit int + queueSize int + requestsPerInterval int + interval time.Duration - h Host + metrics *tracker // metrics can be nil - ctx context.Context + h Host } // New server for the handler. func New(h Host, proto string, handler Handler, opts ...Opt) *Server { srv := &Server{ - ctx: context.Background(), - logger: log.NewNop(), - protocol: proto, - handler: handler, - h: h, - timeout: 10 * time.Second, - requestLimit: 10240, + logger: log.NewNop(), + protocol: proto, + handler: handler, + h: h, + timeout: 10 * time.Second, + requestLimit: 10240, + queueSize: 1000, + requestsPerInterval: 100, + interval: time.Second, } for _, opt := range opts { opt(srv) } - h.SetStreamHandler(protocol.ID(proto), srv.streamHandler) return srv } -func (s *Server) streamHandler(stream network.Stream) { +type request struct { + stream network.Stream + received time.Time +} + +func (s *Server) Run(ctx context.Context) error { + limit := rate.NewLimiter(rate.Every(s.interval/time.Duration(s.requestsPerInterval)), s.requestsPerInterval) + queue := make(chan request, s.queueSize) + if s.metrics != nil { + s.metrics.targetQueue.Set(float64(s.queueSize)) + s.metrics.targetRps.Set(float64(limit.Limit())) + } + s.h.SetStreamHandler(protocol.ID(s.protocol), func(stream network.Stream) { + select { + case queue <- request{stream: stream, received: time.Now()}: + if s.metrics != nil { + s.metrics.queue.Set(float64(len(queue))) + s.metrics.accepted.Inc() + } + default: + if s.metrics != nil { + s.metrics.dropped.Inc() + } + stream.Close() + } + }) + + var eg errgroup.Group + eg.SetLimit(s.queueSize) + for { + select { + case <-ctx.Done(): + eg.Wait() + return nil + case req := <-queue: + if err := limit.Wait(ctx); err != nil { + eg.Wait() + return nil + } + eg.Go(func() error { + s.queueHandler(ctx, req.stream) + if s.metrics != nil { + s.metrics.serverLatency.Observe(time.Since(req.received).Seconds()) + s.metrics.completed.Inc() + } + return nil + }) + } + } +} + +func (s *Server) queueHandler(ctx context.Context, stream network.Stream) { defer stream.Close() _ = stream.SetDeadline(time.Now().Add(s.timeout)) defer stream.SetDeadline(time.Time{}) @@ -112,7 +193,7 @@ func (s *Server) streamHandler(stream network.Stream) { return } if size > uint64(s.requestLimit) { - s.logger.Warning("request limit overflow", + s.logger.With().Warning("request limit overflow", log.Int("limit", s.requestLimit), log.Uint64("request", size), ) @@ -125,7 +206,7 @@ func (s *Server) streamHandler(stream network.Stream) { return } start := time.Now() - buf, err = s.handler(log.WithNewRequestID(s.ctx), buf) + buf, err = s.handler(log.WithNewRequestID(ctx), buf) s.logger.With().Debug("protocol handler execution time", log.String("protocol", s.protocol), log.Duration("duration", time.Since(start)), @@ -149,7 +230,14 @@ func (s *Server) streamHandler(stream network.Stream) { // Request sends a binary request to the peer. Request is executed in the background, one of the callbacks // is guaranteed to be called on success/error. -func (s *Server) Request(ctx context.Context, pid peer.ID, req []byte, resp func([]byte), failure func(error)) error { +func (s *Server) Request( + ctx context.Context, + pid peer.ID, + req []byte, + resp func([]byte), + failure func(error), +) error { + start := time.Now() if len(req) > s.requestLimit { return fmt.Errorf("request length (%d) is longer than limit %d", len(req), s.requestLimit) } @@ -157,53 +245,65 @@ func (s *Server) Request(ctx context.Context, pid peer.ID, req []byte, resp func return fmt.Errorf("%w: %s", ErrNotConnected, pid) } go func() { - start := time.Now() - defer func() { - s.logger.WithContext(ctx).With().Debug("request execution time", - log.String("protocol", s.protocol), - log.Duration("duration", time.Since(start)), - ) - }() - ctx, cancel := context.WithTimeout(ctx, s.timeout) - defer cancel() - stream, err := s.h.NewStream(network.WithNoDial(ctx, "existing connection"), pid, protocol.ID(s.protocol)) - if err != nil { - failure(err) - return - } - defer stream.Close() - defer stream.SetDeadline(time.Time{}) - _ = stream.SetDeadline(time.Now().Add(s.timeout)) - - wr := bufio.NewWriter(stream) - sz := make([]byte, binary.MaxVarintLen64) - n := binary.PutUvarint(sz, uint64(len(req))) - _, err = wr.Write(sz[:n]) - if err != nil { - failure(err) - return - } - _, err = wr.Write(req) + data, err := s.request(ctx, pid, req) if err != nil { failure(err) - return - } - if err := wr.Flush(); err != nil { - failure(err) - return + } else if len(data.Error) > 0 { + failure(errors.New(data.Error)) + } else { + resp(data.Data) } - - rd := bufio.NewReader(stream) - var r Response - if _, err := codec.DecodeFrom(rd, &r); err != nil { - failure(err) + s.logger.WithContext(ctx).With().Debug("request execution time", + log.String("protocol", s.protocol), + log.Duration("duration", time.Since(start)), + log.Err(err), + ) + switch { + case s.metrics == nil: return - } - if len(r.Error) > 0 { - failure(errors.New(r.Error)) - } else { - resp(r.Data) + case err != nil: + s.metrics.clientLatencyFailure.Observe(time.Since(start).Seconds()) + case err == nil: + s.metrics.clientLatency.Observe(time.Since(start).Seconds()) } }() return nil } + +func (s *Server) request(ctx context.Context, pid peer.ID, req []byte) (*Response, error) { + ctx, cancel := context.WithTimeout(ctx, s.timeout) + defer cancel() + + var stream network.Stream + stream, err := s.h.NewStream( + network.WithNoDial(ctx, "existing connection"), + pid, + protocol.ID(s.protocol), + ) + if err != nil { + return nil, err + } + defer stream.Close() + defer stream.SetDeadline(time.Time{}) + _ = stream.SetDeadline(time.Now().Add(s.timeout)) + + wr := bufio.NewWriter(stream) + sz := make([]byte, binary.MaxVarintLen64) + n := binary.PutUvarint(sz, uint64(len(req))) + if _, err := wr.Write(sz[:n]); err != nil { + return nil, err + } + if _, err := wr.Write(req); err != nil { + return nil, err + } + if err := wr.Flush(); err != nil { + return nil, err + } + + rd := bufio.NewReader(stream) + var r Response + if _, err = codec.DecodeFrom(rd, &r); err != nil { + return nil, err + } + return &r, nil +} diff --git a/p2p/server/server_test.go b/p2p/server/server_test.go index ea673bd18c6..49cc02e1652 100644 --- a/p2p/server/server_test.go +++ b/p2p/server/server_test.go @@ -3,18 +3,20 @@ package server import ( "context" "errors" + "sync/atomic" "testing" "time" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/spacemeshos/go-scale/tester" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" + + "github.com/spacemeshos/go-spacemesh/log/logtest" ) func TestServer(t *testing.T) { const limit = 1024 - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) mesh, err := mocknet.FullMeshConnected(4) require.NoError(t, err) @@ -32,12 +34,31 @@ func TestServer(t *testing.T) { } opts := []Opt{ WithTimeout(100 * time.Millisecond), - WithContext(ctx), + WithLog(logtest.New(t)), } client := New(mesh.Hosts()[0], proto, handler, append(opts, WithRequestSizeLimit(2*limit))...) - _ = New(mesh.Hosts()[1], proto, handler, append(opts, WithRequestSizeLimit(limit))...) - _ = New(mesh.Hosts()[2], proto, errhandler, append(opts, WithRequestSizeLimit(limit))...) - + srv1 := New(mesh.Hosts()[1], proto, handler, append(opts, WithRequestSizeLimit(limit))...) + srv2 := New(mesh.Hosts()[2], proto, errhandler, append(opts, WithRequestSizeLimit(limit))...) + ctx, cancel := context.WithCancel(context.Background()) + var eg errgroup.Group + eg.Go(func() error { + return srv1.Run(ctx) + }) + eg.Go(func() error { + return srv2.Run(ctx) + }) + require.Eventually(t, func() bool { + for _, h := range mesh.Hosts()[1:] { + if len(h.Mux().Protocols()) == 0 { + return false + } + } + return true + }, time.Second, 10*time.Millisecond) + t.Cleanup(func() { + cancel() + eg.Wait() + }) respHandler := func(msg []byte) { select { case <-ctx.Done(): @@ -51,7 +72,10 @@ func TestServer(t *testing.T) { } } t.Run("ReceiveMessage", func(t *testing.T) { - require.NoError(t, client.Request(ctx, mesh.Hosts()[1].ID(), request, respHandler, respErrHandler)) + require.NoError( + t, + client.Request(ctx, mesh.Hosts()[1].ID(), request, respHandler, respErrHandler), + ) select { case <-time.After(time.Second): require.FailNow(t, "timed out while waiting for message response") @@ -61,7 +85,10 @@ func TestServer(t *testing.T) { } }) t.Run("ReceiveError", func(t *testing.T) { - require.NoError(t, client.Request(ctx, mesh.Hosts()[2].ID(), request, respHandler, respErrHandler)) + require.NoError( + t, + client.Request(ctx, mesh.Hosts()[2].ID(), request, respHandler, respErrHandler), + ) select { case <-time.After(time.Second): require.FailNow(t, "timed out while waiting for error response") @@ -70,7 +97,10 @@ func TestServer(t *testing.T) { } }) t.Run("DialError", func(t *testing.T) { - require.NoError(t, client.Request(ctx, mesh.Hosts()[3].ID(), request, respHandler, respErrHandler)) + require.NoError( + t, + client.Request(ctx, mesh.Hosts()[3].ID(), request, respHandler, respErrHandler), + ) select { case <-time.After(time.Second): require.FailNow(t, "timed out while waiting for dial error") @@ -79,10 +109,23 @@ func TestServer(t *testing.T) { } }) t.Run("NotConnected", func(t *testing.T) { - require.ErrorIs(t, client.Request(ctx, "unknown", request, respHandler, respErrHandler), ErrNotConnected) + require.ErrorIs( + t, + client.Request(ctx, "unknown", request, respHandler, respErrHandler), + ErrNotConnected, + ) }) t.Run("limit overflow", func(t *testing.T) { - require.NoError(t, client.Request(ctx, mesh.Hosts()[2].ID(), make([]byte, limit+1), respHandler, respErrHandler)) + require.NoError( + t, + client.Request( + ctx, + mesh.Hosts()[2].ID(), + make([]byte, limit+1), + respHandler, + respErrHandler, + ), + ) select { case <-time.After(time.Second): require.FailNow(t, "timed out while waiting for error response") @@ -92,6 +135,59 @@ func TestServer(t *testing.T) { }) } +func TestQueued(t *testing.T) { + mesh, err := mocknet.FullMeshConnected(2) + require.NoError(t, err) + + var ( + total = 100 + proto = "test" + success, failure atomic.Int64 + unblock = make(chan struct{}) + wait = make(chan struct{}, total) + ) + + client := New(mesh.Hosts()[0], proto, nil) + srv := New( + mesh.Hosts()[1], + proto, + func(_ context.Context, msg []byte) ([]byte, error) { + return msg, nil + }, + WithQueueSize(total/4), + WithRequestsPerInterval(25, time.Second), + WithMetrics(), + ) + var ( + eg errgroup.Group + ctx, cancel = context.WithCancel(context.Background()) + ) + eg.Go(func() error { + return srv.Run(ctx) + }) + t.Cleanup(func() { + cancel() + eg.Wait() + }) + for i := 0; i < total; i++ { + require.NoError(t, client.Request(ctx, mesh.Hosts()[1].ID(), []byte("ping"), + func(b []byte) { + success.Add(1) + wait <- struct{}{} + }, func(err error) { + failure.Add(1) + wait <- struct{}{} + }, + )) + } + close(unblock) + for i := 0; i < total; i++ { + <-wait + } + require.NotEmpty(t, failure.Load()) + require.Greater(t, int(success.Load()), total/2) +} + func FuzzResponseConsistency(f *testing.F) { tester.FuzzConsistency[Response](f) } diff --git a/systest/parameters/fastnet/smesher.json b/systest/parameters/fastnet/smesher.json index e30df571c07..23e9a12d146 100644 --- a/systest/parameters/fastnet/smesher.json +++ b/systest/parameters/fastnet/smesher.json @@ -10,8 +10,14 @@ }, "api": { "grpc-public-services": [ - "debug", "global", "mesh", "node", - "transaction", "activation", "admin", "smesher" + "debug", + "global", + "mesh", + "node", + "transaction", + "activation", + "admin", + "smesher" ], "grpc-private-services": [ "post" @@ -19,5 +25,8 @@ }, "logging": { "hare": "debug" + }, + "fetch": { + "servers-metrics": true } -} +} \ No newline at end of file