Skip to content

Commit

Permalink
feat: add dynamic parallel count (dragonflyoss#1088)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Feb 23, 2022
1 parent 29c46ff commit 5c8ec61
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 56 deletions.
3 changes: 2 additions & 1 deletion manager/types/scheduler_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ type SchedulerClusterConfig struct {
}

type SchedulerClusterClientConfig struct {
LoadLimit uint32 `yaml:"loadLimit" mapstructure:"loadLimit" json:"load_limit" binding:"omitempty,gte=1,lte=5000"`
LoadLimit uint32 `yaml:"loadLimit" mapstructure:"loadLimit" json:"load_limit" binding:"omitempty,gte=1,lte=5000"`
ParallelCount uint32 `yaml:"parallelCount" mapstructure:"parallelCount" json:"parallel_count" binding:"omitempty,gte=1,lte=50"`
}

type SchedulerClusterScopes struct {
Expand Down
10 changes: 5 additions & 5 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err
}

// Initialize dynconfig client
dynConfig, err := config.NewDynconfig(s.managerClient, d.CacheDir(), cfg)
dynconfig, err := config.NewDynconfig(s.managerClient, d.CacheDir(), cfg)
if err != nil {
return nil, err
}
s.dynconfig = dynConfig
s.dynconfig = dynconfig

// Initialize GC
s.gc = gc.New(gc.WithLogger(logger.GCLogger))
Expand All @@ -122,16 +122,16 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err
}

// Initialize resource
resource, err := resource.New(cfg, s.gc, dynConfig, dialOptions...)
resource, err := resource.New(cfg, s.gc, dynconfig, dialOptions...)
if err != nil {
return nil, err
}

// Initialize scheduler
scheduler := scheduler.New(cfg.Scheduler, d.PluginDir())
scheduler := scheduler.New(cfg.Scheduler, dynconfig, d.PluginDir())

// Initialize scheduler service
service := service.New(cfg, resource, scheduler, dynConfig)
service := service.New(cfg, resource, scheduler, dynconfig)

// Initialize grpc service
svr := rpcserver.New(service, serverOptions...)
Expand Down
27 changes: 20 additions & 7 deletions scheduler/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ import (
"d7y.io/dragonfly/v2/scheduler/scheduler/evaluator"
)

const (
// Default number of pieces downloaded in parallel
defaultParallelCount = 4
)

type Scheduler interface {
// ScheduleParent schedule a parent and candidates to a peer
ScheduleParent(context.Context, *resource.Peer, set.SafeSet)
Expand All @@ -48,12 +53,16 @@ type scheduler struct {

// Scheduler configuration
config *config.SchedulerConfig

// Scheduler dynamic configuration
dynconfig config.DynconfigInterface
}

func New(cfg *config.SchedulerConfig, pluginDir string) Scheduler {
func New(cfg *config.SchedulerConfig, dynconfig config.DynconfigInterface, pluginDir string) Scheduler {
return &scheduler{
evaluator: evaluator.New(cfg.Algorithm, pluginDir),
config: cfg,
dynconfig: dynconfig,
}
}

Expand Down Expand Up @@ -172,7 +181,7 @@ func (s *scheduler) NotifyAndFindParent(ctx context.Context, peer *resource.Peer
return []*resource.Peer{}, false
}

if err := stream.Send(constructSuccessPeerPacket(peer, parents[0], parents[1:])); err != nil {
if err := stream.Send(constructSuccessPeerPacket(s.dynconfig, peer, parents[0], parents[1:])); err != nil {
peer.Log.Error(err)
return []*resource.Peer{}, false
}
Expand Down Expand Up @@ -254,7 +263,12 @@ func (s *scheduler) filterParents(peer *resource.Peer, blocklist set.SafeSet) []
}

// Construct peer successful packet
func constructSuccessPeerPacket(peer *resource.Peer, parent *resource.Peer, candidateParents []*resource.Peer) *rpcscheduler.PeerPacket {
func constructSuccessPeerPacket(dynconfig config.DynconfigInterface, peer *resource.Peer, parent *resource.Peer, candidateParents []*resource.Peer) *rpcscheduler.PeerPacket {
parallelCount := defaultParallelCount
if client, ok := dynconfig.GetSchedulerClusterClientConfig(); ok && client.ParallelCount > 0 {
parallelCount = int(client.ParallelCount)
}

var stealPeers []*rpcscheduler.PeerPacket_DestPeer
for _, candidateParent := range candidateParents {
stealPeers = append(stealPeers, &rpcscheduler.PeerPacket_DestPeer{
Expand All @@ -265,10 +279,9 @@ func constructSuccessPeerPacket(peer *resource.Peer, parent *resource.Peer, cand
}

return &rpcscheduler.PeerPacket{
TaskId: peer.Task.ID,
SrcPid: peer.ID,
// TODO(gaius-qi) Configure ParallelCount parameter in manager service
ParallelCount: 1,
TaskId: peer.Task.ID,
SrcPid: peer.ID,
ParallelCount: int32(parallelCount),
MainPeer: &rpcscheduler.PeerPacket_DestPeer{
Ip: parent.Host.IP,
RpcPort: parent.Host.Port,
Expand Down
Loading

0 comments on commit 5c8ec61

Please sign in to comment.