From d1802a9aa352641bb0dc455399ad28c0acfa1b1e Mon Sep 17 00:00:00 2001 From: Starnop Date: Tue, 23 Apr 2019 11:45:58 +0800 Subject: [PATCH] feature: implement the cdn manager framework Signed-off-by: Starnop --- supernode/config/constants.go | 8 ++++ supernode/daemon/mgr/cdn/cache_detector.go | 24 +++++++++++ supernode/daemon/mgr/cdn/downloader.go | 15 +++++++ supernode/daemon/mgr/cdn/manager.go | 47 ++++++++++++++++++++-- supernode/daemon/mgr/cdn/super_writer.go | 15 +++++++ supernode/daemon/mgr/cdn_mgr.go | 23 ----------- 6 files changed, 105 insertions(+), 27 deletions(-) create mode 100644 supernode/daemon/mgr/cdn/cache_detector.go create mode 100644 supernode/daemon/mgr/cdn/downloader.go create mode 100644 supernode/daemon/mgr/cdn/super_writer.go diff --git a/supernode/config/constants.go b/supernode/config/constants.go index 5e51fc71d..fe0e3e9a3 100644 --- a/supernode/config/constants.go +++ b/supernode/config/constants.go @@ -46,3 +46,11 @@ const ( // PeerDownLimit indicates the limit of the download task count as a client. PeerDownLimit = 4 ) + +const ( + // PieceHeadSize 4 bytes + PieceHeadSize = 4 + + // PieceWrapSize 4 bytes head and 1 byte tail + PieceWrapSize = PieceHeadSize + 1 +) diff --git a/supernode/daemon/mgr/cdn/cache_detector.go b/supernode/daemon/mgr/cdn/cache_detector.go new file mode 100644 index 000000000..e4623e9e4 --- /dev/null +++ b/supernode/daemon/mgr/cdn/cache_detector.go @@ -0,0 +1,24 @@ +package cdn + +import ( + "context" + "hash" + + "github.com/dragonflyoss/Dragonfly/apis/types" +) + +type cacheResult struct { + startPieceNum int + pieceMd5s []string + fileMD5 hash.Hash +} + +// detectCache detects whether there is a corresponding file in the local. +// If any, check whether the entire file has been completely downloaded. +// +// If so, return the md5 of task file and return startPieceNum as -1. +// And if not, return the lastest piece num that has been downloaded. +func (cm *Manager) detectCache(ctx context.Context, task *types.TaskInfo) (*cacheResult, error) { + // TODO: implement it. + return nil, nil +} diff --git a/supernode/daemon/mgr/cdn/downloader.go b/supernode/daemon/mgr/cdn/downloader.go new file mode 100644 index 000000000..af139c1f8 --- /dev/null +++ b/supernode/daemon/mgr/cdn/downloader.go @@ -0,0 +1,15 @@ +package cdn + +import ( + "context" + "net/http" + + "github.com/sirupsen/logrus" +) + +// download the file from the original address and +// set the "Range" header to the undownloaded file range. +func (cm *Manager) download(ctx context.Context, taskID, url string, headers map[string]string, startPieceNum int, httpFileLength int64, pieceContSize int32) (*http.Response, error) { + logrus.Infof("start to download for taskId:%s, fileUrl:%s", taskID, url) + return nil, nil +} diff --git a/supernode/daemon/mgr/cdn/manager.go b/supernode/daemon/mgr/cdn/manager.go index e67a20bbb..7fa006f1b 100644 --- a/supernode/daemon/mgr/cdn/manager.go +++ b/supernode/daemon/mgr/cdn/manager.go @@ -4,22 +4,61 @@ import ( "context" "github.com/dragonflyoss/Dragonfly/apis/types" + cutil "github.com/dragonflyoss/Dragonfly/common/util" + "github.com/dragonflyoss/Dragonfly/supernode/config" "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr" + "github.com/dragonflyoss/Dragonfly/supernode/store" + + "github.com/sirupsen/logrus" ) var _ mgr.CDNMgr = &Manager{} // Manager is an implementation of the interface of CDNMgr. -type Manager struct{} +type Manager struct { + cfg *config.Config + cacheStore *store.Store +} // NewManager returns a new Manager. -func NewManager() (*Manager, error) { - return &Manager{}, nil +func NewManager(cfg *config.Config, cacheStore *store.Store) (*Manager, error) { + return &Manager{ + cfg: cfg, + cacheStore: cacheStore, + }, nil } // TriggerCDN will trigger CDN to download the file from sourceUrl. func (cm *Manager) TriggerCDN(ctx context.Context, taskInfo *types.TaskInfo) error { - return nil + httpFileLength := taskInfo.HTTPFileLength + if httpFileLength == 0 { + httpFileLength = -1 + } + + // detect Cache + cacheResult, err := cm.detectCache(ctx, taskInfo) + if err != nil { + return err + } + if cacheResult.startPieceNum == -1 { + logrus.Infof("cache full hit for taskId:%s on local", taskInfo.ID) + return nil + } + + // get piece content size which not including the piece header and trailer + pieceContSize := taskInfo.PieceSize - config.PieceWrapSize + + // start to download the source file + resp, err := cm.download(ctx, taskInfo.ID, taskInfo.TaskURL, taskInfo.Headers, cacheResult.startPieceNum, httpFileLength, pieceContSize) + if err != nil { + return err + } + defer resp.Body.Close() + + // TODO: update the LastModified And ETag for taskID. + reader := cutil.NewLimitReader(resp.Body, cm.cfg.LinkLimit, true) + + return cm.startWriter(ctx, cm.cfg, reader, taskInfo, cacheResult.startPieceNum, httpFileLength, pieceContSize) } // GetStatus get the status of the file. diff --git a/supernode/daemon/mgr/cdn/super_writer.go b/supernode/daemon/mgr/cdn/super_writer.go new file mode 100644 index 000000000..ab74af469 --- /dev/null +++ b/supernode/daemon/mgr/cdn/super_writer.go @@ -0,0 +1,15 @@ +package cdn + +import ( + "context" + + "github.com/dragonflyoss/Dragonfly/apis/types" + cutil "github.com/dragonflyoss/Dragonfly/common/util" + "github.com/dragonflyoss/Dragonfly/supernode/config" +) + +// startWriter writes the stream data from the reader to the underlying storage. +func (cm *Manager) startWriter(ctx context.Context, cfg *config.Config, reader *cutil.LimitReader, + task *types.TaskInfo, startPieceNum int, httpFileLength int64, pieceContSize int32) error { + return nil +} diff --git a/supernode/daemon/mgr/cdn_mgr.go b/supernode/daemon/mgr/cdn_mgr.go index 6cc6ac977..eff9d4e56 100644 --- a/supernode/daemon/mgr/cdn_mgr.go +++ b/supernode/daemon/mgr/cdn_mgr.go @@ -22,26 +22,3 @@ type CDNMgr interface { // Delete the file from disk with specified taskID. Delete(ctx context.Context, taskID string) error } - -// CDNManager is an implementation of the interface of CDNMgr. -type CDNManager struct{} - -// NewCDNManager returns a new CDNManager. -func NewCDNManager() (*CDNManager, error) { - return &CDNManager{}, nil -} - -// TriggerCDN will trigger CDN to download the file from sourceUrl. -func (cm *CDNManager) TriggerCDN(ctx context.Context, taskInfo *types.TaskInfo) error { - return nil -} - -// GetStatus get the status of the file. -func (cm *CDNManager) GetStatus(ctx context.Context, taskID string) (cdnStatus string, err error) { - return "", nil -} - -// Delete the file from disk with specified taskID. -func (cm *CDNManager) Delete(ctx context.Context, taskID string) error { - return nil -}