Skip to content
This repository has been archived by the owner on Dec 20, 2024. It is now read-only.

Commit

Permalink
feature: support source cdn pattern
Browse files Browse the repository at this point in the history
Signed-off-by: Starnop <[email protected]>
  • Loading branch information
starnop committed Mar 23, 2020
1 parent bae7528 commit 9a3ced9
Show file tree
Hide file tree
Showing 34 changed files with 526 additions and 78 deletions.
7 changes: 7 additions & 0 deletions apis/swagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -973,6 +973,13 @@ definitions:
1. If file's total size is less than 200MB, then the piece size is 4MB by default.
2. Otherwise, it equals to the smaller value between totalSize/100MB + 2 MB and 15MB.
format: int32
cdnSource:
$ref: "#/definitions/CdnSource"

CdnSource:
type: string
description: ""
enum: ["supernode", "source"]

TaskInfo:
type: "object"
Expand Down
63 changes: 63 additions & 0 deletions apis/types/cdn_source.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 29 additions & 0 deletions apis/types/task_create_response.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions cmd/supernode/app/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ func setupFlags(cmd *cobra.Command) {
flagSet.String("config", config.DefaultSupernodeConfigFilePath,
"the path of supernode's configuration file")

flagSet.String("cdn-pattern", config.CDNPatternLocal,
"cdn pattern, must be in [\"local\", \"source\"]. Default: local")

flagSet.Int("port", defaultBaseProperties.ListenPort,
"listenPort is the port that supernode server listens on")

Expand Down Expand Up @@ -206,6 +209,10 @@ func bindRootFlags(v *viper.Viper) error {
key: "config",
flag: "config",
},
{
key: "base.CDNPattern",
flag: "cdn-pattern",
},
{
key: "base.listenPort",
flag: "port",
Expand Down
2 changes: 2 additions & 0 deletions cmd/supernode/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package main

import (
"github.com/dragonflyoss/Dragonfly/cmd/supernode/app"
_ "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/cdn"
_ "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/sourcecdn"
)

func main() {
Expand Down
4 changes: 4 additions & 0 deletions dfget/config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,7 @@ const (
// CodeDownloadError represents failed to download file.
CodeDownloadError
)

const (
RangeSeparator = "-"
)
15 changes: 14 additions & 1 deletion dfget/core/api/download_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"net/http"
"strconv"
"strings"
"time"

"github.com/dragonflyoss/Dragonfly/dfget/config"
Expand All @@ -34,6 +35,7 @@ type DownloadRequest struct {
PieceRange string
PieceNum int
PieceSize int32
Headers map[string]string
}

// DownloadAPI defines the download method between dfget and peer server.
Expand All @@ -59,7 +61,18 @@ func (d *downloadAPI) Download(ip string, port int, req *DownloadRequest, timeou
headers[config.StrPieceNum] = strconv.Itoa(req.PieceNum)
headers[config.StrPieceSize] = fmt.Sprint(req.PieceSize)
headers[config.StrUserAgent] = "dfget/" + version.DFGetVersion
if req.Headers != nil {
for k, v := range req.Headers {
headers[k] = v
}
}

var url string
if strings.Contains(req.Path, "://") {
url = req.Path
} else {
url = fmt.Sprintf("http://%s:%d%s", ip, port, req.Path)
}

url := fmt.Sprintf("http://%s:%d%s", ip, port, req.Path)
return httputils.HTTPGetTimeout(url, headers, timeout)
}
5 changes: 4 additions & 1 deletion dfget/core/downloader/p2p_downloader/client_stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"io"
"time"

apiTypes "github.com/dragonflyoss/Dragonfly/apis/types"
"github.com/dragonflyoss/Dragonfly/dfget/config"
"github.com/dragonflyoss/Dragonfly/dfget/core/api"
"github.com/dragonflyoss/Dragonfly/dfget/core/helper"
Expand Down Expand Up @@ -62,6 +63,8 @@ type ClientStreamWriter struct {
// api holds an instance of SupernodeAPI to interact with supernode.
api api.SupernodeAPI
cfg *config.Config

cdnSource apiTypes.CdnSource
}

// NewClientStreamWriter creates and initialize a ClientStreamWriter instance.
Expand Down Expand Up @@ -155,7 +158,7 @@ func (csw *ClientStreamWriter) writePieceToPipe(p *Piece) error {
break
}

_, err := io.Copy(csw.pipeWriter, p.RawContent())
_, err := io.Copy(csw.pipeWriter, p.RawContent(csw.cdnSource == apiTypes.CdnSourceSource))
if err != nil {
return err
}
Expand Down
27 changes: 17 additions & 10 deletions dfget/core/downloader/p2p_downloader/client_stream_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,54 +41,61 @@ func (s *ClientStreamWriterTestSuite) TearDownSuite(*check.C) {

func (s *ClientStreamWriterTestSuite) TestWrite(c *check.C) {
var cases = []struct {
piece *Piece
expected string
piece *Piece
noWrapper bool
expected string
}{
{
piece: &Piece{
PieceNum: 0,
PieceSize: 6,
Content: bytes.NewBufferString("000010"),
},
expected: "1",
noWrapper: false,
expected: "1",
},
{
piece: &Piece{
PieceNum: 1,
PieceSize: 6,
Content: bytes.NewBufferString("000020"),
},
expected: "2",
noWrapper: false,
expected: "2",
},
{
piece: &Piece{
PieceNum: 3,
PieceSize: 6,
Content: bytes.NewBufferString("000040"),
},
expected: "4",
noWrapper: false,
expected: "4",
},
{
piece: &Piece{
PieceNum: 4,
PieceSize: 6,
Content: bytes.NewBufferString("000050"),
},
expected: "5",
noWrapper: false,
expected: "5",
},
{
piece: &Piece{
PieceNum: 2,
PieceSize: 6,
Content: bytes.NewBufferString("000030"),
},
expected: "3",
noWrapper: false,
expected: "3",
},
}

cases2 := make([]struct {
piece *Piece
expected string
piece *Piece
noWrapper bool
expected string
}, len(cases))
copy(cases2, cases)

Expand All @@ -104,7 +111,7 @@ func (s *ClientStreamWriterTestSuite) TestWrite(c *check.C) {
return cases[i].piece.PieceNum < cases[j].piece.PieceNum
})
for _, v := range cases {
content := s.getString(csw, v.piece.RawContent().Len())
content := s.getString(csw, v.piece.RawContent(v.noWrapper).Len())
c.Check(content, check.Equals, v.expected)
}
}
Expand Down
23 changes: 17 additions & 6 deletions dfget/core/downloader/p2p_downloader/client_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"os"
"time"

apiTypes "github.com/dragonflyoss/Dragonfly/apis/types"
"github.com/dragonflyoss/Dragonfly/dfget/config"
"github.com/dragonflyoss/Dragonfly/dfget/core/api"
"github.com/dragonflyoss/Dragonfly/dfget/core/downloader"
Expand Down Expand Up @@ -88,17 +89,20 @@ type ClientWriter struct {
// api holds an instance of SupernodeAPI to interact with supernode.
api api.SupernodeAPI
cfg *config.Config

cdnSource apiTypes.CdnSource
}

// NewClientWriter creates and initialize a ClientWriter instance.
func NewClientWriter(clientFilePath, serviceFilePath string,
clientQueue queue.Queue, api api.SupernodeAPI, cfg *config.Config) PieceWriter {
clientQueue queue.Queue, api api.SupernodeAPI, cfg *config.Config, cdnSource apiTypes.CdnSource) PieceWriter {
clientWriter := &ClientWriter{
clientQueue: clientQueue,
clientFilePath: clientFilePath,
serviceFilePath: serviceFilePath,
api: api,
cfg: cfg,
cdnSource: cdnSource,
}
return clientWriter
}
Expand All @@ -119,7 +123,7 @@ func (cw *ClientWriter) PreRun(ctx context.Context) (err error) {

cw.result = true
cw.targetQueue = queue.NewQueue(0)
cw.targetWriter, err = NewTargetWriter(cw.cfg.RV.TempTarget, cw.targetQueue, cw.cfg)
cw.targetWriter, err = NewTargetWriter(cw.cfg.RV.TempTarget, cw.targetQueue, cw.cfg, cw.cdnSource)
if err != nil {
return
}
Expand Down Expand Up @@ -213,21 +217,28 @@ func (cw *ClientWriter) write(piece *Piece) error {
}

cw.pieceIndex++
err := writePieceToFile(piece, cw.serviceFile)
err := writePieceToFile(piece, cw.serviceFile, cw.cdnSource)
if err == nil {
go sendSuccessPiece(cw.api, cw.cfg.RV.Cid, piece, time.Since(startTime))
}
return err
}

func writePieceToFile(piece *Piece, file *os.File) error {
start := int64(piece.PieceNum) * (int64(piece.PieceSize) - 5)
func writePieceToFile(piece *Piece, file *os.File, cdnSource apiTypes.CdnSource) error {
var pieceHeader = 5
// the piece is not wrapped with source cdn type
noWrapper := (cdnSource == apiTypes.CdnSourceSource)
if noWrapper {
pieceHeader = 0
}

start := int64(piece.PieceNum) * (int64(piece.PieceSize) - int64(pieceHeader))
if _, err := file.Seek(start, 0); err != nil {
return err
}

buf := bufio.NewWriterSize(file, 4*1024*1024)
_, err := io.Copy(buf, piece.RawContent())
_, err := io.Copy(buf, piece.RawContent(noWrapper))
buf.Flush()
return err
}
Expand Down
Loading

0 comments on commit 9a3ced9

Please sign in to comment.