diff --git a/dfget/core/api/supernode_api.go b/dfget/core/api/supernode_api.go index 55dcd192f..361a8244b 100644 --- a/dfget/core/api/supernode_api.go +++ b/dfget/core/api/supernode_api.go @@ -25,6 +25,7 @@ import ( "github.com/dragonflyoss/Dragonfly/dfget/types" "github.com/dragonflyoss/Dragonfly/pkg/constants" "github.com/dragonflyoss/Dragonfly/pkg/httputils" + "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -114,10 +115,11 @@ func (api *supernodeAPI) ReportPiece(node string, req *types.ReportPieceRequest) resp = new(types.BaseResponse) if e = api.get(url, resp); e != nil { logrus.Errorf("failed to report piece{taskid:%s,range:%s},err: %v", req.TaskID, req.PieceRange, e) - return nil, e + return nil, errors.Wrapf(e, "failed to report piece{taskid:%s,range:%s}", req.TaskID, req.PieceRange) } if resp.Code != constants.CodeGetPieceReport { logrus.Errorf("failed to report piece{taskid:%s,range:%s} to supernode: api response code is %d not equal to %d", req.TaskID, req.PieceRange, resp.Code, constants.CodeGetPieceReport) + return nil, errors.Wrapf(e, "failed to report piece{taskid:%s,range:%s} to supernode: api response code is %d not equal to %d", req.TaskID, req.PieceRange, resp.Code, constants.CodeGetPieceReport) } return } diff --git a/dfget/core/api/supernode_api_test.go b/dfget/core/api/supernode_api_test.go index 126df9b6b..6bd661b19 100644 --- a/dfget/core/api/supernode_api_test.go +++ b/dfget/core/api/supernode_api_test.go @@ -103,10 +103,10 @@ func (s *SupernodeAPITestSuite) TestSupernodeAPI_ReportPiece(c *check.C) { TaskID: "sssss", PieceRange: "0-11", } - s.mock.GetFunc = s.mock.CreateGetFunc(200, []byte(`{"Code":700}`), nil) + s.mock.GetFunc = s.mock.CreateGetFunc(200, []byte(`{"Code":611}`), nil) r, e := s.api.ReportPiece(localhost, req) c.Check(e, check.IsNil) - c.Check(r.Code, check.Equals, 700) + c.Check(r.Code, check.Equals, 611) } func (s *SupernodeAPITestSuite) TestSupernodeAPI_ServiceDown(c *check.C) { diff --git a/dfget/core/downloader/p2p_downloader/client_writer.go b/dfget/core/downloader/p2p_downloader/client_writer.go index acf3dda4c..716ef163f 100644 --- a/dfget/core/downloader/p2p_downloader/client_writer.go +++ b/dfget/core/downloader/p2p_downloader/client_writer.go @@ -20,6 +20,7 @@ import ( "bufio" "context" "io" + "math/rand" "os" "time" @@ -202,12 +203,35 @@ func startSyncWriter(q queue.Queue) queue.Queue { } func (cw *ClientWriter) sendSuccessPiece(piece *Piece, cost time.Duration) { - cw.api.ReportPiece(piece.SuperNode, &types.ReportPieceRequest{ + reportPieceRequest := &types.ReportPieceRequest{ TaskID: piece.TaskID, Cid: cw.cfg.RV.Cid, DstCid: piece.DstCid, PieceRange: piece.Range, - }) + } + + var retry = 0 + var maxRetryTime = 3 + for { + if retry >= maxRetryTime { + logrus.Errorf("failed to report piece to supernode with request(%+v) even after retrying max retry time", reportPieceRequest) + break + } + + _, err := cw.api.ReportPiece(piece.SuperNode, reportPieceRequest) + if err == nil { + if retry > 0 { + logrus.Warnf("success to report piece with request(%+v) after retrying (%d) times", reportPieceRequest, retry) + } + break + } + + sleepTime := time.Duration(rand.Intn(500)+50) * time.Millisecond + logrus.Warnf("failed to report piece to supernode with request(%+v) for (%d) times and will retry after sleep %.3fs", reportPieceRequest, retry, sleepTime.Seconds()) + time.Sleep(sleepTime) + retry++ + } + if cost.Seconds() > 2.0 { logrus.Infof( "async writer and report suc from dst:%s... cost:%.3f for range:%s",