Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Relay client should limit max concurrency #1140

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
82 changes: 82 additions & 0 deletions api/clients/v2/limited_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package clients

import (
"context"
"fmt"
relaygrpc "github.com/Layr-Labs/eigenda/api/grpc/relay"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
"google.golang.org/grpc"
)

var _ relaygrpc.RelayClient = (*limitedRelayClient)(nil)

// TODO (cody-littley / litt3): when randomly selecting a relay client to use, we should avoid selecting a
// client that currently has exhausted its concurrency permits.

// limitedRelayClient encapsulates a gRPC client and a channel for limiting concurrent requests to that client.
type limitedRelayClient struct {
// client is the underlying gRPC client.
client relaygrpc.RelayClient

// relayID is the ID of the relay that this client is connected to.
relayKey corev2.RelayKey

// permits is a channel for limiting the number of concurrent requests to the client.
// when a request is initiated, a value is sent to the channel, and when the request is completed,
// the value is received. The channel has a buffer size of `MaxConcurrentRequests`, and will block
// if the number of concurrent requests exceeds this limit.
permits chan struct{}
}

// newLimitedRelayClient creates a new limitedRelayClient.
func newLimitedRelayClient(
client relaygrpc.RelayClient,
relayKey corev2.RelayKey,
maxConcurrentRequests uint) (*limitedRelayClient, error) {

if maxConcurrentRequests == 0 {
return nil, fmt.Errorf("maxConcurrentRequests must be greater than 0")
}

return &limitedRelayClient{
client: client,
relayKey: relayKey,
permits: make(chan struct{}, maxConcurrentRequests),
}, nil
}

func (l *limitedRelayClient) GetBlob(
ctx context.Context,
in *relaygrpc.GetBlobRequest,
opts ...grpc.CallOption) (*relaygrpc.GetBlobReply, error) {

select {
case l.permits <- struct{}{}:
// permit acquired
case <-ctx.Done():
return nil,
fmt.Errorf("context cancelled while waiting for permit to get blob from relay %d", l.relayKey)
}
defer func() {
<-l.permits
}()
return l.client.GetBlob(ctx, in, opts...)
}

func (l *limitedRelayClient) GetChunks(
ctx context.Context,
in *relaygrpc.GetChunksRequest,
opts ...grpc.CallOption) (*relaygrpc.GetChunksReply, error) {

select {
case l.permits <- struct{}{}:
// permit acquired
case <-ctx.Done():
return nil,
fmt.Errorf("context cancelled while waiting for permit to get chunks from relay %d", l.relayKey)
}
defer func() {
<-l.permits
}()
return l.client.GetChunks(ctx, in, opts...)
}
30 changes: 22 additions & 8 deletions api/clients/v2/relay_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type RelayClientConfig struct {
UseSecureGrpcFlag bool
OperatorID *core.OperatorID
MessageSigner MessageSigner
// MaxConcurrentRequests is the maximum number of concurrent requests to a particular relay server.
MaxConcurrentRequests uint
}

