From 1da7fba91f94574cd64af696da5ee961c1cbb147 Mon Sep 17 00:00:00 2001 From: "allen.wq" Date: Thu, 23 Apr 2020 10:43:23 +0800 Subject: [PATCH] add seed downloader Signed-off-by: allen.wq --- dfdaemon/seed/downloader.go | 130 ++++++++++++++++++++ dfdaemon/seed/downloader_test.go | 102 ++++++++++++++++ dfdaemon/seed/utils.go | 48 ++++++++ dfdaemon/seed/utils_test.go | 204 +++++++++++++++++++++++++++++++ pkg/errortypes/errortypes.go | 23 ++++ 5 files changed, 507 insertions(+) create mode 100644 dfdaemon/seed/downloader.go create mode 100644 dfdaemon/seed/downloader_test.go create mode 100644 dfdaemon/seed/utils.go create mode 100644 dfdaemon/seed/utils_test.go diff --git a/dfdaemon/seed/downloader.go b/dfdaemon/seed/downloader.go new file mode 100644 index 000000000..c7501d7e7 --- /dev/null +++ b/dfdaemon/seed/downloader.go @@ -0,0 +1,130 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package seed + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + "time" + + "github.com/dragonflyoss/Dragonfly/dfget/config" + "github.com/dragonflyoss/Dragonfly/pkg/errortypes" + "github.com/dragonflyoss/Dragonfly/pkg/httputils" + "github.com/dragonflyoss/Dragonfly/pkg/limitreader" + "github.com/dragonflyoss/Dragonfly/pkg/ratelimiter" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +// downloader manage the downloading of seed file. +type downloader interface { + DownloadToWriterAt(ctx context.Context, rangeStruct httputils.RangeStruct, timeout time.Duration, writeOff int64, writerAt io.WriterAt, rateLimit bool) (length int64, err error) +} + +func newLocalDownloader(url string, header map[string][]string, rate *ratelimiter.RateLimiter, copyCache bool) downloader { + return &localDownloader{ + url: url, + header: header, + rate: rate, + copyCache: copyCache, + } +} + +type localDownloader struct { + url string + header map[string][]string + + // downloader will limit the rate. + rate *ratelimiter.RateLimiter + + // if copyCache sets, the response body will store to memory cache and transfer to writer + copyCache bool +} + +func (ld *localDownloader) DownloadToWriterAt(ctx context.Context, rangeStruct httputils.RangeStruct, timeout time.Duration, + writeOff int64, writerAt io.WriterAt, rateLimit bool) (length int64, err error) { + return ld.download(ctx, rangeStruct, timeout, writeOff, writerAt, rateLimit) +} + +func (ld *localDownloader) download(ctx context.Context, rangeStruct httputils.RangeStruct, timeout time.Duration, + writeOff int64, writerAt io.WriterAt, rateLimit bool) (length int64, err error) { + var ( + written int64 + n int + rd io.Reader + ) + + header := map[string]string{} + for k, v := range ld.header { + header[k] = v[0] + } + + header[config.StrRange] = fmt.Sprintf("bytes=%d-%d", rangeStruct.StartIndex, rangeStruct.EndIndex) + resp, err := httputils.HTTPWithHeaders("GET", ld.url, header, timeout, nil) + if err != nil { + return 0, err + } + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent { + return 0, errortypes.NewHTTPError(resp.StatusCode, "resp code is not 200 or 206") + } + + expectedLen := rangeStruct.EndIndex - rangeStruct.StartIndex + 1 + + defer resp.Body.Close() + rd = resp.Body + if rateLimit { + rd = limitreader.NewLimitReaderWithLimiter(ld.rate, resp.Body, false) + } + + // in copyCache pattern, the bytes buffer will be transferred to io.WriterAt, and will be held by io.WriterAt. + if ld.copyCache { + buf := bytes.NewBuffer(nil) + buf.Grow(int(expectedLen)) + written, err = io.CopyN(buf, rd, expectedLen) + if err != nil && err != io.EOF { + logrus.Errorf("failed to read data [%d, %d] from resp.body: %v", rangeStruct.StartIndex, rangeStruct.EndIndex, err) + } + + if written < expectedLen { + return 0, errors.Wrap(io.ErrShortWrite, fmt.Sprintf("download from [%d,%d], expecte read %d, but got %d", rangeStruct.StartIndex, rangeStruct.EndIndex, expectedLen, written)) + } + + n, err = writerAt.WriteAt(buf.Bytes(), writeOff) + written = int64(n) + } else { + written, err = CopyBufferToWriterAt(writeOff, writerAt, rd) + } + + if err == io.EOF { + err = nil + } + + if err != nil { + return 0, errors.Wrap(err, fmt.Sprintf("failed to download from [%d,%d]", rangeStruct.StartIndex, rangeStruct.EndIndex)) + } + + if written < expectedLen { + return 0, errors.Wrap(io.ErrShortWrite, fmt.Sprintf("download from [%d,%d], expecte read %d, but got %d", rangeStruct.StartIndex, rangeStruct.EndIndex, expectedLen, written)) + } + + return written, err +} diff --git a/dfdaemon/seed/downloader_test.go b/dfdaemon/seed/downloader_test.go new file mode 100644 index 000000000..d7d72c7ad --- /dev/null +++ b/dfdaemon/seed/downloader_test.go @@ -0,0 +1,102 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package seed + +import ( + "bytes" + "context" + "fmt" + + "github.com/dragonflyoss/Dragonfly/pkg/httputils" + "github.com/dragonflyoss/Dragonfly/pkg/ratelimiter" + + "github.com/go-check/check" +) + +type mockBufferWriterAt struct { + buf *bytes.Buffer +} + +func newMockBufferWriterAt() *mockBufferWriterAt { + return &mockBufferWriterAt{ + buf: bytes.NewBuffer(nil), + } +} + +func (mb *mockBufferWriterAt) WriteAt(p []byte, off int64) (n int, err error) { + if off != int64(mb.buf.Len()) { + return 0, fmt.Errorf("failed to seek to %d", off) + } + + return mb.buf.Write(p) +} + +func (mb *mockBufferWriterAt) Bytes() []byte { + return mb.buf.Bytes() +} + +func (suite *SeedTestSuite) checkLocalDownloadDataFromFileServer(c *check.C, path string, off int64, size int64) { + buf := newMockBufferWriterAt() + + ld := newLocalDownloader(fmt.Sprintf("http://%s/%s", suite.host, path), nil, ratelimiter.NewRateLimiter(0, 0), false) + + length, err := ld.DownloadToWriterAt(context.Background(), httputils.RangeStruct{StartIndex: off, EndIndex: off + size - 1}, 0, 0, buf, true) + c.Check(err, check.IsNil) + c.Check(size, check.Equals, length) + + expectData, err := suite.readFromFileServer(path, off, size) + c.Check(err, check.IsNil) + c.Check(string(buf.Bytes()), check.Equals, string(expectData)) + + buf2 := newMockBufferWriterAt() + ld2 := newLocalDownloader(fmt.Sprintf("http://%s/%s", suite.host, path), nil, ratelimiter.NewRateLimiter(0, 0), true) + + length2, err := ld2.DownloadToWriterAt(context.Background(), httputils.RangeStruct{StartIndex: off, EndIndex: off + size - 1}, 0, 0, buf2, false) + c.Check(err, check.IsNil) + c.Check(size, check.Equals, length2) + c.Check(string(buf2.Bytes()), check.Equals, string(expectData)) +} + +func (suite *SeedTestSuite) TestLocalDownload(c *check.C) { + // test read fileA + suite.checkLocalDownloadDataFromFileServer(c, "fileA", 0, 500*1024) + suite.checkLocalDownloadDataFromFileServer(c, "fileA", 0, 100*1024) + for i := 0; i < 5; i++ { + suite.checkLocalDownloadDataFromFileServer(c, "fileA", int64(i*100*1024), 100*1024) + } + + // test read fileB + suite.checkLocalDownloadDataFromFileServer(c, "fileB", 0, 1024*1024) + suite.checkLocalDownloadDataFromFileServer(c, "fileB", 0, 100*1024) + for i := 0; i < 20; i++ { + suite.checkLocalDownloadDataFromFileServer(c, "fileB", int64(i*50*1024), 50*1024) + } + suite.checkLocalDownloadDataFromFileServer(c, "fileB", 1000*1024, 24*1024) + + // test read fileC + suite.checkLocalDownloadDataFromFileServer(c, "fileC", 0, 1500*1024) + suite.checkLocalDownloadDataFromFileServer(c, "fileC", 0, 100*1024) + suite.checkLocalDownloadDataFromFileServer(c, "fileC", 1400*1024, 100*1024) + for i := 0; i < 75; i++ { + suite.checkLocalDownloadDataFromFileServer(c, "fileC", int64(i*20*1024), 20*1024) + } + + //test read fileF + for i := 0; i < 50; i++ { + suite.checkLocalDownloadDataFromFileServer(c, "fileF", int64(i*20000), 20000) + } +} diff --git a/dfdaemon/seed/utils.go b/dfdaemon/seed/utils.go new file mode 100644 index 000000000..febd40adc --- /dev/null +++ b/dfdaemon/seed/utils.go @@ -0,0 +1,48 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package seed + +import ( + "bytes" + "io" +) + +// CopyBufferToWriterAt copy data from rd and write to io.WriterAt. +func CopyBufferToWriterAt(off int64, writerAt io.WriterAt, rd io.Reader) (n int64, err error) { + buffer := bytes.NewBuffer(nil) + bufSize := int64(256 * 1024) + + for { + _, err := io.CopyN(buffer, rd, bufSize) + if err != nil && err != io.EOF { + return 0, err + } + + wcount, werr := writerAt.WriteAt(buffer.Bytes(), off) + n += int64(wcount) + if werr != nil { + return n, err + } + + if err == io.EOF { + return n, io.EOF + } + + buffer.Reset() + off += int64(wcount) + } +} diff --git a/dfdaemon/seed/utils_test.go b/dfdaemon/seed/utils_test.go new file mode 100644 index 000000000..225fc6566 --- /dev/null +++ b/dfdaemon/seed/utils_test.go @@ -0,0 +1,204 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package seed + +import ( + "context" + "fmt" + "io/ioutil" + "math/rand" + "os" + "testing" + "time" + + "github.com/dragonflyoss/Dragonfly/dfget/core/helper" + "github.com/dragonflyoss/Dragonfly/pkg/httputils" + + "github.com/go-check/check" +) + +func Test(t *testing.T) { + check.TestingT(t) +} + +type SeedTestSuite struct { + port int + host string + server *helper.MockFileServer + tmpDir string + cacheDir string +} + +func init() { + rand.Seed(time.Now().Unix()) + check.Suite(&SeedTestSuite{}) +} + +func (suite *SeedTestSuite) SetUpSuite(c *check.C) { + suite.tmpDir = "./testdata" + err := os.MkdirAll(suite.tmpDir, 0774) + c.Assert(err, check.IsNil) + + suite.cacheDir = "./testcache" + err = os.MkdirAll(suite.cacheDir, 0774) + c.Assert(err, check.IsNil) + + suite.port = rand.Intn(1000) + 63000 + suite.host = fmt.Sprintf("127.0.0.1:%d", suite.port) + + suite.server = helper.NewMockFileServer() + err = suite.server.StartServer(context.Background(), suite.port) + c.Assert(err, check.IsNil) + + // 500KB + err = suite.server.RegisterFile("fileA", 500*1024, "abcde0123456789") + c.Assert(err, check.IsNil) + // 1MB + err = suite.server.RegisterFile("fileB", 1024*1024, "abcdefg") + c.Assert(err, check.IsNil) + // 1.5 MB + err = suite.server.RegisterFile("fileC", 1500*1024, "abcdefg") + c.Assert(err, check.IsNil) + // 2 MB + err = suite.server.RegisterFile("fileD", 2048*1024, "abcdefg") + c.Assert(err, check.IsNil) + // 9.5 MB + err = suite.server.RegisterFile("fileE", 9500*1024, "abcdefg") + c.Assert(err, check.IsNil) + // 10 MB + err = suite.server.RegisterFile("fileF", 10*1024*1024, "abcdefg") + c.Assert(err, check.IsNil) + // 1 G + err = suite.server.RegisterFile("fileG", 1024*1024*1024, "1abcdefg") + c.Assert(err, check.IsNil) + + // 100 M + err = suite.server.RegisterFile("fileH", 100*1024*1024, "1abcdefg") + c.Assert(err, check.IsNil) +} + +func (suite *SeedTestSuite) TearDownSuite(c *check.C) { + if suite.tmpDir != "" { + os.RemoveAll(suite.tmpDir) + } + if suite.cacheDir != "" { + os.RemoveAll(suite.cacheDir) + } +} + +func (suite *SeedTestSuite) readFromFileServer(path string, off int64, size int64) ([]byte, error) { + url := fmt.Sprintf("http://%s/%s", suite.host, path) + header := map[string]string{} + + if size > 0 { + header["Range"] = fmt.Sprintf("bytes=%d-%d", off, off+size-1) + } + + code, data, err := httputils.GetWithHeaders(url, header, 5*time.Second) + if err != nil { + return nil, err + } + + if code >= 400 { + return nil, fmt.Errorf("resp code %d", code) + } + + return data, nil +} + +func (suite *SeedTestSuite) checkDataWithFileServer(c *check.C, path string, off int64, size int64, obtained []byte) { + expected, err := suite.readFromFileServer(path, off, size) + c.Assert(err, check.IsNil) + if string(obtained) != string(expected) { + c.Errorf("path %s, range [%d-%d]: get %s, expect %s", path, off, off+size-1, + string(obtained), string(expected)) + } + + c.Assert(string(obtained), check.Equals, string(expected)) +} + +func (suite *SeedTestSuite) checkFileWithSeed(c *check.C, path string, fileLength int64, sd Seed) { + // download all + rc, err := sd.Download(0, -1) + c.Assert(err, check.IsNil) + obtainedData, err := ioutil.ReadAll(rc) + rc.Close() + c.Assert(err, check.IsNil) + suite.checkDataWithFileServer(c, path, 0, -1, obtainedData) + + // download {fileLength-100KB}- {fileLength}-1 + rc, err = sd.Download(fileLength-100*1024, 100*1024) + c.Assert(err, check.IsNil) + obtainedData, err = ioutil.ReadAll(rc) + rc.Close() + c.Assert(err, check.IsNil) + suite.checkDataWithFileServer(c, path, fileLength-100*1024, 100*1024, obtainedData) + + // download 0-{100KB-1} + rc, err = sd.Download(0, 100*1024) + c.Assert(err, check.IsNil) + obtainedData, err = ioutil.ReadAll(rc) + rc.Close() + c.Assert(err, check.IsNil) + suite.checkDataWithFileServer(c, path, 0, 100*1024, obtainedData) + + start := int64(0) + end := int64(0) + rangeSize := int64(100 * 1024) + + for { + end = start + rangeSize - 1 + if end >= fileLength { + end = fileLength - 1 + } + + if start > end { + break + } + + rc, err = sd.Download(start, end-start+1) + c.Assert(err, check.IsNil) + obtainedData, err = ioutil.ReadAll(rc) + rc.Close() + c.Assert(err, check.IsNil) + suite.checkDataWithFileServer(c, path, start, end-start+1, obtainedData) + start = end + 1 + } + + start = 0 + end = 0 + rangeSize = 99 * 1023 + + for { + end = start + rangeSize - 1 + if end >= fileLength { + end = fileLength - 1 + } + + if start > end { + break + } + + rc, err = sd.Download(start, end-start+1) + c.Assert(err, check.IsNil) + obtainedData, err = ioutil.ReadAll(rc) + rc.Close() + c.Assert(err, check.IsNil) + suite.checkDataWithFileServer(c, path, start, end-start+1, obtainedData) + start = end + 1 + } +} diff --git a/pkg/errortypes/errortypes.go b/pkg/errortypes/errortypes.go index 4669ea081..e20ff9325 100644 --- a/pkg/errortypes/errortypes.go +++ b/pkg/errortypes/errortypes.go @@ -132,3 +132,26 @@ func checkError(err error, code int) bool { e, ok := errors.Cause(err).(DfError) return ok && e.Code == code } + +type HTTPError struct { + Code int + Msg string +} + +// NewHTTPError function creates a HTTPError. +func NewHTTPError(code int, msg string) *HTTPError { + return &HTTPError{ + Code: code, + Msg: msg, + } +} + +// Error function implements the interface of error.Error(). +func (s HTTPError) Error() string { + return fmt.Sprintf("{\"Code\":%d,\"Msg\":\"%s\"}", s.Code, s.Msg) +} + +// HTTPCode return the http code. +func (s HTTPError) HTTPCode() int { + return s.Code +}