Skip to content

Commit

Permalink
Merge pull request dragonflyoss#1411 from zcc35357949/crosswrite_panic
Browse files Browse the repository at this point in the history
bugfix: fix dfget panic when crossWrite is true
  • Loading branch information
lowzj committed Jul 16, 2020
2 parents 475763b + cc2adc4 commit eaeed52
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (csw *ClientStreamWriter) writePieceToPipe(p *Piece) error {
break
}

_, err := io.Copy(csw.pipeWriter, p.RawContent(csw.cdnSource == apiTypes.CdnSourceSource))
_, err := p.WriteTo(csw.pipeWriter, csw.cdnSource == apiTypes.CdnSourceSource)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions dfget/core/downloader/p2p_downloader/client_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package downloader

import (
"context"
"io"
"math/rand"
"os"
"time"
Expand Down Expand Up @@ -220,6 +219,7 @@ func (cw *ClientWriter) write(piece *Piece) error {
}

if cw.acrossWrite {
piece.IncWriter()
cw.targetQueue.Put(piece)
}

Expand All @@ -245,7 +245,7 @@ func writePieceToFile(piece *Piece, file *os.File, cdnSource apiTypes.CdnSource)
}

writer := pool.AcquireWriter(file)
_, err := io.Copy(writer, piece.RawContent(noWrapper))
_, err := piece.WriteTo(writer, noWrapper)
pool.ReleaseWriter(writer)
writer = nil
return err
Expand Down
41 changes: 30 additions & 11 deletions dfget/core/downloader/p2p_downloader/piece.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package downloader
import (
"bytes"
"encoding/json"
"fmt"
"io"
"sync/atomic"

apiTypes "github.com/dragonflyoss/Dragonfly/apis/types"
"github.com/dragonflyoss/Dragonfly/pkg/constants"
Expand Down Expand Up @@ -57,20 +60,32 @@ type Piece struct {
// length the length of the content.
length int64

// autoReset automatically reset content after reading.
autoReset bool
// writerNum record the writer number which will write this piece.
writerNum int32
}

// WriteTo writes piece raw data in content buffer to w.
// If the piece has wrapper, the piece content will remove the head and tail before writing.
func (p *Piece) WriteTo(w io.Writer, noWrapper bool) (n int64, err error) {
defer p.TryResetContent()

content := p.RawContent(noWrapper)
if content != nil {
return content.WriteTo(w)
}
return 0, fmt.Errorf("piece content length less than 5 bytes")
}

// IncWriter increase a writer for the piece.
func (p *Piece) IncWriter() {
atomic.AddInt32(&p.writerNum, 1)
}

// RawContent returns raw contents,
// If the piece has wrapper, and the piece content will remove the head and tail.
func (p *Piece) RawContent(noWrapper bool) *bytes.Buffer {
contents := p.Content.Bytes()
length := len(contents)
defer func() {
if p.autoReset {
p.ResetContent()
}
}()

if noWrapper {
return bytes.NewBuffer(contents[:])
Expand All @@ -97,7 +112,11 @@ func (p *Piece) String() string {
}

// ResetContent reset contents and returns it back to buffer pool.
func (p *Piece) ResetContent() {
func (p *Piece) TryResetContent() {
if atomic.AddInt32(&p.writerNum, -1) > 0 {
return
}

if p.Content == nil {
return
}
Expand All @@ -118,7 +137,7 @@ func NewPiece(taskID, node, dstCid, pieceRange string, result, status int, cdnSo
Result: result,
Status: status,
Content: nil,
autoReset: true,
writerNum: 1,
}
}

Expand All @@ -130,7 +149,7 @@ func NewPieceSimple(taskID string, node string, status int, cdnSource apiTypes.C
Status: status,
Result: constants.ResultInvalid,
Content: nil,
autoReset: true,
writerNum: 1,
}
}

Expand All @@ -146,6 +165,6 @@ func NewPieceContent(taskID, node, dstCid, pieceRange string,
Status: status,
Content: contents,
length: int64(contents.Len()),
autoReset: true,
writerNum: 1,
}
}
31 changes: 31 additions & 0 deletions dfget/core/downloader/p2p_downloader/piece_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,37 @@ func (s *PieceTestSuite) TestRawContent(c *check.C) {
}
}

func (s *PieceTestSuite) TestTryResetContent(c *check.C) {
piece := &Piece{writerNum: 2, Content: pool.NewBufferString("")}
piece.TryResetContent()
c.Assert(piece.writerNum, check.Equals, int32(1))
c.Assert(piece.Content, check.NotNil)

piece.TryResetContent()
c.Assert(piece.writerNum, check.Equals, int32(0))
c.Assert(piece.Content, check.IsNil)
}

func (s *PieceTestSuite) TestWriteTo(c *check.C) {
var cases = []struct {
piece *Piece
noWrapper bool
expected *bytes.Buffer
hasErr bool
}{
{piece: &Piece{Content: pool.NewBufferString("")}, noWrapper: false, expected: &bytes.Buffer{}, hasErr: true},
{piece: &Piece{Content: pool.NewBufferString("000010")}, noWrapper: false, expected: bytes.NewBufferString("1"), hasErr: false},
{piece: &Piece{Content: pool.NewBufferString("000020")}, noWrapper: true, expected: bytes.NewBufferString("000020"), hasErr: false},
}

for _, v := range cases {
result := &bytes.Buffer{}
_, err := v.piece.WriteTo(result, v.noWrapper)
c.Assert(err != nil, check.Equals, v.hasErr)
c.Assert(result, check.DeepEquals, v.expected)
}
}

func (s *PieceTestSuite) TestString(c *check.C) {
var cases = []struct {
piece *Piece
Expand Down

0 comments on commit eaeed52

Please sign in to comment.