diff --git a/scheduler/resource/peer.go b/scheduler/resource/peer.go index 9b5cfbdb2..cf9a79be6 100644 --- a/scheduler/resource/peer.go +++ b/scheduler/resource/peer.go @@ -150,8 +150,19 @@ type Peer struct { BlockPeers set.SafeSet // NeedBackToSource needs downloaded from source + // + // When peer is registering, at the same time, + // scheduler needs to create the new corresponding task and the cdn is disabled, + // NeedBackToSource is set to true NeedBackToSource *atomic.Bool + // IsBackToSource is downloaded from source + // + // When peer is scheduling and NeedBackToSource is true, + // scheduler needs to return Code_SchedNeedBackSource and + // IsBackToSource is set to true + IsBackToSource *atomic.Bool + // CreateAt is peer create time CreateAt *atomic.Time @@ -181,6 +192,7 @@ func NewPeer(id string, task *Task, host *Host, options ...PeerOption) *Peer { StealPeers: set.NewSafeSet(), BlockPeers: set.NewSafeSet(), NeedBackToSource: atomic.NewBool(false), + IsBackToSource: atomic.NewBool(false), CreateAt: atomic.NewTime(time.Now()), UpdateAt: atomic.NewTime(time.Now()), mu: &sync.RWMutex{}, @@ -226,6 +238,7 @@ func NewPeer(id string, task *Task, host *Host, options ...PeerOption) *Peer { p.Log.Infof("peer state is %s", e.FSM.Current()) }, PeerEventDownloadFromBackToSource: func(e *fsm.Event) { + p.IsBackToSource.Store(true) p.Task.BackToSourcePeers.Add(p) p.DeleteParent() p.Host.DeletePeer(p.ID) diff --git a/scheduler/scheduler/scheduler.go b/scheduler/scheduler/scheduler.go index 7be79dbc3..8f0daa2e4 100644 --- a/scheduler/scheduler/scheduler.go +++ b/scheduler/scheduler/scheduler.go @@ -159,19 +159,19 @@ func (s *scheduler) NotifyAndFindParent(ctx context.Context, peer *resource.Peer return []*resource.Peer{}, false } - // Find the parent that can be scheduled - parents := s.filterParents(peer, blocklist) - if len(parents) == 0 { - peer.Log.Info("can not find parents") + // Find the candidate parent that can be scheduled + candidateParents := s.filterCandidateParents(peer, blocklist) + if len(candidateParents) == 0 { + peer.Log.Info("can not find candidate parents") return []*resource.Peer{}, false } - // Sort parents by evaluation score + // Sort candidate parents by evaluation score taskTotalPieceCount := peer.Task.TotalPieceCount.Load() sort.Slice( - parents, + candidateParents, func(i, j int) bool { - return s.evaluator.Evaluate(parents[i], peer, taskTotalPieceCount) > s.evaluator.Evaluate(parents[j], peer, taskTotalPieceCount) + return s.evaluator.Evaluate(candidateParents[i], peer, taskTotalPieceCount) > s.evaluator.Evaluate(candidateParents[j], peer, taskTotalPieceCount) }, ) @@ -182,56 +182,56 @@ func (s *scheduler) NotifyAndFindParent(ctx context.Context, peer *resource.Peer return []*resource.Peer{}, false } - if err := stream.Send(constructSuccessPeerPacket(s.dynconfig, peer, parents[0], parents[1:])); err != nil { + if err := stream.Send(constructSuccessPeerPacket(s.dynconfig, peer, candidateParents[0], candidateParents[1:])); err != nil { peer.Log.Error(err) return []*resource.Peer{}, false } // Add steal peers to current peer peer.StealPeers.Clear() - for _, parent := range parents[1:] { - peer.StealPeers.Add(parent.ID) + for _, candidateParent := range candidateParents[1:] { + peer.StealPeers.Add(candidateParent.ID) } // Replace peer's parent with scheduled parent - peer.ReplaceParent(parents[0]) + peer.ReplaceParent(candidateParents[0]) peer.Log.Infof("schedule parent successful, replace parent to %s and steal peers is %v", - parents[0].ID, peer.StealPeers.Values()) + candidateParents[0].ID, peer.StealPeers.Values()) peer.Log.Debugf("peer ancestors is %v", peer.Ancestors()) - return parents, true + return candidateParents, true } // FindParent finds parent that best matches the evaluation func (s *scheduler) FindParent(ctx context.Context, peer *resource.Peer, blocklist set.SafeSet) (*resource.Peer, bool) { - // Filter the parent that can be scheduled - parents := s.filterParents(peer, blocklist) - if len(parents) == 0 { - peer.Log.Info("can not find parents") + // Filter the candidate parent that can be scheduled + candidateParents := s.filterCandidateParents(peer, blocklist) + if len(candidateParents) == 0 { + peer.Log.Info("can not find candidate parents") return nil, false } - // Sort parents by evaluation score + // Sort candidate parents by evaluation score taskTotalPieceCount := peer.Task.TotalPieceCount.Load() sort.Slice( - parents, + candidateParents, func(i, j int) bool { - return s.evaluator.Evaluate(parents[i], peer, taskTotalPieceCount) > s.evaluator.Evaluate(parents[j], peer, taskTotalPieceCount) + return s.evaluator.Evaluate(candidateParents[i], peer, taskTotalPieceCount) > s.evaluator.Evaluate(candidateParents[j], peer, taskTotalPieceCount) }, ) - peer.Log.Infof("find parent %s successful", parents[0].ID) - return parents[0], true + peer.Log.Infof("find parent %s successful", candidateParents[0].ID) + return candidateParents[0], true } -// Filter the parent that can be scheduled -func (s *scheduler) filterParents(peer *resource.Peer, blocklist set.SafeSet) []*resource.Peer { +// Filter the candidate parent that can be scheduled +func (s *scheduler) filterCandidateParents(peer *resource.Peer, blocklist set.SafeSet) []*resource.Peer { filterParentLimit := config.DefaultSchedulerFilterParentLimit if config, ok := s.dynconfig.GetSchedulerClusterConfig(); ok && filterParentLimit > 0 { filterParentLimit = int(config.FilterParentLimit) } - var parents []*resource.Peer - var parentIDs []string + var candidateParents []*resource.Peer + var candidateParentIDs []string var n int peer.Task.Peers.Range(func(_, value interface{}) bool { if n > filterParentLimit { @@ -239,70 +239,82 @@ func (s *scheduler) filterParents(peer *resource.Peer, blocklist set.SafeSet) [] } n++ - parent, ok := value.(*resource.Peer) + candidateParent, ok := value.(*resource.Peer) if !ok { return true } - if blocklist.Contains(parent.ID) { - peer.Log.Debugf("parent %s is not selected because it is in blocklist", parent.ID) + // Candidate parent is in blocklist + if blocklist.Contains(candidateParent.ID) { + peer.Log.Debugf("candidate parent %s is not selected because it is in blocklist", candidateParent.ID) return true } - if parent.ID == peer.ID { - peer.Log.Debug("parent is not selected because it is same") + // Candidate parent is itself + if candidateParent.ID == peer.ID { + peer.Log.Debug("candidate parent is not selected because it is same") return true } - if s.evaluator.IsBadNode(parent) { - peer.Log.Debugf("parent %s is not selected because it is bad node", parent.ID) + // Candidate parent is bad node + if s.evaluator.IsBadNode(candidateParent) { + peer.Log.Debugf("candidate parent %s is not selected because it is bad node", candidateParent.ID) return true } - _, ok = parent.LoadParent() - isBackToSource := peer.Task.BackToSourcePeers.Contains(parent) - if !ok && !parent.Host.IsCDN && !isBackToSource { - peer.Log.Debugf("parent %s is not selected, because its download state is %t %t %t", - parent.ID, ok, parent.Host.IsCDN, isBackToSource) + // Conditions for candidate parent to be a parent: + // 1. candidate parent has parent + // 2. candidate parent is CDN + // 3. candidate parent has been back-to-source + _, ok = candidateParent.LoadParent() + isBackToSource := candidateParent.IsBackToSource.Load() + if !ok && !candidateParent.Host.IsCDN && !isBackToSource { + peer.Log.Debugf("candidate parent %s is not selected, because its download state is %t %t %t", + candidateParent.ID, ok, candidateParent.Host.IsCDN, isBackToSource) return true } + // Candidate parent's depth exceeds available depth peerChildCount := peer.ChildCount.Load() - parentDepth := parent.Depth() + parentDepth := candidateParent.Depth() if peerChildCount > 0 && parentDepth > defaultAvailableDepth { - peer.Log.Debugf("peer has %d children and parent %s depth is %d", peerChildCount, parent.ID, parentDepth) + peer.Log.Debugf("candidate peer has %d children and parent %s depth is %d", peerChildCount, candidateParent.ID, parentDepth) return true } + // Peer's depth exceeds limit depth peerDepth := peer.Depth() if parentDepth+peerDepth > defaultDepthLimit { - peer.Log.Debugf("exceeds the %d depth limit of the tree, peer depth is %d, parent %s is %d", defaultDepthLimit, peerDepth, parent.ID, parentDepth) + peer.Log.Debugf("exceeds the %d depth limit of the tree, peer depth is %d, candidate parent %s is %d", defaultDepthLimit, peerDepth, candidateParent.ID, parentDepth) return true } - if parent.IsDescendant(peer) { - peer.Log.Debugf("parent %s is not selected because it is descendant", parent.ID) + // Candidate parent is an descendant of peer + if candidateParent.IsDescendant(peer) { + peer.Log.Debugf("candidate parent %s is not selected because it is descendant", candidateParent.ID) return true } - if parent.IsAncestor(peer) { - peer.Log.Debugf("parent %s is not selected because it is ancestor", parent.ID) + // Candidate parent is an ancestor of peer + if candidateParent.IsAncestor(peer) { + peer.Log.Debugf("candidate parent %s is not selected because it is ancestor", candidateParent.ID) return true } - if parent.Host.FreeUploadLoad() <= 0 { - peer.Log.Debugf("parent %s is not selected because its free upload is empty, upload limit is %d, upload peer count is %d", - parent.ID, parent.Host.UploadLoadLimit.Load(), parent.Host.UploadPeerCount.Load()) + // Candidate parent's free upload is empty + if candidateParent.Host.FreeUploadLoad() <= 0 { + peer.Log.Debugf("candidate parent %s is not selected because its free upload is empty, upload limit is %d, upload peer count is %d", + candidateParent.ID, candidateParent.Host.UploadLoadLimit.Load(), candidateParent.Host.UploadPeerCount.Load()) return true } - parents = append(parents, parent) - parentIDs = append(parentIDs, parent.ID) + candidateParents = append(candidateParents, candidateParent) + candidateParentIDs = append(candidateParentIDs, candidateParent.ID) return true }) - peer.Log.Infof("candidate parents include %#v", parentIDs) - return parents + peer.Log.Infof("candidate parents include %#v", candidateParentIDs) + return candidateParents } // Construct peer successful packet diff --git a/scheduler/scheduler/scheduler_test.go b/scheduler/scheduler/scheduler_test.go index 494645cf1..cc1684aba 100644 --- a/scheduler/scheduler/scheduler_test.go +++ b/scheduler/scheduler/scheduler_test.go @@ -671,6 +671,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) { peer.FSM.SetState(resource.PeerStateRunning) mockPeer.FSM.SetState(resource.PeerStateRunning) peer.Task.BackToSourcePeers.Add(mockPeer) + mockPeer.IsBackToSource.Store(true) peer.Task.StorePeer(mockPeer) mockPeer.Pieces.Set(0) peer.StoreStream(stream) @@ -700,6 +701,8 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) { peer.Task.StorePeer(stealPeer) peer.Task.BackToSourcePeers.Add(mockPeer) peer.Task.BackToSourcePeers.Add(stealPeer) + mockPeer.IsBackToSource.Store(true) + stealPeer.IsBackToSource.Store(true) mockPeer.Pieces.Set(0) peer.StoreStream(stream) gomock.InOrder( @@ -838,6 +841,8 @@ func TestScheduler_FindParent(t *testing.T) { peer.Task.StorePeer(mockPeers[1]) peer.Task.BackToSourcePeers.Add(mockPeers[0]) peer.Task.BackToSourcePeers.Add(mockPeers[1]) + mockPeers[0].IsBackToSource.Store(true) + mockPeers[1].IsBackToSource.Store(true) mockPeers[0].Pieces.Set(0) mockPeers[1].Pieces.Set(0) mockPeers[1].Pieces.Set(1) @@ -907,6 +912,8 @@ func TestScheduler_FindParent(t *testing.T) { peer.Task.StorePeer(mockPeers[1]) peer.Task.BackToSourcePeers.Add(mockPeers[0]) peer.Task.BackToSourcePeers.Add(mockPeers[1]) + mockPeers[0].IsBackToSource.Store(true) + mockPeers[1].IsBackToSource.Store(true) mockPeers[0].Pieces.Set(0) mockPeers[1].Pieces.Set(0) mockPeers[1].Pieces.Set(1)