type ChunkRequestByRange struct {
Expand Down Expand Up @@ -52,6 +54,8 @@ type RelayClient interface {
Close() error
}

var _ RelayClient = (*relayClient)(nil)

type relayClient struct {
config *RelayClientConfig

Expand All @@ -62,17 +66,21 @@ type relayClient struct {
conns sync.Map
logger logging.Logger

// grpcClients maps relay key to the gRPC client: `map[corev2.RelayKey]relaygrpc.RelayClient`
// grpcClients maps relay key to the gRPC client: `map[corev2.RelayKey]*limitedRelayClient`
grpcClients sync.Map
}

var _ RelayClient = (*relayClient)(nil)

// NewRelayClient creates a new RelayClient that connects to the relays specified in the config.
// It keeps a connection to each relay and reuses it for subsequent requests, and the connection is lazily instantiated.
func NewRelayClient(config *RelayClientConfig, logger logging.Logger) (RelayClient, error) {
if config == nil || len(config.Sockets) <= 0 {
return nil, fmt.Errorf("invalid config: %v", config)
if config == nil {
return nil, fmt.Errorf("config is nil")
}
if len(config.Sockets) <= 0 {
return nil, errors.New("no relay sockets provided")
}
if config.MaxConcurrentRequests == 0 {
return nil, errors.New("maxConcurrentRequests must be greater than 0")
}

logger.Info("creating relay client", "urls", config.Sockets)
Expand Down Expand Up @@ -214,15 +222,15 @@ func (c *relayClient) GetChunksByIndex(
return res.GetData(), nil
}

func (c *relayClient) getClient(key corev2.RelayKey) (relaygrpc.RelayClient, error) {
func (c *relayClient) getClient(key corev2.RelayKey) (*limitedRelayClient, error) {
if err := c.initOnceGrpcConnection(key); err != nil {
return nil, err
}
maybeClient, ok := c.grpcClients.Load(key)
if !ok {
return nil, fmt.Errorf("no grpc client for relay key: %v", key)
}
client, ok := maybeClient.(relaygrpc.RelayClient)
client, ok := maybeClient.(*limitedRelayClient)
if !ok {
return nil, fmt.Errorf("invalid grpc client for relay key: %v", key)
}
Expand All @@ -248,7 +256,13 @@ func (c *relayClient) initOnceGrpcConnection(key corev2.RelayKey) error {
return
}
c.conns.Store(key, conn)
c.grpcClients.Store(key, relaygrpc.NewRelayClient(conn))

limitedClient, err := newLimitedRelayClient(relaygrpc.NewRelayClient(conn), key, c.config.MaxConcurrentRequests)
if err != nil {
initErr = err
return
}
c.grpcClients.Store(key, limitedClient)
})
return initErr
}
Expand Down
3 changes: 2 additions & 1 deletion inabox/tests/integration_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ var _ = Describe("Inabox v2 Integration", func() {

// Test retrieval from relay
relayClient, err := clients.NewRelayClient(&clients.RelayClientConfig{
Sockets: relays,
Sockets: relays,
MaxConcurrentRequests: 8,
}, logger)
Expect(err).To(BeNil())

Expand Down
3 changes: 3 additions & 0 deletions node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ type Config struct {
UseSecureGrpc bool
ReachabilityPollIntervalSec uint64
DisableNodeInfoResources bool
// The maximum number of concurrent requests to any particular relay server.
RelayConcurrency uint

BlsSignerConfig blssignerTypes.SignerConfig

Expand Down Expand Up @@ -292,6 +294,7 @@ func NewConfig(ctx *cli.Context) (*Config, error) {
EnableGnarkBundleEncoding: ctx.Bool(flags.EnableGnarkBundleEncodingFlag.Name),
ClientIPHeader: ctx.GlobalString(flags.ClientIPHeaderFlag.Name),
UseSecureGrpc: ctx.GlobalBoolT(flags.ChurnerUseSecureGRPC.Name),
RelayConcurrency: ctx.GlobalUint(flags.RelayConcurrencyFlag.Name),
DisableNodeInfoResources: ctx.GlobalBool(flags.DisableNodeInfoResourcesFlag.Name),
BlsSignerConfig: blsSignerConfig,
EnableV2: v2Enabled,
Expand Down
9 changes: 9 additions & 0 deletions node/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,14 @@ var (
Required: false,
EnvVar: common.PrefixEnvVar(EnvVarPrefix, "CHURNER_USE_SECURE_GRPC"),
}
RelayConcurrencyFlag = cli.UintFlag{
Name: common.PrefixFlag(FlagPrefix, "relay-concurrency"),
Usage: "The maximum number of concurrent relay requests to any particular relay",
Required: false,
EnvVar: common.PrefixEnvVar(EnvVarPrefix, "RELAY_CONCURRENCY"),
// default value should stay in sync with default value in relay.limiter.Config.MaxConcurrentGetChunkOpsClient
Value: 8,
}
PubIPProviderFlag = cli.StringSliceFlag{
Name: common.PrefixFlag(FlagPrefix, "public-ip-provider"),
Usage: "The ip provider service(s) used to obtain a node's public IP. Valid options: 'seeip', 'ipify'",
Expand Down Expand Up @@ -439,6 +447,7 @@ var optionalFlags = []cli.Flag{
DispersalAuthenticationKeyCacheSizeFlag,
DisperserKeyTimeoutFlag,
DispersalAuthenticationTimeoutFlag,
RelayConcurrencyFlag,
}

func init() {
Expand Down
18 changes: 10 additions & 8 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,11 @@ func NewNode(

logger.Info("Creating relay client", "relayURLs", relayURLs)
relayClient, err = clients.NewRelayClient(&clients.RelayClientConfig{
Sockets: relayURLs,
UseSecureGrpcFlag: config.UseSecureGrpc,
OperatorID: &config.ID,
MessageSigner: n.SignMessage,
Sockets: relayURLs,
UseSecureGrpcFlag: config.UseSecureGrpc,
OperatorID: &config.ID,
MessageSigner: n.SignMessage,
MaxConcurrentRequests: config.RelayConcurrency,
}, logger)

if err != nil {
Expand Down Expand Up @@ -423,10 +424,11 @@ func (n *Node) RefreshOnchainState(ctx context.Context) error {
}

relayClient, err := clients.NewRelayClient(&clients.RelayClientConfig{
Sockets: relayURLs,
UseSecureGrpcFlag: n.Config.UseSecureGrpc,
OperatorID: &n.Config.ID,
MessageSigner: n.SignMessage,
Sockets: relayURLs,
UseSecureGrpcFlag: n.Config.UseSecureGrpc,
OperatorID: &n.Config.ID,
MessageSigner: n.SignMessage,
MaxConcurrentRequests: n.Config.RelayConcurrency,
}, n.Logger)
if err != nil {
n.Logger.Error("error creating relay client", "err", err)
Expand Down
7 changes: 4 additions & 3 deletions node/node_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,10 @@ func TestRefreshOnchainStateSuccess(t *testing.T) {
}

relayClient, err := clients.NewRelayClient(&clients.RelayClientConfig{
Sockets: relayURLs,
OperatorID: &c.node.Config.ID,
MessageSigner: messageSigner,
Sockets: relayURLs,
OperatorID: &c.node.Config.ID,
MessageSigner: messageSigner,
MaxConcurrentRequests: 1,
}, c.node.Logger)
require.NoError(t, err)
// set up non-mock client
Expand Down
2 changes: 1 addition & 1 deletion relay/cmd/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ var (
Usage: "Max number of concurrent GetChunk operations per client",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_CONCURRENT_GET_CHUNK_OPS_CLIENT"),
Value: 1,
Value: 8, // default value should stay in sync with the default value of node.Config.RelayConcurrency
}
BlsOperatorStateRetrieverAddrFlag = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "bls-operator-state-retriever-addr"),
Expand Down
2 changes: 1 addition & 1 deletion relay/limiter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,6 @@ type Config struct {
GetChunkBytesBurstinessClient int

// MaxConcurrentGetChunkOpsClient is the maximum number of concurrent GetChunk operations that are permitted.
// Default is 1.
// Default is 2.
MaxConcurrentGetChunkOpsClient int
}
Loading