diff --git a/dfget/core/core.go b/dfget/core/core.go index 952298708..aaf82d5e3 100644 --- a/dfget/core/core.go +++ b/dfget/core/core.go @@ -213,9 +213,9 @@ func doDownload(cfg *config.Config, supernodeAPI api.SupernodeAPI, } err := downloader.DoDownloadTimeout(getter, timeout) + // report finished task to uploader regardless of the result of downloading from dragonfly + reportFinishedTask(cfg, getter) if err == nil { - // report finished task to uploader when success to download by dragonfly - reportFinishedTask(cfg, getter) return nil } diff --git a/supernode/daemon/mgr/gc/gc_task.go b/supernode/daemon/mgr/gc/gc_task.go index 0b126a4b8..5f5de16d2 100644 --- a/supernode/daemon/mgr/gc/gc_task.go +++ b/supernode/daemon/mgr/gc/gc_task.go @@ -116,7 +116,16 @@ func (gcm *Manager) gcCDNByTaskID(ctx context.Context, taskID string, full bool) } func (gcm *Manager) gcTaskByTaskID(ctx context.Context, taskID string) { - if err := gcm.progressMgr.DeleteTaskID(ctx, taskID); err != nil { + taskInfo, err := gcm.taskMgr.Get(ctx, taskID) + if err != nil { + logrus.Errorf("gc task: failed to get task info(%s): %v", taskID, err) + } + + var pieceTotal int + if taskInfo != nil { + pieceTotal = int(taskInfo.PieceTotal) + } + if err := gcm.progressMgr.DeleteTaskID(ctx, taskID, pieceTotal); err != nil { logrus.Errorf("gc task: failed to gc progress info taskID(%s): %v", taskID, err) } if err := gcm.taskMgr.Delete(ctx, taskID); err != nil { diff --git a/supernode/daemon/mgr/mock/mock_progress_mgr.go b/supernode/daemon/mgr/mock/mock_progress_mgr.go index 7f69fabcb..1683a735e 100644 --- a/supernode/daemon/mgr/mock/mock_progress_mgr.go +++ b/supernode/daemon/mgr/mock/mock_progress_mgr.go @@ -199,17 +199,17 @@ func (mr *MockProgressMgrMockRecorder) UpdateSuperLoad(ctx, taskID, delta, limit } // DeleteTaskID mocks base method -func (m *MockProgressMgr) DeleteTaskID(ctx context.Context, taskID string) error { +func (m *MockProgressMgr) DeleteTaskID(ctx context.Context, taskID string, pieceTotal int) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DeleteTaskID", ctx, taskID) + ret := m.ctrl.Call(m, "DeleteTaskID", ctx, taskID, pieceTotal) ret0, _ := ret[0].(error) return ret0 } // DeleteTaskID indicates an expected call of DeleteTaskID -func (mr *MockProgressMgrMockRecorder) DeleteTaskID(ctx, taskID interface{}) *gomock.Call { +func (mr *MockProgressMgrMockRecorder) DeleteTaskID(ctx, taskID, pieceTotal interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteTaskID", reflect.TypeOf((*MockProgressMgr)(nil).DeleteTaskID), ctx, taskID) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteTaskID", reflect.TypeOf((*MockProgressMgr)(nil).DeleteTaskID), ctx, taskID, pieceTotal) } // DeleteCID mocks base method diff --git a/supernode/daemon/mgr/progress/progress_delete.go b/supernode/daemon/mgr/progress/progress_delete.go index 8cbc1cc8c..fb4d21480 100644 --- a/supernode/daemon/mgr/progress/progress_delete.go +++ b/supernode/daemon/mgr/progress/progress_delete.go @@ -19,13 +19,24 @@ package progress import ( "context" + "github.com/dragonflyoss/Dragonfly/pkg/errortypes" + "github.com/sirupsen/logrus" ) // DeleteTaskID deletes the super progress with specified taskID. -func (pm *Manager) DeleteTaskID(ctx context.Context, taskID string) (err error) { +func (pm *Manager) DeleteTaskID(ctx context.Context, taskID string, pieceTotal int) (err error) { pm.superLoad.remove(taskID) - return pm.superProgress.remove(taskID) + pm.superProgress.remove(taskID) + + for i := 0; i < pieceTotal; i++ { + key, err := generatePieceProgressKey(taskID, i) + if err != nil { + return err + } + pm.pieceProgress.remove(key) + } + return nil } // DeleteCID deletes the client progress with specified clientID. @@ -81,9 +92,10 @@ func (pm *Manager) deletePeerIDByPieceNum(ctx context.Context, taskID string, pi // the peer no longer provides the service for the pieceNum of taskID. func (pm *Manager) deletePeerIDByPieceProgressKey(ctx context.Context, pieceProgressKey string, peerID string) error { ps, err := pm.pieceProgress.getAsPieceState(pieceProgressKey) - if err != nil { + if err != nil && !errortypes.IsDataNotFound(err) { return err } - return ps.delete(peerID) + ps.delete(peerID) + return nil } diff --git a/supernode/daemon/mgr/progress_mgr.go b/supernode/daemon/mgr/progress_mgr.go index 851aee05d..a805e0f69 100644 --- a/supernode/daemon/mgr/progress_mgr.go +++ b/supernode/daemon/mgr/progress_mgr.go @@ -88,7 +88,7 @@ type ProgressMgr interface { UpdateSuperLoad(ctx context.Context, taskID string, delta, limit int32) (updated bool, err error) // DeleteTaskID deletes the super progress with specified taskID. - DeleteTaskID(ctx context.Context, taskID string) (err error) + DeleteTaskID(ctx context.Context, taskID string, pieceTotal int) (err error) // DeleteCID deletes the super progress with specified clientID. DeleteCID(ctx context.Context, clientID string) (err error) diff --git a/supernode/daemon/mgr/scheduler/manager.go b/supernode/daemon/mgr/scheduler/manager.go index 03fea922f..50c0ece55 100644 --- a/supernode/daemon/mgr/scheduler/manager.go +++ b/supernode/daemon/mgr/scheduler/manager.go @@ -260,7 +260,7 @@ func (sm *Manager) tryGetPID(ctx context.Context, taskID string, pieceNum int, s func (sm *Manager) deletePeerIDByPieceNum(ctx context.Context, taskID string, pieceNum int, peerID string) { if err := sm.progressMgr.DeletePeerIDByPieceNum(ctx, taskID, pieceNum, peerID); err != nil { - logrus.Warnf("scheduler: failed to delete the peerID %s for pieceNum %d of taskID: %s", peerID, pieceNum, taskID) + logrus.Warnf("scheduler: failed to delete the peerID %s for pieceNum %d of taskID: %s: %v", peerID, pieceNum, taskID, err) } }