Skip to content
This repository has been archived by the owner on Dec 20, 2024. It is now read-only.

Commit

Permalink
implement limit rate for stream mode
Browse files Browse the repository at this point in the history
Signed-off-by: 楚贤 <[email protected]>
  • Loading branch information
jim3ma committed Jan 21, 2020
1 parent c9472bb commit b40524a
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 74 deletions.
1 change: 0 additions & 1 deletion cmd/dfget/app/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,6 @@ func initFlags() {
"caching duration for which cached file keeps no accessed by any process, after this period cache file will be deleted")
flagSet.DurationVar(&cfg.RV.ServerAliveTime, "alivetime", config.ServerAliveTime,
"alive duration for which uploader keeps no accessing by any uploading requests, after this period uploader will automatically exit")
flagSet.BoolVar(&cfg.RV.StreamMode, "stream", false, "enable stream mode")

flagSet.MarkDeprecated("exceed", "please use '--timeout' or '-e' instead")
}
Expand Down
44 changes: 30 additions & 14 deletions dfget/core/downloader/back_downloader/back_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/dragonflyoss/Dragonfly/pkg/printer"
"github.com/dragonflyoss/Dragonfly/pkg/stringutils"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -127,6 +128,7 @@ func (bd *BackDownloader) Run(ctx context.Context) error {
return err
}

