diff --git a/dfget/core/downloader/p2p_downloader/client_stream_writer.go b/dfget/core/downloader/p2p_downloader/client_stream_writer.go index d0a25b434..c55ecc488 100644 --- a/dfget/core/downloader/p2p_downloader/client_stream_writer.go +++ b/dfget/core/downloader/p2p_downloader/client_stream_writer.go @@ -164,7 +164,7 @@ func (csw *ClientStreamWriter) writePieceToPipe(p *Piece) error { break } - _, err := io.Copy(csw.pipeWriter, p.RawContent(csw.cdnSource == apiTypes.CdnSourceSource)) + _, err := p.WriteTo(csw.pipeWriter, csw.cdnSource == apiTypes.CdnSourceSource) if err != nil { return err } diff --git a/dfget/core/downloader/p2p_downloader/client_writer.go b/dfget/core/downloader/p2p_downloader/client_writer.go index 1833aba10..73e733951 100644 --- a/dfget/core/downloader/p2p_downloader/client_writer.go +++ b/dfget/core/downloader/p2p_downloader/client_writer.go @@ -18,7 +18,6 @@ package downloader import ( "context" - "io" "math/rand" "os" "time" @@ -220,6 +219,7 @@ func (cw *ClientWriter) write(piece *Piece) error { } if cw.acrossWrite { + piece.IncWriter() cw.targetQueue.Put(piece) } @@ -245,7 +245,7 @@ func writePieceToFile(piece *Piece, file *os.File, cdnSource apiTypes.CdnSource) } writer := pool.AcquireWriter(file) - _, err := io.Copy(writer, piece.RawContent(noWrapper)) + _, err := piece.WriteTo(writer, noWrapper) pool.ReleaseWriter(writer) writer = nil return err diff --git a/dfget/core/downloader/p2p_downloader/piece.go b/dfget/core/downloader/p2p_downloader/piece.go index 67a932d19..b54d78f47 100644 --- a/dfget/core/downloader/p2p_downloader/piece.go +++ b/dfget/core/downloader/p2p_downloader/piece.go @@ -19,6 +19,9 @@ package downloader import ( "bytes" "encoding/json" + "fmt" + "io" + "sync/atomic" apiTypes "github.com/dragonflyoss/Dragonfly/apis/types" "github.com/dragonflyoss/Dragonfly/pkg/constants" @@ -57,8 +60,25 @@ type Piece struct { // length the length of the content. length int64 - // autoReset automatically reset content after reading. - autoReset bool + // writerNum record the writer number which will write this piece. + writerNum int32 +} + +// WriteTo writes piece raw data in content buffer to w. +// If the piece has wrapper, the piece content will remove the head and tail before writing. +func (p *Piece) WriteTo(w io.Writer, noWrapper bool) (n int64, err error) { + defer p.TryResetContent() + + content := p.RawContent(noWrapper) + if content != nil { + return content.WriteTo(w) + } + return 0, fmt.Errorf("piece content length less than 5 bytes") +} + +// IncWriter increase a writer for the piece. +func (p *Piece) IncWriter() { + atomic.AddInt32(&p.writerNum, 1) } // RawContent returns raw contents, @@ -66,11 +86,6 @@ type Piece struct { func (p *Piece) RawContent(noWrapper bool) *bytes.Buffer { contents := p.Content.Bytes() length := len(contents) - defer func() { - if p.autoReset { - p.ResetContent() - } - }() if noWrapper { return bytes.NewBuffer(contents[:]) @@ -97,7 +112,11 @@ func (p *Piece) String() string { } // ResetContent reset contents and returns it back to buffer pool. -func (p *Piece) ResetContent() { +func (p *Piece) TryResetContent() { + if atomic.AddInt32(&p.writerNum, -1) > 0 { + return + } + if p.Content == nil { return } @@ -118,7 +137,7 @@ func NewPiece(taskID, node, dstCid, pieceRange string, result, status int, cdnSo Result: result, Status: status, Content: nil, - autoReset: true, + writerNum: 1, } } @@ -130,7 +149,7 @@ func NewPieceSimple(taskID string, node string, status int, cdnSource apiTypes.C Status: status, Result: constants.ResultInvalid, Content: nil, - autoReset: true, + writerNum: 1, } } @@ -146,6 +165,6 @@ func NewPieceContent(taskID, node, dstCid, pieceRange string, Status: status, Content: contents, length: int64(contents.Len()), - autoReset: true, + writerNum: 1, } } diff --git a/dfget/core/downloader/p2p_downloader/piece_test.go b/dfget/core/downloader/p2p_downloader/piece_test.go index 6cdea206a..e94123aca 100644 --- a/dfget/core/downloader/p2p_downloader/piece_test.go +++ b/dfget/core/downloader/p2p_downloader/piece_test.go @@ -48,6 +48,37 @@ func (s *PieceTestSuite) TestRawContent(c *check.C) { } } +func (s *PieceTestSuite) TestTryResetContent(c *check.C) { + piece := &Piece{writerNum: 2, Content: pool.NewBufferString("")} + piece.TryResetContent() + c.Assert(piece.writerNum, check.Equals, int32(1)) + c.Assert(piece.Content, check.NotNil) + + piece.TryResetContent() + c.Assert(piece.writerNum, check.Equals, int32(0)) + c.Assert(piece.Content, check.IsNil) +} + +func (s *PieceTestSuite) TestWriteTo(c *check.C) { + var cases = []struct { + piece *Piece + noWrapper bool + expected *bytes.Buffer + hasErr bool + }{ + {piece: &Piece{Content: pool.NewBufferString("")}, noWrapper: false, expected: &bytes.Buffer{}, hasErr: true}, + {piece: &Piece{Content: pool.NewBufferString("000010")}, noWrapper: false, expected: bytes.NewBufferString("1"), hasErr: false}, + {piece: &Piece{Content: pool.NewBufferString("000020")}, noWrapper: true, expected: bytes.NewBufferString("000020"), hasErr: false}, + } + + for _, v := range cases { + result := &bytes.Buffer{} + _, err := v.piece.WriteTo(result, v.noWrapper) + c.Assert(err != nil, check.Equals, v.hasErr) + c.Assert(result, check.DeepEquals, v.expected) + } +} + func (s *PieceTestSuite) TestString(c *check.C) { var cases = []struct { piece *Piece