From 5126ec7216a726d650fb11cb60c4ef128e52c510 Mon Sep 17 00:00:00 2001 From: Starnop Date: Tue, 11 Feb 2020 09:44:20 +0800 Subject: [PATCH] feature: support source cdn pattern Signed-off-by: Starnop --- apis/swagger.yml | 7 ++ apis/types/cdn_source.go | 63 +++++++++++ apis/types/task_create_response.go | 29 +++++ cmd/supernode/app/root.go | 7 ++ cmd/supernode/main.go | 2 + dfget/core/api/download_api.go | 15 ++- .../p2p_downloader/p2p_downloader.go | 7 ++ .../downloader/p2p_downloader/power_client.go | 20 ++-- dfget/core/regist/register.go | 6 +- dfget/core/regist/register_test.go | 3 +- dfget/types/register_response.go | 1 + supernode/config/config.go | 11 ++ supernode/daemon/mgr/cdn/downloader_test.go | 2 +- supernode/daemon/mgr/cdn/manager.go | 15 ++- supernode/daemon/mgr/cdn_mgr.go | 32 +++++- supernode/daemon/mgr/sourcecdn/manager.go | 102 ++++++++++++++++++ supernode/daemon/mgr/task/manager.go | 5 + supernode/daemon/mgr/task/manager_util.go | 17 ++- supernode/server/0.3_bridge.go | 2 + supernode/server/server.go | 3 +- 20 files changed, 329 insertions(+), 20 deletions(-) create mode 100644 apis/types/cdn_source.go create mode 100644 supernode/daemon/mgr/sourcecdn/manager.go diff --git a/apis/swagger.yml b/apis/swagger.yml index 73b1c33fc..193f7f7ff 100644 --- a/apis/swagger.yml +++ b/apis/swagger.yml @@ -973,6 +973,13 @@ definitions: 1. If file's total size is less than 200MB, then the piece size is 4MB by default. 2. Otherwise, it equals to the smaller value between totalSize/100MB + 2 MB and 15MB. format: int32 + cdnSource: + $ref: "#/definitions/CdnSource" + + CdnSource: + type: string + description: "" + enum: ["supernode", "source"] TaskInfo: type: "object" diff --git a/apis/types/cdn_source.go b/apis/types/cdn_source.go new file mode 100644 index 000000000..57cbc3bac --- /dev/null +++ b/apis/types/cdn_source.go @@ -0,0 +1,63 @@ +// Code generated by go-swagger; DO NOT EDIT. + +package types + +// This file was generated by the swagger tool. +// Editing this file might prove futile when you re-run the swagger generate command + +import ( + "encoding/json" + + strfmt "github.com/go-openapi/strfmt" + + "github.com/go-openapi/errors" + "github.com/go-openapi/validate" +) + +// CdnSource cdn source +// swagger:model CdnSource +type CdnSource string + +const ( + + // CdnSourceSupernode captures enum value "supernode" + CdnSourceSupernode CdnSource = "supernode" + + // CdnSourceSource captures enum value "source" + CdnSourceSource CdnSource = "source" +) + +// for schema +var cdnSourceEnum []interface{} + +func init() { + var res []CdnSource + if err := json.Unmarshal([]byte(`["supernode","source"]`), &res); err != nil { + panic(err) + } + for _, v := range res { + cdnSourceEnum = append(cdnSourceEnum, v) + } +} + +func (m CdnSource) validateCdnSourceEnum(path, location string, value CdnSource) error { + if err := validate.Enum(path, location, value, cdnSourceEnum); err != nil { + return err + } + return nil +} + +// Validate validates this cdn source +func (m CdnSource) Validate(formats strfmt.Registry) error { + var res []error + + // value enum + if err := m.validateCdnSourceEnum("", "body", m); err != nil { + return err + } + + if len(res) > 0 { + return errors.CompositeValidationError(res...) + } + return nil +} diff --git a/apis/types/task_create_response.go b/apis/types/task_create_response.go index 2c8ebbb1c..528732bb8 100644 --- a/apis/types/task_create_response.go +++ b/apis/types/task_create_response.go @@ -8,6 +8,7 @@ package types import ( strfmt "github.com/go-openapi/strfmt" + "github.com/go-openapi/errors" "github.com/go-openapi/swag" ) @@ -18,6 +19,9 @@ type TaskCreateResponse struct { // ID of the created task. ID string `json:"ID,omitempty"` + // cdn source + CdnSource CdnSource `json:"cdnSource,omitempty"` + // The length of the file dfget requests to download in bytes. // FileLength int64 `json:"fileLength,omitempty"` @@ -31,6 +35,31 @@ type TaskCreateResponse struct { // Validate validates this task create response func (m *TaskCreateResponse) Validate(formats strfmt.Registry) error { + var res []error + + if err := m.validateCdnSource(formats); err != nil { + res = append(res, err) + } + + if len(res) > 0 { + return errors.CompositeValidationError(res...) + } + return nil +} + +func (m *TaskCreateResponse) validateCdnSource(formats strfmt.Registry) error { + + if swag.IsZero(m.CdnSource) { // not required + return nil + } + + if err := m.CdnSource.Validate(formats); err != nil { + if ve, ok := err.(*errors.Validation); ok { + return ve.ValidateName("cdnSource") + } + return err + } + return nil } diff --git a/cmd/supernode/app/root.go b/cmd/supernode/app/root.go index 0687b8857..8c034489a 100644 --- a/cmd/supernode/app/root.go +++ b/cmd/supernode/app/root.go @@ -145,6 +145,9 @@ func setupFlags(cmd *cobra.Command) { flagSet.String("config", config.DefaultSupernodeConfigFilePath, "the path of supernode's configuration file") + flagSet.String("cdn-pattern", config.CDNPatternLocal, + "cdn pattern, must be in [\"local\", \"source\"]. Default: local") + flagSet.Int("port", defaultBaseProperties.ListenPort, "listenPort is the port that supernode server listens on") @@ -206,6 +209,10 @@ func bindRootFlags(v *viper.Viper) error { key: "config", flag: "config", }, + { + key: "base.CDNPattern", + flag: "cdn-pattern", + }, { key: "base.listenPort", flag: "port", diff --git a/cmd/supernode/main.go b/cmd/supernode/main.go index 806e3567d..81023ab1b 100644 --- a/cmd/supernode/main.go +++ b/cmd/supernode/main.go @@ -18,6 +18,8 @@ package main import ( "github.com/dragonflyoss/Dragonfly/cmd/supernode/app" + _ "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/cdn" + _ "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/sourcecdn" ) func main() { diff --git a/dfget/core/api/download_api.go b/dfget/core/api/download_api.go index 0bb287d99..90ae86469 100644 --- a/dfget/core/api/download_api.go +++ b/dfget/core/api/download_api.go @@ -20,6 +20,7 @@ import ( "fmt" "net/http" "strconv" + "strings" "time" "github.com/dragonflyoss/Dragonfly/dfget/config" @@ -34,6 +35,7 @@ type DownloadRequest struct { PieceRange string PieceNum int PieceSize int32 + Headers map[string]string } // DownloadAPI defines the download method between dfget and peer server. @@ -59,7 +61,18 @@ func (d *downloadAPI) Download(ip string, port int, req *DownloadRequest, timeou headers[config.StrPieceNum] = strconv.Itoa(req.PieceNum) headers[config.StrPieceSize] = fmt.Sprint(req.PieceSize) headers[config.StrUserAgent] = "dfget/" + version.DFGetVersion + if req.Headers != nil { + for k, v := range req.Headers { + headers[k] = v + } + } + + var url string + if strings.Contains(req.Path, "://") { + url = req.Path + } else { + url = fmt.Sprintf("http://%s:%d%s", ip, port, req.Path) + } - url := fmt.Sprintf("http://%s:%d%s", ip, port, req.Path) return httputils.HTTPGetTimeout(url, headers, timeout) } diff --git a/dfget/core/downloader/p2p_downloader/p2p_downloader.go b/dfget/core/downloader/p2p_downloader/p2p_downloader.go index e40e39ca0..dc7569581 100644 --- a/dfget/core/downloader/p2p_downloader/p2p_downloader.go +++ b/dfget/core/downloader/p2p_downloader/p2p_downloader.go @@ -25,6 +25,7 @@ import ( "strconv" "time" + apiTypes "github.com/dragonflyoss/Dragonfly/apis/types" "github.com/dragonflyoss/Dragonfly/dfget/config" "github.com/dragonflyoss/Dragonfly/dfget/core/api" "github.com/dragonflyoss/Dragonfly/dfget/core/downloader" @@ -69,6 +70,8 @@ type P2PDownloader struct { targetFile string // taskFileName is a string composed of `the last element of RealTarget path + "-" + sign`. taskFileName string + // headers is the extra HTTP headers when downloading the task. + headers []string pieceSizeHistory [2]int32 // queue maintains a queue of tasks that to be downloaded. @@ -130,6 +133,9 @@ func (p2p *P2PDownloader) init() { p2p.taskID = p2p.RegisterResult.TaskID p2p.targetFile = p2p.cfg.RV.RealTarget p2p.taskFileName = p2p.cfg.RV.TaskFileName + if p2p.RegisterResult.CDNSource == string(apiTypes.CdnSourceSource) { + p2p.headers = p2p.cfg.Header + } p2p.pieceSizeHistory[0], p2p.pieceSizeHistory[1] = p2p.RegisterResult.PieceSize, p2p.RegisterResult.PieceSize @@ -328,6 +334,7 @@ func (p2p *P2PDownloader) startTask(data *types.PullPieceTaskResponseContinueDat clientQueue: p2p.clientQueue, rateLimiter: p2p.rateLimiter, downloadAPI: api.NewDownloadAPI(), + headers: p2p.headers, } if err := powerClient.Run(); err != nil && powerClient.ClientError() != nil { p2p.API.ReportClientError(p2p.node, powerClient.ClientError()) diff --git a/dfget/core/downloader/p2p_downloader/power_client.go b/dfget/core/downloader/p2p_downloader/power_client.go index 3ffffd0fe..d70dcc7a1 100644 --- a/dfget/core/downloader/p2p_downloader/power_client.go +++ b/dfget/core/downloader/p2p_downloader/power_client.go @@ -48,6 +48,8 @@ const ( type PowerClient struct { // taskID is a string which represents a unique task. taskID string + // headers is the extra HTTP headers when downloading a piece. + headers []string // node indicates the IP address of the currently registered supernode. node string // pieceTask is the data when successfully pulling piece task @@ -108,13 +110,11 @@ func (pc *PowerClient) ClientError() *types.ClientErrorRequest { } func (pc *PowerClient) downloadPiece() (content *bytes.Buffer, e error) { - pieceMetaArr := strings.Split(pc.pieceTask.PieceMd5, ":") - pieceMD5 := pieceMetaArr[0] dstIP := pc.pieceTask.PeerIP peerPort := pc.pieceTask.PeerPort // check that the target download peer is available - if dstIP != pc.node { + if dstIP != "" && dstIP != pc.node { if _, e = httputils.CheckConnect(dstIP, peerPort, -1); e != nil { return nil, e } @@ -139,6 +139,9 @@ func (pc *PowerClient) downloadPiece() (content *bytes.Buffer, e error) { return nil, errortypes.New(resp.StatusCode, pc.readBody(resp.Body)) } + pieceMetaArr := strings.Split(pc.pieceTask.PieceMd5, ":") + pieceMD5 := pieceMetaArr[0] + // start to read data from resp // use limitReader to limit the download speed limitReader := limitreader.NewLimitReaderWithLimiter(pc.rateLimiter, resp.Body, pieceMD5 != "") @@ -149,10 +152,12 @@ func (pc *PowerClient) downloadPiece() (content *bytes.Buffer, e error) { pc.readCost = time.Since(startTime) // Verify md5 code - if realMd5 := limitReader.Md5(); realMd5 != pieceMD5 { - pc.initFileMd5NotMatchError(dstIP, realMd5, pieceMD5) - return nil, fmt.Errorf("piece range:%s md5 not match, expected:%s real:%s", - pc.pieceTask.Range, pieceMD5, realMd5) + if pieceMD5 != "" { + if realMd5 := limitReader.Md5(); realMd5 != pieceMD5 { + pc.initFileMd5NotMatchError(dstIP, realMd5, pieceMD5) + return nil, fmt.Errorf("piece range:%s md5 not match, expected:%s real:%s", + pc.pieceTask.Range, pieceMD5, realMd5) + } } if timeDuring := time.Since(startTime); timeDuring > downloadPieceTimeout { @@ -168,6 +173,7 @@ func (pc *PowerClient) createDownloadRequest() *api.DownloadRequest { PieceRange: pc.pieceTask.Range, PieceNum: pc.pieceTask.PieceNum, PieceSize: pc.pieceTask.PieceSize, + Headers: netutils.ConvertHeaders(pc.headers), } } diff --git a/dfget/core/regist/register.go b/dfget/core/regist/register.go index b5d462c12..970a43180 100644 --- a/dfget/core/regist/register.go +++ b/dfget/core/regist/register.go @@ -99,7 +99,7 @@ func (s *supernodeRegister) Register(peerPort int) (*RegisterResult, *errortypes } result := NewRegisterResult(nodes[i], s.cfg.Nodes, s.cfg.URL, - resp.Data.TaskID, resp.Data.FileLength, resp.Data.PieceSize) + resp.Data.TaskID, resp.Data.FileLength, resp.Data.PieceSize, resp.Data.CDNSource) logrus.Infof("do register result:%s and cost:%.3fs", resp, time.Since(start).Seconds()) @@ -188,7 +188,7 @@ func getTaskPath(taskFileName string) string { // NewRegisterResult creates an instance of RegisterResult. func NewRegisterResult(node string, remainder []string, url string, - taskID string, fileLen int64, pieceSize int32) *RegisterResult { + taskID string, fileLen int64, pieceSize int32, cdnSource string) *RegisterResult { return &RegisterResult{ Node: node, RemainderNodes: remainder, @@ -196,6 +196,7 @@ func NewRegisterResult(node string, remainder []string, url string, TaskID: taskID, FileLength: fileLen, PieceSize: pieceSize, + CDNSource: cdnSource, } } @@ -207,6 +208,7 @@ type RegisterResult struct { TaskID string FileLength int64 PieceSize int32 + CDNSource string } func (r *RegisterResult) String() string { diff --git a/dfget/core/regist/register_test.go b/dfget/core/regist/register_test.go index 6196ee26d..611a3a44d 100644 --- a/dfget/core/regist/register_test.go +++ b/dfget/core/regist/register_test.go @@ -58,13 +58,14 @@ func (s *RegistTestSuite) TearDownSuite(c *check.C) { func (s *RegistTestSuite) TestNewRegisterResult(c *check.C) { result := NewRegisterResult("node", []string{"1"}, "url", "taskID", - 10, 1) + 10, 1, "supernode") c.Assert(result.Node, check.Equals, "node") c.Assert(result.RemainderNodes, check.DeepEquals, []string{"1"}) c.Assert(result.URL, check.Equals, "url") c.Assert(result.TaskID, check.Equals, "taskID") c.Assert(result.FileLength, check.Equals, int64(10)) c.Assert(result.PieceSize, check.Equals, int32(1)) + c.Assert(result.CDNSource, check.Equals, "supernode") str, _ := json.Marshal(result) c.Assert(result.String(), check.Equals, string(str)) diff --git a/dfget/types/register_response.go b/dfget/types/register_response.go index 1c59fa801..ff24b358b 100644 --- a/dfget/types/register_response.go +++ b/dfget/types/register_response.go @@ -38,4 +38,5 @@ type RegisterResponseData struct { TaskID string `json:"taskId"` FileLength int64 `json:"fileLength"` PieceSize int32 `json:"pieceSize"` + CDNSource string `json:"cdnSource"` } diff --git a/supernode/config/config.go b/supernode/config/config.go index 290daf237..cad228e85 100644 --- a/supernode/config/config.go +++ b/supernode/config/config.go @@ -117,8 +117,19 @@ func NewBaseProperties() *BaseProperties { } } +type CDNPattern string + +const ( + CDNPatternLocal = "local" + CDNPatternSource = "source" +) + // BaseProperties contains all basic properties of supernode. type BaseProperties struct { + // CDNPattern cdn pattern which must be in ["local", "source"]. + // default: CDNPatternLocal + CDNPattern CDNPattern `yaml:"cdnPattern"` + // ListenPort is the port supernode server listens on. // default: 8002 ListenPort int `yaml:"listenPort"` diff --git a/supernode/daemon/mgr/cdn/downloader_test.go b/supernode/daemon/mgr/cdn/downloader_test.go index 61ad3308a..36deb814d 100644 --- a/supernode/daemon/mgr/cdn/downloader_test.go +++ b/supernode/daemon/mgr/cdn/downloader_test.go @@ -47,7 +47,7 @@ func init() { } func (s *CDNDownloadTestSuite) TestDownload(c *check.C) { - cm, _ := NewManager(config.NewConfig(), nil, nil, httpclient.NewOriginClient(), prometheus.DefaultRegisterer) + cm, _ := newManager(config.NewConfig(), nil, nil, httpclient.NewOriginClient(), prometheus.DefaultRegisterer) bytes := []byte("hello world") bytesLength := int64(len(bytes)) diff --git a/supernode/daemon/mgr/cdn/manager.go b/supernode/daemon/mgr/cdn/manager.go index 427e272b5..970a5c3bf 100644 --- a/supernode/daemon/mgr/cdn/manager.go +++ b/supernode/daemon/mgr/cdn/manager.go @@ -67,6 +67,10 @@ func newMetrics(register prometheus.Registerer) *metrics { } } +func init() { + mgr.Register(config.CDNPatternLocal, NewManager) +} + // Manager is an implementation of the interface of CDNMgr. type Manager struct { cfg *config.Config @@ -86,6 +90,11 @@ type Manager struct { // NewManager returns a new Manager. func NewManager(cfg *config.Config, cacheStore *store.Store, progressManager mgr.ProgressMgr, + originClient httpclient.OriginHTTPClient, register prometheus.Registerer) (mgr.CDNMgr, error) { + return newManager(cfg, cacheStore, progressManager, originClient, register) +} + +func newManager(cfg *config.Config, cacheStore *store.Store, progressManager mgr.ProgressMgr, originClient httpclient.OriginHTTPClient, register prometheus.Registerer) (*Manager, error) { rateLimiter := ratelimiter.NewRateLimiter(ratelimiter.TransRate(int64(cfg.MaxBandwidth-cfg.SystemReservedBandwidth)), 2) metaDataManager := newFileMetaDataManager(cacheStore) @@ -167,14 +176,14 @@ func (cm *Manager) TriggerCDN(ctx context.Context, task *types.TaskInfo) (*types // GetHTTPPath returns the http download path of taskID. // The returned path joined the DownloadRaw.Bucket and DownloadRaw.Key. -func (cm *Manager) GetHTTPPath(ctx context.Context, taskID string) (string, error) { - raw := getDownloadRawFunc(taskID) +func (cm *Manager) GetHTTPPath(ctx context.Context, taskInfo *types.TaskInfo) (string, error) { + raw := getDownloadRawFunc(taskInfo.ID) return path.Join("/", raw.Bucket, raw.Key), nil } // GetStatus gets the status of the file. func (cm *Manager) GetStatus(ctx context.Context, taskID string) (cdnStatus string, err error) { - return "", nil + return types.TaskInfoCdnStatusSUCCESS, nil } // GetPieceMD5 gets the piece Md5 accorrding to the specified taskID and pieceNum. diff --git a/supernode/daemon/mgr/cdn_mgr.go b/supernode/daemon/mgr/cdn_mgr.go index 5ec304ca5..8d6dc7b72 100644 --- a/supernode/daemon/mgr/cdn_mgr.go +++ b/supernode/daemon/mgr/cdn_mgr.go @@ -18,10 +18,40 @@ package mgr import ( "context" + "fmt" "github.com/dragonflyoss/Dragonfly/apis/types" + "github.com/dragonflyoss/Dragonfly/supernode/config" + "github.com/dragonflyoss/Dragonfly/supernode/httpclient" + "github.com/dragonflyoss/Dragonfly/supernode/store" + + "github.com/prometheus/client_golang/prometheus" ) +type CDNBuilder func(cfg *config.Config, cacheStore *store.Store, progressManager ProgressMgr, + originClient httpclient.OriginHTTPClient, register prometheus.Registerer) (CDNMgr, error) + +var cdnBuilderMap = make(map[config.CDNPattern]CDNBuilder) + +func Register(name config.CDNPattern, builder CDNBuilder) { + cdnBuilderMap[name] = builder +} + +func GetCDNManager(cfg *config.Config, cacheStore *store.Store, progressManager ProgressMgr, + originClient httpclient.OriginHTTPClient, register prometheus.Registerer) (CDNMgr, error) { + name := cfg.CDNPattern + if name == "" { + name = config.CDNPatternLocal + } + + cdnBuilder, ok := cdnBuilderMap[name] + if !ok { + return nil, fmt.Errorf("unexpected cdn pattern(%s) which must be in [\"local\", \"source\"]", name) + } + + return cdnBuilder(cfg, cacheStore, progressManager, originClient, register) +} + // CDNMgr as an interface defines all operations against CDN and // operates on the underlying files stored on the local disk, etc. type CDNMgr interface { @@ -36,7 +66,7 @@ type CDNMgr interface { TriggerCDN(ctx context.Context, taskInfo *types.TaskInfo) (*types.TaskInfo, error) // GetHTTPPath returns the http download path of taskID. - GetHTTPPath(ctx context.Context, taskID string) (path string, err error) + GetHTTPPath(ctx context.Context, taskInfo *types.TaskInfo) (path string, err error) // GetStatus gets the status of the file. GetStatus(ctx context.Context, taskID string) (cdnStatus string, err error) diff --git a/supernode/daemon/mgr/sourcecdn/manager.go b/supernode/daemon/mgr/sourcecdn/manager.go new file mode 100644 index 000000000..51fdbd98e --- /dev/null +++ b/supernode/daemon/mgr/sourcecdn/manager.go @@ -0,0 +1,102 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cdn + +import ( + "context" + + "github.com/dragonflyoss/Dragonfly/apis/types" + "github.com/dragonflyoss/Dragonfly/supernode/config" + "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr" + "github.com/dragonflyoss/Dragonfly/supernode/httpclient" + "github.com/dragonflyoss/Dragonfly/supernode/store" + "github.com/prometheus/client_golang/prometheus" +) + +var _ mgr.CDNMgr = &Manager{} + +func init() { + mgr.Register(config.CDNPatternSource, NewManager) +} + +// Manager is an implementation of the interface of CDNMgr which use source as CDN node. +type Manager struct { + cfg *config.Config + progressManager mgr.ProgressMgr +} + +// NewManager returns a new Manager. +func NewManager(cfg *config.Config, cacheStore *store.Store, progressManager mgr.ProgressMgr, + originClient httpclient.OriginHTTPClient, register prometheus.Registerer) (mgr.CDNMgr, error) { + return &Manager{ + cfg: cfg, + progressManager: progressManager, + }, nil +} + +// TriggerCDN will trigger CDN to download the file from sourceUrl. +func (cm *Manager) TriggerCDN(ctx context.Context, task *types.TaskInfo) (*types.TaskInfo, error) { + httpFileLength := task.HTTPFileLength + if httpFileLength == 0 { + httpFileLength = -1 + } + + if httpFileLength > 0 { + pieceTotal := int((httpFileLength + int64(task.PieceSize-1)) / int64(task.PieceSize)) + supernodeCID := cm.cfg.GetSuperCID(task.ID) + supernodePID := cm.cfg.GetSuperPID() + + var pieceNum int + for pieceNum = 0; pieceNum < pieceTotal; pieceNum++ { + cm.progressManager.UpdateProgress(ctx, task.ID, supernodeCID, supernodePID, "", pieceNum, config.PieceSUCCESS) + } + } + + return &types.TaskInfo{ + CdnStatus: types.TaskInfoCdnStatusSUCCESS, + }, nil +} + +// GetHTTPPath returns the http download path of taskID. +func (cm *Manager) GetHTTPPath(ctx context.Context, taskInfo *types.TaskInfo) (string, error) { + return taskInfo.RawURL, nil +} + +// GetStatus gets the status of the file. +func (cm *Manager) GetStatus(ctx context.Context, taskID string) (cdnStatus string, err error) { + return "", nil +} + +// GetPieceMD5 gets the piece Md5 accorrding to the specified taskID and pieceNum. +func (cm *Manager) GetPieceMD5(ctx context.Context, taskID string, pieceNum int, pieceRange, source string) (pieceMd5 string, err error) { + return "", nil +} + +// CheckFile checks the file whether exists. +func (cm *Manager) CheckFile(ctx context.Context, taskID string) bool { + return true +} + +// Delete the cdn meta with specified taskID. +// It will also delete the files on the disk when the force equals true. +func (cm *Manager) Delete(ctx context.Context, taskID string, force bool) error { + return nil +} + +func (cm *Manager) GetGCTaskIDs(ctx context.Context, taskMgr mgr.TaskMgr) ([]string, error) { + return nil, nil +} diff --git a/supernode/daemon/mgr/task/manager.go b/supernode/daemon/mgr/task/manager.go index 921dcdb81..9fe065684 100644 --- a/supernode/daemon/mgr/task/manager.go +++ b/supernode/daemon/mgr/task/manager.go @@ -163,10 +163,15 @@ func (tm *Manager) Register(ctx context.Context, req *types.TaskCreateRequest) ( return nil, errors.Wrapf(errortypes.ErrSystemError, "failed to trigger cdn: %v", err) } + cdnSource := types.CdnSourceSupernode + if tm.cfg.CDNPattern == config.CDNPatternSource { + cdnSource = types.CdnSourceSource + } return &types.TaskCreateResponse{ ID: task.ID, FileLength: task.HTTPFileLength, PieceSize: task.PieceSize, + CdnSource: cdnSource, }, nil } diff --git a/supernode/daemon/mgr/task/manager_util.go b/supernode/daemon/mgr/task/manager_util.go index b6c65cc3b..41fe3ab35 100644 --- a/supernode/daemon/mgr/task/manager_util.go +++ b/supernode/daemon/mgr/task/manager_util.go @@ -95,6 +95,19 @@ func (tm *Manager) addOrUpdateTask(ctx context.Context, req *types.TaskCreateReq return nil, err } } + if tm.cfg.CDNPattern == config.CDNPatternSource { + if fileLength <= 0 { + return nil, fmt.Errorf("failed to get file length and it is required in source CDN pattern") + } + + supportRange, err := tm.originClient.IsSupportRange(task.TaskURL, task.Headers) + if err != nil { + return nil, errors.Wrapf(err, "failed to check whether the task(%s) supports partial requests", task.ID) + } + if !supportRange { + return nil, fmt.Errorf("the task URL should support range request in source CDN pattern: %s", taskID) + } + } task.HTTPFileLength = fileLength logrus.Infof("get file length %d from http client for taskID(%s)", fileLength, taskID) @@ -246,7 +259,7 @@ func (tm *Manager) triggerCdnSyncAction(ctx context.Context, task *types.TaskInf func (tm *Manager) initCdnNode(ctx context.Context, task *types.TaskInfo) error { var cid = tm.cfg.GetSuperCID(task.ID) var pid = tm.cfg.GetSuperPID() - path, err := tm.cdnMgr.GetHTTPPath(ctx, task.ID) + path, err := tm.cdnMgr.GetHTTPPath(ctx, task) if err != nil { return err } @@ -321,7 +334,7 @@ func (tm *Manager) parseAvailablePeers(ctx context.Context, clientID string, tas cdnSuccess := task.CdnStatus == types.TaskInfoCdnStatusSUCCESS pieceSuccess, _ := tm.progressMgr.GetPieceProgressByCID(ctx, task.ID, clientID, "success") logrus.Debugf("taskID(%s) clientID(%s) get successful pieces: %v", task.ID, clientID, pieceSuccess) - if cdnSuccess && (int32(len(pieceSuccess)) == task.PieceTotal) { + if cdnSuccess && (task.PieceTotal != 0 && (int32(len(pieceSuccess)) == task.PieceTotal)) { // update dfget task status to success if err := tm.dfgetTaskMgr.UpdateStatus(ctx, clientID, task.ID, types.DfGetTaskStatusSUCCESS); err != nil { logrus.Errorf("failed to update dfget task status with "+ diff --git a/supernode/server/0.3_bridge.go b/supernode/server/0.3_bridge.go index 28e225548..958c02d30 100644 --- a/supernode/server/0.3_bridge.go +++ b/supernode/server/0.3_bridge.go @@ -39,6 +39,7 @@ type RegisterResponseData struct { TaskID string `json:"taskId"` FileLength int64 `json:"fileLength"` PieceSize int32 `json:"pieceSize"` + CDNSource string `json:"cdnSource"` } // PullPieceTaskResponseContinueData is the data when successfully pulling piece task @@ -135,6 +136,7 @@ func (s *Server) registry(ctx context.Context, rw http.ResponseWriter, req *http TaskID: resp.ID, FileLength: resp.FileLength, PieceSize: resp.PieceSize, + CDNSource: string(resp.CdnSource), }, }) } diff --git a/supernode/server/server.go b/supernode/server/server.go index 5db89104a..ac626a2e8 100644 --- a/supernode/server/server.go +++ b/supernode/server/server.go @@ -25,7 +25,6 @@ import ( "github.com/dragonflyoss/Dragonfly/supernode/config" "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr" - "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/cdn" "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/dfgettask" "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/gc" "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/peer" @@ -94,7 +93,7 @@ func New(cfg *config.Config, logger *logrus.Logger, register prometheus.Register return nil, err } - cdnMgr, err := cdn.NewManager(cfg, storeLocal, progressMgr, originClient, register) + cdnMgr, err := mgr.GetCDNManager(cfg, storeLocal, progressMgr, originClient, register) if err != nil { return nil, err }