diff --git a/api/api_net.go b/api/api_net.go index 0a389e5ed6b..7dddb09ac0a 100644 --- a/api/api_net.go +++ b/api/api_net.go @@ -52,7 +52,9 @@ type Net interface { NetBlockList(ctx context.Context) (NetBlockList, error) //perm:read // ResourceManager API - NetStat(ctx context.Context, scope string) (NetStat, error) //perm:read + NetStat(ctx context.Context, scope string) (NetStat, error) //perm:read + NetLimit(ctx context.Context, scope string) (NetLimit, error) //perm:read + NetSetLimit(ctx context.Context, scope string, limit NetLimit) error //perm:admin // ID returns peerID of libp2p node backing this API ID(context.Context) (peer.ID, error) //perm:read diff --git a/api/types.go b/api/types.go index 66fb7985060..c688edf4b5f 100644 --- a/api/types.go +++ b/api/types.go @@ -135,7 +135,21 @@ type NetStat struct { Transient *network.ScopeStat `json:",omitempty"` Services map[string]network.ScopeStat `json:",omitempty"` Protocols map[string]network.ScopeStat `json:",omitempty"` - Peers map[string]network.ScopeStat + Peers map[string]network.ScopeStat `json:",omitempty"` +} + +type NetLimit struct { + Dynamic bool `json:",omitempty"` + // set if Dynamic is false + Memory int64 `json:",omitempty"` + // set if Dynamic is true + MemoryFraction float64 `json:",omitempty"` + MinMemory int64 `json:",omitempty"` + MaxMemory int64 `json:",omitempty"` + + Streams, StreamsInbound, StreamsOutbound int + Conns, ConnsInbound, ConnsOutbound int + FD int } type ExtendedPeerInfo struct { diff --git a/node/impl/net/rcmgr.go b/node/impl/net/rcmgr.go index 2084d3a3576..1b6d57d8e8c 100644 --- a/node/impl/net/rcmgr.go +++ b/node/impl/net/rcmgr.go @@ -85,7 +85,7 @@ func (a *NetAPI) NetStat(ctx context.Context, scope string) (result api.NetStat, p := scope[5:] pid, err := peer.IDFromString(p) if err != nil { - return result, err + return result, xerrors.Errorf("invalid peer ID: %s: %w", p, err) } err = a.ResourceManager.ViewPeer(pid, func(s network.PeerScope) error { stat := s.Stat() @@ -100,3 +100,172 @@ func (a *NetAPI) NetStat(ctx context.Context, scope string) (result api.NetStat, return result, xerrors.Errorf("invalid scope %s", scope) } } + +func (a *NetAPI) NetLimit(ctx context.Context, scope string) (result api.NetLimit, err error) { + getLimit := func(s network.ResourceScope) error { + limiter, ok := s.(rcmgr.ResourceScopeLimiter) + if !ok { + return xerrors.Errorf("resource scope doesn't implement ResourceScopeLimiter interface") + } + + limit := limiter.Limit() + switch l := limit.(type) { + case *rcmgr.StaticLimit: + result.Memory = l.Memory + result.Streams = l.BaseLimit.Streams + result.StreamsInbound = l.BaseLimit.StreamsInbound + result.StreamsOutbound = l.BaseLimit.StreamsOutbound + result.Conns = l.BaseLimit.Conns + result.ConnsInbound = l.BaseLimit.ConnsInbound + result.ConnsOutbound = l.BaseLimit.ConnsOutbound + result.FD = l.BaseLimit.FD + + case *rcmgr.DynamicLimit: + result.Dynamic = true + result.MemoryFraction = l.MemoryLimit.MemoryFraction + result.MinMemory = l.MemoryLimit.MinMemory + result.MaxMemory = l.MemoryLimit.MaxMemory + result.Streams = l.BaseLimit.Streams + result.StreamsInbound = l.BaseLimit.StreamsInbound + result.StreamsOutbound = l.BaseLimit.StreamsOutbound + result.Conns = l.BaseLimit.Conns + result.ConnsInbound = l.BaseLimit.ConnsInbound + result.ConnsOutbound = l.BaseLimit.ConnsOutbound + result.FD = l.BaseLimit.FD + + default: + return xerrors.Errorf("unknown limit type %T", limit) + } + + return nil + } + + switch { + case scope == "system": + err = a.ResourceManager.ViewSystem(func(s network.ResourceScope) error { + return getLimit(s) + }) + return result, err + + case scope == "transient": + err = a.ResourceManager.ViewTransient(func(s network.ResourceScope) error { + return getLimit(s) + }) + return result, err + + case strings.HasPrefix(scope, "svc:"): + svc := scope[4:] + err = a.ResourceManager.ViewService(svc, func(s network.ServiceScope) error { + return getLimit(s) + }) + return result, err + + case strings.HasPrefix(scope, "proto:"): + proto := scope[6:] + err = a.ResourceManager.ViewProtocol(protocol.ID(proto), func(s network.ProtocolScope) error { + return getLimit(s) + }) + return result, err + + case strings.HasPrefix(scope, "peer:"): + p := scope[5:] + pid, err := peer.IDFromString(p) + if err != nil { + return result, xerrors.Errorf("invalid peer ID: %s: %w", p, err) + } + err = a.ResourceManager.ViewPeer(pid, func(s network.PeerScope) error { + return getLimit(s) + }) + return result, err + + default: + return result, xerrors.Errorf("invalid scope %s", scope) + } +} + +func (a *NetAPI) NetSetLimit(ctx context.Context, scope string, limit api.NetLimit) error { + setLimit := func(s network.ResourceScope) error { + limiter, ok := s.(rcmgr.ResourceScopeLimiter) + if !ok { + return xerrors.Errorf("resource scope doesn't implement ResourceScopeLimiter interface") + } + + var newLimit rcmgr.Limit + if limit.Dynamic { + newLimit = &rcmgr.DynamicLimit{ + MemoryLimit: rcmgr.MemoryLimit{ + MemoryFraction: limit.MemoryFraction, + MinMemory: limit.MinMemory, + MaxMemory: limit.MaxMemory, + }, + BaseLimit: rcmgr.BaseLimit{ + Streams: limit.Streams, + StreamsInbound: limit.StreamsInbound, + StreamsOutbound: limit.StreamsOutbound, + Conns: limit.Conns, + ConnsInbound: limit.ConnsInbound, + ConnsOutbound: limit.ConnsOutbound, + FD: limit.FD, + }, + } + } else { + newLimit = &rcmgr.StaticLimit{ + Memory: limit.Memory, + BaseLimit: rcmgr.BaseLimit{ + Streams: limit.Streams, + StreamsInbound: limit.StreamsInbound, + StreamsOutbound: limit.StreamsOutbound, + Conns: limit.Conns, + ConnsInbound: limit.ConnsInbound, + ConnsOutbound: limit.ConnsOutbound, + FD: limit.FD, + }, + } + } + + limiter.SetLimit(newLimit) + return nil + } + + switch { + case scope == "system": + err := a.ResourceManager.ViewSystem(func(s network.ResourceScope) error { + return setLimit(s) + }) + return err + + case scope == "transient": + err := a.ResourceManager.ViewTransient(func(s network.ResourceScope) error { + return setLimit(s) + }) + return err + + case strings.HasPrefix(scope, "svc:"): + svc := scope[4:] + err := a.ResourceManager.ViewService(svc, func(s network.ServiceScope) error { + return setLimit(s) + }) + return err + + case strings.HasPrefix(scope, "proto:"): + proto := scope[6:] + err := a.ResourceManager.ViewProtocol(protocol.ID(proto), func(s network.ProtocolScope) error { + return setLimit(s) + }) + return err + + case strings.HasPrefix(scope, "peer:"): + p := scope[5:] + pid, err := peer.IDFromString(p) + if err != nil { + return xerrors.Errorf("invalid peer ID: %s: %w", p, err) + } + err = a.ResourceManager.ViewPeer(pid, func(s network.PeerScope) error { + return setLimit(s) + }) + return err + + default: + return xerrors.Errorf("invalid scope %s", scope) + } +}