Skip to content

Commit

Permalink
feat: optimize scheduler log (#1114)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Mar 2, 2022
1 parent 84b8bcd commit 2bb44c1
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 40 deletions.
5 changes: 3 additions & 2 deletions scheduler/resource/cdn.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,9 @@ func (c *cdn) TriggerTask(ctx context.Context, task *Task) (*Peer, *rpcscheduler
}
}

peer.Log.Infof("receive piece: %#v %#v", piece, piece.PieceInfo)

// Handle begin of piece
if piece.PieceInfo != nil && piece.PieceInfo.PieceNum == common.BeginOfPiece {
peer.Log.Infof("receive begin of piece from cdn: %#v %#v", piece, piece.PieceInfo)
if err := peer.FSM.Event(PeerEventDownload); err != nil {
return nil, nil, err
}
Expand All @@ -114,13 +113,15 @@ func (c *cdn) TriggerTask(ctx context.Context, task *Task) (*Peer, *rpcscheduler

// Handle end of piece
if piece.Done {
peer.Log.Infof("receive end of from cdn: %#v %#v", piece, piece.PieceInfo)
return peer, &rpcscheduler.PeerResult{
TotalPieceCount: piece.TotalPieceCount,
ContentLength: piece.ContentLength,
}, nil
}

// Handle piece download successfully
peer.Log.Infof("receive piece from cdn: %#v %#v", piece, piece.PieceInfo)
peer.Pieces.Set(uint(piece.PieceInfo.PieceNum))
// TODO(244372610) CDN should set piece cost
peer.AppendPieceCost(0)
Expand Down
8 changes: 4 additions & 4 deletions scheduler/scheduler/evaluator/evaluator_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func calculateMultiElementAffinityScore(dst, src string) float64 {

func (eb *evaluatorBase) IsBadNode(peer *resource.Peer) bool {
if peer.FSM.Is(resource.PeerStateFailed) || peer.FSM.Is(resource.PeerStateLeave) || peer.FSM.Is(resource.PeerStatePending) {
peer.Log.Infof("peer is bad node because peer status is %s", peer.FSM.Current())
peer.Log.Debugf("peer is bad node because peer status is %s", peer.FSM.Current())
return true
}

Expand All @@ -187,7 +187,7 @@ func (eb *evaluatorBase) IsBadNode(peer *resource.Peer) bool {
len := len(costs)
// Peer has not finished downloading enough piece
if len < minAvailableCostLen {
logger.Infof("peer %s has not finished downloading enough piece, it can't be bad node", peer.ID)
logger.Debugf("peer %s has not finished downloading enough piece, it can't be bad node", peer.ID)
return false
}

Expand All @@ -198,7 +198,7 @@ func (eb *evaluatorBase) IsBadNode(peer *resource.Peer) bool {
// if the last cost is twenty times more than mean, it is bad node.
if len < normalDistributionLen {
isBadNode := big.NewFloat(lastCost).Cmp(big.NewFloat(mean*20)) > 0
logger.Infof("peer %s mean is %.2f and it is bad node: %t", peer.ID, mean, isBadNode)
logger.Debugf("peer %s mean is %.2f and it is bad node: %t", peer.ID, mean, isBadNode)
return isBadNode
}

Expand All @@ -207,7 +207,7 @@ func (eb *evaluatorBase) IsBadNode(peer *resource.Peer) bool {
// refer to https://en.wikipedia.org/wiki/68%E2%80%9395%E2%80%9399.7_rule
stdev, _ := stats.StandardDeviation(costs[:len-1]) // nolint: errcheck
isBadNode := big.NewFloat(lastCost).Cmp(big.NewFloat(mean+3*stdev)) > 0
logger.Infof("peer %s meet the normal distribution, costs mean is %.2f and standard deviation is %.2f, peer is bad node: %t",
logger.Debugf("peer %s meet the normal distribution, costs mean is %.2f and standard deviation is %.2f, peer is bad node: %t",
peer.ID, mean, stdev, isBadNode)
return isBadNode
}
23 changes: 11 additions & 12 deletions scheduler/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ func (s *scheduler) ScheduleParent(ctx context.Context, peer *resource.Peer, blo
peer.Log.Errorf("send packet failed: %v", err)
return
}
peer.Log.Infof("peer scheduling %d times and back-to-source limit %d times, cdn peer is %#v, return code %d",
n, s.config.RetryBackSourceLimit, cdnPeer, base.Code_SchedNeedBackSource)
peer.Log.Infof("peer scheduling %d times and cdn peer is %#v, peer downloads back-to-source %d",
n, cdnPeer, base.Code_SchedNeedBackSource)

if err := peer.FSM.Event(resource.PeerEventDownloadFromBackToSource); err != nil {
peer.Log.Errorf("peer fsm event failed: %v", err)
Expand All @@ -122,7 +122,6 @@ func (s *scheduler) ScheduleParent(ctx context.Context, peer *resource.Peer, blo

// If the peer downloads back-to-source, its parent needs to be deleted
peer.DeleteParent()
peer.Task.Log.Info("peer back to source successfully")
return
}

Expand All @@ -139,7 +138,7 @@ func (s *scheduler) ScheduleParent(ctx context.Context, peer *resource.Peer, blo
peer.Log.Errorf("send packet failed: %v", err)
return
}
peer.Log.Infof("peer scheduling exceeds the limit %d times and return code %d", s.config.RetryLimit, base.Code_SchedTaskStatusError)
peer.Log.Errorf("peer scheduling exceeds the limit %d times and return code %d", s.config.RetryLimit, base.Code_SchedTaskStatusError)
return
}

Expand Down Expand Up @@ -244,45 +243,45 @@ func (s *scheduler) filterParents(peer *resource.Peer, blocklist set.SafeSet) []
}

if blocklist.Contains(parent.ID) {
peer.Log.Infof("parent %s is not selected because it is in blocklist", parent.ID)
peer.Log.Debugf("parent %s is not selected because it is in blocklist", parent.ID)
return true
}

if parent.ID == peer.ID {
peer.Log.Info("parent is not selected because it is same")
peer.Log.Debug("parent is not selected because it is same")
return true
}

if s.evaluator.IsBadNode(parent) {
peer.Log.Infof("parent %s is not selected because it is bad node", parent.ID)
peer.Log.Debugf("parent %s is not selected because it is bad node", parent.ID)
return true
}

peerChildCount := peer.ChildCount.Load()
parentDepth := parent.Depth()
if peerChildCount > 0 && parentDepth > defaultAvailableDepth {
peer.Log.Infof("peer has %d children and parent %s depth is %d", peerChildCount, parent.ID, parentDepth)
peer.Log.Debugf("peer has %d children and parent %s depth is %d", peerChildCount, parent.ID, parentDepth)
return true
}

peerDepth := peer.Depth()
if parentDepth+peerDepth > defaultDepthLimit {
peer.Log.Infof("exceeds the %d depth limit of the tree, peer depth is %d, parent %s is %d", defaultDepthLimit, peerDepth, parent.ID, parentDepth)
peer.Log.Debugf("exceeds the %d depth limit of the tree, peer depth is %d, parent %s is %d", defaultDepthLimit, peerDepth, parent.ID, parentDepth)
return true
}

if parent.IsDescendant(peer) {
peer.Log.Infof("parent %s is not selected because it is descendant", parent.ID)
peer.Log.Debugf("parent %s is not selected because it is descendant", parent.ID)
return true
}

if parent.IsAncestor(peer) {
peer.Log.Infof("parent %s is not selected because it is ancestor", parent.ID)
peer.Log.Debugf("parent %s is not selected because it is ancestor", parent.ID)
return true
}

if parent.Host.FreeUploadLoad() <= 0 {
peer.Log.Infof("parent %s is not selected because its free upload is empty", parent.ID)
peer.Log.Debugf("parent %s is not selected because its free upload is empty", parent.ID)
return true
}

Expand Down
44 changes: 22 additions & 22 deletions scheduler/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"fmt"
"io"
"time"

"d7y.io/dragonfly/v2/internal/dferrors"
logger "d7y.io/dragonfly/v2/internal/dflog"
Expand Down Expand Up @@ -80,7 +79,7 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa

// Task has been successful
if task.FSM.Is(resource.TaskStateSucceeded) {
peer.Log.Info("task has been successful")
peer.Log.Info("tasks can be reused")
sizeScope := task.SizeScope()
switch sizeScope {
case base.SizeScope_TINY:
Expand Down Expand Up @@ -198,7 +197,7 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa
}

// Task is unsuccessful
peer.Log.Info("task is unsuccessful and needs to be register")
peer.Log.Infof("task state is %s and needs to be register", task.FSM.Current())
if err := peer.FSM.Event(resource.PeerEventRegisterNormal); err != nil {
dferr := dferrors.New(base.Code_SchedError, err.Error())
peer.Log.Errorf("peer %s register is failed: %v", req.PeerId, err)
Expand Down Expand Up @@ -253,24 +252,25 @@ func (s *Service) ReportPieceResult(stream rpcscheduler.Scheduler_ReportPieceRes
defer peer.DeleteStream()
}

peer.Log.Infof("receive piece: %#v %#v", piece, piece.PieceInfo)

if piece.PieceInfo != nil {
// Handle begin of piece
if piece.PieceInfo.PieceNum == common.BeginOfPiece {
peer.Log.Infof("receive begin of piece: %#v %#v", piece, piece.PieceInfo)
s.handleBeginOfPiece(ctx, peer)
continue
}

// Handle end of piece
if piece.PieceInfo.PieceNum == common.EndOfPiece {
peer.Log.Infof("receive end of piece: %#v %#v", piece, piece.PieceInfo)
s.handleEndOfPiece(ctx, peer)
continue
}
}

// Handle piece download successfully
if piece.Success {
peer.Log.Infof("receive piece: %#v %#v", piece, piece.PieceInfo)
s.handlePieceSuccess(ctx, peer, piece)

// Collect peer host traffic metrics
Expand All @@ -287,37 +287,39 @@ func (s *Service) ReportPieceResult(stream rpcscheduler.Scheduler_ReportPieceRes

// Handle piece download code
if piece.Code != base.Code_Success {
// FIXME(244372610) When dfdaemon download peer return empty, retry later.
if piece.Code == base.Code_ClientWaitPieceReady {
peer.Log.Infof("receive piece code %d and wait for dfdaemon piece ready", piece.Code)
peer.Log.Debugf("receive piece code %d and wait for dfdaemon piece ready", piece.Code)
continue
}

// Handle piece download failed
peer.Log.Errorf("receive failed piece: %#v %#v", piece, piece.PieceInfo)
peer.Log.Errorf("receive failed piece: %#v", piece)
s.handlePieceFail(ctx, peer, piece)
continue
}

peer.Log.Warnf("receive unknow piece: %#v %#v", piece, piece.PieceInfo)
}
}

// ReportPeerResult handles peer result reported by dfdaemon
func (s *Service) ReportPeerResult(ctx context.Context, req *rpcscheduler.PeerResult) error {
peer, ok := s.resource.PeerManager().Load(req.PeerId)
if !ok {
logger.Errorf("report peer result: peer %s is not exists", req.PeerId)
logger.Errorf("report peer result and peer %s is not exists", req.PeerId)
return dferrors.Newf(base.Code_SchedPeerNotFound, "peer %s not found", req.PeerId)
}

if !req.Success {
peer.Log.Errorf("report peer result error: %#v", req)
peer.Log.Errorf("report peer failed result: %s %#v", req.Code, req)
if peer.FSM.Is(resource.PeerStateBackToSource) {
s.handleTaskFail(ctx, peer.Task)
}
s.handlePeerFail(ctx, peer)
return nil
}

peer.Log.Infof("report peer result request: %#v", req)
peer.Log.Infof("report peer result: %#v", req)
if peer.FSM.Is(resource.PeerStateBackToSource) {
s.handleTaskSuccess(ctx, peer.Task, req)
}
Expand All @@ -329,11 +331,11 @@ func (s *Service) ReportPeerResult(ctx context.Context, req *rpcscheduler.PeerRe
func (s *Service) LeaveTask(ctx context.Context, req *rpcscheduler.PeerTarget) error {
peer, ok := s.resource.PeerManager().Load(req.PeerId)
if !ok {
logger.Errorf("leave task: peer %s is not exists", req.PeerId)
logger.Errorf("leave task and peer %s is not exists", req.PeerId)
return dferrors.Newf(base.Code_SchedPeerNotFound, "peer %s not found", req.PeerId)
}

peer.Log.Infof("leave task request: %#v", req)
peer.Log.Infof("leave task: %#v", req)
if err := peer.FSM.Event(resource.PeerEventLeave); err != nil {
peer.Log.Errorf("peer fsm event failed: %v", err)
return dferrors.Newf(base.Code_SchedTaskStatusError, err.Error())
Expand Down Expand Up @@ -364,9 +366,6 @@ func (s *Service) registerTask(ctx context.Context, req *rpcscheduler.PeerTaskRe
task := resource.NewTask(idgen.TaskID(req.Url, req.UrlMeta), req.Url, s.config.Scheduler.BackSourceCount, req.UrlMeta)
task, loaded := s.resource.TaskManager().LoadOrStore(task)
if loaded && task.HasAvailablePeer() && (task.FSM.Is(resource.TaskStateRunning) || task.FSM.Is(resource.TaskStateSucceeded)) {
// Task is healthy and can be reused
task.UpdateAt.Store(time.Now())
task.Log.Infof("reuse task and status is %s", task.FSM.Current())
return task, nil
}

Expand Down Expand Up @@ -409,10 +408,10 @@ func (s *Service) registerPeer(ctx context.Context, req *rpcscheduler.PeerTaskRe
peer, loaded := s.resource.PeerManager().LoadOrStore(resource.NewPeer(req.PeerId, task, host))
if !loaded {
peer.Log.Info("create new peer")
} else {
peer.Log.Info("peer already exists")
return peer
}

peer.Log.Info("peer already exists")
return peer
}

Expand All @@ -427,6 +426,7 @@ func (s *Service) triggerCDNTask(ctx context.Context, task *resource.Task) {
}

// Update the task status first to help peer scheduling evaluation and scoring
peer.Log.Info("trigger cdn download task successfully")
s.handleTaskSuccess(ctx, task, endOfPiece)
s.handlePeerSuccess(ctx, peer)
}
Expand All @@ -436,7 +436,7 @@ func (s *Service) handleBeginOfPiece(ctx context.Context, peer *resource.Peer) {
switch peer.FSM.Current() {
case resource.PeerStateBackToSource:
// Back to the source download process, peer directly returns
peer.Log.Info("peer back to source")
peer.Log.Info("peer downloads back-to-source when receive the begin of piece")
return
case resource.PeerStateReceivedTiny:
// When the task is tiny,
Expand Down Expand Up @@ -464,7 +464,8 @@ func (s *Service) handleBeginOfPiece(ctx context.Context, peer *resource.Peer) {
// to help peer to schedule the parent node
blocklist := set.NewSafeSet()
blocklist.Add(peer.ID)
peer.Log.Infof("schedule parent because of peer receive beginOfPiece")

peer.Log.Infof("schedule parent because of peer receive begin of piece")
s.scheduler.ScheduleParent(ctx, peer, blocklist)
default:
peer.Log.Warnf("peer state is %s when receive the begin of piece", peer.FSM.Current())
Expand All @@ -491,7 +492,6 @@ func (s *Service) handlePieceSuccess(ctx context.Context, peer *resource.Peer, p
func (s *Service) handlePieceFail(ctx context.Context, peer *resource.Peer, piece *rpcscheduler.PieceResult) {
// Failed to download piece back-to-source
if peer.FSM.Is(resource.PeerStateBackToSource) {
peer.Log.Error("peer back to source finished with fail piece")
return
}

Expand Down Expand Up @@ -555,7 +555,7 @@ func (s *Service) handlePeerSuccess(ctx context.Context, peer *resource.Peer) {
// Tiny file downloaded successfully
peer.Task.DirectPiece = data
} else {
peer.Log.Warnf("download tiny file length is %d, task content length is %d, download is failed: %v", len(data), peer.Task.ContentLength.Load(), err)
peer.Log.Warnf("download tiny file length is %d, task content length is %d, downloading is failed: %v", len(data), peer.Task.ContentLength.Load(), err)
}
}

Expand Down

0 comments on commit 2bb44c1

Please sign in to comment.