diff --git a/supernode/daemon/mgr/cdn/manager.go b/supernode/daemon/mgr/cdn/manager.go index d97b6f8a5..b06d1f870 100644 --- a/supernode/daemon/mgr/cdn/manager.go +++ b/supernode/daemon/mgr/cdn/manager.go @@ -3,6 +3,7 @@ package cdn import ( "context" "crypto/md5" + "path" "strconv" "github.com/dragonflyoss/Dragonfly/apis/types" @@ -32,7 +33,7 @@ type Manager struct { // NewManager returns a new Manager. func NewManager(cfg *config.Config, cacheStore *store.Store, progressManager mgr.ProgressMgr) (*Manager, error) { - rateLimiter := cutil.NewRateLimiter(cutil.TransRate(cfg.MaxBandwidth-cfg.SystemReservedBandwidth), 2) + rateLimiter := cutil.NewRateLimiter(cutil.TransRate(config.TransLimit(cfg.MaxBandwidth-cfg.SystemReservedBandwidth)), 2) metaDataManager := newFileMetaDataManager(cacheStore) pieceMD5Manager := newpieceMD5Mgr() cdnReporter := newReporter(cfg, cacheStore, progressManager, metaDataManager, pieceMD5Manager) @@ -87,14 +88,14 @@ func (cm *Manager) TriggerCDN(ctx context.Context, task *types.TaskInfo) (*types cm.updateLastModifiedAndETag(ctx, task.ID, resp.Header.Get("Last-Modified"), resp.Header.Get("Etag")) reader := cutil.NewLimitReaderWithLimiterAndMD5Sum(resp.Body, cm.limiter, fileMD5) - realFileLength, err := cm.writer.startWriter(ctx, cm.cfg, reader, task, startPieceNum, httpFileLength, pieceContSize) + downloadMetadata, err := cm.writer.startWriter(ctx, cm.cfg, reader, task, startPieceNum, httpFileLength, pieceContSize) if err != nil { logrus.Errorf("failed to write for task %s: %v", task.ID, err) return nil, err } realMD5 := reader.Md5() - success, err := cm.handleCDNResult(ctx, task, realMD5, httpFileLength, realFileLength) + success, err := cm.handleCDNResult(ctx, task, realMD5, httpFileLength, downloadMetadata.realHTTPFileLength) if err != nil || success == false { return &types.TaskInfo{ CdnStatus: types.TaskInfoCdnStatusFAILED, @@ -103,18 +104,16 @@ func (cm *Manager) TriggerCDN(ctx context.Context, task *types.TaskInfo) (*types return &types.TaskInfo{ CdnStatus: types.TaskInfoCdnStatusSUCCESS, - FileLength: realFileLength, + FileLength: downloadMetadata.realFileLength, RealMd5: realMD5, }, nil } // GetHTTPPath returns the http download path of taskID. +// The returned path joined the DownloadRaw.Bucket and DownloadRaw.Key. func (cm *Manager) GetHTTPPath(ctx context.Context, taskID string) (string, error) { - info, err := cm.cacheStore.Stat(ctx, getDownloadRawFunc(taskID)) - if err != nil { - return "", err - } - return info.Path, nil + raw := getDownloadRawFunc(taskID) + return path.Join("/", raw.Bucket, raw.Key), nil } // GetStatus get the status of the file. diff --git a/supernode/daemon/mgr/cdn/super_writer.go b/supernode/daemon/mgr/cdn/super_writer.go index 813cb667e..2ecb5fa7e 100644 --- a/supernode/daemon/mgr/cdn/super_writer.go +++ b/supernode/daemon/mgr/cdn/super_writer.go @@ -21,6 +21,12 @@ type protocolContent struct { pieceContent *bytes.Buffer } +type downloadMetadata struct { + realFileLength int64 + realHTTPFileLength int64 + pieceCount int +} + type superWriter struct { cdnStore *store.Store cdnReporter *reporter @@ -35,9 +41,11 @@ func newSuperWriter(cdnStore *store.Store, cdnReporter *reporter) *superWriter { // startWriter writes the stream data from the reader to the underlying storage. func (cw *superWriter) startWriter(ctx context.Context, cfg *config.Config, reader io.Reader, - task *types.TaskInfo, startPieceNum int, httpFileLength int64, pieceContSize int32) (int64, error) { + task *types.TaskInfo, startPieceNum int, httpFileLength int64, pieceContSize int32) (*downloadMetadata, error) { // realFileLength is used to caculate the file Length dynamically - realFileLength := int64(startPieceNum) * int64(pieceContSize) + realFileLength := int64(startPieceNum) * int64(task.PieceSize) + // realHTTPFileLength is used to caculate the http file Length dynamically + realHTTPFileLength := int64(startPieceNum) * int64(pieceContSize) // the left size of data for a complete piece pieceContLeft := pieceContSize // the pieceNum currently processed @@ -57,6 +65,7 @@ func (cw *superWriter) startWriter(ctx context.Context, cfg *config.Config, read if n > 0 { logrus.Debugf("success to read content with length: %d", n) realFileLength += int64(n) + realHTTPFileLength += int64(n) if int(pieceContLeft) <= n { bb.Write(buf[:pieceContLeft]) pc := &protocolContent{ @@ -104,11 +113,15 @@ func (cw *superWriter) startWriter(ctx context.Context, cfg *config.Config, read } if e != nil { close(jobCh) - return 0, e + return nil, e } } close(jobCh) wg.Wait() - return realFileLength, nil + return &downloadMetadata{ + realFileLength: realFileLength, + realHTTPFileLength: realHTTPFileLength, + pieceCount: curPieceNum, + }, nil } diff --git a/supernode/daemon/mgr/cdn/super_writer_test.go b/supernode/daemon/mgr/cdn/super_writer_test.go index 3d8edf2f5..b56e6ba2a 100644 --- a/supernode/daemon/mgr/cdn/super_writer_test.go +++ b/supernode/daemon/mgr/cdn/super_writer_test.go @@ -73,9 +73,9 @@ func (s *SuperWriterTestSuite) TestStartWriter(c *check.C) { pieceCount := (httpFileLen + int64(pieceContSize-1)) / int64(pieceContSize) expectedSize := httpFileLen + pieceCount*int64(config.PieceWrapSize) - realFileLength, err := s.writer.startWriter(context.TODO(), nil, f, task, 0, httpFileLen, pieceContSize) + downloadMetadata, err := s.writer.startWriter(context.TODO(), nil, f, task, 0, httpFileLen, pieceContSize) c.Check(err, check.IsNil) - c.Check(realFileLength, check.Equals, expectedSize) + c.Check(downloadMetadata.realFileLength, check.Equals, expectedSize) checkFileSize(s.writer.cdnStore, task.ID, expectedSize, c) } diff --git a/supernode/daemon/mgr/progress/progress_util.go b/supernode/daemon/mgr/progress/progress_util.go index 6170045f7..59cff9f55 100644 --- a/supernode/daemon/mgr/progress/progress_util.go +++ b/supernode/daemon/mgr/progress/progress_util.go @@ -74,7 +74,10 @@ func updateRunningPiece(dstPIDMap *cutil.SyncMap, srcCID, dstPID string, pieceNu return dstPIDMap.Add(pieceNumString, dstPID) } - if _, err := dstPIDMap.Get(pieceNumString); err != nil && errorType.IsDataNotFound(err) { + if _, err := dstPIDMap.Get(pieceNumString); err != nil { + if errorType.IsDataNotFound(err) { + return nil + } return err }