From 564290c9121c0a54c7cbbbfda8a806e95e49ad44 Mon Sep 17 00:00:00 2001 From: Starnop Date: Thu, 25 Jul 2019 15:26:54 +0800 Subject: [PATCH] feature: add gc mgr for supernode Signed-off-by: Starnop --- cmd/supernode/app/root.go | 2 + .../downloader/p2p_downloader/power_client.go | 4 +- pkg/syncmap/syncmap.go | 57 ++++++++--- supernode/config/config.go | 19 +++- supernode/config/constants.go | 22 +++++ supernode/daemon/mgr/cdn/manager.go | 9 +- supernode/daemon/mgr/cdn/piece_md5_mgr.go | 4 + supernode/daemon/mgr/cdn_mgr.go | 5 +- supernode/daemon/mgr/dfget_task_mgr.go | 6 ++ supernode/daemon/mgr/dfgettask/manager.go | 66 ++++++++++++- supernode/daemon/mgr/gc/gc_dfget_task.go | 43 ++++++++ supernode/daemon/mgr/gc/gc_manager.go | 65 ++++++++++++ supernode/daemon/mgr/gc/gc_peer.go | 88 +++++++++++++++++ supernode/daemon/mgr/gc/gc_task.go | 98 +++++++++++++++++++ supernode/daemon/mgr/gc_mgr.go | 17 ++++ supernode/daemon/mgr/mock/mock_cdn_mgr.go | 8 +- .../daemon/mgr/mock/mock_dfget_task_mgr.go | 30 ++++++ supernode/daemon/mgr/mock/mock_peer_mgr.go | 14 +++ .../daemon/mgr/mock/mock_progress_mgr.go | 70 +++++++++---- supernode/daemon/mgr/peer/manager.go | 10 ++ supernode/daemon/mgr/peer_mgr.go | 3 + .../daemon/mgr/progress/progress_delete.go | 72 ++++++++++++++ .../daemon/mgr/progress/progress_manager.go | 47 +++------ .../daemon/mgr/progress/state_sync_map.go | 35 ++----- supernode/daemon/mgr/progress_mgr.go | 21 ++-- supernode/daemon/mgr/scheduler/manager.go | 2 +- supernode/daemon/mgr/task/manager.go | 25 +++-- supernode/daemon/mgr/task/manager_util.go | 12 +-- supernode/daemon/mgr/task_mgr.go | 1 - supernode/daemon/util/store.go | 34 ++----- supernode/server/0.3_bridge.go | 18 +--- supernode/server/server.go | 11 +++ supernode/util/locker.go | 12 +++ 33 files changed, 756 insertions(+), 174 deletions(-) create mode 100644 supernode/daemon/mgr/gc/gc_dfget_task.go create mode 100644 supernode/daemon/mgr/gc/gc_manager.go create mode 100644 supernode/daemon/mgr/gc/gc_peer.go create mode 100644 supernode/daemon/mgr/gc/gc_task.go create mode 100644 supernode/daemon/mgr/gc_mgr.go create mode 100644 supernode/daemon/mgr/progress/progress_delete.go diff --git a/cmd/supernode/app/root.go b/cmd/supernode/app/root.go index ca8a9cc8a..6dbf823ca 100644 --- a/cmd/supernode/app/root.go +++ b/cmd/supernode/app/root.go @@ -126,6 +126,8 @@ func runSuperNode() error { // set up the CIDPrefix cfg.SetCIDPrefix(cfg.AdvertiseIP) + logrus.Debugf("get supernode config: %+v", cfg) + logrus.Info("start to run supernode") d, err := daemon.New(cfg) diff --git a/dfget/core/downloader/p2p_downloader/power_client.go b/dfget/core/downloader/p2p_downloader/power_client.go index d7f125612..ef0cf2fce 100644 --- a/dfget/core/downloader/p2p_downloader/power_client.go +++ b/dfget/core/downloader/p2p_downloader/power_client.go @@ -81,8 +81,8 @@ func (pc *PowerClient) Run() error { pc.readCost.Seconds(), pc.total) if err != nil { - logrus.Errorf("read piece cont error:%v from dst:%s:%d, wait 20 ms", - err, pc.pieceTask.PeerIP, pc.pieceTask.PeerPort) + logrus.Errorf("failed to read piece cont(%s) from dst:%s:%d, wait 20 ms: %v", + pc.pieceTask.Range, pc.pieceTask.PeerIP, pc.pieceTask.PeerPort, err) time.AfterFunc(time.Millisecond*20, func() { pc.queue.Put(pc.failPiece()) }) diff --git a/pkg/syncmap/syncmap.go b/pkg/syncmap/syncmap.go index 4874e7cea..ad83b832b 100644 --- a/pkg/syncmap/syncmap.go +++ b/pkg/syncmap/syncmap.go @@ -19,6 +19,7 @@ package syncmap import ( "strconv" "sync" + "time" "github.com/dragonflyoss/Dragonfly/pkg/atomiccount" "github.com/dragonflyoss/Dragonfly/pkg/errortypes" @@ -60,7 +61,7 @@ func (mmap *SyncMap) Get(key string) (interface{}, error) { return v, nil } - return nil, errors.Wrapf(errortypes.ErrDataNotFound, "key: %s", key) + return nil, errors.Wrapf(errortypes.ErrDataNotFound, "failed to get key %s from map", key) } // GetAsBitset returns result as *bitset.BitSet. @@ -68,13 +69,13 @@ func (mmap *SyncMap) Get(key string) (interface{}, error) { func (mmap *SyncMap) GetAsBitset(key string) (*bitset.BitSet, error) { v, err := mmap.Get(key) if err != nil { - return nil, errors.Wrapf(err, "key: %s", key) + return nil, errors.Wrapf(err, "failed to get key %s from map", key) } if value, ok := v.(*bitset.BitSet); ok { return value, nil } - return nil, errors.Wrapf(errortypes.ErrConvertFailed, "key %s: %v", key, v) + return nil, errors.Wrapf(errortypes.ErrConvertFailed, "failed to get key %s from map with value %s", key, v) } // GetAsMap returns result as SyncMap. @@ -82,13 +83,13 @@ func (mmap *SyncMap) GetAsBitset(key string) (*bitset.BitSet, error) { func (mmap *SyncMap) GetAsMap(key string) (*SyncMap, error) { v, err := mmap.Get(key) if err != nil { - return nil, errors.Wrapf(err, "key: %s", key) + return nil, errors.Wrapf(err, "failed to get key %s from map", key) } if value, ok := v.(*SyncMap); ok { return value, nil } - return nil, errors.Wrapf(errortypes.ErrConvertFailed, "key %s: %v", key, v) + return nil, errors.Wrapf(errortypes.ErrConvertFailed, "failed to get key %s from map with value %s", key, v) } // GetAsInt returns result as int. @@ -96,13 +97,27 @@ func (mmap *SyncMap) GetAsMap(key string) (*SyncMap, error) { func (mmap *SyncMap) GetAsInt(key string) (int, error) { v, err := mmap.Get(key) if err != nil { - return 0, errors.Wrapf(err, "key: %s", key) + return 0, errors.Wrapf(err, "failed to get key %s from map", key) } if value, ok := v.(int); ok { return value, nil } - return 0, errors.Wrapf(errortypes.ErrConvertFailed, "key %s: %v", key, v) + return 0, errors.Wrapf(errortypes.ErrConvertFailed, "failed to get key %s from map with value %s", key, v) +} + +// GetAsInt64 returns result as int64. +// The ErrConvertFailed error will be returned if the assertion fails. +func (mmap *SyncMap) GetAsInt64(key string) (int64, error) { + v, err := mmap.Get(key) + if err != nil { + return 0, errors.Wrapf(err, "failed to get key %s from map", key) + } + + if value, ok := v.(int64); ok { + return value, nil + } + return 0, errors.Wrapf(errortypes.ErrConvertFailed, "failed to get key %s from map with value %s", key, v) } // GetAsString returns result as string. @@ -110,13 +125,13 @@ func (mmap *SyncMap) GetAsInt(key string) (int, error) { func (mmap *SyncMap) GetAsString(key string) (string, error) { v, err := mmap.Get(key) if err != nil { - return "", errors.Wrapf(err, "key: %s", key) + return "", errors.Wrapf(err, "failed to get key %s from map", key) } if value, ok := v.(string); ok { return value, nil } - return "", errors.Wrapf(errortypes.ErrConvertFailed, "key %s: %v", key, v) + return "", errors.Wrapf(errortypes.ErrConvertFailed, "failed to get key %s from map with value %s", key, v) } // GetAsBool returns result as bool. @@ -124,13 +139,13 @@ func (mmap *SyncMap) GetAsString(key string) (string, error) { func (mmap *SyncMap) GetAsBool(key string) (bool, error) { v, err := mmap.Get(key) if err != nil { - return false, errors.Wrapf(err, "key: %s", key) + return false, errors.Wrapf(err, "failed to get key %s from map", key) } if value, ok := v.(bool); ok { return value, nil } - return false, errors.Wrapf(errortypes.ErrConvertFailed, "key %s: %v", key, v) + return false, errors.Wrapf(errortypes.ErrConvertFailed, "failed to get key %s from map with value %s", key, v) } // GetAsAtomicInt returns result as *AtomicInt. @@ -138,13 +153,27 @@ func (mmap *SyncMap) GetAsBool(key string) (bool, error) { func (mmap *SyncMap) GetAsAtomicInt(key string) (*atomiccount.AtomicInt, error) { v, err := mmap.Get(key) if err != nil { - return nil, errors.Wrapf(err, "key: %s", key) + return nil, errors.Wrapf(err, "failed to get key %s from map", key) } if value, ok := v.(*atomiccount.AtomicInt); ok { return value, nil } - return nil, errors.Wrapf(errortypes.ErrConvertFailed, "key %s: %v", key, v) + return nil, errors.Wrapf(errortypes.ErrConvertFailed, "failed to get key %s from map with value %s", key, v) +} + +// GetAsTime returns result as Time. +// The ErrConvertFailed error will be returned if the assertion fails. +func (mmap *SyncMap) GetAsTime(key string) (time.Time, error) { + v, err := mmap.Get(key) + if err != nil { + return time.Now(), errors.Wrapf(err, "failed to get key %s from map", key) + } + + if value, ok := v.(time.Time); ok { + return value, nil + } + return time.Now(), errors.Wrapf(errortypes.ErrConvertFailed, "failed to get key %s from map with value %s", key, v) } // Remove deletes the key-value pair from the mmap. @@ -156,7 +185,7 @@ func (mmap *SyncMap) Remove(key string) error { } if _, ok := mmap.Load(key); !ok { - return errors.Wrapf(errortypes.ErrDataNotFound, "key: %s", key) + return errors.Wrapf(errortypes.ErrDataNotFound, "failed to get key %s from map", key) } mmap.Delete(key) diff --git a/supernode/config/config.go b/supernode/config/config.go index 8f9d6dd71..6c3e3f73b 100644 --- a/supernode/config/config.go +++ b/supernode/config/config.go @@ -102,7 +102,11 @@ func NewBaseProperties() *BaseProperties { MaxBandwidth: 200, EnableProfiler: false, Debug: false, - FailAccessInterval: 3, + FailAccessInterval: DefaultFailAccessInterval, + GCInitialDelay: DefaultGCInitialDelay, + GCMetaInterval: DefaultGCMetaInterval, + TaskExpireTime: DefaultTaskExpireTime, + PeerGCDelay: DefaultPeerGCDelay, } } @@ -190,6 +194,19 @@ type BaseProperties struct { // default: 3 FailAccessInterval time.Duration `yaml:"failAccessInterval"` + // GCInitialDelay is the delay time from the start to the first GC execution. + GCInitialDelay time.Duration `yaml:"gcInitialDelay"` + + // GCMetaInterval is the interval time to execute the GC meta. + GCMetaInterval time.Duration `yaml:"gcMetaInterval"` + + // TaskExpireTime when a task is not accessed within the taskExpireTime, + // and it will be treated to be expired. + TaskExpireTime time.Duration `yaml:"taskExpireTime"` + + // PeerGCDelay is the delay time to execute the GC after the peer has reported the offline. + PeerGCDelay time.Duration `yaml:"peerGCDelay"` + // cIDPrefix s a prefix string used to indicate that the CID is supernode. cIDPrefix string diff --git a/supernode/config/constants.go b/supernode/config/constants.go index 2b7f4ad09..dd06ade6b 100644 --- a/supernode/config/constants.go +++ b/supernode/config/constants.go @@ -16,6 +16,10 @@ package config +import ( + "time" +) + const ( // DefaultSupernodeConfigFilePath the default supernode config path. DefaultSupernodeConfigFilePath = "/etc/dragonfly/supernode.yml" @@ -75,3 +79,21 @@ const ( // SubsystemDfget represents metrics from dfget SubsystemDfget = "dfget" ) + +const ( + // DefaultFailAccessInterval is the interval time after failed to access the URL. + DefaultFailAccessInterval = 3 * time.Minute + + // DefaultGCInitialDelay is the delay time from the start to the first GC execution. + DefaultGCInitialDelay = 6 * time.Second + + // DefaultGCMetaInterval is the interval time to execute the GC meta. + DefaultGCMetaInterval = 2 * time.Minute + + // DefaultTaskExpireTime when a task is not accessed within the taskExpireTime, + // and it will be treated to be expired. + DefaultTaskExpireTime = 3 * time.Minute + + // DefaultPeerGCDelay is the delay time to execute the GC after the peer has reported the offline. + DefaultPeerGCDelay = 3 * time.Minute +) diff --git a/supernode/daemon/mgr/cdn/manager.go b/supernode/daemon/mgr/cdn/manager.go index d955d0104..91cbd68f5 100644 --- a/supernode/daemon/mgr/cdn/manager.go +++ b/supernode/daemon/mgr/cdn/manager.go @@ -141,8 +141,13 @@ func (cm *Manager) GetStatus(ctx context.Context, taskID string) (cdnStatus stri return "", nil } -// Delete the file from disk with specified taskID. -func (cm *Manager) Delete(ctx context.Context, taskID string) error { +// Delete the cdn meta with specified taskID. +func (cm *Manager) Delete(ctx context.Context, taskID string, force bool) error { + if !force { + return cm.pieceMD5Manager.removePieceMD5sByTaskID(taskID) + } + + // TODO: delete the file form disk. return nil } diff --git a/supernode/daemon/mgr/cdn/piece_md5_mgr.go b/supernode/daemon/mgr/cdn/piece_md5_mgr.go index c13d149f4..89f03f058 100644 --- a/supernode/daemon/mgr/cdn/piece_md5_mgr.go +++ b/supernode/daemon/mgr/cdn/piece_md5_mgr.go @@ -77,3 +77,7 @@ func (pmm *pieceMD5Mgr) getPieceMD5sByTaskID(taskID string) (pieceMD5s []string, } return pieceMD5s, nil } + +func (pmm *pieceMD5Mgr) removePieceMD5sByTaskID(taskID string) error { + return pmm.taskPieceMD5s.Remove(taskID) +} diff --git a/supernode/daemon/mgr/cdn_mgr.go b/supernode/daemon/mgr/cdn_mgr.go index 305ae9524..c5461ba33 100644 --- a/supernode/daemon/mgr/cdn_mgr.go +++ b/supernode/daemon/mgr/cdn_mgr.go @@ -41,6 +41,7 @@ type CDNMgr interface { // GetStatus get the status of the file. GetStatus(ctx context.Context, taskID string) (cdnStatus string, err error) - // Delete the file from disk with specified taskID. - Delete(ctx context.Context, taskID string) error + // Delete the cdn meta with specified taskID. + // The file on the disk will be deleted when the force is true. + Delete(ctx context.Context, taskID string, force bool) error } diff --git a/supernode/daemon/mgr/dfget_task_mgr.go b/supernode/daemon/mgr/dfget_task_mgr.go index 6465b03e8..7aafc48a6 100644 --- a/supernode/daemon/mgr/dfget_task_mgr.go +++ b/supernode/daemon/mgr/dfget_task_mgr.go @@ -37,6 +37,12 @@ type DfgetTaskMgr interface { // GetCIDByPeerIDAndTaskID returns cid with specified peerID and taskID. GetCIDByPeerIDAndTaskID(ctx context.Context, peerID, taskID string) (string, error) + // GetCIDsByTaskID returns cids as a string slice with specified taskID. + GetCIDsByTaskID(ctx context.Context, taskID string) ([]string, error) + + // GetCIDAndTaskIDsByPeerID returns a cid<->taskID map by specified peerID. + GetCIDAndTaskIDsByPeerID(ctx context.Context, peerID string) (map[string]string, error) + // List returns the list of dfgetTask. List(ctx context.Context, filter map[string]string) (dfgetTaskList []*types.DfGetTask, err error) diff --git a/supernode/daemon/mgr/dfgettask/manager.go b/supernode/daemon/mgr/dfgettask/manager.go index 4807264ff..cea99cff0 100644 --- a/supernode/daemon/mgr/dfgettask/manager.go +++ b/supernode/daemon/mgr/dfgettask/manager.go @@ -19,6 +19,7 @@ package dfgettask import ( "context" "fmt" + "strings" "github.com/dragonflyoss/Dragonfly/apis/types" "github.com/dragonflyoss/Dragonfly/pkg/errortypes" @@ -31,10 +32,15 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" ) var _ mgr.DfgetTaskMgr = &Manager{} +const ( + keyJoinChar = "@" +) + type metrics struct { dfgetTasks *prometheus.GaugeVec dfgetTasksRegisterCount *prometheus.CounterVec @@ -119,6 +125,62 @@ func (dtm *Manager) GetCIDByPeerIDAndTaskID(ctx context.Context, peerID, taskID return dtm.ptoc.GetAsString(generatePeerKey(peerID, taskID)) } +// GetCIDsByTaskID returns cids as a string slice with specified taskID. +func (dtm *Manager) GetCIDsByTaskID(ctx context.Context, taskID string) ([]string, error) { + var result []string + suffixString := keyJoinChar + taskID + rangeFunc := func(k, v interface{}) bool { + key, ok := k.(string) + if !ok { + return true + } + + if !strings.HasSuffix(key, suffixString) { + return true + } + cid, err := dtm.ptoc.GetAsString(key) + if err != nil { + logrus.Warnf("failed to get cid from ptoc with key(%s): %v", key, err) + return true + } + + result = append(result, cid) + return true + } + dtm.ptoc.Range(rangeFunc) + + return result, nil +} + +// GetCIDAndTaskIDsByPeerID returns a cid<->taskID map by specified peerID. +func (dtm *Manager) GetCIDAndTaskIDsByPeerID(ctx context.Context, peerID string) (map[string]string, error) { + var result = make(map[string]string) + prefixStr := peerID + keyJoinChar + rangeFunc := func(k, v interface{}) bool { + key, ok := k.(string) + if !ok { + return true + } + + if !strings.HasPrefix(key, prefixStr) { + return true + } + cid, err := dtm.ptoc.GetAsString(key) + if err != nil { + logrus.Warnf("failed to get cid from ptoc with key(%s): %v", key, err) + return true + } + + // get TaskID from the key + splitResult := strings.Split(key, keyJoinChar) + result[cid] = splitResult[len(splitResult)-1] + return true + } + dtm.ptoc.Range(rangeFunc) + + return result, nil +} + // List returns the list of dfgetTask. func (dtm *Manager) List(ctx context.Context, filter map[string]string) (dfgetTaskList []*types.DfGetTask, err error) { return nil, nil @@ -189,9 +251,9 @@ func generateKey(cID, taskID string) (string, error) { return "", errors.Wrapf(errortypes.ErrEmptyValue, "taskID") } - return fmt.Sprintf("%s%s%s", cID, "@", taskID), nil + return fmt.Sprintf("%s%s%s", cID, keyJoinChar, taskID), nil } func generatePeerKey(peerID, taskID string) string { - return fmt.Sprintf("%s@%s", peerID, taskID) + return fmt.Sprintf("%s%s%s", peerID, keyJoinChar, taskID) } diff --git a/supernode/daemon/mgr/gc/gc_dfget_task.go b/supernode/daemon/mgr/gc/gc_dfget_task.go new file mode 100644 index 000000000..833e1ad3f --- /dev/null +++ b/supernode/daemon/mgr/gc/gc_dfget_task.go @@ -0,0 +1,43 @@ +package gc + +import ( + "context" + + "github.com/pkg/errors" +) + +func (gcm *Manager) gcDfgetTasksWithTaskID(ctx context.Context, taskID string, cids []string) []error { + var errSlice []error + for _, cid := range cids { + if err := gcm.progressMgr.DeleteCID(ctx, cid); err != nil { + errSlice = append(errSlice, errors.Wrapf(err, "failed to delete dfgetTask(%s) progress info", cid)) + } + if err := gcm.dfgetTaskMgr.Delete(ctx, cid, taskID); err != nil { + errSlice = append(errSlice, errors.Wrapf(err, "failed to delete dfgetTask(%s) info", cid)) + } + } + + if len(errSlice) != 0 { + return nil + } + + return errSlice +} + +func (gcm *Manager) gcDfgetTasks(ctx context.Context, keys map[string]string, cids []string) []error { + var errSlice []error + for _, cid := range cids { + if err := gcm.progressMgr.DeleteCID(ctx, cid); err != nil { + errSlice = append(errSlice, errors.Wrapf(err, "failed to delete dfgetTask(%s) progress info", cid)) + } + if err := gcm.dfgetTaskMgr.Delete(ctx, cid, keys[cid]); err != nil { + errSlice = append(errSlice, errors.Wrapf(err, "failed to delete dfgetTask(%s) info", cid)) + } + } + + if len(errSlice) != 0 { + return nil + } + + return errSlice +} diff --git a/supernode/daemon/mgr/gc/gc_manager.go b/supernode/daemon/mgr/gc/gc_manager.go new file mode 100644 index 000000000..9e91ddc15 --- /dev/null +++ b/supernode/daemon/mgr/gc/gc_manager.go @@ -0,0 +1,65 @@ +package gc + +import ( + "context" + "time" + + "github.com/dragonflyoss/Dragonfly/supernode/config" + "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr" + + "github.com/sirupsen/logrus" +) + +var _ mgr.GCMgr = &Manager{} + +// Manager is an implementation of the interface of DfgetTaskMgr. +type Manager struct { + cfg *config.Config + + // mgr objects + taskMgr mgr.TaskMgr + peerMgr mgr.PeerMgr + dfgetTaskMgr mgr.DfgetTaskMgr + progressMgr mgr.ProgressMgr + cdnMgr mgr.CDNMgr +} + +// NewManager returns a new Manager. +func NewManager(cfg *config.Config, taskMgr mgr.TaskMgr, peerMgr mgr.PeerMgr, + dfgetTaskMgr mgr.DfgetTaskMgr, progressMgr mgr.ProgressMgr, cdnMgr mgr.CDNMgr) (*Manager, error) { + return &Manager{ + cfg: cfg, + taskMgr: taskMgr, + peerMgr: peerMgr, + dfgetTaskMgr: dfgetTaskMgr, + progressMgr: progressMgr, + cdnMgr: cdnMgr, + }, nil +} + +// StartGC start to do the gc jobs. +func (gcm *Manager) StartGC(ctx context.Context) { + logrus.Debugf("start the gc job") + + go func() { + // delay to execute GC after gcm.initialDelay + time.Sleep(gcm.cfg.GCInitialDelay) + + // execute the GC by fixed delay + ticker := time.NewTicker(gcm.cfg.GCMetaInterval) + for range ticker.C { + go gcm.gcTasks(ctx) + go gcm.gcPeers(ctx) + } + }() +} + +// GCTask to do the gc job with specified taskID. +func (gcm *Manager) GCTask(ctx context.Context, taskID string) { + gcm.gcTask(ctx, taskID) +} + +// GCPeer to do the gc job when a peer offline. +func (gcm *Manager) GCPeer(ctx context.Context, peerID string) { + gcm.gcPeer(ctx, peerID) +} diff --git a/supernode/daemon/mgr/gc/gc_peer.go b/supernode/daemon/mgr/gc/gc_peer.go new file mode 100644 index 000000000..44724f8c1 --- /dev/null +++ b/supernode/daemon/mgr/gc/gc_peer.go @@ -0,0 +1,88 @@ +package gc + +import ( + "context" + "sync" + + "github.com/dragonflyoss/Dragonfly/pkg/timeutils" + "github.com/dragonflyoss/Dragonfly/supernode/util" + + "github.com/sirupsen/logrus" +) + +func (gcm *Manager) gcPeers(ctx context.Context) { + var gcPeerCount int + peerIDs := gcm.peerMgr.GetAllPeerIDs(ctx) + + for _, peerID := range peerIDs { + if gcm.cfg.IsSuperPID(peerID) { + continue + } + peerState, err := gcm.progressMgr.GetPeerStateByPeerID(ctx, peerID) + if err != nil { + logrus.Warnf("gc peers: failed to get peerState peerID(%s): %v", peerID, err) + continue + } + + if peerState.ServiceDownTime != 0 && + timeutils.GetCurrentTimeMillis()-peerState.ServiceDownTime < int64(gcm.cfg.PeerGCDelay) { + continue + } + + if !gcm.gcPeer(ctx, peerID) { + logrus.Warnf("gc peers: failed to gc peer peerID(%s): %v", peerID, err) + continue + } + gcPeerCount++ + } + + logrus.Infof("gc peers: success to gc peer count(%d), remainder count(%d)", gcPeerCount, len(peerIDs)-gcPeerCount) +} + +func (gcm *Manager) gcPeer(ctx context.Context, peerID string) bool { + logrus.Infof("start to gc peer: %s", peerID) + + util.GetLock(peerID, false) + defer util.ReleaseLock(peerID, false) + + var wg sync.WaitGroup + wg.Add(2) + + go func(wg *sync.WaitGroup) { + gcm.gcCIDsByPeerID(ctx, peerID) + wg.Done() + }(&wg) + + go func(wg *sync.WaitGroup) { + gcm.gcPeerByPeerID(ctx, peerID) + wg.Done() + }(&wg) + + wg.Wait() + return true +} + +func (gcm *Manager) gcCIDsByPeerID(ctx context.Context, peerID string) { + // get related CIDs + keys, err := gcm.dfgetTaskMgr.GetCIDAndTaskIDsByPeerID(ctx, peerID) + if err != nil { + logrus.Errorf("gc Peer: failed to get cids with corresponding taskID by specified peerID(%s): %v", peerID, err) + } + var cids []string + for key := range keys { + cids = append(cids, key) + } + + if err := gcm.gcDfgetTasks(ctx, keys, cids); err != nil { + logrus.Errorf("gc Peer: failed to gc dfgetTask info peerID(%s): %v", peerID, err) + } +} + +func (gcm *Manager) gcPeerByPeerID(ctx context.Context, peerID string) { + if err := gcm.progressMgr.DeletePeerID(ctx, peerID); err != nil { + logrus.Errorf("gc Peer: failed to gc progress peer info peerID(%s): %v", peerID, err) + } + if err := gcm.peerMgr.DeRegister(ctx, peerID); err != nil { + logrus.Errorf("gc Peer: failed to gc peer info peerID(%s): %v", peerID, err) + } +} diff --git a/supernode/daemon/mgr/gc/gc_task.go b/supernode/daemon/mgr/gc/gc_task.go new file mode 100644 index 000000000..9056aed9f --- /dev/null +++ b/supernode/daemon/mgr/gc/gc_task.go @@ -0,0 +1,98 @@ +package gc + +import ( + "context" + "sync" + "time" + + "github.com/dragonflyoss/Dragonfly/supernode/util" + + "github.com/sirupsen/logrus" +) + +func (gcm *Manager) gcTasks(ctx context.Context) { + var removedTaskCount int + + // get all taskIDs and the corresponding accessTime + taskAccessMap, err := gcm.taskMgr.GetAccessTime(ctx) + if err != nil { + logrus.Errorf("gc tasks: failed to get task accessTime map for GC: %v", err) + return + } + + // range all tasks and determine whether they are expired + taskIDs := taskAccessMap.ListKeyAsStringSlice() + totalTaskNums := len(taskIDs) + for _, taskID := range taskIDs { + atime, err := taskAccessMap.GetAsTime(taskID) + if err != nil { + logrus.Errorf("gc tasks: failed to get access time taskID(%s): %v", taskID, err) + continue + } + if time.Since(atime) < gcm.cfg.TaskExpireTime { + continue + } + + if !gcm.gcTask(ctx, taskID) { + continue + } + removedTaskCount++ + } + + logrus.Infof("gc tasks: success to full gc task count(%d), remainder count(%d)", removedTaskCount, totalTaskNums-removedTaskCount) +} + +func (gcm *Manager) gcTask(ctx context.Context, taskID string) bool { + logrus.Infof("start to gc task: %s", taskID) + + util.GetLock(taskID, false) + defer util.ReleaseLock(taskID, false) + + var wg sync.WaitGroup + wg.Add(3) + + go func(wg *sync.WaitGroup) { + gcm.gcCIDsByTaskID(ctx, taskID) + wg.Done() + }(&wg) + + go func(wg *sync.WaitGroup) { + gcm.gcCDNByTaskID(ctx, taskID) + wg.Done() + }(&wg) + + go func(wg *sync.WaitGroup) { + gcm.gcTaskByTaskID(ctx, taskID) + wg.Done() + }(&wg) + + wg.Wait() + return true +} + +func (gcm *Manager) gcCIDsByTaskID(ctx context.Context, taskID string) { + // get CIDs according to the taskID + cids, err := gcm.dfgetTaskMgr.GetCIDsByTaskID(ctx, taskID) + if err != nil { + logrus.Errorf("gc task: failed to get cids taskID(%s): %v", taskID, err) + return + } + if err := gcm.gcDfgetTasksWithTaskID(ctx, taskID, cids); err != nil { + logrus.Errorf("gc task: failed to gc dfgetTasks taskID(%s): %v", taskID, err) + } +} + +func (gcm *Manager) gcCDNByTaskID(ctx context.Context, taskID string) { + if err := gcm.cdnMgr.Delete(ctx, taskID, false); err != nil { + logrus.Errorf("gc task: failed to gc cdn meta taskID(%s): %v", taskID, err) + } +} + +func (gcm *Manager) gcTaskByTaskID(ctx context.Context, taskID string) { + if err := gcm.progressMgr.DeleteTaskID(ctx, taskID); err != nil { + logrus.Errorf("gc task: failed to gc progress info taskID(%s): %v", taskID, err) + } + if err := gcm.taskMgr.Delete(ctx, taskID); err != nil { + logrus.Errorf("gc task: failed to gc task info taskID(%s): %v", taskID, err) + } +} diff --git a/supernode/daemon/mgr/gc_mgr.go b/supernode/daemon/mgr/gc_mgr.go new file mode 100644 index 000000000..71b5f51f4 --- /dev/null +++ b/supernode/daemon/mgr/gc_mgr.go @@ -0,0 +1,17 @@ +package mgr + +import ( + "context" +) + +// GCMgr as an interface defines all operations about gc operation. +type GCMgr interface { + // StartGC start to execute GC with a new goroutine. + StartGC(ctx context.Context) + + // GCTask to do the gc task job with specified taskID. + GCTask(ctx context.Context, taskID string) + + // GCPeer to do the gc peer job when a peer offline. + GCPeer(ctx context.Context, peerID string) +} diff --git a/supernode/daemon/mgr/mock/mock_cdn_mgr.go b/supernode/daemon/mgr/mock/mock_cdn_mgr.go index 208b5922c..deb60a799 100644 --- a/supernode/daemon/mgr/mock/mock_cdn_mgr.go +++ b/supernode/daemon/mgr/mock/mock_cdn_mgr.go @@ -82,15 +82,15 @@ func (mr *MockCDNMgrMockRecorder) GetStatus(ctx, taskID interface{}) *gomock.Cal } // Delete mocks base method -func (m *MockCDNMgr) Delete(ctx context.Context, taskID string) error { +func (m *MockCDNMgr) Delete(ctx context.Context, taskID string, force bool) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Delete", ctx, taskID) + ret := m.ctrl.Call(m, "Delete", ctx, taskID, force) ret0, _ := ret[0].(error) return ret0 } // Delete indicates an expected call of Delete -func (mr *MockCDNMgrMockRecorder) Delete(ctx, taskID interface{}) *gomock.Call { +func (mr *MockCDNMgrMockRecorder) Delete(ctx, taskID, force interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockCDNMgr)(nil).Delete), ctx, taskID) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockCDNMgr)(nil).Delete), ctx, taskID, force) } diff --git a/supernode/daemon/mgr/mock/mock_dfget_task_mgr.go b/supernode/daemon/mgr/mock/mock_dfget_task_mgr.go index 55fefbe88..a76de0c5e 100644 --- a/supernode/daemon/mgr/mock/mock_dfget_task_mgr.go +++ b/supernode/daemon/mgr/mock/mock_dfget_task_mgr.go @@ -80,6 +80,36 @@ func (mr *MockDfgetTaskMgrMockRecorder) GetCIDByPeerIDAndTaskID(ctx, peerID, tas return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetCIDByPeerIDAndTaskID", reflect.TypeOf((*MockDfgetTaskMgr)(nil).GetCIDByPeerIDAndTaskID), ctx, peerID, taskID) } +// GetCIDsByTaskID mocks base method +func (m *MockDfgetTaskMgr) GetCIDsByTaskID(ctx context.Context, taskID string) ([]string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetCIDsByTaskID", ctx, taskID) + ret0, _ := ret[0].([]string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetCIDsByTaskID indicates an expected call of GetCIDsByTaskID +func (mr *MockDfgetTaskMgrMockRecorder) GetCIDsByTaskID(ctx, taskID interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetCIDsByTaskID", reflect.TypeOf((*MockDfgetTaskMgr)(nil).GetCIDsByTaskID), ctx, taskID) +} + +// GetCIDAndTaskIDsByPeerID mocks base method +func (m *MockDfgetTaskMgr) GetCIDAndTaskIDsByPeerID(ctx context.Context, peerID string) (map[string]string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetCIDAndTaskIDsByPeerID", ctx, peerID) + ret0, _ := ret[0].(map[string]string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetCIDAndTaskIDsByPeerID indicates an expected call of GetCIDAndTaskIDsByPeerID +func (mr *MockDfgetTaskMgrMockRecorder) GetCIDAndTaskIDsByPeerID(ctx, peerID interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetCIDAndTaskIDsByPeerID", reflect.TypeOf((*MockDfgetTaskMgr)(nil).GetCIDAndTaskIDsByPeerID), ctx, peerID) +} + // List mocks base method func (m *MockDfgetTaskMgr) List(ctx context.Context, filter map[string]string) ([]*types.DfGetTask, error) { m.ctrl.T.Helper() diff --git a/supernode/daemon/mgr/mock/mock_peer_mgr.go b/supernode/daemon/mgr/mock/mock_peer_mgr.go index 8203f0f8d..018b14c1d 100644 --- a/supernode/daemon/mgr/mock/mock_peer_mgr.go +++ b/supernode/daemon/mgr/mock/mock_peer_mgr.go @@ -81,6 +81,20 @@ func (mr *MockPeerMgrMockRecorder) Get(ctx, peerID interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockPeerMgr)(nil).Get), ctx, peerID) } +// GetAllPeerIDs mocks base method +func (m *MockPeerMgr) GetAllPeerIDs(ctx context.Context) []string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetAllPeerIDs", ctx) + ret0, _ := ret[0].([]string) + return ret0 +} + +// GetAllPeerIDs indicates an expected call of GetAllPeerIDs +func (mr *MockPeerMgrMockRecorder) GetAllPeerIDs(ctx interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAllPeerIDs", reflect.TypeOf((*MockPeerMgr)(nil).GetAllPeerIDs), ctx) +} + // List mocks base method func (m *MockPeerMgr) List(ctx context.Context, filter *util.PageFilter) ([]*types.PeerInfo, error) { m.ctrl.T.Helper() diff --git a/supernode/daemon/mgr/mock/mock_progress_mgr.go b/supernode/daemon/mgr/mock/mock_progress_mgr.go index 4d82c94c2..523de2261 100644 --- a/supernode/daemon/mgr/mock/mock_progress_mgr.go +++ b/supernode/daemon/mgr/mock/mock_progress_mgr.go @@ -11,7 +11,7 @@ import ( gomock "github.com/golang/mock/gomock" types "github.com/dragonflyoss/Dragonfly/apis/types" - "github.com/dragonflyoss/Dragonfly/pkg/syncmap" + syncmap "github.com/dragonflyoss/Dragonfly/pkg/syncmap" mgr "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr" ) @@ -95,20 +95,6 @@ func (mr *MockProgressMgrMockRecorder) GetPieceProgressByCID(ctx, taskID, client return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPieceProgressByCID", reflect.TypeOf((*MockProgressMgr)(nil).GetPieceProgressByCID), ctx, taskID, clientID, filter) } -// DeletePieceProgressByCID mocks base method -func (m *MockProgressMgr) DeletePieceProgressByCID(ctx context.Context, taskID, clientID string) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DeletePieceProgressByCID", ctx, taskID, clientID) - ret0, _ := ret[0].(error) - return ret0 -} - -// DeletePieceProgressByCID indicates an expected call of DeletePieceProgressByCID -func (mr *MockProgressMgrMockRecorder) DeletePieceProgressByCID(ctx, taskID, clientID interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeletePieceProgressByCID", reflect.TypeOf((*MockProgressMgr)(nil).DeletePieceProgressByCID), ctx, taskID, clientID) -} - // GetPeerIDsByPieceNum mocks base method func (m *MockProgressMgr) GetPeerIDsByPieceNum(ctx context.Context, taskID string, pieceNum int) ([]string, error) { m.ctrl.T.Helper() @@ -153,18 +139,18 @@ func (mr *MockProgressMgrMockRecorder) GetPeerStateByPeerID(ctx, peerID interfac return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPeerStateByPeerID", reflect.TypeOf((*MockProgressMgr)(nil).GetPeerStateByPeerID), ctx, peerID) } -// DeletePeerStateByPeerID mocks base method -func (m *MockProgressMgr) DeletePeerStateByPeerID(ctx context.Context, peerID string) error { +// UpdatePeerServiceDown mocks base method +func (m *MockProgressMgr) UpdatePeerServiceDown(ctx context.Context, peerID string) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DeletePeerStateByPeerID", ctx, peerID) + ret := m.ctrl.Call(m, "UpdatePeerServiceDown", ctx, peerID) ret0, _ := ret[0].(error) return ret0 } -// DeletePeerStateByPeerID indicates an expected call of DeletePeerStateByPeerID -func (mr *MockProgressMgrMockRecorder) DeletePeerStateByPeerID(ctx, peerID interface{}) *gomock.Call { +// UpdatePeerServiceDown indicates an expected call of UpdatePeerServiceDown +func (mr *MockProgressMgrMockRecorder) UpdatePeerServiceDown(ctx, peerID interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeletePeerStateByPeerID", reflect.TypeOf((*MockProgressMgr)(nil).DeletePeerStateByPeerID), ctx, peerID) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdatePeerServiceDown", reflect.TypeOf((*MockProgressMgr)(nil).UpdatePeerServiceDown), ctx, peerID) } // GetPeersByTaskID mocks base method @@ -196,3 +182,45 @@ func (mr *MockProgressMgrMockRecorder) GetBlackInfoByPeerID(ctx, peerID interfac mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetBlackInfoByPeerID", reflect.TypeOf((*MockProgressMgr)(nil).GetBlackInfoByPeerID), ctx, peerID) } + +// DeleteTaskID mocks base method +func (m *MockProgressMgr) DeleteTaskID(ctx context.Context, taskID string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteTaskID", ctx, taskID) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteTaskID indicates an expected call of DeleteTaskID +func (mr *MockProgressMgrMockRecorder) DeleteTaskID(ctx, taskID interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteTaskID", reflect.TypeOf((*MockProgressMgr)(nil).DeleteTaskID), ctx, taskID) +} + +// DeleteCID mocks base method +func (m *MockProgressMgr) DeleteCID(ctx context.Context, clientID string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteCID", ctx, clientID) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteCID indicates an expected call of DeleteCID +func (mr *MockProgressMgrMockRecorder) DeleteCID(ctx, clientID interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteCID", reflect.TypeOf((*MockProgressMgr)(nil).DeleteCID), ctx, clientID) +} + +// DeletePeerID mocks base method +func (m *MockProgressMgr) DeletePeerID(ctx context.Context, peerID string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeletePeerID", ctx, peerID) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeletePeerID indicates an expected call of DeletePeerID +func (mr *MockProgressMgrMockRecorder) DeletePeerID(ctx, peerID interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeletePeerID", reflect.TypeOf((*MockProgressMgr)(nil).DeletePeerID), ctx, peerID) +} diff --git a/supernode/daemon/mgr/peer/manager.go b/supernode/daemon/mgr/peer/manager.go index 6dffac9bf..c0fadbe27 100644 --- a/supernode/daemon/mgr/peer/manager.go +++ b/supernode/daemon/mgr/peer/manager.go @@ -29,6 +29,7 @@ import ( "github.com/dragonflyoss/Dragonfly/supernode/config" "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr" dutil "github.com/dragonflyoss/Dragonfly/supernode/daemon/util" + "github.com/dragonflyoss/Dragonfly/supernode/util" "github.com/go-openapi/strfmt" "github.com/pkg/errors" @@ -98,15 +99,24 @@ func (pm *Manager) DeRegister(ctx context.Context, peerID string) error { } pm.peerStore.Delete(peerID) + // NOTE: DeRegister will be called asynchronously. pm.metrics.peers.WithLabelValues(peerInfo.IP.String()).Dec() return nil } // Get returns the peerInfo of the specified peerID. func (pm *Manager) Get(ctx context.Context, peerID string) (*types.PeerInfo, error) { + util.GetLock(peerID, true) + defer util.ReleaseLock(peerID, true) + return pm.getPeerInfo(peerID) } +// GetAllPeerIDs returns all peerIDs. +func (pm *Manager) GetAllPeerIDs(ctx context.Context) (peerIDs []string) { + return pm.peerStore.ListKeyAsStringSlice() +} + // List returns all filtered peerInfo by filter. func (pm *Manager) List(ctx context.Context, filter *dutil.PageFilter) ( peerList []*types.PeerInfo, err error) { diff --git a/supernode/daemon/mgr/peer_mgr.go b/supernode/daemon/mgr/peer_mgr.go index 965fd74cb..6957d4f5a 100644 --- a/supernode/daemon/mgr/peer_mgr.go +++ b/supernode/daemon/mgr/peer_mgr.go @@ -37,6 +37,9 @@ type PeerMgr interface { // Get the peer Info with specified peerID. Get(ctx context.Context, peerID string) (*types.PeerInfo, error) + // GetAllPeerIDs returns all peerIDs. + GetAllPeerIDs(ctx context.Context) (peerIDs []string) + // List return a list of peers info with filter. List(ctx context.Context, filter *util.PageFilter) (peerList []*types.PeerInfo, err error) } diff --git a/supernode/daemon/mgr/progress/progress_delete.go b/supernode/daemon/mgr/progress/progress_delete.go new file mode 100644 index 000000000..44315cec5 --- /dev/null +++ b/supernode/daemon/mgr/progress/progress_delete.go @@ -0,0 +1,72 @@ +package progress + +import ( + "context" + + "github.com/sirupsen/logrus" +) + +// DeleteTaskID deletes the super progress with specified taskID. +func (pm *Manager) DeleteTaskID(ctx context.Context, taskID string) (err error) { + return pm.superProgress.remove(taskID) +} + +// DeleteCID deletes the client progress with specified clientID. +func (pm *Manager) DeleteCID(ctx context.Context, clientID string) (err error) { + return pm.clientProgress.remove(clientID) +} + +// DeletePeerID deletes the related info with specified PeerID. +func (pm *Manager) DeletePeerID(ctx context.Context, peerID string) error { + // NOTE: we should delete the peerID from the pieceProgress. + // However, it will cost a lot of time to find which one refers to this peerID. + // So we leave it to be deleted when scheduled. + pm.deletePeerIDFromPeerProgress(ctx, peerID) + pm.deletePeerIDFromBlackInfo(ctx, peerID) + + return nil +} + +func (pm *Manager) deletePeerIDFromPeerProgress(ctx context.Context, peerID string) bool { + if err := pm.peerProgress.remove(peerID); err != nil { + logrus.Errorf("failed to delete peerID(%s) from peerProgress: %v", peerID, err) + return false + } + return true +} + +func (pm *Manager) deletePeerIDFromBlackInfo(ctx context.Context, peerID string) bool { + result := true + // delete the black info which use peerID as the key + pm.clientBlackInfo.Delete(peerID) + + // TODO: delete the black info which refers to the specified peerID + return result +} + +// DeletePeerIDByPieceNum deletes the peerID which means that +// the peer no longer provides the service for the pieceNum of taskID. +func (pm *Manager) DeletePeerIDByPieceNum(ctx context.Context, taskID string, pieceNum int, peerID string) error { + return pm.deletePeerIDByPieceNum(ctx, taskID, pieceNum, peerID) +} + +// deletePeerIDByPieceNum deletes the peerID which means that +// the peer no longer provides the service for the pieceNum of taskID. +func (pm *Manager) deletePeerIDByPieceNum(ctx context.Context, taskID string, pieceNum int, peerID string) error { + key, err := generatePieceProgressKey(taskID, pieceNum) + if err != nil { + return err + } + return pm.deletePeerIDByPieceProgressKey(ctx, key, peerID) +} + +// deletePeerIDByPieceProgressKey deletes the peerID which means that +// the peer no longer provides the service for the pieceNum of taskID. +func (pm *Manager) deletePeerIDByPieceProgressKey(ctx context.Context, pieceProgressKey string, peerID string) error { + ps, err := pm.pieceProgress.getAsPieceState(pieceProgressKey) + if err != nil { + return err + } + + return ps.delete(peerID) +} diff --git a/supernode/daemon/mgr/progress/progress_manager.go b/supernode/daemon/mgr/progress/progress_manager.go index 9cd9925e5..c7cce2e88 100644 --- a/supernode/daemon/mgr/progress/progress_manager.go +++ b/supernode/daemon/mgr/progress/progress_manager.go @@ -18,11 +18,13 @@ package progress import ( "context" + "fmt" "github.com/dragonflyoss/Dragonfly/apis/types" "github.com/dragonflyoss/Dragonfly/pkg/errortypes" "github.com/dragonflyoss/Dragonfly/pkg/stringutils" "github.com/dragonflyoss/Dragonfly/pkg/syncmap" + "github.com/dragonflyoss/Dragonfly/pkg/timeutils" "github.com/dragonflyoss/Dragonfly/supernode/config" "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr" @@ -223,15 +225,6 @@ func (pm *Manager) GetPieceProgressByCID(ctx context.Context, taskID, clientID, return getAvailablePieces(clientBitset, cdnBitset, runningPieces) } -// DeletePieceProgressByCID deletes the pieces progress with specified clientID. -func (pm *Manager) DeletePieceProgressByCID(ctx context.Context, taskID, clientID string) (err error) { - if pm.cfg.IsSuperCID(clientID) { - return pm.superProgress.remove(taskID) - } - - return pm.clientProgress.remove(clientID) -} - // GetPeerIDsByPieceNum gets all peerIDs with specified taskID and pieceNum. // It will return nil when no peers is available. func (pm *Manager) GetPeerIDsByPieceNum(ctx context.Context, taskID string, pieceNum int) (peerIDs []string, err error) { @@ -247,21 +240,6 @@ func (pm *Manager) GetPeerIDsByPieceNum(ctx context.Context, taskID string, piec return ps.getAvailablePeers(), nil } -// DeletePeerIDByPieceNum deletes the peerID which means that -// the peer no longer provides the service for the pieceNum of taskID. -func (pm *Manager) DeletePeerIDByPieceNum(ctx context.Context, taskID string, pieceNum int, peerID string) error { - key, err := generatePieceProgressKey(taskID, pieceNum) - if err != nil { - return err - } - ps, err := pm.pieceProgress.getAsPieceState(key) - if err != nil { - return err - } - - return ps.delete(peerID) -} - // GetPeerStateByPeerID gets peer state with specified peerID. func (pm *Manager) GetPeerStateByPeerID(ctx context.Context, peerID string) (*mgr.PeerState, error) { peerState, err := pm.peerProgress.getAsPeerState(peerID) @@ -271,21 +249,26 @@ func (pm *Manager) GetPeerStateByPeerID(ctx context.Context, peerID string) (*mg return &mgr.PeerState{ PeerID: peerID, - ServiceDownTime: &peerState.serviceDownTime, + ServiceDownTime: peerState.serviceDownTime, ClientErrorCount: peerState.clientErrorCount, ServiceErrorCount: peerState.serviceErrorCount, ProducerLoad: peerState.producerLoad, }, nil } -// DeletePeerStateByPeerID deletes the peerState by PeerID. -func (pm *Manager) DeletePeerStateByPeerID(ctx context.Context, peerID string) error { - // delete client blackinfo - // TODO: delete the blackinfo that refer to peerID - pm.clientBlackInfo.Delete(peerID) +// UpdatePeerServiceDown do update operation when a peer server offline. +func (pm *Manager) UpdatePeerServiceDown(ctx context.Context, peerID string) (err error) { + peerState, err := pm.peerProgress.getAsPeerState(peerID) + if err != nil { + return errors.Wrapf(err, "failed to get peer state peerID(%s): %v", peerID, err) + } - // delete peer progress - return pm.peerProgress.remove(peerID) + if peerState.serviceDownTime > 0 { + return fmt.Errorf("failed to update the service down info because this peer(%s) has been offline", peerID) + } + + peerState.serviceDownTime = timeutils.GetCurrentTimeMillis() + return nil } // GetPeersByTaskID gets all peers info with specified taskID. diff --git a/supernode/daemon/mgr/progress/state_sync_map.go b/supernode/daemon/mgr/progress/state_sync_map.go index b065fcb6e..80ef7a8f5 100644 --- a/supernode/daemon/mgr/progress/state_sync_map.go +++ b/supernode/daemon/mgr/progress/state_sync_map.go @@ -17,47 +17,33 @@ package progress import ( - "sync" - "github.com/dragonflyoss/Dragonfly/pkg/errortypes" - "github.com/dragonflyoss/Dragonfly/pkg/stringutils" + "github.com/dragonflyoss/Dragonfly/pkg/syncmap" "github.com/pkg/errors" ) // stateSyncMap is a thread-safe map. type stateSyncMap struct { - *sync.Map + *syncmap.SyncMap } // newStateSyncMap returns a new stateSyncMap. func newStateSyncMap() *stateSyncMap { - return &stateSyncMap{&sync.Map{}} + return &stateSyncMap{syncmap.NewSyncMap()} } // add a key-value pair into the *sync.Map. // The ErrEmptyValue error will be returned if the key is empty. func (mmap *stateSyncMap) add(key string, value interface{}) error { - if stringutils.IsEmptyStr(key) { - return errors.Wrap(errortypes.ErrEmptyValue, "key") - } - mmap.Store(key, value) - return nil + return mmap.Add(key, value) } // get returns result as interface{} according to the key. // The ErrEmptyValue error will be returned if the key is empty. // And the ErrDataNotFound error will be returned if the key cannot be found. func (mmap *stateSyncMap) get(key string) (interface{}, error) { - if stringutils.IsEmptyStr(key) { - return nil, errors.Wrap(errortypes.ErrEmptyValue, "key") - } - - if v, ok := mmap.Load(key); ok { - return v, nil - } - - return nil, errors.Wrapf(errortypes.ErrDataNotFound, "key: %s", key) + return mmap.Get(key) } // getAsSuperState returns result as *superState. @@ -120,14 +106,5 @@ func (mmap *stateSyncMap) getAsPieceState(key string) (*pieceState, error) { // The ErrEmptyValue error will be returned if the key is empty. // And the ErrDataNotFound error will be returned if the key cannot be found. func (mmap *stateSyncMap) remove(key string) error { - if stringutils.IsEmptyStr(key) { - return errors.Wrap(errortypes.ErrEmptyValue, "key") - } - - if _, ok := mmap.Load(key); !ok { - return errors.Wrapf(errortypes.ErrDataNotFound, "key: %s", key) - } - - mmap.Delete(key) - return nil + return mmap.Remove(key) } diff --git a/supernode/daemon/mgr/progress_mgr.go b/supernode/daemon/mgr/progress_mgr.go index e9506c658..e9f93382c 100644 --- a/supernode/daemon/mgr/progress_mgr.go +++ b/supernode/daemon/mgr/progress_mgr.go @@ -39,7 +39,7 @@ type PeerState struct { ServiceErrorCount *atomiccount.AtomicInt // ServiceDownTime the down time of the peer service. - ServiceDownTime *int64 + ServiceDownTime int64 } // ProgressMgr is responsible for maintaining the correspondence between peer and pieces. @@ -60,9 +60,6 @@ type ProgressMgr interface { // The filter parameter depends on the specific implementation. GetPieceProgressByCID(ctx context.Context, taskID, clientID, filter string) (pieceNums []int, err error) - // DeletePieceProgressByCID deletes the pieces progress with specified clientID. - DeletePieceProgressByCID(ctx context.Context, taskID, clientID string) (err error) - // GetPeerIDsByPieceNum gets all peerIDs with specified taskID and pieceNum. GetPeerIDsByPieceNum(ctx context.Context, taskID string, pieceNum int) (peerIDs []string, err error) @@ -73,12 +70,24 @@ type ProgressMgr interface { // GetPeerStateByPeerID gets peer state with specified peerID. GetPeerStateByPeerID(ctx context.Context, peerID string) (peerState *PeerState, err error) - // DeletePeerStateByPeerID deletes the peerState by PeerID. - DeletePeerStateByPeerID(ctx context.Context, peerID string) error + // UpdatePeerServiceDown do update operation when a peer server offline. + // + // This function will update the service down time for the peerID. + // And the supernode will not dispatch tasks to this peer. + UpdatePeerServiceDown(ctx context.Context, peerID string) (err error) // GetPeersByTaskID gets all peers info with specified taskID. GetPeersByTaskID(ctx context.Context, taskID string) (peersInfo []*types.PeerInfo, err error) // GetBlackInfoByPeerID gets black info with specified peerID. GetBlackInfoByPeerID(ctx context.Context, peerID string) (dstPIDMap *syncmap.SyncMap, err error) + + // DeleteTaskID deletes the super progress with specified taskID. + DeleteTaskID(ctx context.Context, taskID string) (err error) + + // DeleteCID deletes the super progress with specified clientID. + DeleteCID(ctx context.Context, clientID string) (err error) + + // DeletePeerID deletes the peerState by PeerID. + DeletePeerID(ctx context.Context, peerID string) (err error) } diff --git a/supernode/daemon/mgr/scheduler/manager.go b/supernode/daemon/mgr/scheduler/manager.go index dd0d23662..3c152a0b9 100644 --- a/supernode/daemon/mgr/scheduler/manager.go +++ b/supernode/daemon/mgr/scheduler/manager.go @@ -208,7 +208,7 @@ func (sm *Manager) tryGetPID(ctx context.Context, taskID string, pieceNum int, p } // if the service has been down, and then it should not be needed. - if peerState.ServiceDownTime != nil && *(peerState.ServiceDownTime) > 0 { + if peerState.ServiceDownTime > 0 { sm.deletePeerIDByPieceNum(ctx, taskID, pieceNum, peerIDs[i]) continue } diff --git a/supernode/daemon/mgr/task/manager.go b/supernode/daemon/mgr/task/manager.go index e55dc44be..859ca9646 100644 --- a/supernode/daemon/mgr/task/manager.go +++ b/supernode/daemon/mgr/task/manager.go @@ -25,7 +25,6 @@ import ( "github.com/dragonflyoss/Dragonfly/pkg/metricsutils" "github.com/dragonflyoss/Dragonfly/pkg/stringutils" "github.com/dragonflyoss/Dragonfly/pkg/syncmap" - "github.com/dragonflyoss/Dragonfly/pkg/timeutils" "github.com/dragonflyoss/Dragonfly/supernode/config" "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr" dutil "github.com/dragonflyoss/Dragonfly/supernode/daemon/util" @@ -76,7 +75,6 @@ type Manager struct { cfg *config.Config taskStore *dutil.Store - taskLocker *util.LockerPool accessTimeMap *syncmap.SyncMap taskURLUnReachableStore *syncmap.SyncMap @@ -96,7 +94,6 @@ func NewManager(cfg *config.Config, peerMgr mgr.PeerMgr, dfgetTaskMgr mgr.DfgetT return &Manager{ cfg: cfg, taskStore: dutil.NewStore(), - taskLocker: util.NewLockerPool(), peerMgr: peerMgr, dfgetTaskMgr: dfgetTaskMgr, progressMgr: progressMgr, @@ -117,7 +114,7 @@ func (tm *Manager) Register(ctx context.Context, req *types.TaskCreateRequest) ( } // Step2: add a new Task or update the exist task - failAccessInterval := tm.cfg.FailAccessInterval * time.Minute + failAccessInterval := tm.cfg.FailAccessInterval task, err := tm.addOrUpdateTask(ctx, req, failAccessInterval) if err != nil { logrus.Infof("failed to add or update task with req %+v: %v", req, err) @@ -127,8 +124,11 @@ func (tm *Manager) Register(ctx context.Context, req *types.TaskCreateRequest) ( logrus.Debugf("success to get task info: %+v", task) // TODO: defer rollback the task update + util.GetLock(task.ID, true) + defer util.ReleaseLock(task.ID, true) + // update accessTime for taskID - if err := tm.accessTimeMap.Add(task.ID, timeutils.GetCurrentTimeMillis()); err != nil { + if err := tm.accessTimeMap.Add(task.ID, time.Now()); err != nil { logrus.Warnf("failed to update accessTime for taskID(%s): %v", task.ID, err) } @@ -170,6 +170,8 @@ func (tm *Manager) Register(ctx context.Context, req *types.TaskCreateRequest) ( // Get a task info according to specified taskID. func (tm *Manager) Get(ctx context.Context, taskID string) (*types.TaskInfo, error) { + util.GetLock(taskID, true) + defer util.ReleaseLock(taskID, true) return tm.getTask(taskID) } @@ -186,6 +188,9 @@ func (tm *Manager) List(ctx context.Context, filter map[string]string) ([]*types // CheckTaskStatus check the task status. func (tm *Manager) CheckTaskStatus(ctx context.Context, taskID string) (bool, error) { + util.GetLock(taskID, true) + defer util.ReleaseLock(taskID, true) + task, err := tm.getTask(taskID) if err != nil { return false, err @@ -201,12 +206,17 @@ func (tm *Manager) CheckTaskStatus(ctx context.Context, taskID string) (bool, er // Delete deletes a task. func (tm *Manager) Delete(ctx context.Context, taskID string) error { + tm.accessTimeMap.Delete(taskID) + tm.taskURLUnReachableStore.Delete(taskID) tm.taskStore.Delete(taskID) return nil } // Update the info of task. func (tm *Manager) Update(ctx context.Context, taskID string, taskInfo *types.TaskInfo) error { + util.GetLock(taskID, false) + defer util.ReleaseLock(taskID, false) + return tm.updateTask(taskID, taskInfo) } @@ -214,6 +224,9 @@ func (tm *Manager) Update(ctx context.Context, taskID string, taskInfo *types.Ta func (tm *Manager) GetPieces(ctx context.Context, taskID, clientID string, req *types.PiecePullRequest) (bool, interface{}, error) { logrus.Debugf("get pieces request: %+v with taskID(%s) and clientID(%s)", req, taskID, clientID) + util.GetLock(taskID, true) + defer util.ReleaseLock(taskID, true) + // convert piece result and dfgetTask status to dfgetTask status code dfgetTaskStatus := convertToDfgetTaskStatus(req.PieceResult, req.DfgetTaskStatus) if stringutils.IsEmptyStr(dfgetTaskStatus) { @@ -233,7 +246,7 @@ func (tm *Manager) GetPieces(ctx context.Context, taskID, clientID string, req * logrus.Debugf("success to get task: %+v", task) // update accessTime for taskID - if err := tm.accessTimeMap.Add(task.ID, timeutils.GetCurrentTimeMillis()); err != nil { + if err := tm.accessTimeMap.Add(task.ID, time.Now()); err != nil { logrus.Warnf("failed to update accessTime for taskID(%s): %v", task.ID, err) } diff --git a/supernode/daemon/mgr/task/manager_util.go b/supernode/daemon/mgr/task/manager_util.go index b7fa0b7ab..cfb000bea 100644 --- a/supernode/daemon/mgr/task/manager_util.go +++ b/supernode/daemon/mgr/task/manager_util.go @@ -44,6 +44,9 @@ func (tm *Manager) addOrUpdateTask(ctx context.Context, req *types.TaskCreateReq } taskID := generateTaskID(taskURL, req.Md5, req.Identifier) + util.GetLock(taskID, true) + defer util.ReleaseLock(taskID, true) + if key, err := tm.taskURLUnReachableStore.Get(taskID); err == nil { if unReachableStartTime, ok := key.(time.Time); ok && time.Since(unReachableStartTime) < failAccessInterval { @@ -75,9 +78,6 @@ func (tm *Manager) addOrUpdateTask(ctx context.Context, req *types.TaskCreateReq task = newTask } - tm.taskLocker.GetLock(taskID, false) - defer tm.taskLocker.ReleaseLock(taskID, false) - if task.FileLength != 0 { return task, nil } @@ -146,9 +146,6 @@ func (tm *Manager) updateTask(taskID string, updateTaskInfo *types.TaskInfo) err return errors.Wrapf(errortypes.ErrEmptyValue, "CDNStatus of TaskInfo: %+v", updateTaskInfo) } - tm.taskLocker.GetLock(taskID, false) - defer tm.taskLocker.ReleaseLock(taskID, false) - task, err := tm.getTask(taskID) if err != nil { return err @@ -225,6 +222,7 @@ func (tm *Manager) triggerCdnSyncAction(ctx context.Context, task *types.TaskInf } logrus.Infof("success to init cdn node or taskID %s", task.ID) } + if err := tm.updateTask(task.ID, &types.TaskInfo{ CdnStatus: types.TaskInfoCdnStatusRUNNING, }); err != nil { @@ -322,7 +320,7 @@ func (tm *Manager) parseAvailablePeers(ctx context.Context, clientID string, tas // Step3. whether success cdnSuccess := task.CdnStatus == types.TaskInfoCdnStatusSUCCESS pieceSuccess, _ := tm.progressMgr.GetPieceProgressByCID(ctx, task.ID, clientID, "success") - logrus.Debugf("taskID: %s, get successful pieces: %v", task.ID, pieceSuccess) + logrus.Debugf("taskID(%s) clientID(%s) get successful pieces: %v", task.ID, clientID, pieceSuccess) if cdnSuccess && (int32(len(pieceSuccess)) == task.PieceTotal) { // update dfget task status to success if err := tm.dfgetTaskMgr.UpdateStatus(ctx, clientID, task.ID, types.DfGetTaskStatusSUCCESS); err != nil { diff --git a/supernode/daemon/mgr/task_mgr.go b/supernode/daemon/mgr/task_mgr.go index 1fd3308b1..515289d0c 100644 --- a/supernode/daemon/mgr/task_mgr.go +++ b/supernode/daemon/mgr/task_mgr.go @@ -53,7 +53,6 @@ type TaskMgr interface { CheckTaskStatus(ctx context.Context, taskID string) (bool, error) // Delete deletes a task - // NOTE: delete the related peers and dfgetTask info is necessary. Delete(ctx context.Context, taskID string) error // Update updates the task info with specified info. diff --git a/supernode/daemon/util/store.go b/supernode/daemon/util/store.go index 093a965da..f66df9f93 100644 --- a/supernode/daemon/util/store.go +++ b/supernode/daemon/util/store.go @@ -17,49 +17,27 @@ package util import ( - "sync" - - "github.com/dragonflyoss/Dragonfly/pkg/errortypes" - - "github.com/pkg/errors" + "github.com/dragonflyoss/Dragonfly/pkg/syncmap" ) // Store maintains some metadata information in memory. type Store struct { - metaMap sync.Map + *syncmap.SyncMap } // NewStore returns a new Store. func NewStore() *Store { - return &Store{} + return &Store{syncmap.NewSyncMap()} } // Put a key-value pair into the store. func (s *Store) Put(key string, value interface{}) error { - s.metaMap.Store(key, value) - return nil -} - -// Get a key-value pair from the store. -func (s *Store) Get(key string) (interface{}, error) { - v, ok := s.metaMap.Load(key) - if !ok { - return nil, errors.Wrapf(errortypes.ErrDataNotFound, "key (%s)", key) - } - - return v, nil + return s.Add(key, value) } // Delete a key-value pair from the store with specified key. func (s *Store) Delete(key string) error { - _, ok := s.metaMap.Load(key) - if !ok { - return errors.Wrapf(errortypes.ErrDataNotFound, "key (%s)", key) - } - - s.metaMap.Delete(key) - - return nil + return s.Remove(key) } // List returns all key-value pairs in the store. @@ -70,7 +48,7 @@ func (s *Store) List() []interface{} { metaSlice = append(metaSlice, value) return true } - s.metaMap.Range(rangeFunc) + s.Range(rangeFunc) return metaSlice } diff --git a/supernode/server/0.3_bridge.go b/supernode/server/0.3_bridge.go index aaf37e5db..bc3724a03 100644 --- a/supernode/server/0.3_bridge.go +++ b/supernode/server/0.3_bridge.go @@ -230,26 +230,12 @@ func (s *Server) reportServiceDown(ctx context.Context, rw http.ResponseWriter, taskID := params.Get("taskId") cID := params.Get("cid") + // get peerID according to the CID and taskID dfgetTask, err := s.DfgetTaskMgr.Get(ctx, cID, taskID) if err != nil { return err } - - if err := s.ProgressMgr.DeletePieceProgressByCID(ctx, taskID, cID); err != nil { - return err - } - - if err := s.ProgressMgr.DeletePeerStateByPeerID(ctx, dfgetTask.PeerID); err != nil { - return err - } - - if err := s.PeerMgr.DeRegister(ctx, dfgetTask.PeerID); err != nil { - return err - } - - if err := s.DfgetTaskMgr.Delete(ctx, cID, taskID); err != nil { - return err - } + s.ProgressMgr.UpdatePeerServiceDown(ctx, dfgetTask.PeerID) return EncodeResponse(rw, http.StatusOK, &types.ResultInfo{ Code: constants.CodeGetPeerDown, diff --git a/supernode/server/server.go b/supernode/server/server.go index 486b71457..7a313f5a0 100644 --- a/supernode/server/server.go +++ b/supernode/server/server.go @@ -17,6 +17,7 @@ package server import ( + "context" "fmt" "net" "net/http" @@ -26,6 +27,7 @@ import ( "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr" "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/cdn" "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/dfgettask" + "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/gc" "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/peer" "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/progress" "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/scheduler" @@ -45,6 +47,8 @@ type Server struct { TaskMgr mgr.TaskMgr DfgetTaskMgr mgr.DfgetTaskMgr ProgressMgr mgr.ProgressMgr + GCMgr mgr.GCMgr + OriginClient httpclient.OriginHTTPClient } @@ -94,12 +98,18 @@ func New(cfg *config.Config, register prometheus.Registerer) (*Server, error) { return nil, err } + GCMgr, err := gc.NewManager(cfg, taskMgr, peerMgr, dfgetTaskMgr, progressMgr, cdnMgr) + if err != nil { + return nil, err + } + return &Server{ Config: cfg, PeerMgr: peerMgr, TaskMgr: taskMgr, DfgetTaskMgr: dfgetTaskMgr, ProgressMgr: progressMgr, + GCMgr: GCMgr, OriginClient: originClient, }, nil } @@ -116,6 +126,7 @@ func (s *Server) Start() error { return err } + s.GCMgr.StartGC(context.Background()) server := &http.Server{ Handler: router, ReadTimeout: time.Minute * 10, diff --git a/supernode/util/locker.go b/supernode/util/locker.go index 37fe2a537..46ed2ee97 100644 --- a/supernode/util/locker.go +++ b/supernode/util/locker.go @@ -20,6 +20,18 @@ import ( "sync" ) +var defaultLocker = NewLockerPool() + +// GetLock locks key with defaultLocker. +func GetLock(key string, ro bool) { + defaultLocker.GetLock(key, ro) +} + +// ReleaseLock unlocks key with defaultLocker. +func ReleaseLock(key string, ro bool) { + defaultLocker.ReleaseLock(key, ro) +} + // LockerPool is a set of reader/writer mutual exclusion locks. type LockerPool struct { // use syncPool to cache allocated but unused *countRWMutex items for later reuse