diff --git a/fetch/fetch.go b/fetch/fetch.go index c5157308a78..9035b95cc82 100644 --- a/fetch/fetch.go +++ b/fetch/fetch.go @@ -78,24 +78,72 @@ func (b *batchInfo) toMap() map[types.Hash32]RequestMessage { return m } +type ServerConfig struct { + Queue int `mapstructure:"queue"` + Requests int `mapstructure:"requests"` + Interval time.Duration `mapstructure:"interval"` +} + +func (s ServerConfig) toOpts() []server.Opt { + opts := []server.Opt{} + if s.Queue != 0 { + opts = append(opts, server.WithQueueSize(s.Queue)) + } + if s.Requests != 0 && s.Interval != 0 { + opts = append(opts, server.WithRequestsPerInterval(s.Requests, s.Interval)) + } + return opts +} + // Config is the configuration file of the Fetch component. type Config struct { - BatchTimeout time.Duration // in milliseconds + BatchTimeout time.Duration BatchSize, QueueSize int - RequestTimeout time.Duration // in seconds + RequestTimeout time.Duration MaxRetriesForRequest int - PeersRateThreshold float64 `mapstructure:"peers-rate-threshold"` + EnableServesMetrics bool `mapstructure:"servers-metrics"` + ServersConfig map[string]ServerConfig `mapstructure:"servers"` + PeersRateThreshold float64 `mapstructure:"peers-rate-threshold"` +} + +func (c Config) getServerConfig(protocol string) ServerConfig { + cfg, exists := c.ServersConfig[protocol] + if exists { + return cfg + } + return ServerConfig{ + Queue: 10000, + Requests: 100, + Interval: time.Second, + } } // DefaultConfig is the default config for the fetch component. func DefaultConfig() Config { return Config{ - BatchTimeout: time.Millisecond * time.Duration(50), + BatchTimeout: 50 * time.Millisecond, QueueSize: 20, BatchSize: 20, - RequestTimeout: time.Second * time.Duration(10), + RequestTimeout: 10 * time.Second, MaxRetriesForRequest: 100, - PeersRateThreshold: 0.02, + ServersConfig: map[string]ServerConfig{ + // serves 1 MB of data + atxProtocol: {Queue: 10, Requests: 1, Interval: time.Second}, + // serves 1 KB of data + lyrDataProtocol: {Queue: 1000, Requests: 100, Interval: time.Second}, + // serves atxs, ballots, active sets + // atx - 1 KB + // ballots > 300 bytes + // often queried after receiving gossip message + hashProtocol: {Queue: 2000, Requests: 200, Interval: time.Second}, + // serves at most 100 hashes - 3KB + meshHashProtocol: {Queue: 1000, Requests: 100, Interval: time.Second}, + // serves all malicious ids (id - 32 byte) - 10KB + malProtocol: {Queue: 100, Requests: 10, Interval: time.Second}, + // 64 bytes + OpnProtocol: {Queue: 10000, Requests: 1000, Interval: time.Second}, + }, + PeersRateThreshold: 0.02, } } @@ -220,34 +268,34 @@ func NewFetch( } f.batchTimeout = time.NewTicker(f.cfg.BatchTimeout) - srvOpts := []server.Opt{ - server.WithTimeout(f.cfg.RequestTimeout), - server.WithLog(f.logger), - } if len(f.servers) == 0 { h := newHandler(cdb, bs, msh, b, f.logger) - f.servers[atxProtocol] = server.New(host, atxProtocol, h.handleEpochInfoReq, srvOpts...) - f.servers[lyrDataProtocol] = server.New( - host, - lyrDataProtocol, - h.handleLayerDataReq, - srvOpts...) - f.servers[hashProtocol] = server.New(host, hashProtocol, h.handleHashReq, srvOpts...) - f.servers[meshHashProtocol] = server.New( - host, - meshHashProtocol, - h.handleMeshHashReq, - srvOpts...) - f.servers[malProtocol] = server.New(host, malProtocol, h.handleMaliciousIDsReq, srvOpts...) - f.servers[OpnProtocol] = server.New( - host, - OpnProtocol, - h.handleLayerOpinionsReq2, - srvOpts...) + f.registerServer(host, atxProtocol, h.handleEpochInfoReq) + f.registerServer(host, lyrDataProtocol, h.handleLayerDataReq) + f.registerServer(host, hashProtocol, h.handleHashReq) + f.registerServer(host, meshHashProtocol, h.handleMeshHashReq) + f.registerServer(host, malProtocol, h.handleMaliciousIDsReq) + f.registerServer(host, OpnProtocol, h.handleLayerOpinionsReq2) } return f } +func (f *Fetch) registerServer( + host *p2p.Host, + protocol string, + handler server.Handler, +) { + opts := []server.Opt{ + server.WithTimeout(f.cfg.RequestTimeout), + server.WithLog(f.logger), + } + if f.cfg.EnableServesMetrics { + opts = append(opts, server.WithMetrics()) + } + opts = append(opts, f.cfg.getServerConfig(protocol).toOpts()...) + f.servers[protocol] = server.New(host, protocol, handler, opts...) +} + type dataValidators struct { atx SyncValidator poet SyncValidator @@ -295,6 +343,12 @@ func (f *Fetch) Start() error { f.loop() return nil }) + for _, srv := range f.servers { + srv := srv + f.eg.Go(func() error { + return srv.Run(f.shutdownCtx) + }) + } }) return nil } diff --git a/fetch/fetch_test.go b/fetch/fetch_test.go index 6a74cd40788..ffe4943bc14 100644 --- a/fetch/fetch_test.go +++ b/fetch/fetch_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" + "golang.org/x/sync/errgroup" "github.com/spacemeshos/go-spacemesh/codec" "github.com/spacemeshos/go-spacemesh/common/types" @@ -65,6 +66,9 @@ func createFetch(tb testing.TB) *testFetch { mTxProposalH: mocks.NewMockSyncValidator(ctrl), mPoetH: mocks.NewMockSyncValidator(ctrl), } + for _, srv := range []*mocks.Mockrequester{tf.mMalS, tf.mAtxS, tf.mLyrS, tf.mHashS, tf.mMHashS, tf.mOpn2S} { + srv.EXPECT().Run(gomock.Any()).AnyTimes() + } cfg := Config{ BatchTimeout: 2 * time.Second, // make sure we never hit the batch timeout BatchSize: 3, @@ -373,7 +377,13 @@ func TestFetch_PeerDroppedWhenMessageResultsInValidationReject(t *testing.T) { } return result, nil } - server.New(badPeerHost, hashProtocol, badPeerHandler) + badsrv := server.New(badPeerHost, hashProtocol, badPeerHandler) + var eg errgroup.Group + eg.Go(func() error { + badsrv.Run(ctx) + return nil + }) + defer eg.Wait() fetcher := NewFetch(datastore.NewCachedDB(sql.InMemory(), lg), nil, nil, h, WithContext(ctx), diff --git a/fetch/interface.go b/fetch/interface.go index 759397b4b64..93a5b093b9e 100644 --- a/fetch/interface.go +++ b/fetch/interface.go @@ -11,6 +11,7 @@ import ( //go:generate mockgen -typed -package=mocks -destination=./mocks/mocks.go -source=./interface.go type requester interface { + Run(context.Context) error Request(context.Context, p2p.Peer, []byte, func([]byte), func(error)) error } diff --git a/fetch/mocks/mocks.go b/fetch/mocks/mocks.go index 4b2245ca59c..14531ef52f7 100644 --- a/fetch/mocks/mocks.go +++ b/fetch/mocks/mocks.go @@ -78,6 +78,44 @@ func (c *requesterRequestCall) DoAndReturn(f func(context.Context, p2p.Peer, []b return c } +// Run mocks base method. +func (m *Mockrequester) Run(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Run", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Run indicates an expected call of Run. +func (mr *MockrequesterMockRecorder) Run(arg0 any) *requesterRunCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*Mockrequester)(nil).Run), arg0) + return &requesterRunCall{Call: call} +} + +// requesterRunCall wrap *gomock.Call +type requesterRunCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *requesterRunCall) Return(arg0 error) *requesterRunCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *requesterRunCall) Do(f func(context.Context) error) *requesterRunCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *requesterRunCall) DoAndReturn(f func(context.Context) error) *requesterRunCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + // MockSyncValidator is a mock of SyncValidator interface. type MockSyncValidator struct { ctrl *gomock.Controller diff --git a/go.mod b/go.mod index aeaaa7ba97e..b0e3740aed3 100644 --- a/go.mod +++ b/go.mod @@ -52,6 +52,7 @@ require ( go.uber.org/zap v1.26.0 golang.org/x/exp v0.0.0-20231006140011-7918f672742d golang.org/x/sync v0.4.0 + golang.org/x/time v0.3.0 google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b google.golang.org/grpc v1.59.0 google.golang.org/protobuf v1.31.0 @@ -206,7 +207,6 @@ require ( golang.org/x/sys v0.13.0 // indirect golang.org/x/term v0.13.0 // indirect golang.org/x/text v0.13.0 // indirect - golang.org/x/time v0.3.0 // indirect golang.org/x/tools v0.14.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect diff --git a/p2p/server/metrics.go b/p2p/server/metrics.go new file mode 100644 index 00000000000..5d5c435e737 --- /dev/null +++ b/p2p/server/metrics.go @@ -0,0 +1,78 @@ +package server + +import ( + "github.com/prometheus/client_golang/prometheus" + + "github.com/spacemeshos/go-spacemesh/metrics" +) + +const ( + namespace = "server" + protoLabel = "protocol" +) + +var ( + targetQueue = metrics.NewGauge( + "target_queue", + namespace, + "target size of the queue", + []string{protoLabel}, + ) + queue = metrics.NewGauge( + "queue", + namespace, + "actual size of the queue", + []string{protoLabel}, + ) + targetRps = metrics.NewGauge( + "rps", + namespace, + "target requests per second", + []string{protoLabel}, + ) + requests = metrics.NewCounter( + "requests", + namespace, + "requests counter", + []string{protoLabel, "state"}, + ) + clientLatency = metrics.NewHistogramWithBuckets( + "client_latency_seconds", + namespace, + "latency since initiating a request", + []string{protoLabel, "result"}, + prometheus.ExponentialBuckets(0.01, 2, 10), + ) + serverLatency = metrics.NewHistogramWithBuckets( + "server_latency_seconds", + namespace, + "latency since accepting new stream", + []string{protoLabel}, + prometheus.ExponentialBuckets(0.01, 2, 10), + ) +) + +func newTracker(protocol string) *tracker { + return &tracker{ + targetQueue: targetQueue.WithLabelValues(protocol), + queue: queue.WithLabelValues(protocol), + targetRps: targetRps.WithLabelValues(protocol), + completed: requests.WithLabelValues(protocol, "completed"), + accepted: requests.WithLabelValues(protocol, "accepted"), + dropped: requests.WithLabelValues(protocol, "dropped"), + serverLatency: serverLatency.WithLabelValues(protocol), + clientLatency: clientLatency.WithLabelValues(protocol, "success"), + clientLatencyFailure: clientLatency.WithLabelValues(protocol, "failure"), + } +} + +type tracker struct { + targetQueue prometheus.Gauge + queue prometheus.Gauge + targetRps prometheus.Gauge + completed prometheus.Counter + accepted prometheus.Counter + dropped prometheus.Counter + serverLatency prometheus.Observer + clientLatency, clientLatencyFailure prometheus.Observer +} diff --git a/p2p/server/server.go b/p2p/server/server.go index 60389f3b882..cf8a8f6019d 100644 --- a/p2p/server/server.go +++ b/p2p/server/server.go @@ -13,6 +13,8 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" "github.com/multiformats/go-varint" + "golang.org/x/sync/errgroup" + "golang.org/x/time/rate" "github.com/spacemeshos/go-spacemesh/codec" "github.com/spacemeshos/go-spacemesh/log" @@ -38,16 +40,40 @@ func WithLog(log log.Log) Opt { } } -// WithContext configures parent context for contexts that are passed to the handler. -func WithContext(ctx context.Context) Opt { +func WithRequestSizeLimit(limit int) Opt { return func(s *Server) { - s.ctx = ctx + s.requestLimit = limit } } -func WithRequestSizeLimit(limit int) Opt { +// WithMetrics will enable metrics collection in the server. +func WithMetrics() Opt { return func(s *Server) { - s.requestLimit = limit + s.metrics = newTracker(s.protocol) + } +} + +// WithQueueSize parametrize number of message that will be kept in queue +// and eventually processed by server. Otherwise stream is closed immediately. +// +// Size of the queue should be set to account for maximum expected latency, such as if expected latency is 10s +// and server processes 1000 requests per second size should be 100. +// +// Defaults to 100. +func WithQueueSize(size int) Opt { + return func(s *Server) { + s.queueSize = size + } +} + +// WithRequestsPerInterval parametrizes server rate limit to limit maximum amount of bandwidth +// that this handler can consume. +// +// Defaults to 100 requests per second. +func WithRequestsPerInterval(n int, interval time.Duration) Opt { + return func(s *Server) { + s.requestsPerInterval = n + s.interval = interval } } @@ -73,36 +99,91 @@ type Host interface { // Server for the Handler. type Server struct { - logger log.Log - protocol string - handler Handler - timeout time.Duration - requestLimit int + logger log.Log + protocol string + handler Handler + timeout time.Duration + requestLimit int + queueSize int + requestsPerInterval int + interval time.Duration - h Host + metrics *tracker // metrics can be nil - ctx context.Context + h Host } // New server for the handler. func New(h Host, proto string, handler Handler, opts ...Opt) *Server { srv := &Server{ - ctx: context.Background(), - logger: log.NewNop(), - protocol: proto, - handler: handler, - h: h, - timeout: 10 * time.Second, - requestLimit: 10240, + logger: log.NewNop(), + protocol: proto, + handler: handler, + h: h, + timeout: 10 * time.Second, + requestLimit: 10240, + queueSize: 1000, + requestsPerInterval: 100, + interval: time.Second, } for _, opt := range opts { opt(srv) } - h.SetStreamHandler(protocol.ID(proto), srv.streamHandler) return srv } -func (s *Server) streamHandler(stream network.Stream) { +type request struct { + stream network.Stream + received time.Time +} + +func (s *Server) Run(ctx context.Context) error { + limit := rate.NewLimiter(rate.Every(s.interval/time.Duration(s.requestsPerInterval)), s.requestsPerInterval) + queue := make(chan request, s.queueSize) + if s.metrics != nil { + s.metrics.targetQueue.Set(float64(s.queueSize)) + s.metrics.targetRps.Set(float64(limit.Limit())) + } + s.h.SetStreamHandler(protocol.ID(s.protocol), func(stream network.Stream) { + select { + case queue <- request{stream: stream, received: time.Now()}: + if s.metrics != nil { + s.metrics.queue.Set(float64(len(queue))) + s.metrics.accepted.Inc() + } + default: + if s.metrics != nil { + s.metrics.dropped.Inc() + } + stream.Close() + } + }) + + var eg errgroup.Group + eg.SetLimit(s.queueSize) + for { + select { + case <-ctx.Done(): + eg.Wait() + return nil + case req := <-queue: + if err := limit.Wait(ctx); err != nil { + eg.Wait() + return nil + } + eg.Go(func() error { + s.queueHandler(ctx, req.stream) + if s.metrics != nil { + s.metrics.serverLatency.Observe(time.Since(req.received).Seconds()) + s.metrics.completed.Inc() + } + return nil + }) + } + } +} + +func (s *Server) queueHandler(ctx context.Context, stream network.Stream) { defer stream.Close() _ = stream.SetDeadline(time.Now().Add(s.timeout)) defer stream.SetDeadline(time.Time{}) @@ -112,7 +193,7 @@ func (s *Server) streamHandler(stream network.Stream) { return } if size > uint64(s.requestLimit) { - s.logger.Warning("request limit overflow", + s.logger.With().Warning("request limit overflow", log.Int("limit", s.requestLimit), log.Uint64("request", size), ) @@ -125,7 +206,7 @@ func (s *Server) streamHandler(stream network.Stream) { return } start := time.Now() - buf, err = s.handler(log.WithNewRequestID(s.ctx), buf) + buf, err = s.handler(log.WithNewRequestID(ctx), buf) s.logger.With().Debug("protocol handler execution time", log.String("protocol", s.protocol), log.Duration("duration", time.Since(start)), @@ -149,7 +230,14 @@ func (s *Server) streamHandler(stream network.Stream) { // Request sends a binary request to the peer. Request is executed in the background, one of the callbacks // is guaranteed to be called on success/error. -func (s *Server) Request(ctx context.Context, pid peer.ID, req []byte, resp func([]byte), failure func(error)) error { +func (s *Server) Request( + ctx context.Context, + pid peer.ID, + req []byte, + resp func([]byte), + failure func(error), +) error { + start := time.Now() if len(req) > s.requestLimit { return fmt.Errorf("request length (%d) is longer than limit %d", len(req), s.requestLimit) } @@ -157,53 +245,65 @@ func (s *Server) Request(ctx context.Context, pid peer.ID, req []byte, resp func return fmt.Errorf("%w: %s", ErrNotConnected, pid) } go func() { - start := time.Now() - defer func() { - s.logger.WithContext(ctx).With().Debug("request execution time", - log.String("protocol", s.protocol), - log.Duration("duration", time.Since(start)), - ) - }() - ctx, cancel := context.WithTimeout(ctx, s.timeout) - defer cancel() - stream, err := s.h.NewStream(network.WithNoDial(ctx, "existing connection"), pid, protocol.ID(s.protocol)) - if err != nil { - failure(err) - return - } - defer stream.Close() - defer stream.SetDeadline(time.Time{}) - _ = stream.SetDeadline(time.Now().Add(s.timeout)) - - wr := bufio.NewWriter(stream) - sz := make([]byte, binary.MaxVarintLen64) - n := binary.PutUvarint(sz, uint64(len(req))) - _, err = wr.Write(sz[:n]) - if err != nil { - failure(err) - return - } - _, err = wr.Write(req) + data, err := s.request(ctx, pid, req) if err != nil { failure(err) - return - } - if err := wr.Flush(); err != nil { - failure(err) - return + } else if len(data.Error) > 0 { + failure(errors.New(data.Error)) + } else { + resp(data.Data) } - - rd := bufio.NewReader(stream) - var r Response - if _, err := codec.DecodeFrom(rd, &r); err != nil { - failure(err) + s.logger.WithContext(ctx).With().Debug("request execution time", + log.String("protocol", s.protocol), + log.Duration("duration", time.Since(start)), + log.Err(err), + ) + switch { + case s.metrics == nil: return - } - if len(r.Error) > 0 { - failure(errors.New(r.Error)) - } else { - resp(r.Data) + case err != nil: + s.metrics.clientLatencyFailure.Observe(time.Since(start).Seconds()) + case err == nil: + s.metrics.clientLatency.Observe(time.Since(start).Seconds()) } }() return nil } + +func (s *Server) request(ctx context.Context, pid peer.ID, req []byte) (*Response, error) { + ctx, cancel := context.WithTimeout(ctx, s.timeout) + defer cancel() + + var stream network.Stream + stream, err := s.h.NewStream( + network.WithNoDial(ctx, "existing connection"), + pid, + protocol.ID(s.protocol), + ) + if err != nil { + return nil, err + } + defer stream.Close() + defer stream.SetDeadline(time.Time{}) + _ = stream.SetDeadline(time.Now().Add(s.timeout)) + + wr := bufio.NewWriter(stream) + sz := make([]byte, binary.MaxVarintLen64) + n := binary.PutUvarint(sz, uint64(len(req))) + if _, err := wr.Write(sz[:n]); err != nil { + return nil, err + } + if _, err := wr.Write(req); err != nil { + return nil, err + } + if err := wr.Flush(); err != nil { + return nil, err + } + + rd := bufio.NewReader(stream) + var r Response + if _, err = codec.DecodeFrom(rd, &r); err != nil { + return nil, err + } + return &r, nil +} diff --git a/p2p/server/server_test.go b/p2p/server/server_test.go index ea673bd18c6..49cc02e1652 100644 --- a/p2p/server/server_test.go +++ b/p2p/server/server_test.go @@ -3,18 +3,20 @@ package server import ( "context" "errors" + "sync/atomic" "testing" "time" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/spacemeshos/go-scale/tester" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" + + "github.com/spacemeshos/go-spacemesh/log/logtest" ) func TestServer(t *testing.T) { const limit = 1024 - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) mesh, err := mocknet.FullMeshConnected(4) require.NoError(t, err) @@ -32,12 +34,31 @@ func TestServer(t *testing.T) { } opts := []Opt{ WithTimeout(100 * time.Millisecond), - WithContext(ctx), + WithLog(logtest.New(t)), } client := New(mesh.Hosts()[0], proto, handler, append(opts, WithRequestSizeLimit(2*limit))...) - _ = New(mesh.Hosts()[1], proto, handler, append(opts, WithRequestSizeLimit(limit))...) - _ = New(mesh.Hosts()[2], proto, errhandler, append(opts, WithRequestSizeLimit(limit))...) - + srv1 := New(mesh.Hosts()[1], proto, handler, append(opts, WithRequestSizeLimit(limit))...) + srv2 := New(mesh.Hosts()[2], proto, errhandler, append(opts, WithRequestSizeLimit(limit))...) + ctx, cancel := context.WithCancel(context.Background()) + var eg errgroup.Group + eg.Go(func() error { + return srv1.Run(ctx) + }) + eg.Go(func() error { + return srv2.Run(ctx) + }) + require.Eventually(t, func() bool { + for _, h := range mesh.Hosts()[1:] { + if len(h.Mux().Protocols()) == 0 { + return false + } + } + return true + }, time.Second, 10*time.Millisecond) + t.Cleanup(func() { + cancel() + eg.Wait() + }) respHandler := func(msg []byte) { select { case <-ctx.Done(): @@ -51,7 +72,10 @@ func TestServer(t *testing.T) { } } t.Run("ReceiveMessage", func(t *testing.T) { - require.NoError(t, client.Request(ctx, mesh.Hosts()[1].ID(), request, respHandler, respErrHandler)) + require.NoError( + t, + client.Request(ctx, mesh.Hosts()[1].ID(), request, respHandler, respErrHandler), + ) select { case <-time.After(time.Second): require.FailNow(t, "timed out while waiting for message response") @@ -61,7 +85,10 @@ func TestServer(t *testing.T) { } }) t.Run("ReceiveError", func(t *testing.T) { - require.NoError(t, client.Request(ctx, mesh.Hosts()[2].ID(), request, respHandler, respErrHandler)) + require.NoError( + t, + client.Request(ctx, mesh.Hosts()[2].ID(), request, respHandler, respErrHandler), + ) select { case <-time.After(time.Second): require.FailNow(t, "timed out while waiting for error response") @@ -70,7 +97,10 @@ func TestServer(t *testing.T) { } }) t.Run("DialError", func(t *testing.T) { - require.NoError(t, client.Request(ctx, mesh.Hosts()[3].ID(), request, respHandler, respErrHandler)) + require.NoError( + t, + client.Request(ctx, mesh.Hosts()[3].ID(), request, respHandler, respErrHandler), + ) select { case <-time.After(time.Second): require.FailNow(t, "timed out while waiting for dial error") @@ -79,10 +109,23 @@ func TestServer(t *testing.T) { } }) t.Run("NotConnected", func(t *testing.T) { - require.ErrorIs(t, client.Request(ctx, "unknown", request, respHandler, respErrHandler), ErrNotConnected) + require.ErrorIs( + t, + client.Request(ctx, "unknown", request, respHandler, respErrHandler), + ErrNotConnected, + ) }) t.Run("limit overflow", func(t *testing.T) { - require.NoError(t, client.Request(ctx, mesh.Hosts()[2].ID(), make([]byte, limit+1), respHandler, respErrHandler)) + require.NoError( + t, + client.Request( + ctx, + mesh.Hosts()[2].ID(), + make([]byte, limit+1), + respHandler, + respErrHandler, + ), + ) select { case <-time.After(time.Second): require.FailNow(t, "timed out while waiting for error response") @@ -92,6 +135,59 @@ func TestServer(t *testing.T) { }) } +func TestQueued(t *testing.T) { + mesh, err := mocknet.FullMeshConnected(2) + require.NoError(t, err) + + var ( + total = 100 + proto = "test" + success, failure atomic.Int64 + unblock = make(chan struct{}) + wait = make(chan struct{}, total) + ) + + client := New(mesh.Hosts()[0], proto, nil) + srv := New( + mesh.Hosts()[1], + proto, + func(_ context.Context, msg []byte) ([]byte, error) { + return msg, nil + }, + WithQueueSize(total/4), + WithRequestsPerInterval(25, time.Second), + WithMetrics(), + ) + var ( + eg errgroup.Group + ctx, cancel = context.WithCancel(context.Background()) + ) + eg.Go(func() error { + return srv.Run(ctx) + }) + t.Cleanup(func() { + cancel() + eg.Wait() + }) + for i := 0; i < total; i++ { + require.NoError(t, client.Request(ctx, mesh.Hosts()[1].ID(), []byte("ping"), + func(b []byte) { + success.Add(1) + wait <- struct{}{} + }, func(err error) { + failure.Add(1) + wait <- struct{}{} + }, + )) + } + close(unblock) + for i := 0; i < total; i++ { + <-wait + } + require.NotEmpty(t, failure.Load()) + require.Greater(t, int(success.Load()), total/2) +} + func FuzzResponseConsistency(f *testing.F) { tester.FuzzConsistency[Response](f) } diff --git a/systest/parameters/fastnet/smesher.json b/systest/parameters/fastnet/smesher.json index e30df571c07..23e9a12d146 100644 --- a/systest/parameters/fastnet/smesher.json +++ b/systest/parameters/fastnet/smesher.json @@ -10,8 +10,14 @@ }, "api": { "grpc-public-services": [ - "debug", "global", "mesh", "node", - "transaction", "activation", "admin", "smesher" + "debug", + "global", + "mesh", + "node", + "transaction", + "activation", + "admin", + "smesher" ], "grpc-private-services": [ "post" @@ -19,5 +25,8 @@ }, "logging": { "hare": "debug" + }, + "fetch": { + "servers-metrics": true } -} +} \ No newline at end of file