Skip to content
This repository has been archived by the owner on Dec 20, 2024. It is now read-only.

Commit

Permalink
Merge pull request #499 from Starnop/scheduer-mgr
Browse files Browse the repository at this point in the history
feature: implement the scheculer mgr
  • Loading branch information
lowzj authored Apr 22, 2019
2 parents 7bd00c4 + cf3c601 commit 477738f
Show file tree
Hide file tree
Showing 5 changed files with 386 additions and 3 deletions.
2 changes: 2 additions & 0 deletions common/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ const (
codeCDNFail
codeCDNWait
codePeerWait
codeUnknowError
codePeerContinue
)

// DfError represents a Dragonfly error.
Expand Down
17 changes: 17 additions & 0 deletions common/errors/supernode_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ var (

// ErrPeerWait represents the peer should wait.
ErrPeerWait = DfError{codePeerWait, "peer should wait"}

// ErrUnknowError represents the error should not happen
// and the cause of that is unknown.
ErrUnknowError = DfError{codeUnknowError, "unknow error"}

// PeerContinue represents the peer should wait.
PeerContinue = DfError{codePeerContinue, "peer continue"}
)

// IsSystemError check the error is a system error or not.
Expand All @@ -50,3 +57,13 @@ func IsCDNWait(err error) bool {
func IsPeerWait(err error) bool {
return checkError(err, codePeerWait)
}

// IsUnknowError check the error is UnknowError or not.
func IsUnknowError(err error) bool {
return checkError(err, codeUnknowError)
}

// IsPeerContinue check the error is PeerContinue or not.
func IsPeerContinue(err error) bool {
return checkError(err, codePeerContinue)
}
3 changes: 3 additions & 0 deletions supernode/config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,7 @@ const (

// PeerUpLimit indicates the limit of the load count as a server.
PeerUpLimit = 5

// PeerDownLimit indicates the limit of the download task count as a client.
PeerDownLimit = 4
)
226 changes: 223 additions & 3 deletions supernode/daemon/mgr/scheduler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,23 @@ package scheduler

import (
"context"
"math/rand"
"sort"
"time"

errorType "github.com/dragonflyoss/Dragonfly/common/errors"
cutil "github.com/dragonflyoss/Dragonfly/common/util"
"github.com/dragonflyoss/Dragonfly/supernode/config"
"github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

func init() {
rand.Seed(time.Now().UnixNano())
}

var _ mgr.SchedulerMgr = &Manager{}

// Manager is an implement of the interface of SchedulerMgr.
Expand All @@ -14,13 +27,220 @@ type Manager struct {
}

// NewManager returns a new Manager.
func NewManager(progressMgr mgr.ProgressMgr) *Manager {
func NewManager(progressMgr mgr.ProgressMgr) (*Manager, error) {
return &Manager{
progressMgr: progressMgr,
}
}, nil
}

// Schedule gets scheduler result with specified taskID, clientID and peerID through some rules.
func (sm *Manager) Schedule(ctx context.Context, taskID, clientID, peerID string) ([]*mgr.PieceResult, error) {
return nil, nil
// get available pieces
pieceAvailable, err := sm.progressMgr.GetPieceProgressByCID(ctx, taskID, clientID, "available")
if err != nil {
return nil, err
}
if len(pieceAvailable) == 0 {
return nil, errors.Wrapf(errorType.ErrPeerWait, "taskID: %s", taskID)
}

// get runnning pieces
pieceRunning, err := sm.progressMgr.GetPieceProgressByCID(ctx, taskID, clientID, "running")
if err != nil {
return nil, err
}
runningCount := len(pieceRunning)
if runningCount > config.PeerDownLimit {
return nil, errors.Wrapf(errorType.PeerContinue, "taskID: %s,clientID: %s", taskID, clientID)
}

// prioritize pieces
pieceNums, err := sm.sort(ctx, pieceAvailable, pieceRunning, taskID)
if err != nil {
return nil, err
}

return sm.getPieceResults(ctx, taskID, peerID, pieceNums, runningCount)
}

func (sm *Manager) sort(ctx context.Context, pieceNums, runningPieces []int, taskID string) ([]int, error) {
pieceCountMap, err := sm.getPieceCountMap(ctx, pieceNums, taskID)
if err != nil {
return nil, err
}

sm.sortExecutor(ctx, pieceNums, getCenterNum(runningPieces), pieceCountMap)
return pieceNums, nil
}

func (sm *Manager) getPieceCountMap(ctx context.Context, pieceNums []int, taskID string) (map[int]int, error) {
pieceCountMap := make(map[int]int)
for i := 0; i < len(pieceNums); i++ {
// NOTE: should we return errors here or just record an error log?
peerIDs, err := sm.progressMgr.GetPeerIDsByPieceNum(ctx, taskID, pieceNums[i])
if err != nil {
return nil, err
}
pieceCountMap[pieceNums[i]] = len(peerIDs)
}
return pieceCountMap, nil
}

// sortExecutor sorts the pieces by distributedCount and the distance to center value of running piece nums.
func (sm *Manager) sortExecutor(ctx context.Context, pieceNums []int, centerNum int, pieceCountMap map[int]int) {
if len(pieceNums) == 0 || len(pieceCountMap) == 0 {
return
}

sort.Slice(pieceNums, func(i, j int) bool {
// sort by distributedCount to ensure that
// the least distributed pieces in the network are prioritized
if pieceCountMap[pieceNums[i]] < pieceCountMap[pieceNums[j]] {
return true
}

if pieceCountMap[pieceNums[i]] > pieceCountMap[pieceNums[j]] {
return false
}

// sort by piece distance when multiple pieces have the same distributedCount
if abs(pieceNums[i]-centerNum) < abs(pieceNums[j]-centerNum) {
return true
}

// randomly choose whether to exchange when the distance to center value is equal
if abs(pieceNums[i]-centerNum) == abs(pieceNums[j]-centerNum) {
randNum := rand.Intn(2)
if randNum == 0 {
return true
}
}
return false
})
}

func (sm *Manager) getPieceResults(ctx context.Context, taskID, peerID string, pieceNums []int, runningCount int) ([]*mgr.PieceResult, error) {
// validate ClientErrorCount
var useSupernode bool
srcPeerState, err := sm.progressMgr.GetPeerStateByPeerID(ctx, peerID)
if err != nil {
return nil, err
}
if srcPeerState.ClientErrorCount > config.FailCountLimit {
logrus.Warnf("peerID: %s got errors for %d times which reaches error limit: %d", peerID, srcPeerState.ClientErrorCount, config.FailCountLimit)
useSupernode = true
}

pieceResults := make([]*mgr.PieceResult, 0)
for i := 0; i < len(pieceNums); i++ {
var dstPID string
if useSupernode {
dstPID = getSupernodePID()
} else {
// get peerIDs by pieceNum
peerIDs, err := sm.progressMgr.GetPeerIDsByPieceNum(ctx, taskID, pieceNums[i])
if err != nil {
return nil, errors.Wrapf(errorType.ErrUnknowError, "failed to get peerIDs for pieceNum: %d of taskID: %s", pieceNums[i], taskID)
}
dstPID = sm.tryGetPID(ctx, taskID, pieceNums[i], peerIDs)
}

if dstPID == "" {
continue
}

pieceResults = append(pieceResults, &mgr.PieceResult{
TaskID: taskID,
PieceNum: pieceNums[i],
DstPID: dstPID,
})

runningCount++
if runningCount >= config.PeerDownLimit {
break
}
}

return pieceResults, nil
}

// tryGetPID returns a available dstPID from ps.pieceContainer.
func (sm *Manager) tryGetPID(ctx context.Context, taskID string, pieceNum int, peerIDs []string) (dstPID string) {
defer func() {
if dstPID == "" {
dstPID = getSupernodePID()
}
}()

for i := 0; i < len(peerIDs); i++ {
// if failed to get peerState, and then it should not be needed.
peerState, err := sm.progressMgr.GetPeerStateByPeerID(ctx, peerIDs[i])
if err != nil {
sm.deletePeerIDByPieceNum(ctx, taskID, pieceNum, peerIDs[i])
continue
}

// if the service has been down, and then it should not be needed.
if peerState.ServiceDownTime > 0 {
sm.deletePeerIDByPieceNum(ctx, taskID, pieceNum, peerIDs[i])
continue
}

// if service has failed for EliminationLimit times, and then it should not be needed.
if peerState.ServiceErrorCount >= config.EliminationLimit {
sm.deletePeerIDByPieceNum(ctx, taskID, pieceNum, peerIDs[i])
continue
}

// if the v is in the blackList, try the next one.
blackInfo, err := sm.progressMgr.GetBlackInfoByPeerID(ctx, peerIDs[i])
if blackInfo != nil && isExistInMap(blackInfo, peerIDs[i]) {
continue
}

if peerState.ProducerLoad < config.PeerUpLimit {
return peerIDs[i]
}
}
return
}

func (sm *Manager) deletePeerIDByPieceNum(ctx context.Context, taskID string, pieceNum int, peerID string) {
if err := sm.progressMgr.DeletePeerIDByPieceNum(ctx, taskID, pieceNum, peerID); err != nil {
logrus.Warnf("failed to delete the peerID %s for pieceNum %d of taskID: %s", peerID, pieceNum, taskID)
}
}

// isExistInMap returns whether the key exists in the mmap
func isExistInMap(mmap *cutil.SyncMap, key string) bool {
if mmap == nil {
return false
}
_, err := mmap.Get(key)
return err == nil
}

// get the center value of the piece num being downloaded
func getCenterNum(runningPieces []int) int {
if len(runningPieces) == 0 {
return 0
}

totalDistance := 0
for i := 0; i < len(runningPieces); i++ {
totalDistance += runningPieces[i]
}
return totalDistance / (len(runningPieces))
}

// TODO: return supernode peerID
func getSupernodePID() string {
return ""
}

func abs(i int) int {
if i < 0 {
return -i
}
return i
}
Loading

0 comments on commit 477738f

Please sign in to comment.