Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AnnounceTask and StatTask tests #1254

Merged
merged 1 commit into from
Apr 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions scheduler/rpcserver/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,7 @@ func (s *Server) ReportPeerResult(ctx context.Context, req *scheduler.PeerResult
return new(empty.Empty), s.service.ReportPeerResult(ctx, req)
}

// LeaveTask makes the peer unschedulable
func (s *Server) LeaveTask(ctx context.Context, req *scheduler.PeerTarget) (*empty.Empty, error) {
return new(empty.Empty), s.service.LeaveTask(ctx, req)
}

// StatTask checks if the given task exists in P2P network
// StatTask checks if the given task exists
func (s *Server) StatTask(ctx context.Context, req *scheduler.StatTaskRequest) (*scheduler.Task, error) {
// TODO: add metrics
return s.service.StatTask(ctx, req)
Expand All @@ -99,3 +94,8 @@ func (s *Server) AnnounceTask(ctx context.Context, req *scheduler.AnnounceTaskRe
// TODO: add metrics
return new(empty.Empty), s.service.AnnounceTask(ctx, req)
}

// LeaveTask makes the peer unschedulable
func (s *Server) LeaveTask(ctx context.Context, req *scheduler.PeerTarget) (*empty.Empty, error) {
return new(empty.Empty), s.service.LeaveTask(ctx, req)
}
139 changes: 71 additions & 68 deletions scheduler/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa
// Register task and trigger cdn download task
task, err := s.registerTask(ctx, req)
if err != nil {
dferr := dferrors.New(base.Code_SchedTaskStatusError, "register task is fail")
logger.Errorf("peer %s register is failed: %v", req.PeerId, err)
return nil, dferr
msg := fmt.Sprintf("peer %s register is failed: %v", req.PeerId, err)
logger.Error(msg)
return nil, dferrors.New(base.Code_SchedTaskStatusError, msg)
}
host := s.registerHost(ctx, req.PeerHost)
peer := s.registerPeer(ctx, req.PeerId, task, host, req.UrlMeta.Tag)
Expand All @@ -96,9 +96,9 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa
peer.Log.Info("task size scope is tiny and return piece content directly")
if len(task.DirectPiece) > 0 && int64(len(task.DirectPiece)) == task.ContentLength.Load() {
if err := peer.FSM.Event(resource.PeerEventRegisterTiny); err != nil {
dferr := dferrors.New(base.Code_SchedError, err.Error())
peer.Log.Errorf("peer %s register is failed: %v", req.PeerId, err)
return nil, dferr
msg := fmt.Sprintf("peer %s register is failed: %v", req.PeerId, err)
peer.Log.Error(msg)
return nil, dferrors.New(base.Code_SchedError, msg)
}

return &rpcscheduler.RegisterResult{
Expand All @@ -121,9 +121,9 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa
if !ok {
peer.Log.Warn("task size scope is small and it can not select parent")
if err := peer.FSM.Event(resource.PeerEventRegisterNormal); err != nil {
dferr := dferrors.New(base.Code_SchedError, err.Error())
peer.Log.Errorf("peer %s register is failed: %v", req.PeerId, err)
return nil, dferr
msg := fmt.Sprintf("peer %s register is failed: %v", req.PeerId, err)
peer.Log.Error(msg)
return nil, dferrors.New(base.Code_SchedError, msg)
}

return &rpcscheduler.RegisterResult{
Expand All @@ -137,9 +137,9 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa
if !parent.FSM.Is(resource.PeerStateSucceeded) {
peer.Log.Infof("task size scope is small and download state %s is not PeerStateSucceeded", parent.FSM.Current())
if err := peer.FSM.Event(resource.PeerEventRegisterNormal); err != nil {
dferr := dferrors.New(base.Code_SchedError, err.Error())
peer.Log.Errorf("peer %s register is failed: %v", req.PeerId, err)
return nil, dferr
msg := fmt.Sprintf("peer %s register is failed: %v", req.PeerId, err)
peer.Log.Error(msg)
return nil, dferrors.New(base.Code_SchedError, msg)
}

return &rpcscheduler.RegisterResult{
Expand All @@ -152,9 +152,9 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa
if !ok {
peer.Log.Warn("task size scope is small and it can not get first piece")
if err := peer.FSM.Event(resource.PeerEventRegisterNormal); err != nil {
dferr := dferrors.New(base.Code_SchedError, err.Error())
peer.Log.Errorf("peer %s register is failed: %v", req.PeerId, err)
return nil, dferr
msg := fmt.Sprintf("peer %s register is failed: %v", req.PeerId, err)
peer.Log.Error(msg)
return nil, dferrors.New(base.Code_SchedError, msg)
}

return &rpcscheduler.RegisterResult{
Expand All @@ -165,9 +165,9 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa

peer.ReplaceParent(parent)
if err := peer.FSM.Event(resource.PeerEventRegisterSmall); err != nil {
dferr := dferrors.New(base.Code_SchedError, err.Error())
peer.Log.Errorf("peer %s register is failed: %v", req.PeerId, err)
return nil, dferr
msg := fmt.Sprintf("peer %s register is failed: %v", req.PeerId, err)
peer.Log.Error(msg)
return nil, dferrors.New(base.Code_SchedError, msg)
}

singlePiece := &rpcscheduler.SinglePiece{
Expand All @@ -194,9 +194,9 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa
default:
peer.Log.Info("task size scope is normal and needs to be register")
if err := peer.FSM.Event(resource.PeerEventRegisterNormal); err != nil {
dferr := dferrors.New(base.Code_SchedError, err.Error())
peer.Log.Errorf("peer %s register is failed: %v", req.PeerId, err)
return nil, dferr
msg := fmt.Sprintf("peer %s register is failed: %v", req.PeerId, err)
peer.Log.Error(msg)
return nil, dferrors.New(base.Code_SchedError, msg)
}

return &rpcscheduler.RegisterResult{
Expand All @@ -209,9 +209,9 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa
// Task is unsuccessful
peer.Log.Infof("task state is %s and needs to be register", task.FSM.Current())
if err := peer.FSM.Event(resource.PeerEventRegisterNormal); err != nil {
dferr := dferrors.New(base.Code_SchedError, err.Error())
peer.Log.Errorf("peer %s register is failed: %v", req.PeerId, err)
return nil, dferr
msg := fmt.Sprintf("peer %s register is failed: %v", req.PeerId, err)
peer.Log.Error(msg)
return nil, dferrors.New(base.Code_SchedError, msg)
}

return &rpcscheduler.RegisterResult{
Expand Down Expand Up @@ -252,9 +252,9 @@ func (s *Service) ReportPieceResult(stream rpcscheduler.Scheduler_ReportPieceRes
// Get peer from peer manager
peer, ok = s.resource.PeerManager().Load(piece.SrcPid)
if !ok {
dferr := dferrors.Newf(base.Code_SchedPeerNotFound, "peer %s not found", piece.SrcPid)
logger.Errorf("peer %s not found", piece.SrcPid)
return dferr
msg := fmt.Sprintf("peer %s not found", piece.SrcPid)
logger.Error(msg)
return dferrors.New(base.Code_SchedPeerNotFound, msg)
}

// Peer setting stream
Expand Down Expand Up @@ -323,8 +323,9 @@ func (s *Service) ReportPieceResult(stream rpcscheduler.Scheduler_ReportPieceRes
func (s *Service) ReportPeerResult(ctx context.Context, req *rpcscheduler.PeerResult) error {
peer, ok := s.resource.PeerManager().Load(req.PeerId)
if !ok {
logger.Errorf("report peer result and peer %s is not exists", req.PeerId)
return dferrors.Newf(base.Code_SchedPeerNotFound, "peer %s not found", req.PeerId)
msg := fmt.Sprintf("report peer result and peer %s is not exists", req.PeerId)
logger.Error(msg)
return dferrors.New(base.Code_SchedPeerNotFound, msg)
}

metrics.DownloadCount.WithLabelValues(peer.BizTag).Inc()
Expand Down Expand Up @@ -361,40 +362,6 @@ func (s *Service) ReportPeerResult(ctx context.Context, req *rpcscheduler.PeerRe
return nil
}

// LeaveTask makes the peer unschedulable
func (s *Service) LeaveTask(ctx context.Context, req *rpcscheduler.PeerTarget) error {
peer, ok := s.resource.PeerManager().Load(req.PeerId)
if !ok {
logger.Errorf("leave task and peer %s is not exists", req.PeerId)
return dferrors.Newf(base.Code_SchedPeerNotFound, "peer %s not found", req.PeerId)
}

metrics.LeaveTaskCount.WithLabelValues(peer.BizTag).Inc()

peer.Log.Infof("leave task: %#v", req)
if err := peer.FSM.Event(resource.PeerEventLeave); err != nil {
peer.Log.Errorf("peer fsm event failed: %v", err)

metrics.LeaveTaskFailureCount.WithLabelValues(peer.BizTag).Inc()
return dferrors.Newf(base.Code_SchedTaskStatusError, err.Error())
}

peer.Children.Range(func(_, value interface{}) bool {
child, ok := value.(*resource.Peer)
if !ok {
return true
}

// Reschedule a new parent to children of peer to exclude the current leave peer
child.Log.Infof("schedule parent because of parent peer %s is leaving", peer.ID)
s.scheduler.ScheduleParent(ctx, child, child.BlockPeers)
return true
})

s.resource.PeerManager().Delete(peer.ID)
return nil
}

// StatTask checks the current state of the task
func (s *Service) StatTask(ctx context.Context, req *rpcscheduler.StatTaskRequest) (*rpcscheduler.Task, error) {
task, loaded := s.resource.TaskManager().Load(req.TaskId)
Expand All @@ -404,7 +371,7 @@ func (s *Service) StatTask(ctx context.Context, req *rpcscheduler.StatTaskReques
return nil, dferrors.New(base.Code_PeerTaskNotFound, msg)
}

task.Log.Debug("task has been found in P2P network")
task.Log.Debug("task has been found")
return &rpcscheduler.Task{
Id: task.ID,
Type: int32(task.Type),
Expand Down Expand Up @@ -439,15 +406,15 @@ func (s *Service) AnnounceTask(ctx context.Context, req *rpcscheduler.AnnounceTa
if !task.FSM.Is(resource.TaskStateSucceeded) {
if task.FSM.Is(resource.TaskStatePending) {
if err := task.FSM.Event(resource.TaskEventDownload); err != nil {
msg := fmt.Sprintf("task fsm Pending -> Download event failed: %s", err)
msg := fmt.Sprintf("task fsm event failed: %v", err)
task.Log.Error(msg)
return dferrors.New(base.Code_SchedError, msg)
}
}

if task.FSM.Is(resource.TaskStateFailed) {
if err := task.FSM.Event(resource.TaskEventDownload); err != nil {
msg := fmt.Sprintf("task fsm Failed -> Download event failed: %s", err)
msg := fmt.Sprintf("task fsm event failed: %v", err)
task.Log.Error(msg)
return dferrors.New(base.Code_SchedError, msg)
}
Expand Down Expand Up @@ -477,7 +444,7 @@ func (s *Service) AnnounceTask(ctx context.Context, req *rpcscheduler.AnnounceTa
if !peer.FSM.Is(resource.PeerStateSucceeded) {
if peer.FSM.Is(resource.PeerStatePending) {
if err := peer.FSM.Event(resource.PeerEventRegisterNormal); err != nil {
msg := fmt.Sprintf("peer fsm Pending -> Normal event failed: %s", err)
msg := fmt.Sprintf("peer fsm event failed: %v", err)
task.Log.Error(msg)
return dferrors.New(base.Code_SchedError, msg)
}
Expand All @@ -487,7 +454,7 @@ func (s *Service) AnnounceTask(ctx context.Context, req *rpcscheduler.AnnounceTa
peer.FSM.Is(resource.PeerStateReceivedSmall) ||
peer.FSM.Is(resource.PeerStateReceivedNormal) {
if err := peer.FSM.Event(resource.PeerEventDownload); err != nil {
msg := fmt.Sprintf("peer fsm Normal -> Download event failed: %s", err)
msg := fmt.Sprintf("peer fsm event failed: %v", err)
task.Log.Error(msg)
return dferrors.New(base.Code_SchedError, msg)
}
Expand All @@ -499,6 +466,42 @@ func (s *Service) AnnounceTask(ctx context.Context, req *rpcscheduler.AnnounceTa
return nil
}

// LeaveTask makes the peer unschedulable
func (s *Service) LeaveTask(ctx context.Context, req *rpcscheduler.PeerTarget) error {
peer, ok := s.resource.PeerManager().Load(req.PeerId)
if !ok {
msg := fmt.Sprintf("leave task and peer %s is not exists", req.PeerId)
logger.Error(msg)
return dferrors.New(base.Code_SchedPeerNotFound, msg)
}

metrics.LeaveTaskCount.WithLabelValues(peer.BizTag).Inc()

peer.Log.Infof("leave task: %#v", req)
if err := peer.FSM.Event(resource.PeerEventLeave); err != nil {
metrics.LeaveTaskFailureCount.WithLabelValues(peer.BizTag).Inc()

msg := fmt.Sprintf("peer fsm event failed: %v", err)
peer.Log.Error(msg)
return dferrors.New(base.Code_SchedTaskStatusError, msg)
}

peer.Children.Range(func(_, value interface{}) bool {
child, ok := value.(*resource.Peer)
if !ok {
return true
}

// Reschedule a new parent to children of peer to exclude the current leave peer
child.Log.Infof("schedule parent because of parent peer %s is leaving", peer.ID)
s.scheduler.ScheduleParent(ctx, child, child.BlockPeers)
return true
})

s.resource.PeerManager().Delete(peer.ID)
return nil
}

// registerTask creates a new task or reuses a previous task
func (s *Service) registerTask(ctx context.Context, req *rpcscheduler.PeerTaskRequest) (*resource.Task, error) {
task := resource.NewTask(idgen.TaskID(req.Url, req.UrlMeta), req.Url, resource.TaskTypeNormal, req.UrlMeta, resource.WithBackToSourceLimit(int32(s.config.Scheduler.BackSourceCount)))
Expand Down
Loading