Skip to content
This repository has been archived by the owner on Dec 20, 2024. It is now read-only.

Commit

Permalink
feature: support source cdn pattern
Browse files Browse the repository at this point in the history
Signed-off-by: Starnop <[email protected]>
  • Loading branch information
starnop committed Feb 11, 2020
1 parent aa99056 commit e4f8492
Show file tree
Hide file tree
Showing 18 changed files with 317 additions and 19 deletions.
7 changes: 7 additions & 0 deletions apis/swagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -973,6 +973,13 @@ definitions:
1. If file's total size is less than 200MB, then the piece size is 4MB by default.
2. Otherwise, it equals to the smaller value between totalSize/100MB + 2 MB and 15MB.
format: int32
cdnSource:
$ref: "#/definitions/CdnSource"

CdnSource:
type: string
description: ""
enum: ["supernode", "source"]

TaskInfo:
type: "object"
Expand Down
63 changes: 63 additions & 0 deletions apis/types/cdn_source.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 29 additions & 0 deletions apis/types/task_create_response.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions cmd/supernode/app/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ func setupFlags(cmd *cobra.Command) {
flagSet.String("config", config.DefaultSupernodeConfigFilePath,
"the path of supernode's configuration file")

flagSet.String("cdn-pattern", config.CDNPatternLocal,
"cdn pattern, must be in [\"local\", \"source\"]. Default: local")

flagSet.Int("port", defaultBaseProperties.ListenPort,
"listenPort is the port that supernode server listens on")

Expand Down Expand Up @@ -206,6 +209,10 @@ func bindRootFlags(v *viper.Viper) error {
key: "config",
flag: "config",
},
{
key: "base.CDNPattern",
flag: "cdn-pattern",
},
{
key: "base.listenPort",
flag: "port",
Expand Down
15 changes: 14 additions & 1 deletion dfget/core/api/download_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"net/http"
"strconv"
"strings"
"time"

"github.com/dragonflyoss/Dragonfly/dfget/config"
Expand All @@ -34,6 +35,7 @@ type DownloadRequest struct {
PieceRange string
PieceNum int
PieceSize int32
Headers map[string]string
}

// DownloadAPI defines the download method between dfget and peer server.
Expand All @@ -59,7 +61,18 @@ func (d *downloadAPI) Download(ip string, port int, req *DownloadRequest, timeou
headers[config.StrPieceNum] = strconv.Itoa(req.PieceNum)
headers[config.StrPieceSize] = fmt.Sprint(req.PieceSize)
headers[config.StrUserAgent] = "dfget/" + version.DFGetVersion
if req.Headers != nil {
for k, v := range req.Headers {
headers[k] = v
}
}

var url string
if strings.Contains(req.Path, "://") {
url = req.Path
} else {
url = fmt.Sprintf("http://%s:%d%s", ip, port, req.Path)
}

url := fmt.Sprintf("http://%s:%d%s", ip, port, req.Path)
return httputils.HTTPGetTimeout(url, headers, timeout)
}
7 changes: 7 additions & 0 deletions dfget/core/downloader/p2p_downloader/p2p_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"strconv"
"time"

apiTypes "github.com/dragonflyoss/Dragonfly/apis/types"
"github.com/dragonflyoss/Dragonfly/dfget/config"
"github.com/dragonflyoss/Dragonfly/dfget/core/api"
"github.com/dragonflyoss/Dragonfly/dfget/core/downloader"
Expand Down Expand Up @@ -69,6 +70,8 @@ type P2PDownloader struct {
targetFile string
// taskFileName is a string composed of `the last element of RealTarget path + "-" + sign`.
taskFileName string
// headers is the extra HTTP headers when downloading the task.
headers []string

pieceSizeHistory [2]int32
// queue maintains a queue of tasks that to be downloaded.
Expand Down Expand Up @@ -130,6 +133,9 @@ func (p2p *P2PDownloader) init() {
p2p.taskID = p2p.RegisterResult.TaskID
p2p.targetFile = p2p.cfg.RV.RealTarget
p2p.taskFileName = p2p.cfg.RV.TaskFileName
if p2p.RegisterResult.CDNSource == string(apiTypes.CdnSourceSource) {
p2p.headers = p2p.cfg.Header
}

p2p.pieceSizeHistory[0], p2p.pieceSizeHistory[1] =
p2p.RegisterResult.PieceSize, p2p.RegisterResult.PieceSize
Expand Down Expand Up @@ -328,6 +334,7 @@ func (p2p *P2PDownloader) startTask(data *types.PullPieceTaskResponseContinueDat
clientQueue: p2p.clientQueue,
rateLimiter: p2p.rateLimiter,
downloadAPI: api.NewDownloadAPI(),
headers: p2p.headers,
}
if err := powerClient.Run(); err != nil && powerClient.ClientError() != nil {
p2p.API.ReportClientError(p2p.node, powerClient.ClientError())
Expand Down
20 changes: 13 additions & 7 deletions dfget/core/downloader/p2p_downloader/power_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ const (
type PowerClient struct {
// taskID is a string which represents a unique task.
taskID string
// headers is the extra HTTP headers when downloading a piece.
headers []string
// node indicates the IP address of the currently registered supernode.
node string
// pieceTask is the data when successfully pulling piece task
Expand Down Expand Up @@ -108,13 +110,11 @@ func (pc *PowerClient) ClientError() *types.ClientErrorRequest {
}

func (pc *PowerClient) downloadPiece() (content *bytes.Buffer, e error) {
pieceMetaArr := strings.Split(pc.pieceTask.PieceMd5, ":")
pieceMD5 := pieceMetaArr[0]
dstIP := pc.pieceTask.PeerIP
peerPort := pc.pieceTask.PeerPort

// check that the target download peer is available
if dstIP != pc.node {
if dstIP != "" && dstIP != pc.node {
if _, e = httputils.CheckConnect(dstIP, peerPort, -1); e != nil {
return nil, e
}
Expand All @@ -139,6 +139,9 @@ func (pc *PowerClient) downloadPiece() (content *bytes.Buffer, e error) {
return nil, errortypes.New(resp.StatusCode, pc.readBody(resp.Body))
}

pieceMetaArr := strings.Split(pc.pieceTask.PieceMd5, ":")
pieceMD5 := pieceMetaArr[0]

// start to read data from resp
// use limitReader to limit the download speed
limitReader := limitreader.NewLimitReaderWithLimiter(pc.rateLimiter, resp.Body, pieceMD5 != "")
Expand All @@ -149,10 +152,12 @@ func (pc *PowerClient) downloadPiece() (content *bytes.Buffer, e error) {
pc.readCost = time.Since(startTime)

// Verify md5 code
if realMd5 := limitReader.Md5(); realMd5 != pieceMD5 {
pc.initFileMd5NotMatchError(dstIP, realMd5, pieceMD5)
return nil, fmt.Errorf("piece range:%s md5 not match, expected:%s real:%s",
pc.pieceTask.Range, pieceMD5, realMd5)
if pieceMD5 != "" {
if realMd5 := limitReader.Md5(); realMd5 != pieceMD5 {
pc.initFileMd5NotMatchError(dstIP, realMd5, pieceMD5)
return nil, fmt.Errorf("piece range:%s md5 not match, expected:%s real:%s",
pc.pieceTask.Range, pieceMD5, realMd5)
}
}

if timeDuring := time.Since(startTime); timeDuring > downloadPieceTimeout {
Expand All @@ -168,6 +173,7 @@ func (pc *PowerClient) createDownloadRequest() *api.DownloadRequest {
PieceRange: pc.pieceTask.Range,
PieceNum: pc.pieceTask.PieceNum,
PieceSize: pc.pieceTask.PieceSize,
Headers: netutils.ConvertHeaders(pc.headers),
}
}

Expand Down
6 changes: 4 additions & 2 deletions dfget/core/regist/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (s *supernodeRegister) Register(peerPort int) (*RegisterResult, *errortypes
}

result := NewRegisterResult(nodes[i], s.cfg.Nodes, s.cfg.URL,
resp.Data.TaskID, resp.Data.FileLength, resp.Data.PieceSize)
resp.Data.TaskID, resp.Data.FileLength, resp.Data.PieceSize, resp.Data.CDNSource)

logrus.Infof("do register result:%s and cost:%.3fs", resp,
time.Since(start).Seconds())
Expand Down Expand Up @@ -188,14 +188,15 @@ func getTaskPath(taskFileName string) string {

// NewRegisterResult creates an instance of RegisterResult.
func NewRegisterResult(node string, remainder []string, url string,
taskID string, fileLen int64, pieceSize int32) *RegisterResult {
taskID string, fileLen int64, pieceSize int32, cdnSource string) *RegisterResult {
return &RegisterResult{
Node: node,
RemainderNodes: remainder,
URL: url,
TaskID: taskID,
FileLength: fileLen,
PieceSize: pieceSize,
CDNSource: cdnSource,
}
}

Expand All @@ -207,6 +208,7 @@ type RegisterResult struct {
TaskID string
FileLength int64
PieceSize int32
CDNSource string
}

func (r *RegisterResult) String() string {
Expand Down
1 change: 1 addition & 0 deletions dfget/types/register_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,5 @@ type RegisterResponseData struct {
TaskID string `json:"taskId"`
FileLength int64 `json:"fileLength"`
PieceSize int32 `json:"pieceSize"`
CDNSource string `json:"cdnSource"`
}
11 changes: 11 additions & 0 deletions supernode/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,19 @@ func NewBaseProperties() *BaseProperties {
}
}

type CDNPattern string

const (
CDNPatternLocal = "local"
CDNPatternSource = "source"
)

// BaseProperties contains all basic properties of supernode.
type BaseProperties struct {
// CDNPattern cdn pattern which must be in ["local", "source"].
// default: CDNPatternLocal
CDNPattern CDNPattern `yaml:cdnPattern`

// ListenPort is the port supernode server listens on.
// default: 8002
ListenPort int `yaml:"listenPort"`
Expand Down
2 changes: 1 addition & 1 deletion supernode/daemon/mgr/cdn/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func init() {
}

func (s *CDNDownloadTestSuite) TestDownload(c *check.C) {
cm, _ := NewManager(config.NewConfig(), nil, nil, httpclient.NewOriginClient(), prometheus.DefaultRegisterer)
cm, _ := newManager(config.NewConfig(), nil, nil, httpclient.NewOriginClient(), prometheus.DefaultRegisterer)
bytes := []byte("hello world")
bytesLength := int64(len(bytes))

Expand Down
15 changes: 12 additions & 3 deletions supernode/daemon/mgr/cdn/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func newMetrics(register prometheus.Registerer) *metrics {
}
}

func init() {
mgr.Register(config.CDNPatternLocal, NewManager)
}

// Manager is an implementation of the interface of CDNMgr.
type Manager struct {
cfg *config.Config
Expand All @@ -86,6 +90,11 @@ type Manager struct {

// NewManager returns a new Manager.
func NewManager(cfg *config.Config, cacheStore *store.Store, progressManager mgr.ProgressMgr,
originClient httpclient.OriginHTTPClient, register prometheus.Registerer) (mgr.CDNMgr, error) {
return newManager(cfg, cacheStore, progressManager, originClient, register)
}

func newManager(cfg *config.Config, cacheStore *store.Store, progressManager mgr.ProgressMgr,
originClient httpclient.OriginHTTPClient, register prometheus.Registerer) (*Manager, error) {
rateLimiter := ratelimiter.NewRateLimiter(ratelimiter.TransRate(int64(cfg.MaxBandwidth-cfg.SystemReservedBandwidth)), 2)
metaDataManager := newFileMetaDataManager(cacheStore)
Expand Down Expand Up @@ -167,14 +176,14 @@ func (cm *Manager) TriggerCDN(ctx context.Context, task *types.TaskInfo) (*types

// GetHTTPPath returns the http download path of taskID.
// The returned path joined the DownloadRaw.Bucket and DownloadRaw.Key.
func (cm *Manager) GetHTTPPath(ctx context.Context, taskID string) (string, error) {
raw := getDownloadRawFunc(taskID)
func (cm *Manager) GetHTTPPath(ctx context.Context, taskInfo *types.TaskInfo) (string, error) {
raw := getDownloadRawFunc(taskInfo.ID)
return path.Join("/", raw.Bucket, raw.Key), nil
}

// GetStatus gets the status of the file.
func (cm *Manager) GetStatus(ctx context.Context, taskID string) (cdnStatus string, err error) {
return "", nil
return types.TaskInfoCdnStatusSUCCESS, nil
}

// GetPieceMD5 gets the piece Md5 accorrding to the specified taskID and pieceNum.
Expand Down
Loading

0 comments on commit e4f8492

Please sign in to comment.