Skip to content

Commit

Permalink
fix: filter parent condition (dragonflyoss#1277)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Apr 24, 2022
1 parent d318041 commit f9ec2d3
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 52 deletions.
13 changes: 13 additions & 0 deletions scheduler/resource/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,19 @@ type Peer struct {
BlockPeers set.SafeSet

// NeedBackToSource needs downloaded from source
//
// When peer is registering, at the same time,
// scheduler needs to create the new corresponding task and the cdn is disabled,
// NeedBackToSource is set to true
NeedBackToSource *atomic.Bool

// IsBackToSource is downloaded from source
//
// When peer is scheduling and NeedBackToSource is true,
// scheduler needs to return Code_SchedNeedBackSource and
// IsBackToSource is set to true
IsBackToSource *atomic.Bool

// CreateAt is peer create time
CreateAt *atomic.Time

Expand Down Expand Up @@ -181,6 +192,7 @@ func NewPeer(id string, task *Task, host *Host, options ...PeerOption) *Peer {
StealPeers: set.NewSafeSet(),
BlockPeers: set.NewSafeSet(),
NeedBackToSource: atomic.NewBool(false),
IsBackToSource: atomic.NewBool(false),
CreateAt: atomic.NewTime(time.Now()),
UpdateAt: atomic.NewTime(time.Now()),
mu: &sync.RWMutex{},
Expand Down Expand Up @@ -226,6 +238,7 @@ func NewPeer(id string, task *Task, host *Host, options ...PeerOption) *Peer {
p.Log.Infof("peer state is %s", e.FSM.Current())
},
PeerEventDownloadFromBackToSource: func(e *fsm.Event) {
p.IsBackToSource.Store(true)
p.Task.BackToSourcePeers.Add(p)
p.DeleteParent()
p.Host.DeletePeer(p.ID)
Expand Down
116 changes: 64 additions & 52 deletions scheduler/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,19 +159,19 @@ func (s *scheduler) NotifyAndFindParent(ctx context.Context, peer *resource.Peer
return []*resource.Peer{}, false
}

// Find the parent that can be scheduled
parents := s.filterParents(peer, blocklist)
if len(parents) == 0 {
peer.Log.Info("can not find parents")
// Find the candidate parent that can be scheduled
candidateParents := s.filterCandidateParents(peer, blocklist)
if len(candidateParents) == 0 {
peer.Log.Info("can not find candidate parents")
return []*resource.Peer{}, false
}

// Sort parents by evaluation score
// Sort candidate parents by evaluation score
taskTotalPieceCount := peer.Task.TotalPieceCount.Load()
sort.Slice(
parents,
candidateParents,
func(i, j int) bool {
return s.evaluator.Evaluate(parents[i], peer, taskTotalPieceCount) > s.evaluator.Evaluate(parents[j], peer, taskTotalPieceCount)
return s.evaluator.Evaluate(candidateParents[i], peer, taskTotalPieceCount) > s.evaluator.Evaluate(candidateParents[j], peer, taskTotalPieceCount)
},
)

Expand All @@ -182,127 +182,139 @@ func (s *scheduler) NotifyAndFindParent(ctx context.Context, peer *resource.Peer
return []*resource.Peer{}, false
}

if err := stream.Send(constructSuccessPeerPacket(s.dynconfig, peer, parents[0], parents[1:])); err != nil {
if err := stream.Send(constructSuccessPeerPacket(s.dynconfig, peer, candidateParents[0], candidateParents[1:])); err != nil {
peer.Log.Error(err)
return []*resource.Peer{}, false
}

// Add steal peers to current peer
peer.StealPeers.Clear()
for _, parent := range parents[1:] {
peer.StealPeers.Add(parent.ID)
for _, candidateParent := range candidateParents[1:] {
peer.StealPeers.Add(candidateParent.ID)
}

// Replace peer's parent with scheduled parent
peer.ReplaceParent(parents[0])
peer.ReplaceParent(candidateParents[0])
peer.Log.Infof("schedule parent successful, replace parent to %s and steal peers is %v",
parents[0].ID, peer.StealPeers.Values())
candidateParents[0].ID, peer.StealPeers.Values())
peer.Log.Debugf("peer ancestors is %v", peer.Ancestors())
return parents, true
return candidateParents, true
}

// FindParent finds parent that best matches the evaluation
func (s *scheduler) FindParent(ctx context.Context, peer *resource.Peer, blocklist set.SafeSet) (*resource.Peer, bool) {
// Filter the parent that can be scheduled
parents := s.filterParents(peer, blocklist)
if len(parents) == 0 {
peer.Log.Info("can not find parents")
// Filter the candidate parent that can be scheduled
candidateParents := s.filterCandidateParents(peer, blocklist)
if len(candidateParents) == 0 {
peer.Log.Info("can not find candidate parents")
return nil, false
}

// Sort parents by evaluation score
// Sort candidate parents by evaluation score
taskTotalPieceCount := peer.Task.TotalPieceCount.Load()
sort.Slice(
parents,
candidateParents,
func(i, j int) bool {
return s.evaluator.Evaluate(parents[i], peer, taskTotalPieceCount) > s.evaluator.Evaluate(parents[j], peer, taskTotalPieceCount)
return s.evaluator.Evaluate(candidateParents[i], peer, taskTotalPieceCount) > s.evaluator.Evaluate(candidateParents[j], peer, taskTotalPieceCount)
},
)

peer.Log.Infof("find parent %s successful", parents[0].ID)
return parents[0], true
peer.Log.Infof("find parent %s successful", candidateParents[0].ID)
return candidateParents[0], true
}

// Filter the parent that can be scheduled
func (s *scheduler) filterParents(peer *resource.Peer, blocklist set.SafeSet) []*resource.Peer {
// Filter the candidate parent that can be scheduled
func (s *scheduler) filterCandidateParents(peer *resource.Peer, blocklist set.SafeSet) []*resource.Peer {
filterParentLimit := config.DefaultSchedulerFilterParentLimit
if config, ok := s.dynconfig.GetSchedulerClusterConfig(); ok && filterParentLimit > 0 {
filterParentLimit = int(config.FilterParentLimit)
}

var parents []*resource.Peer
var parentIDs []string
var candidateParents []*resource.Peer
var candidateParentIDs []string
var n int
peer.Task.Peers.Range(func(_, value interface{}) bool {
if n > filterParentLimit {
return false
}
n++

parent, ok := value.(*resource.Peer)
candidateParent, ok := value.(*resource.Peer)
if !ok {
return true
}

if blocklist.Contains(parent.ID) {
peer.Log.Debugf("parent %s is not selected because it is in blocklist", parent.ID)
// Candidate parent is in blocklist
if blocklist.Contains(candidateParent.ID) {
peer.Log.Debugf("candidate parent %s is not selected because it is in blocklist", candidateParent.ID)
return true
}

if parent.ID == peer.ID {
peer.Log.Debug("parent is not selected because it is same")
// Candidate parent is itself
if candidateParent.ID == peer.ID {
peer.Log.Debug("candidate parent is not selected because it is same")
return true
}

if s.evaluator.IsBadNode(parent) {
peer.Log.Debugf("parent %s is not selected because it is bad node", parent.ID)
// Candidate parent is bad node
if s.evaluator.IsBadNode(candidateParent) {
peer.Log.Debugf("candidate parent %s is not selected because it is bad node", candidateParent.ID)
return true
}

_, ok = parent.LoadParent()
isBackToSource := peer.Task.BackToSourcePeers.Contains(parent)
if !ok && !parent.Host.IsCDN && !isBackToSource {
peer.Log.Debugf("parent %s is not selected, because its download state is %t %t %t",
parent.ID, ok, parent.Host.IsCDN, isBackToSource)
// Conditions for candidate parent to be a parent:
// 1. candidate parent has parent
// 2. candidate parent is CDN
// 3. candidate parent has been back-to-source
_, ok = candidateParent.LoadParent()
isBackToSource := candidateParent.IsBackToSource.Load()
if !ok && !candidateParent.Host.IsCDN && !isBackToSource {
peer.Log.Debugf("candidate parent %s is not selected, because its download state is %t %t %t",
candidateParent.ID, ok, candidateParent.Host.IsCDN, isBackToSource)
return true
}

// Candidate parent's depth exceeds available depth
peerChildCount := peer.ChildCount.Load()
parentDepth := parent.Depth()
parentDepth := candidateParent.Depth()
if peerChildCount > 0 && parentDepth > defaultAvailableDepth {
peer.Log.Debugf("peer has %d children and parent %s depth is %d", peerChildCount, parent.ID, parentDepth)
peer.Log.Debugf("candidate peer has %d children and parent %s depth is %d", peerChildCount, candidateParent.ID, parentDepth)
return true
}

// Peer's depth exceeds limit depth
peerDepth := peer.Depth()
if parentDepth+peerDepth > defaultDepthLimit {
peer.Log.Debugf("exceeds the %d depth limit of the tree, peer depth is %d, parent %s is %d", defaultDepthLimit, peerDepth, parent.ID, parentDepth)
peer.Log.Debugf("exceeds the %d depth limit of the tree, peer depth is %d, candidate parent %s is %d", defaultDepthLimit, peerDepth, candidateParent.ID, parentDepth)
return true
}

if parent.IsDescendant(peer) {
peer.Log.Debugf("parent %s is not selected because it is descendant", parent.ID)
// Candidate parent is an descendant of peer
if candidateParent.IsDescendant(peer) {
peer.Log.Debugf("candidate parent %s is not selected because it is descendant", candidateParent.ID)
return true
}

if parent.IsAncestor(peer) {
peer.Log.Debugf("parent %s is not selected because it is ancestor", parent.ID)
// Candidate parent is an ancestor of peer
if candidateParent.IsAncestor(peer) {
peer.Log.Debugf("candidate parent %s is not selected because it is ancestor", candidateParent.ID)
return true
}

if parent.Host.FreeUploadLoad() <= 0 {
peer.Log.Debugf("parent %s is not selected because its free upload is empty, upload limit is %d, upload peer count is %d",
parent.ID, parent.Host.UploadLoadLimit.Load(), parent.Host.UploadPeerCount.Load())
// Candidate parent's free upload is empty
if candidateParent.Host.FreeUploadLoad() <= 0 {
peer.Log.Debugf("candidate parent %s is not selected because its free upload is empty, upload limit is %d, upload peer count is %d",
candidateParent.ID, candidateParent.Host.UploadLoadLimit.Load(), candidateParent.Host.UploadPeerCount.Load())
return true
}

parents = append(parents, parent)
parentIDs = append(parentIDs, parent.ID)
candidateParents = append(candidateParents, candidateParent)
candidateParentIDs = append(candidateParentIDs, candidateParent.ID)
return true
})

peer.Log.Infof("candidate parents include %#v", parentIDs)
return parents
peer.Log.Infof("candidate parents include %#v", candidateParentIDs)
return candidateParents
}

// Construct peer successful packet
Expand Down
7 changes: 7 additions & 0 deletions scheduler/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
peer.FSM.SetState(resource.PeerStateRunning)
mockPeer.FSM.SetState(resource.PeerStateRunning)
peer.Task.BackToSourcePeers.Add(mockPeer)
mockPeer.IsBackToSource.Store(true)
peer.Task.StorePeer(mockPeer)
mockPeer.Pieces.Set(0)
peer.StoreStream(stream)
Expand Down Expand Up @@ -700,6 +701,8 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
peer.Task.StorePeer(stealPeer)
peer.Task.BackToSourcePeers.Add(mockPeer)
peer.Task.BackToSourcePeers.Add(stealPeer)
mockPeer.IsBackToSource.Store(true)
stealPeer.IsBackToSource.Store(true)
mockPeer.Pieces.Set(0)
peer.StoreStream(stream)
gomock.InOrder(
Expand Down Expand Up @@ -838,6 +841,8 @@ func TestScheduler_FindParent(t *testing.T) {
peer.Task.StorePeer(mockPeers[1])
peer.Task.BackToSourcePeers.Add(mockPeers[0])
peer.Task.BackToSourcePeers.Add(mockPeers[1])
mockPeers[0].IsBackToSource.Store(true)
mockPeers[1].IsBackToSource.Store(true)
mockPeers[0].Pieces.Set(0)
mockPeers[1].Pieces.Set(0)
mockPeers[1].Pieces.Set(1)
Expand Down Expand Up @@ -907,6 +912,8 @@ func TestScheduler_FindParent(t *testing.T) {
peer.Task.StorePeer(mockPeers[1])
peer.Task.BackToSourcePeers.Add(mockPeers[0])
peer.Task.BackToSourcePeers.Add(mockPeers[1])
mockPeers[0].IsBackToSource.Store(true)
mockPeers[1].IsBackToSource.Store(true)
mockPeers[0].Pieces.Set(0)
mockPeers[1].Pieces.Set(0)
mockPeers[1].Pieces.Set(1)
Expand Down

0 comments on commit f9ec2d3

Please sign in to comment.