// RunStream return a io.Reader without any disk io.
func (bd *BackDownloader) RunStream(ctx context.Context) (io.Reader, error) {
var (
resp *http.Response
Expand All @@ -147,20 +149,8 @@ func (bd *BackDownloader) RunStream(ctx context.Context) (io.Reader, error) {
return nil, fmt.Errorf("failed to download from source, response code:%d", resp.StatusCode)
}

return &autoCloseReader{rc: resp.Body}, nil
}

type autoCloseReader struct {
rc io.ReadCloser
}

func (a *autoCloseReader) Read(p []byte) (n int, err error) {
n, err = a.rc.Read(p)
if err != nil {
// TODO wrap close error into err
a.rc.Close()
}
return n, err
limitReader := limitreader.NewLimitReader(resp.Body, int64(bd.cfg.LocalLimit), bd.Md5 != "")
return &autoCloseLimitReader{closer: resp.Body, limitReader: limitReader, md5: bd.Md5}, nil
}

// Cleanup clean all temporary resources generated by executing Run.
Expand All @@ -178,3 +168,29 @@ func (bd *BackDownloader) Cleanup() {
func (bd *BackDownloader) isSuccessStatus(code int) bool {
return code < 400
}

// autoCloseLimitReader will auto close when reader return a error(include io.EOF).
// it is necessary when return http.Response.Body as an io.Reader.
type autoCloseLimitReader struct {
closer io.Closer
md5 string
limitReader *limitreader.LimitReader
}

func (a *autoCloseLimitReader) Read(p []byte) (n int, err error) {
n, err = a.limitReader.Read(p)
// when return err, always close
if err != nil {
if closeError := a.closer.Close(); closeError != nil {
err = errors.Wrapf(err, "close error: %s", closeError)
}
}
// all data received, calculate md5
if err == io.EOF && a.md5 != "" {
realMd5 := a.limitReader.Md5()
if realMd5 != a.md5 {
return n, fmt.Errorf("md5 not match, expected: %s real: %s", a.md5, realMd5)
}
}
return n, err
}
47 changes: 47 additions & 0 deletions dfget/core/downloader/back_downloader/back_downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package downloader
import (
"context"
"fmt"
"io"
"io/ioutil"
"math/rand"
"net"
Expand Down Expand Up @@ -102,6 +103,52 @@ func (s *BackDownloaderTestSuite) TestBackDownloader_Run(c *check.C) {
c.Assert(bd.Run(context.TODO()), check.IsNil)
}

func (s *BackDownloaderTestSuite) TestBackDownloader_RunStream(c *check.C) {
testFileMd5 := helper.CreateTestFileWithMD5(filepath.Join(s.workHome, "download.test"), "test downloader")
dst := filepath.Join(s.workHome, "back.test")

cfg := helper.CreateConfig(nil, s.workHome)
bd := &BackDownloader{
cfg: cfg,
URL: "http://" + s.host + "/download.test",
Target: dst,
}

var reader io.Reader
var err error
cfg.Notbs = true
_, err = bd.RunStream(context.TODO())
c.Assert(err, check.NotNil)

cfg.Notbs = false
bd.cleaned = false
cfg.BackSourceReason = config.BackSourceReasonNoSpace
reader, err = bd.RunStream(context.TODO())
c.Assert(reader, check.IsNil)
c.Assert(err, check.NotNil)

// test: realMd5 doesn't equal to expectedMd5
bd.Md5 = "x"
reader, err = bd.RunStream(context.TODO())

c.Assert(reader, check.NotNil)
if reader != nil {
_, err = ioutil.ReadAll(reader)
}
c.Assert(err, check.NotNil)

// test: realMd5 equals to expectedMd5
bd.cleaned = false
bd.Md5 = testFileMd5
reader, err = bd.RunStream(context.TODO())

c.Assert(reader, check.NotNil)
if reader != nil {
_, err = ioutil.ReadAll(reader)
}
c.Assert(err, check.IsNil)
}

func (s *BackDownloaderTestSuite) TestBackDownloader_Run_NotExist(c *check.C) {
dst := filepath.Join(s.workHome, "back.test")

Expand Down
56 changes: 15 additions & 41 deletions dfget/core/downloader/p2p_downloader/client_stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@ import (
"context"
"fmt"
"io"
"math/rand"
"time"

"github.com/dragonflyoss/Dragonfly/dfget/config"
"github.com/dragonflyoss/Dragonfly/dfget/core/api"
"github.com/dragonflyoss/Dragonfly/dfget/core/helper"
"github.com/dragonflyoss/Dragonfly/dfget/types"
"github.com/dragonflyoss/Dragonfly/pkg/limitreader"
"github.com/dragonflyoss/Dragonfly/pkg/queue"

"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -55,6 +54,9 @@ type ClientStreamWriter struct {
// pipeReader is the read half of a pipe
pipeReader *io.PipeReader

// limitReader supports limit rate and calculates md5
limitReader *limitreader.LimitReader

cache map[int]*Piece

// api holds an instance of SupernodeAPI to interact with supernode.
Expand All @@ -65,10 +67,12 @@ type ClientStreamWriter struct {
// NewClientStreamWriter creates and initialize a ClientStreamWriter instance.
func NewClientStreamWriter(clientQueue queue.Queue, api api.SupernodeAPI, cfg *config.Config) *ClientStreamWriter {
pr, pw := io.Pipe()
limitReader := limitreader.NewLimitReader(pr, int64(cfg.LocalLimit), cfg.Md5 != "")
clientWriter := &ClientStreamWriter{
clientQueue: clientQueue,
pipeReader: pr,
pipeWriter: pw,
limitReader: limitReader,
api: api,
cfg: cfg,
cache: make(map[int]*Piece),
Expand Down Expand Up @@ -133,7 +137,7 @@ func (csw *ClientStreamWriter) write(piece *Piece) error {

err := csw.writePieceToPipe(piece)
if err == nil {
go csw.sendSuccessPiece(piece, time.Since(startTime))
go sendSuccessPiece(csw.api, csw.cfg.RV.Cid, piece, time.Since(startTime))
}
return err
}
Expand Down Expand Up @@ -171,44 +175,14 @@ func (csw *ClientStreamWriter) writePieceToPipe(p *Piece) error {
return nil
}

// TODO this function is very similar ClientWriter.sendSuccessPiece, refactor later
func (csw *ClientStreamWriter) sendSuccessPiece(piece *Piece, cost time.Duration) {
reportPieceRequest := &types.ReportPieceRequest{
TaskID: piece.TaskID,
Cid: csw.cfg.RV.Cid,
DstCid: piece.DstCid,
PieceRange: piece.Range,
}

var retry = 0
var maxRetryTime = 3
for {
if retry >= maxRetryTime {
logrus.Errorf("failed to report piece to supernode with request(%+v) even after retrying max retry time", reportPieceRequest)
break
}

_, err := csw.api.ReportPiece(piece.SuperNode, reportPieceRequest)
if err == nil {
if retry > 0 {
logrus.Warnf("success to report piece with request(%+v) after retrying (%d) times", reportPieceRequest, retry)
}
break
func (csw *ClientStreamWriter) Read(p []byte) (n int, err error) {
n, err = csw.limitReader.Read(p)
// all data received, calculate md5
if err == io.EOF && csw.cfg.Md5 != "" {
realMd5 := csw.limitReader.Md5()
if realMd5 != csw.cfg.Md5 {
return n, fmt.Errorf("md5 not match, expected: %s real: %s", csw.cfg.Md5, realMd5)
}

sleepTime := time.Duration(rand.Intn(500)+50) * time.Millisecond
logrus.Warnf("failed to report piece to supernode with request(%+v) for (%d) times and will retry after sleep %.3fs", reportPieceRequest, retry, sleepTime.Seconds())
time.Sleep(sleepTime)
retry++
}

if cost.Seconds() > 2.0 {
logrus.Infof(
"async writer and report suc from dst:%s... cost:%.3f for range:%s",
piece.DstCid[:25], cost.Seconds(), piece.Range)
}
}

func (csw *ClientStreamWriter) Read(p []byte) (n int, err error) {
return csw.pipeReader.Read(p)
return n, err
}
23 changes: 9 additions & 14 deletions dfget/core/downloader/p2p_downloader/client_stream_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,16 @@ import (
"io"
"sort"

"github.com/dragonflyoss/Dragonfly/dfget/config"

"github.com/go-check/check"
)

type ClientStreamWriterTestSuite struct {
csw *ClientStreamWriter
}

func init() {
pr, pw := io.Pipe()
clientWriter := &ClientStreamWriter{
pipeReader: pr,
pipeWriter: pw,
cache: make(map[int]*Piece),
}
check.Suite(&ClientStreamWriterTestSuite{
csw: clientWriter,
})
check.Suite(&ClientStreamWriterTestSuite{})
}

func (s *ClientStreamWriterTestSuite) SetUpSuite(*check.C) {
Expand Down Expand Up @@ -99,23 +92,25 @@ func (s *ClientStreamWriterTestSuite) TestWrite(c *check.C) {
}, len(cases))
copy(cases2, cases)

cfg := &config.Config{}
csw := NewClientStreamWriter(nil, nil, cfg)
go func() {
for _, v := range cases2 {
err := s.csw.writePieceToPipe(v.piece)
err := csw.writePieceToPipe(v.piece)
c.Check(err, check.IsNil)
}
}()
sort.Slice(cases, func(i, j int) bool {
return cases[i].piece.PieceNum < cases[j].piece.PieceNum
})
for _, v := range cases {
content := s.getString(v.piece.RawContent().Len())
content := s.getString(csw, v.piece.RawContent().Len())
c.Check(content, check.Equals, v.expected)
}
}

func (s *ClientStreamWriterTestSuite) getString(length int) string {
func (s *ClientStreamWriterTestSuite) getString(reader io.Reader, length int) string {
b := make([]byte, length)
s.csw.pipeReader.Read(b)
reader.Read(b)
return string(b)
}
8 changes: 4 additions & 4 deletions dfget/core/downloader/p2p_downloader/client_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func (cw *ClientWriter) write(piece *Piece) error {
cw.pieceIndex++
err := writePieceToFile(piece, cw.serviceFile)
if err == nil {
go cw.sendSuccessPiece(piece, time.Since(startTime))
go sendSuccessPiece(cw.api, cw.cfg.RV.Cid, piece, time.Since(startTime))
}
return err
}
Expand All @@ -236,10 +236,10 @@ func startSyncWriter(q queue.Queue) queue.Queue {
return nil
}

func (cw *ClientWriter) sendSuccessPiece(piece *Piece, cost time.Duration) {
func sendSuccessPiece(api api.SupernodeAPI, cid string, piece *Piece, cost time.Duration) {
reportPieceRequest := &types.ReportPieceRequest{
TaskID: piece.TaskID,
Cid: cw.cfg.RV.Cid,
Cid: cid,
DstCid: piece.DstCid,
PieceRange: piece.Range,
}
Expand All @@ -252,7 +252,7 @@ func (cw *ClientWriter) sendSuccessPiece(piece *Piece, cost time.Duration) {
break
}

_, err := cw.api.ReportPiece(piece.SuperNode, reportPieceRequest)
_, err := api.ReportPiece(piece.SuperNode, reportPieceRequest)
if err == nil {
if retry > 0 {
logrus.Warnf("success to report piece with request(%+v) after retrying (%d) times", reportPieceRequest, retry)
Expand Down

0 comments on commit b40524a

Please sign in to comment.