From ef57720ffe0ed7c9566018340499004eb08ee3fe Mon Sep 17 00:00:00 2001 From: lowzj Date: Tue, 24 Mar 2020 16:46:18 +0800 Subject: [PATCH] fix: cdn-source pattern supports range task Signed-off-by: lowzj --- dfget/core/api/download_api.go | 49 ++++++++++++++++-- dfget/core/api/download_api_test.go | 51 +++++++++++++++++++ .../util => pkg/rangeutils}/range_util.go | 14 ++--- .../rangeutils}/range_util_test.go | 2 +- supernode/daemon/mgr/cdn/downloader.go | 10 ++-- supernode/daemon/mgr/cdn/manager.go | 3 +- .../daemon/mgr/pieceerror/md5_not_match.go | 8 +-- supernode/daemon/mgr/task/manager.go | 3 +- supernode/daemon/mgr/task/manager_util.go | 5 +- supernode/server/0.3_bridge.go | 16 +++--- 10 files changed, 130 insertions(+), 31 deletions(-) create mode 100644 dfget/core/api/download_api_test.go rename {supernode/util => pkg/rangeutils}/range_util.go (85%) rename {supernode/util => pkg/rangeutils}/range_util_test.go (99%) diff --git a/dfget/core/api/download_api.go b/dfget/core/api/download_api.go index 90ae86469..77b4991e6 100644 --- a/dfget/core/api/download_api.go +++ b/dfget/core/api/download_api.go @@ -25,6 +25,7 @@ import ( "github.com/dragonflyoss/Dragonfly/dfget/config" "github.com/dragonflyoss/Dragonfly/pkg/httputils" + "github.com/dragonflyoss/Dragonfly/pkg/rangeutils" "github.com/dragonflyoss/Dragonfly/version" ) @@ -56,8 +57,10 @@ func NewDownloadAPI() DownloadAPI { } func (d *downloadAPI) Download(ip string, port int, req *DownloadRequest, timeout time.Duration) (*http.Response, error) { + if req == nil { + return nil, fmt.Errorf("nil dwonload request") + } headers := make(map[string]string) - headers[config.StrRange] = config.StrBytes + "=" + req.PieceRange headers[config.StrPieceNum] = strconv.Itoa(req.PieceNum) headers[config.StrPieceSize] = fmt.Sprint(req.PieceSize) headers[config.StrUserAgent] = "dfget/" + version.DFGetVersion @@ -67,12 +70,52 @@ func (d *downloadAPI) Download(ip string, port int, req *DownloadRequest, timeou } } - var url string - if strings.Contains(req.Path, "://") { + var ( + url string + rangeStr string + ) + if isFromSource(req) { + rangeStr = getRealRange(req.PieceRange, headers[config.StrRange]) url = req.Path } else { + rangeStr = req.PieceRange url = fmt.Sprintf("http://%s:%d%s", ip, port, req.Path) } + headers[config.StrRange] = httputils.ConstructRangeStr(rangeStr) return httputils.HTTPGetTimeout(url, headers, timeout) } + +func isFromSource(req *DownloadRequest) bool { + return strings.Contains(req.Path, "://") +} + +// getRealRange +// pieceRange: "start-end" +// rangeHeaderValue: "bytes=sourceStart-sourceEnd" +// return: "realStart-realEnd" +func getRealRange(pieceRange string, rangeHeaderValue string) string { + if rangeHeaderValue == "" { + return pieceRange + } + rangeEle := strings.Split(rangeHeaderValue, "=") + if len(rangeEle) != 2 { + return pieceRange + } + + lower, upper, err := rangeutils.ParsePieceIndex(rangeEle[1]) + if err != nil { + return pieceRange + } + start, end, err := rangeutils.ParsePieceIndex(pieceRange) + if err != nil { + return pieceRange + } + + realStart := start + lower + realEnd := end + lower + if realEnd > upper { + realEnd = upper + } + return fmt.Sprintf("%d-%d", realStart, realEnd) +} diff --git a/dfget/core/api/download_api_test.go b/dfget/core/api/download_api_test.go new file mode 100644 index 000000000..5201e3629 --- /dev/null +++ b/dfget/core/api/download_api_test.go @@ -0,0 +1,51 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package api + +import ( + "github.com/go-check/check" +) + +type DownloadAPITestSuite struct { +} + +func init() { + check.Suite(&DownloadAPITestSuite{}) +} + +// ---------------------------------------------------------------------------- +// unit tests for DownloadAPI + +func (s *DownloadAPITestSuite) TestGetRealRange(c *check.C) { + cases := []struct { + pieceRange string + rangeValue string + expected string + }{ + {"0-1", "", "0-1"}, + {"0-1", "1-100", "1-2"}, + {"0-100", "1-100", "1-100"}, + {"100-100", "1-100", "101-100"}, + {"100-200", "1-100", "101-100"}, + } + + for _, v := range cases { + res := getRealRange(v.pieceRange, "bytes="+v.rangeValue) + c.Assert(res, check.Equals, v.expected, + check.Commentf("%v", v)) + } +} diff --git a/supernode/util/range_util.go b/pkg/rangeutils/range_util.go similarity index 85% rename from supernode/util/range_util.go rename to pkg/rangeutils/range_util.go index bc13fe08a..429b59dab 100644 --- a/supernode/util/range_util.go +++ b/pkg/rangeutils/range_util.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package util +package rangeutils import ( "fmt" @@ -23,7 +23,8 @@ import ( ) const ( - separator = "-" + separator = "-" + invalidPieceIndex = -1 ) // CalculatePieceSize calculates the size of piece @@ -51,23 +52,24 @@ func CalculatePieceNum(rangeStr string) int { } // ParsePieceIndex parses the start and end index ​​according to range string. +// rangeStr: "start-end" func ParsePieceIndex(rangeStr string) (start, end int64, err error) { ranges := strings.Split(rangeStr, separator) if len(ranges) != 2 { - return -1, -1, fmt.Errorf("range value(%s) is illegal which should be like 0-45535", rangeStr) + return invalidPieceIndex, invalidPieceIndex, fmt.Errorf("range value(%s) is illegal which should be like 0-45535", rangeStr) } startIndex, err := strconv.ParseInt(ranges[0], 10, 64) if err != nil { - return -1, -1, fmt.Errorf("range(%s) start is not a number", rangeStr) + return invalidPieceIndex, invalidPieceIndex, fmt.Errorf("range(%s) start is not a number", rangeStr) } endIndex, err := strconv.ParseInt(ranges[1], 10, 64) if err != nil { - return -1, -1, fmt.Errorf("range(%s) end is not a number", rangeStr) + return invalidPieceIndex, invalidPieceIndex, fmt.Errorf("range(%s) end is not a number", rangeStr) } if endIndex < startIndex { - return -1, -1, fmt.Errorf("range(%s) start is larger than end", rangeStr) + return invalidPieceIndex, invalidPieceIndex, fmt.Errorf("range(%s) start is larger than end", rangeStr) } return startIndex, endIndex, nil diff --git a/supernode/util/range_util_test.go b/pkg/rangeutils/range_util_test.go similarity index 99% rename from supernode/util/range_util_test.go rename to pkg/rangeutils/range_util_test.go index 7fbcaaaef..e7c29a765 100644 --- a/supernode/util/range_util_test.go +++ b/pkg/rangeutils/range_util_test.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package util +package rangeutils import ( "testing" diff --git a/supernode/daemon/mgr/cdn/downloader.go b/supernode/daemon/mgr/cdn/downloader.go index d2fa15b58..cf8fde97b 100644 --- a/supernode/daemon/mgr/cdn/downloader.go +++ b/supernode/daemon/mgr/cdn/downloader.go @@ -20,12 +20,12 @@ import ( "context" "net/http" - errorType "github.com/dragonflyoss/Dragonfly/pkg/errortypes" - "github.com/dragonflyoss/Dragonfly/pkg/httputils" - "github.com/dragonflyoss/Dragonfly/supernode/util" - "github.com/pkg/errors" "github.com/sirupsen/logrus" + + errorType "github.com/dragonflyoss/Dragonfly/pkg/errortypes" + "github.com/dragonflyoss/Dragonfly/pkg/httputils" + "github.com/dragonflyoss/Dragonfly/pkg/rangeutils" ) // download downloads the file from the original address and @@ -38,7 +38,7 @@ func (cm *Manager) download(ctx context.Context, taskID, url string, headers map checkCode := []int{http.StatusOK, http.StatusPartialContent} if startPieceNum > 0 { - breakRange, err := util.CalculateBreakRange(startPieceNum, int(pieceContSize), httpFileLength) + breakRange, err := rangeutils.CalculateBreakRange(startPieceNum, int(pieceContSize), httpFileLength) if err != nil { return nil, errors.Wrapf(errorType.ErrInvalidValue, "failed to calculate the breakRange: %v", err) } diff --git a/supernode/daemon/mgr/cdn/manager.go b/supernode/daemon/mgr/cdn/manager.go index 970a5c3bf..da9865794 100644 --- a/supernode/daemon/mgr/cdn/manager.go +++ b/supernode/daemon/mgr/cdn/manager.go @@ -26,6 +26,7 @@ import ( "github.com/dragonflyoss/Dragonfly/pkg/limitreader" "github.com/dragonflyoss/Dragonfly/pkg/metricsutils" "github.com/dragonflyoss/Dragonfly/pkg/netutils" + "github.com/dragonflyoss/Dragonfly/pkg/rangeutils" "github.com/dragonflyoss/Dragonfly/pkg/ratelimiter" "github.com/dragonflyoss/Dragonfly/pkg/stringutils" "github.com/dragonflyoss/Dragonfly/supernode/config" @@ -216,7 +217,7 @@ func (cm *Manager) GetPieceMD5(ctx context.Context, taskID string, pieceNum int, if source == PieceMd5SourceFile { // get piece length - start, end, err := util.ParsePieceIndex(pieceRange) + start, end, err := rangeutils.ParsePieceIndex(pieceRange) if err != nil { return "", errors.Wrapf(err, "failed to parse piece range(%s)", pieceRange) } diff --git a/supernode/daemon/mgr/pieceerror/md5_not_match.go b/supernode/daemon/mgr/pieceerror/md5_not_match.go index b4eb52d9d..659cf069a 100644 --- a/supernode/daemon/mgr/pieceerror/md5_not_match.go +++ b/supernode/daemon/mgr/pieceerror/md5_not_match.go @@ -19,11 +19,11 @@ package pieceerror import ( "context" + "github.com/sirupsen/logrus" + "github.com/dragonflyoss/Dragonfly/apis/types" + "github.com/dragonflyoss/Dragonfly/pkg/rangeutils" "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr" - "github.com/dragonflyoss/Dragonfly/supernode/util" - - "github.com/sirupsen/logrus" ) var _ Handler = &FileMd5NotMatchHandler{} @@ -45,7 +45,7 @@ func NewFileMd5NotMatchHandler(gcManager mgr.GCMgr, cdnManager mgr.CDNMgr) (Hand } func (fnmh *FileMd5NotMatchHandler) Handle(ctx context.Context, pieceErrorRequest *types.PieceErrorRequest) error { - pieceNum := util.CalculatePieceNum(pieceErrorRequest.Range) + pieceNum := rangeutils.CalculatePieceNum(pieceErrorRequest.Range) // get piece MD5 by reading the meta file metaPieceMD5, err := fnmh.cdnManager.GetPieceMD5(ctx, pieceErrorRequest.TaskID, pieceNum, pieceErrorRequest.Range, "meta") diff --git a/supernode/daemon/mgr/task/manager.go b/supernode/daemon/mgr/task/manager.go index 9fe065684..81c876d36 100644 --- a/supernode/daemon/mgr/task/manager.go +++ b/supernode/daemon/mgr/task/manager.go @@ -23,6 +23,7 @@ import ( "github.com/dragonflyoss/Dragonfly/apis/types" "github.com/dragonflyoss/Dragonfly/pkg/errortypes" "github.com/dragonflyoss/Dragonfly/pkg/metricsutils" + "github.com/dragonflyoss/Dragonfly/pkg/rangeutils" "github.com/dragonflyoss/Dragonfly/pkg/stringutils" "github.com/dragonflyoss/Dragonfly/pkg/syncmap" "github.com/dragonflyoss/Dragonfly/supernode/config" @@ -272,7 +273,7 @@ func (tm *Manager) UpdatePieceStatus(ctx context.Context, taskID, pieceRange str logrus.Debugf("get update piece status request: %+v with taskID(%s) pieceRange(%s)", pieceUpdateRequest, taskID, pieceRange) // calculate the pieceNum according to the pieceRange - pieceNum := util.CalculatePieceNum(pieceRange) + pieceNum := rangeutils.CalculatePieceNum(pieceRange) if pieceNum == -1 { return errors.Wrapf(errortypes.ErrInvalidValue, "failed to parse pieceRange: %s to pieceNum for taskID: %s, clientID: %s", diff --git a/supernode/daemon/mgr/task/manager_util.go b/supernode/daemon/mgr/task/manager_util.go index 81fcf69c0..a9a63a678 100644 --- a/supernode/daemon/mgr/task/manager_util.go +++ b/supernode/daemon/mgr/task/manager_util.go @@ -26,6 +26,7 @@ import ( "github.com/dragonflyoss/Dragonfly/pkg/digest" "github.com/dragonflyoss/Dragonfly/pkg/errortypes" "github.com/dragonflyoss/Dragonfly/pkg/netutils" + "github.com/dragonflyoss/Dragonfly/pkg/rangeutils" "github.com/dragonflyoss/Dragonfly/pkg/stringutils" "github.com/dragonflyoss/Dragonfly/pkg/timeutils" "github.com/dragonflyoss/Dragonfly/supernode/config" @@ -290,7 +291,7 @@ func (tm *Manager) processTaskStart(ctx context.Context, srcCID string, task *ty // req.DstPID, req.PieceRange, req.PieceResult, req.DfgetTaskStatus func (tm *Manager) processTaskRunning(ctx context.Context, srcCID, srcPID string, task *types.TaskInfo, req *types.PiecePullRequest, dfgetTask *types.DfGetTask) (bool, interface{}, error) { - pieceNum := util.CalculatePieceNum(req.PieceRange) + pieceNum := rangeutils.CalculatePieceNum(req.PieceRange) if pieceNum == -1 { return false, nil, errors.Wrapf(errortypes.ErrInvalidValue, "pieceRange: %s", req.PieceRange) } @@ -411,7 +412,7 @@ func (tm *Manager) pieceResultToPieceInfo(ctx context.Context, pr *mgr.PieceResu PeerIP: peer.IP.String(), PeerPort: peer.Port, PieceMD5: pieceMD5, - PieceRange: util.CalculatePieceRange(pr.PieceNum, pieceSize), + PieceRange: rangeutils.CalculatePieceRange(pr.PieceNum, pieceSize), PieceSize: pieceSize, }, nil } diff --git a/supernode/server/0.3_bridge.go b/supernode/server/0.3_bridge.go index 958c02d30..197511917 100644 --- a/supernode/server/0.3_bridge.go +++ b/supernode/server/0.3_bridge.go @@ -21,17 +21,17 @@ import ( "encoding/json" "net/http" + "github.com/go-openapi/strfmt" + "github.com/gorilla/schema" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "github.com/dragonflyoss/Dragonfly/apis/types" "github.com/dragonflyoss/Dragonfly/pkg/constants" "github.com/dragonflyoss/Dragonfly/pkg/errortypes" "github.com/dragonflyoss/Dragonfly/pkg/netutils" + "github.com/dragonflyoss/Dragonfly/pkg/rangeutils" "github.com/dragonflyoss/Dragonfly/pkg/stringutils" - sutil "github.com/dragonflyoss/Dragonfly/supernode/util" - - "github.com/go-openapi/strfmt" - "github.com/gorilla/schema" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" ) // RegisterResponseData is the data when registering supernode successfully. @@ -200,7 +200,7 @@ func (s *Server) pullPieceTask(ctx context.Context, rw http.ResponseWriter, req } datas = append(datas, &PullPieceTaskResponseContinueData{ Range: v.PieceRange, - PieceNum: sutil.CalculatePieceNum(v.PieceRange), + PieceNum: rangeutils.CalculatePieceNum(v.PieceRange), PieceSize: v.PieceSize, PieceMd5: v.PieceMD5, Cid: cid, @@ -229,7 +229,7 @@ func (s *Server) reportPiece(ctx context.Context, rw http.ResponseWriter, req *h // If piece is downloaded from supernode, add metrics. if s.Config.IsSuperCID(dstCID) { - m.pieceDownloadedBytes.WithLabelValues().Add(float64(sutil.CalculatePieceSize(pieceRange))) + m.pieceDownloadedBytes.WithLabelValues().Add(float64(rangeutils.CalculatePieceSize(pieceRange))) } request := &types.PieceUpdateRequest{