diff --git a/dfget/core/downloader/p2p_downloader/client_stream_writer.go b/dfget/core/downloader/p2p_downloader/client_stream_writer.go index 4a42bb887..d0a25b434 100644 --- a/dfget/core/downloader/p2p_downloader/client_stream_writer.go +++ b/dfget/core/downloader/p2p_downloader/client_stream_writer.go @@ -38,6 +38,11 @@ type ClientStreamWriter struct { // The downloader will put the piece into this queue after it downloaded a piece successfully. // And clientWriter will poll values from this queue constantly and write to disk. clientQueue queue.Queue + + // notifyQueue sends a notification when all operation about a piece have + // been completed successfully. + notifyQueue queue.Queue + // finish indicates whether the task written is completed. finish chan struct{} @@ -68,11 +73,12 @@ type ClientStreamWriter struct { } // NewClientStreamWriter creates and initialize a ClientStreamWriter instance. -func NewClientStreamWriter(clientQueue queue.Queue, api api.SupernodeAPI, cfg *config.Config) *ClientStreamWriter { +func NewClientStreamWriter(clientQueue, notifyQueue queue.Queue, api api.SupernodeAPI, cfg *config.Config) *ClientStreamWriter { pr, pw := io.Pipe() limitReader := limitreader.NewLimitReader(pr, int64(cfg.LocalLimit), cfg.Md5 != "") clientWriter := &ClientStreamWriter{ clientQueue: clientQueue, + notifyQueue: notifyQueue, pipeReader: pr, pipeWriter: pw, limitReader: limitReader, @@ -139,7 +145,7 @@ func (csw *ClientStreamWriter) write(piece *Piece) error { err := csw.writePieceToPipe(piece) if err == nil { - go sendSuccessPiece(csw.api, csw.cfg.RV.Cid, piece, time.Since(startTime)) + go sendSuccessPiece(csw.api, csw.cfg.RV.Cid, piece, time.Since(startTime), csw.notifyQueue) } return err } diff --git a/dfget/core/downloader/p2p_downloader/client_stream_writer_test.go b/dfget/core/downloader/p2p_downloader/client_stream_writer_test.go index 792f7b4e6..7e5e38179 100644 --- a/dfget/core/downloader/p2p_downloader/client_stream_writer_test.go +++ b/dfget/core/downloader/p2p_downloader/client_stream_writer_test.go @@ -100,7 +100,7 @@ func (s *ClientStreamWriterTestSuite) TestWrite(c *check.C) { copy(cases2, cases) cfg := &config.Config{} - csw := NewClientStreamWriter(nil, nil, cfg) + csw := NewClientStreamWriter(nil, nil, nil, cfg) go func() { for _, v := range cases2 { err := csw.writePieceToPipe(v.piece) diff --git a/dfget/core/downloader/p2p_downloader/client_writer.go b/dfget/core/downloader/p2p_downloader/client_writer.go index f4dedaf91..a19bbef57 100644 --- a/dfget/core/downloader/p2p_downloader/client_writer.go +++ b/dfget/core/downloader/p2p_downloader/client_writer.go @@ -58,6 +58,11 @@ type ClientWriter struct { // The downloader will put the piece into this queue after it downloaded a piece successfully. // And clientWriter will poll values from this queue constantly and write to disk. clientQueue queue.Queue + + // notifyQueue sends a notification when all operation about a piece have + // been completed successfully. + notifyQueue queue.Queue + // finish indicates whether the task written is completed. finish chan struct{} @@ -95,9 +100,11 @@ type ClientWriter struct { // NewClientWriter creates and initialize a ClientWriter instance. func NewClientWriter(clientFilePath, serviceFilePath string, - clientQueue queue.Queue, api api.SupernodeAPI, cfg *config.Config, cdnSource apiTypes.CdnSource) PieceWriter { + clientQueue, notifyQueue queue.Queue, + api api.SupernodeAPI, cfg *config.Config, cdnSource apiTypes.CdnSource) PieceWriter { clientWriter := &ClientWriter{ clientQueue: clientQueue, + notifyQueue: notifyQueue, clientFilePath: clientFilePath, serviceFilePath: serviceFilePath, api: api, @@ -219,7 +226,7 @@ func (cw *ClientWriter) write(piece *Piece) error { cw.pieceIndex++ err := writePieceToFile(piece, cw.serviceFile, cw.cdnSource) if err == nil { - go sendSuccessPiece(cw.api, cw.cfg.RV.Cid, piece, time.Since(startTime)) + go sendSuccessPiece(cw.api, cw.cfg.RV.Cid, piece, time.Since(startTime), cw.notifyQueue) } return err } @@ -247,7 +254,7 @@ func startSyncWriter(q queue.Queue) queue.Queue { return nil } -func sendSuccessPiece(api api.SupernodeAPI, cid string, piece *Piece, cost time.Duration) { +func sendSuccessPiece(api api.SupernodeAPI, cid string, piece *Piece, cost time.Duration, notifyQueue queue.Queue) { reportPieceRequest := &types.ReportPieceRequest{ TaskID: piece.TaskID, Cid: cid, @@ -265,6 +272,9 @@ func sendSuccessPiece(api api.SupernodeAPI, cid string, piece *Piece, cost time. _, err := api.ReportPiece(piece.SuperNode, reportPieceRequest) if err == nil { + if notifyQueue != nil { + notifyQueue.Put("success") + } if retry > 0 { logrus.Warnf("success to report piece with request(%+v) after retrying (%d) times", reportPieceRequest, retry) } diff --git a/dfget/core/downloader/p2p_downloader/p2p_downloader.go b/dfget/core/downloader/p2p_downloader/p2p_downloader.go index ae7cc5432..dcfbadcf4 100644 --- a/dfget/core/downloader/p2p_downloader/p2p_downloader.go +++ b/dfget/core/downloader/p2p_downloader/p2p_downloader.go @@ -84,6 +84,9 @@ type P2PDownloader struct { // And clientWriter will poll values from this queue constantly and write to disk. clientQueue queue.Queue + // notifyQueue maintains a queue for notifying p2p downloader to pull next download tasks. + notifyQueue queue.Queue + // clientFilePath is the full path of the temp file. clientFilePath string // serviceFilePath is the full path of the temp service file which @@ -150,6 +153,7 @@ func (p2p *P2PDownloader) init() { p2p.queue.Put(NewPieceSimple(p2p.taskID, p2p.node, constants.TaskStatusStart, p2p.RegisterResult.CDNSource)) p2p.clientQueue = queue.NewQueue(p2p.cfg.ClientQueueSize) + p2p.notifyQueue = queue.NewQueue(p2p.cfg.ClientQueueSize) p2p.clientFilePath = helper.GetTaskFile(p2p.taskFileName, p2p.cfg.RV.DataDir) p2p.serviceFilePath = helper.GetServiceFile(p2p.taskFileName, p2p.cfg.RV.DataDir) @@ -165,7 +169,9 @@ func (p2p *P2PDownloader) Run(ctx context.Context) error { if p2p.streamMode { return fmt.Errorf("streamMode enabled, should be disable") } - clientWriter := NewClientWriter(p2p.clientFilePath, p2p.serviceFilePath, p2p.clientQueue, p2p.API, p2p.cfg, p2p.RegisterResult.CDNSource) + clientWriter := NewClientWriter(p2p.clientFilePath, p2p.serviceFilePath, + p2p.clientQueue, p2p.notifyQueue, + p2p.API, p2p.cfg, p2p.RegisterResult.CDNSource) return p2p.run(ctx, clientWriter) } @@ -174,7 +180,7 @@ func (p2p *P2PDownloader) RunStream(ctx context.Context) (io.Reader, error) { if !p2p.streamMode { return nil, fmt.Errorf("streamMode disable, should be enabled") } - clientStreamWriter := NewClientStreamWriter(p2p.clientQueue, p2p.API, p2p.cfg) + clientStreamWriter := NewClientStreamWriter(p2p.clientQueue, p2p.notifyQueue, p2p.API, p2p.cfg) go func() { err := p2p.run(ctx, clientStreamWriter) if err != nil { @@ -280,14 +286,10 @@ func (p2p *P2PDownloader) pullPieceTask(item *Piece) ( break } - sleepTime := time.Duration(rand.Intn(p2p.maxTimeout-p2p.minTimeout)+p2p.minTimeout) * time.Millisecond - logrus.Infof("pull piece task(%+v) result:%s and sleep %.3fs", item, res, sleepTime.Seconds()) - time.Sleep(sleepTime) - - // gradually increase the sleep time, up to [800-1600] - if p2p.minTimeout < 800 { - p2p.minTimeout *= 2 - p2p.maxTimeout *= 2 + actual, expected := p2p.sleepInterval() + if expected > actual || logrus.IsLevelEnabled(logrus.DebugLevel) { + logrus.Infof("pull piece task(%+v) result:%s and sleep actual:%.3fs expected:%.3fs", + item, res, actual.Seconds(), expected.Seconds()) } } @@ -314,6 +316,23 @@ func (p2p *P2PDownloader) pullPieceTask(item *Piece) ( return p2p.pullPieceTask(item) } +// sleepInterval sleep for a while to wait for next pulling piece task until +// receiving a notification which indicating that all the previous works have +// been completed. +func (p2p *P2PDownloader) sleepInterval() (actual, expected time.Duration) { + expected = time.Duration(rand.Intn(p2p.maxTimeout-p2p.minTimeout)+p2p.minTimeout) * time.Millisecond + start := time.Now() + p2p.notifyQueue.PollTimeout(expected) + actual = time.Now().Sub(start) + + // gradually increase the sleep time, up to [800-1600] + if p2p.minTimeout < 800 { + p2p.minTimeout *= 2 + p2p.maxTimeout *= 2 + } + return actual, expected +} + // getPullRate gets download rate limit dynamically. func (p2p *P2PDownloader) getPullRate(data *types.PullPieceTaskResponseContinueData) { if time.Since(p2p.pullRateTime).Seconds() < 3 {