From 36fba9a6800a69399568618ec62fecae8f609453 Mon Sep 17 00:00:00 2001 From: Starnop Date: Fri, 22 Mar 2019 16:35:18 +0800 Subject: [PATCH] feature: implement the task mgr Signed-off-by: Starnop --- apis/swagger.yml | 11 +- apis/types/task_create_request.go | 5 + apis/types/task_info.go | 6 - common/errors/errors.go | 3 + common/errors/supernode_errors.go | 24 + dfget/util/http_util.go | 223 --------- supernode/daemon/mgr/task/manager.go | 204 ++++++++ supernode/daemon/mgr/task/manager_test.go | 168 +++++++ supernode/daemon/mgr/task/manager_util.go | 473 ++++++++++++++++++ .../daemon/mgr/task/manager_util_test.go | 127 +++++ supernode/daemon/mgr/task_mgr.go | 31 +- supernode/daemon/util/store.go | 4 +- supernode/{result => server}/result_info.go | 13 + supernode/server/router.go | 3 +- supernode/server/server.go | 55 +- supernode/util/range_util.go | 2 +- supernode/util/range_util_test.go | 2 +- 17 files changed, 1092 insertions(+), 262 deletions(-) delete mode 100644 dfget/util/http_util.go create mode 100644 supernode/daemon/mgr/task/manager.go create mode 100644 supernode/daemon/mgr/task/manager_test.go create mode 100644 supernode/daemon/mgr/task/manager_util.go create mode 100644 supernode/daemon/mgr/task/manager_util_test.go rename supernode/{result => server}/result_info.go (83%) diff --git a/apis/swagger.yml b/apis/swagger.yml index 96a88478c..af35f6b3f 100644 --- a/apis/swagger.yml +++ b/apis/swagger.yml @@ -562,6 +562,11 @@ definitions: downloads, if there is already a task a.b.com/fileA. items: type: "string" + peerID: + type: "string" + description: | + PeerID is used to uniquely identifies a peer which will be used to create a dfgetTask. + The value must be the value in the response after registering a peer. TaskCreateResponse: type: "object" @@ -648,12 +653,6 @@ definitions: the same taskURL and taskID to download file, A and B will share the same peer network to distribute files. If user A additionally adds an identifier with taskURL, while user B still carries only taskURL, then A's generated taskID is different from B, and the result is that two users use different peer networks. - path: - type: "string" - description: | - path is used in one peer A for uploading functionality. When peer B hopes - to get piece C from peer A, B must provide a URL for piece C. - Then when creating a task in supernode, peer A must provide this URL in request. headers: type: "object" description: | diff --git a/apis/types/task_create_request.go b/apis/types/task_create_request.go index 500d25fef..23fef3fc7 100644 --- a/apis/types/task_create_request.go +++ b/apis/types/task_create_request.go @@ -71,6 +71,11 @@ type TaskCreateRequest struct { // Path string `json:"path,omitempty"` + // PeerID is used to uniquely identifies a peer which will be used to create a dfgetTask. + // The value must be the value in the response after registering a peer. + // + PeerID string `json:"peerID,omitempty"` + // The is the resource's URL which user uses dfget to download. The location of URL can be anywhere, LAN or WAN. // For image distribution, this is image layer's URL in image registry. // The resource url is provided by command line parameter. diff --git a/apis/types/task_info.go b/apis/types/task_info.go index 7a68ede61..7b13ea4e8 100644 --- a/apis/types/task_info.go +++ b/apis/types/task_info.go @@ -69,12 +69,6 @@ type TaskInfo struct { // Md5 string `json:"md5,omitempty"` - // path is used in one peer A for uploading functionality. When peer B hopes - // to get piece C from peer A, B must provide a URL for piece C. - // Then when creating a task in supernode, peer A must provide this URL in request. - // - Path string `json:"path,omitempty"` - // The size of pieces which is calculated as per the following strategy // 1. If file's total size is less than 200MB, then the piece size is 4MB by default. // 2. Otherwise, it equals to the smaller value between totalSize/100MB + 2 MB and 15MB. diff --git a/common/errors/errors.go b/common/errors/errors.go index 342ca0dc4..add144603 100644 --- a/common/errors/errors.go +++ b/common/errors/errors.go @@ -58,6 +58,9 @@ const ( codePeerWait codeUnknowError codePeerContinue + codeURLNotReachable + codeTaskIDDuplicate + codeAuthenticationRequired ) // DfError represents a Dragonfly error. diff --git a/common/errors/supernode_errors.go b/common/errors/supernode_errors.go index 875b3f040..c81694be9 100644 --- a/common/errors/supernode_errors.go +++ b/common/errors/supernode_errors.go @@ -36,6 +36,15 @@ var ( // PeerContinue represents the peer should wait. PeerContinue = DfError{codePeerContinue, "peer continue"} + + // ErrURLNotReachable represents the url is a not reachable. + ErrURLNotReachable = DfError{codeURLNotReachable, "url not reachable"} + + // ErrTaskIDDuplicate represents the task id is in conflict. + ErrTaskIDDuplicate = DfError{codeTaskIDDuplicate, "taskId conflict"} + + // ErrAuthenticationRequired represents the authentication is required. + ErrAuthenticationRequired = DfError{codeAuthenticationRequired, "authentication required"} ) // IsSystemError check the error is a system error or not. @@ -67,3 +76,18 @@ func IsUnknowError(err error) bool { func IsPeerContinue(err error) bool { return checkError(err, codePeerContinue) } + +// IsURLNotReachable check the error is a url not reachable or not. +func IsURLNotReachable(err error) bool { + return checkError(err, codeURLNotReachable) +} + +// IsTaskIDDuplicate check the error is a TaskIDDuplicate error or not. +func IsTaskIDDuplicate(err error) bool { + return checkError(err, codeTaskIDDuplicate) +} + +// IsAuthenticationRequired check the error is a AuthenticationRequired error or not. +func IsAuthenticationRequired(err error) bool { + return checkError(err, codeAuthenticationRequired) +} diff --git a/dfget/util/http_util.go b/dfget/util/http_util.go deleted file mode 100644 index 396ef72b3..000000000 --- a/dfget/util/http_util.go +++ /dev/null @@ -1,223 +0,0 @@ -/* - * Copyright The Dragonfly Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package util - -import ( - "bytes" - "encoding/json" - "fmt" - "net" - "net/http" - "reflect" - "strings" - "time" - - "github.com/dragonflyoss/Dragonfly/common/util" - - "github.com/valyala/fasthttp" -) - -/* http content types */ -const ( - ApplicationJSONUtf8Value = "application/json;charset=utf-8" -) - -const ( - // RequestTag is the tag name for parsing structure to query parameters. - // see function ParseQuery. - RequestTag = "request" - - // DefaultTimeout is the default timeout to check connect. - DefaultTimeout = 500 * time.Millisecond -) - -// DefaultHTTPClient is the default implementation of SimpleHTTPClient. -var DefaultHTTPClient SimpleHTTPClient = &defaultHTTPClient{} - -// SimpleHTTPClient defines some http functions used frequently. -type SimpleHTTPClient interface { - PostJSON(url string, body interface{}, timeout time.Duration) (code int, res []byte, e error) - Get(url string, timeout time.Duration) (code int, res []byte, e error) -} - -// ---------------------------------------------------------------------------- -// defaultHTTPClient - -type defaultHTTPClient struct { -} - -var _ SimpleHTTPClient = &defaultHTTPClient{} - -// PostJSON send a POST request whose content-type is 'application/json;charset=utf-8'. -// When timeout <= 0, it will block until receiving response from server. -func (c *defaultHTTPClient) PostJSON(url string, body interface{}, timeout time.Duration) ( - code int, resBody []byte, err error) { - - var jsonByte []byte - - if body != nil { - jsonByte, err = json.Marshal(body) - if err != nil { - return fasthttp.StatusBadRequest, nil, err - } - } - - req := fasthttp.AcquireRequest() - defer fasthttp.ReleaseRequest(req) - - req.SetRequestURI(url) - req.SetBody(jsonByte) - req.Header.SetMethod("POST") - req.Header.SetContentType(ApplicationJSONUtf8Value) - - resp := fasthttp.AcquireResponse() - defer fasthttp.ReleaseResponse(resp) - - if timeout > 0 { - err = fasthttp.DoTimeout(req, resp, timeout) - } else { - err = fasthttp.Do(req, resp) - } - return resp.StatusCode(), resp.Body(), err -} - -// Get sends a GET request to server. -// When timeout <= 0, it will block until receiving response from server. -func (c *defaultHTTPClient) Get(url string, timeout time.Duration) ( - code int, body []byte, e error) { - if timeout > 0 { - return fasthttp.GetTimeout(nil, url, timeout) - } - return fasthttp.Get(nil, url) -} - -// --------------------------------------------------------------------------- -// util functions - -// PostJSON send a POST request whose content-type is 'application/json;charset=utf-8'. -func PostJSON(url string, body interface{}, timeout time.Duration) (int, []byte, error) { - return DefaultHTTPClient.PostJSON(url, body, timeout) -} - -// Get sends a GET request to server. -// When timeout <= 0, it will block until receiving response from server. -func Get(url string, timeout time.Duration) (int, []byte, error) { - return DefaultHTTPClient.Get(url, timeout) -} - -// Do performs the given http request and fills the given http response. -// When timeout <= 0, it will block until receiving response from server. -func Do(url string, headers map[string]string, timeout time.Duration) (string, error) { - // init request and response - req := fasthttp.AcquireRequest() - defer fasthttp.ReleaseRequest(req) - - req.SetRequestURI(url) - for k, v := range headers { - req.Header.Add(k, v) - } - - resp := fasthttp.AcquireResponse() - defer fasthttp.ReleaseResponse(resp) - - // send request - var err error - if timeout > 0 { - err = fasthttp.DoTimeout(req, resp, timeout) - } else { - err = fasthttp.Do(req, resp) - } - if err != nil { - return "", err - } - - // get resp status code - statusCode := resp.StatusCode() - if statusCode != http.StatusOK { - return "", fmt.Errorf("unexpected status code: %d", statusCode) - } - - result := string(resp.Body()) - - return result, nil -} - -// HTTPGetWithHeaders send an HTTP GET request with headers. -func HTTPGetWithHeaders(url string, headers map[string]string) (*http.Response, error) { - req, err := http.NewRequest("GET", url, nil) - if err != nil { - return nil, err - } - - for k, v := range headers { - req.Header.Add(k, v) - } - - return http.DefaultClient.Do(req) -} - -// HTTPStatusOk reports whether the http response code is 200. -func HTTPStatusOk(code int) bool { - return fasthttp.StatusOK == code -} - -// ParseQuery only parses the fields with tag 'request' of the query to parameters. -// query must be a pointer to a struct. -func ParseQuery(query interface{}) string { - if util.IsNil(query) { - return "" - } - - b := bytes.Buffer{} - wrote := false - t := reflect.TypeOf(query).Elem() - v := reflect.ValueOf(query).Elem() - for i := 0; i < t.NumField(); i++ { - tag := t.Field(i).Tag.Get(RequestTag) - if tag != "" { - if wrote { - b.WriteByte('&') - } - b.WriteString(tag) - b.WriteByte('=') - b.WriteString(fmt.Sprintf("%v", v.Field(i))) - wrote = true - } - } - return b.String() -} - -// CheckConnect checks the network connectivity between local and remote. -// param timeout: its unit is milliseconds, reset to 500 ms if <= 0 -// returns localIP -func CheckConnect(ip string, port int, timeout int) (localIP string, e error) { - t := time.Duration(timeout) * time.Millisecond - if timeout <= 0 { - t = DefaultTimeout - } - - var conn net.Conn - addr := fmt.Sprintf("%s:%d", ip, port) - if conn, e = net.DialTimeout("tcp", addr, t); e == nil { - localIP = conn.LocalAddr().String() - conn.Close() - if idx := strings.LastIndexByte(localIP, ':'); idx >= 0 { - localIP = localIP[:idx] - } - } - return -} diff --git a/supernode/daemon/mgr/task/manager.go b/supernode/daemon/mgr/task/manager.go new file mode 100644 index 000000000..c84fe881d --- /dev/null +++ b/supernode/daemon/mgr/task/manager.go @@ -0,0 +1,204 @@ +package task + +import ( + "context" + + "github.com/dragonflyoss/Dragonfly/apis/types" + errorType "github.com/dragonflyoss/Dragonfly/common/errors" + cutil "github.com/dragonflyoss/Dragonfly/common/util" + "github.com/dragonflyoss/Dragonfly/supernode/config" + "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr" + dutil "github.com/dragonflyoss/Dragonfly/supernode/daemon/util" + "github.com/dragonflyoss/Dragonfly/supernode/util" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +const ( + key = ">I$pg-~AS~sP'rqu_`Oh&lz#9]\"=;nE%" +) + +var _ mgr.TaskMgr = &Manager{} + +// using a variable getContentLength to reference the function util.GetContentLength, +// and it helps using stub functions in the test with gostub. +var getContentLength = cutil.GetContentLength + +// Manager is an implementation of the interface of TaskMgr. +type Manager struct { + cfg *config.Config + + taskStore *dutil.Store + taskLocker *util.LockerPool + + peerMgr mgr.PeerMgr + dfgetTaskMgr mgr.DfgetTaskMgr + progressMgr mgr.ProgressMgr + cdnMgr mgr.CDNMgr + schedulerMgr mgr.SchedulerMgr +} + +// NewManager returns a new Manager Object. +func NewManager(cfg *config.Config, peerMgr mgr.PeerMgr, dfgetTaskMgr mgr.DfgetTaskMgr, + progressMgr mgr.ProgressMgr, cdnMgr mgr.CDNMgr, schedulerMgr mgr.SchedulerMgr) (*Manager, error) { + return &Manager{ + cfg: cfg, + taskStore: dutil.NewStore(), + taskLocker: util.NewLockerPool(), + peerMgr: peerMgr, + dfgetTaskMgr: dfgetTaskMgr, + progressMgr: progressMgr, + cdnMgr: cdnMgr, + schedulerMgr: schedulerMgr, + }, nil +} + +// Register will not only register a task. +func (tm *Manager) Register(ctx context.Context, req *types.TaskCreateRequest) (taskCreateResponse *types.TaskCreateResponse, err error) { + // Step1: validate params + if err := validateParams(req); err != nil { + return nil, err + } + + // Step2: add a new Task or update the exist task + task, err := tm.addOrUpdateTask(ctx, req) + if err != nil { + logrus.Infof("failed to add or update task with req %+v: %v", req, err) + return nil, err + } + logrus.Debugf("success to get task info: %+v", task) + // TODO: defer rollback the task update + + // Step3: add a new DfgetTask + dfgetTask, err := tm.addDfgetTask(ctx, req, task) + if err != nil { + logrus.Infof("failed to add dfgetTask %+v: %v", dfgetTask, err) + return nil, err + } + + logrus.Debugf("success to add dfgetTask %+v", dfgetTask) + defer func() { + if err != nil { + if err := tm.dfgetTaskMgr.Delete(ctx, req.CID, task.ID); err != nil { + logrus.Errorf("failed to delete the dfgetTask with taskID %s peerID %s: %v", task.ID, req.PeerID, err) + } + logrus.Infof("success to rollback the dfgetTask %+v", dfgetTask) + } + }() + + // Step4: init Progress + if err := tm.progressMgr.InitProgress(ctx, task.ID, req.PeerID, req.CID); err != nil { + return nil, err + } + logrus.Debugf("success to init progress for taskID: %s peerID: %s cID: %s", task.ID, req.PeerID, req.CID) + // TODO: defer rollback init Progress + + // Step5: trigger CDN + if err := tm.triggerCdnSyncAction(ctx, task); err != nil { + return nil, errors.Wrapf(errorType.ErrSystemError, "failed to trigger cdn: %v", err) + } + + return &types.TaskCreateResponse{ + ID: task.ID, + FileLength: task.FileLength, + PieceSize: task.PieceSize, + }, nil +} + +// Get a task info according to specified taskID. +func (tm *Manager) Get(ctx context.Context, taskID string) (*types.TaskInfo, error) { + return tm.getTask(taskID) +} + +// List returns a list of tasks with filter. +// TODO: implement it. +func (tm *Manager) List(ctx context.Context, filter map[string]string) ([]*types.TaskInfo, error) { + return nil, nil +} + +// CheckTaskStatus check the task status. +func (tm *Manager) CheckTaskStatus(ctx context.Context, taskID string) (bool, error) { + task, err := tm.getTask(taskID) + if err != nil { + return false, err + } + + // the expected CDNStatus is not nil + if cutil.IsEmptyStr(task.CdnStatus) { + return false, errors.Wrap(errorType.ErrSystemError, "CDNStatus of TaskInfo") + } + + return isSuccessCDN(task.CdnStatus), nil +} + +// Delete deletes a task. +func (tm *Manager) Delete(ctx context.Context, taskID string) error { + tm.taskStore.Delete(taskID) + return nil +} + +// Update the info of task. +func (tm *Manager) Update(ctx context.Context, taskID string, taskInfo *types.TaskInfo) error { + return tm.updateTask(taskID, taskInfo) +} + +// GetPieces get the pieces to be downloaded based on the scheduling result. +func (tm *Manager) GetPieces(ctx context.Context, taskID, clientID string, req *types.PiecePullRequest) (bool, interface{}, error) { + logrus.Debugf("get pieces request: %+v with taskID(%s) and clientID(%s)", req, taskID, clientID) + + // convert piece result and dfgetTask status to dfgetTask status code + dfgetTaskStatus := convertToDfgetTaskStatus(req.PieceResult, req.DfgetTaskStatus) + if cutil.IsEmptyStr(dfgetTaskStatus) { + return false, nil, errors.Wrapf(errorType.ErrInvalidValue, "failed to convert piece result (%s) dfgetTaskStatus (%s)", req.PieceResult, req.DfgetTaskStatus) + } + + dfgetTask, err := tm.dfgetTaskMgr.Get(ctx, clientID, taskID) + if err != nil { + return false, nil, errors.Wrapf(err, "failed to get dfgetTask with taskID (%s) clientID (%s)", taskID, clientID) + } + logrus.Debugf("success to get dfgetTask: %+v", dfgetTask) + + task, err := tm.getTask(taskID) + if err != nil { + return false, nil, errors.Wrapf(err, "failed to get taskID (%s)", taskID) + } + logrus.Debugf("success to get task: %+v", task) + + if dfgetTaskStatus == types.DfGetTaskStatusWAITING { + logrus.Debugf("start to process task(%s) start", taskID) + return tm.processTaskStart(ctx, clientID, task, dfgetTask) + } + if dfgetTaskStatus == types.DfGetTaskStatusRUNNING { + logrus.Debugf("start to process task(%s) running", taskID) + return tm.processTaskRunning(ctx, clientID, dfgetTask.PeerID, task, req, dfgetTask) + } + logrus.Debugf("start to process task(%s) finish", taskID) + return true, nil, tm.processTaskFinish(ctx, taskID, clientID, dfgetTaskStatus) +} + +// UpdatePieceStatus update the piece status with specified parameters. +func (tm *Manager) UpdatePieceStatus(ctx context.Context, taskID, pieceRange string, pieceUpdateRequest *types.PieceUpdateRequest) error { + // calculate the pieceNum according to the pieceRange + pieceNum := util.CalculatePieceNum(pieceRange) + if pieceNum == -1 { + return errors.Wrapf(errorType.ErrInvalidValue, + "failed to parse pieceRange: %s to pieceNum for taskID: %s, clientID: %s", + pieceRange, taskID, pieceUpdateRequest.ClientID) + } + + // get dfgetTask according to the CID + srcDfgetTask, err := tm.dfgetTaskMgr.Get(ctx, pieceUpdateRequest.ClientID, taskID) + if err != nil { + return err + } + + // get piece status code according to the pieceUpdateRequest.Result + pieceStatus, ok := mgr.PieceStatusMap[pieceUpdateRequest.PieceStatus] + if !ok { + return errors.Wrapf(errorType.ErrInvalidValue, "result: %s", pieceUpdateRequest.PieceStatus) + } + + return tm.progressMgr.UpdateProgress(ctx, taskID, pieceUpdateRequest.ClientID, + srcDfgetTask.PeerID, pieceUpdateRequest.DstPID, pieceNum, pieceStatus) +} diff --git a/supernode/daemon/mgr/task/manager_test.go b/supernode/daemon/mgr/task/manager_test.go new file mode 100644 index 000000000..b8534474a --- /dev/null +++ b/supernode/daemon/mgr/task/manager_test.go @@ -0,0 +1,168 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package task + +import ( + "context" + "testing" + + "github.com/dragonflyoss/Dragonfly/apis/types" + "github.com/dragonflyoss/Dragonfly/common/errors" + "github.com/dragonflyoss/Dragonfly/supernode/config" + "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/mock" + dutil "github.com/dragonflyoss/Dragonfly/supernode/daemon/util" + + "github.com/go-check/check" + "github.com/golang/mock/gomock" + "github.com/prashantv/gostub" +) + +func Test(t *testing.T) { + check.TestingT(t) +} + +func init() { + check.Suite(&TaskMgrTestSuite{}) +} + +type TaskMgrTestSuite struct { + mockCtl *gomock.Controller + mockCDNMgr *mock.MockCDNMgr + mockDfgetTaskMgr *mock.MockDfgetTaskMgr + mockPeerMgr *mock.MockPeerMgr + mockProgressMgr *mock.MockProgressMgr + mockSchedulerMgr *mock.MockSchedulerMgr + + taskManager *Manager + contentLengthStub *gostub.Stubs +} + +func (s *TaskMgrTestSuite) SetUpSuite(c *check.C) { + s.mockCtl = gomock.NewController(c) + + s.mockPeerMgr = mock.NewMockPeerMgr(s.mockCtl) + s.mockCDNMgr = mock.NewMockCDNMgr(s.mockCtl) + s.mockDfgetTaskMgr = mock.NewMockDfgetTaskMgr(s.mockCtl) + s.mockProgressMgr = mock.NewMockProgressMgr(s.mockCtl) + s.mockSchedulerMgr = mock.NewMockSchedulerMgr(s.mockCtl) + + s.mockCDNMgr.EXPECT().TriggerCDN(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() + s.mockDfgetTaskMgr.EXPECT().Add(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + s.mockProgressMgr.EXPECT().InitProgress(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + + cfg := config.NewConfig() + s.taskManager, _ = NewManager(cfg, s.mockPeerMgr, s.mockDfgetTaskMgr, + s.mockProgressMgr, s.mockCDNMgr, s.mockSchedulerMgr) + + s.contentLengthStub = gostub.Stub(&getContentLength, func(url string, headers map[string]string) (int64, int, error) { + return 1000, 200, nil + }) +} + +func (s *TaskMgrTestSuite) TearDownSuite(c *check.C) { + s.contentLengthStub.Reset() + s.mockCtl.Finish() +} + +func (s *TaskMgrTestSuite) TestCheckTaskStatus(c *check.C) { + s.taskManager.taskStore = dutil.NewStore() + req := &types.TaskCreateRequest{ + CID: "cid", + CallSystem: "foo", + Dfdaemon: true, + Path: "/peer/file/foo", + RawURL: "http://aa.bb.com", + PeerID: "fooPeerID", + } + resp, err := s.taskManager.Register(context.Background(), req) + c.Check(err, check.IsNil) + + isSuccess, err := s.taskManager.CheckTaskStatus(context.Background(), resp.ID) + c.Check(err, check.IsNil) + c.Check(isSuccess, check.Equals, false) + + isSuccess, err = s.taskManager.CheckTaskStatus(context.Background(), "foo") + c.Check(errors.IsDataNotFound(err), check.Equals, true) + c.Check(isSuccess, check.Equals, false) + + task, err := s.taskManager.Get(context.Background(), resp.ID) + c.Check(err, check.IsNil) + task.CdnStatus = types.TaskInfoCdnStatusSUCCESS + isSuccess, err = s.taskManager.CheckTaskStatus(context.Background(), resp.ID) + c.Check(err, check.IsNil) + c.Check(isSuccess, check.Equals, true) +} + +func (s *TaskMgrTestSuite) TestUpdateTaskInfo(c *check.C) { + s.taskManager.taskStore = dutil.NewStore() + req := &types.TaskCreateRequest{ + CID: "cid", + CallSystem: "foo", + Dfdaemon: true, + Path: "/peer/file/foo", + RawURL: "http://aa.bb.com", + PeerID: "fooPeerID", + } + resp, err := s.taskManager.Register(context.Background(), req) + c.Check(err, check.IsNil) + + // return error when taskInfo equals nil + err = s.taskManager.Update(context.Background(), resp.ID, nil) + c.Check(errors.IsEmptyValue(err), check.Equals, true) + + // return error when taskInfo.CDNStatus equals "" + err = s.taskManager.Update(context.Background(), resp.ID, &types.TaskInfo{}) + c.Check(errors.IsEmptyValue(err), check.Equals, true) + + // only update the cdnStatus when CDNStatus is not success. + err = s.taskManager.Update(context.Background(), resp.ID, &types.TaskInfo{ + CdnStatus: types.TaskInfoCdnStatusFAILED, + FileLength: 2000, + Md5: "fooMd5", + }) + c.Check(err, check.IsNil) + task, err := s.taskManager.Get(context.Background(), resp.ID) + c.Check(err, check.IsNil) + c.Check(task.CdnStatus, check.Equals, types.TaskInfoCdnStatusFAILED) + c.Check(task.FileLength, check.Equals, int64(0)) + c.Check(task.Md5, check.Equals, "") + + // update the taskInfo when CDNStatus is success. + err = s.taskManager.Update(context.Background(), resp.ID, &types.TaskInfo{ + CdnStatus: types.TaskInfoCdnStatusSUCCESS, + FileLength: 2000, + Md5: "fooMd5", + }) + c.Check(err, check.IsNil) + task, err = s.taskManager.Get(context.Background(), resp.ID) + c.Check(err, check.IsNil) + c.Check(task.CdnStatus, check.Equals, types.TaskInfoCdnStatusSUCCESS) + c.Check(task.FileLength, check.Equals, int64(2000)) + + // do not update if origin CDNStatus equals success + err = s.taskManager.Update(context.Background(), resp.ID, &types.TaskInfo{ + CdnStatus: types.TaskInfoCdnStatusFAILED, + FileLength: 3000, + Md5: "fooMd5", + }) + c.Check(err, check.IsNil) + task, err = s.taskManager.Get(context.Background(), resp.ID) + c.Check(err, check.IsNil) + c.Check(task.CdnStatus, check.Equals, types.TaskInfoCdnStatusSUCCESS) + c.Check(task.FileLength, check.Equals, int64(2000)) + +} diff --git a/supernode/daemon/mgr/task/manager_util.go b/supernode/daemon/mgr/task/manager_util.go new file mode 100644 index 000000000..4a44132a7 --- /dev/null +++ b/supernode/daemon/mgr/task/manager_util.go @@ -0,0 +1,473 @@ +package task + +import ( + "context" + "fmt" + "net/http" + + "github.com/dragonflyoss/Dragonfly/apis/types" + errorType "github.com/dragonflyoss/Dragonfly/common/errors" + cutil "github.com/dragonflyoss/Dragonfly/common/util" + "github.com/dragonflyoss/Dragonfly/supernode/config" + "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr" + "github.com/dragonflyoss/Dragonfly/supernode/util" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +// addOrUpdateTask adds a new task or update the exist task to taskStore. +func (tm *Manager) addOrUpdateTask(ctx context.Context, req *types.TaskCreateRequest) (*types.TaskInfo, error) { + taskURL := cutil.FilterURLParam(req.RawURL, req.Filter) + taskID := generateTaskID(taskURL, req.Md5, req.Identifier) + + // using the existing task if it already exists corresponding to taskID + var task *types.TaskInfo + newTask := &types.TaskInfo{ + ID: taskID, + CallSystem: req.CallSystem, + Dfdaemon: req.Dfdaemon, + Headers: req.Headers, + Identifier: req.Identifier, + Md5: req.Md5, + RawURL: req.RawURL, + TaskURL: taskURL, + CdnStatus: types.TaskInfoCdnStatusWAITING, + PieceTotal: -1, + } + + if v, err := tm.taskStore.Get(taskID); err == nil { + task = v.(*types.TaskInfo) + if !equalsTask(task, newTask) { + return nil, errors.Wrapf(errorType.ErrTaskIDDuplicate, "%s", taskID) + } + } else { + task = newTask + } + + tm.taskLocker.GetLock(taskID, false) + defer tm.taskLocker.ReleaseLock(taskID, false) + + if task.FileLength != 0 { + return task, nil + } + + // get fileLength with req.Headers + fileLength, err := getHTTPFileLength(taskID, task.TaskURL, req.Headers) + if err != nil { + logrus.Errorf("failed to get file length from http client for taskID(%s): %v", taskID, err) + } + task.HTTPFileLength = fileLength + logrus.Infof("get file length %d from http client for taskID(%s)", fileLength, taskID) + + // if success to get the information successfully with the req.Headers, + // and then update the task.Headers to req.Headers. + if !cutil.IsNil(req.Headers) { + task.Headers = req.Headers + } + + // caculate piece size and update the PieceSize and PieceTotal + pieceSize := computePieceSize(fileLength) + task.PieceSize = pieceSize + task.PieceTotal = int32((fileLength + (int64(pieceSize) - 1)) / int64(pieceSize)) + + tm.taskStore.Put(taskID, task) + return task, nil +} + +// getTask returns the taskInfo according to the specified taskID. +func (tm *Manager) getTask(taskID string) (*types.TaskInfo, error) { + if cutil.IsEmptyStr(taskID) { + return nil, errors.Wrap(errorType.ErrEmptyValue, "taskID") + } + + v, err := tm.taskStore.Get(taskID) + if err != nil { + return nil, err + } + + // type assertion + if info, ok := v.(*types.TaskInfo); ok { + return info, nil + } + return nil, errors.Wrapf(errorType.ErrConvertFailed, "taskID %s: %v", taskID, v) +} + +func (tm *Manager) updateTask(taskID string, updateTaskInfo *types.TaskInfo) error { + if cutil.IsEmptyStr(taskID) { + return errors.Wrap(errorType.ErrEmptyValue, "taskID") + } + + if cutil.IsNil(updateTaskInfo) { + return errors.Wrap(errorType.ErrEmptyValue, "Update TaskInfo") + } + + // the expected new CDNStatus is not nil + if cutil.IsEmptyStr(updateTaskInfo.CdnStatus) { + return errors.Wrapf(errorType.ErrEmptyValue, "CDNStatus of TaskInfo: %+v", updateTaskInfo) + } + + tm.taskLocker.GetLock(taskID, false) + defer tm.taskLocker.ReleaseLock(taskID, false) + + task, err := tm.getTask(taskID) + if err != nil { + return err + } + + if !isSuccessCDN(updateTaskInfo.CdnStatus) { + // when the origin CDNStatus equals success, do not update it to unsuccessful + if isSuccessCDN(task.CdnStatus) { + return nil + } + + // only update the task CdnStatus when the new CDNStatus and + // the origin CDNStatus both not equals success + task.CdnStatus = updateTaskInfo.CdnStatus + return nil + } + + // only update the task info when the new CDNStatus equals success + // and the origin CDNStatus not equals success. + if updateTaskInfo.FileLength != 0 { + task.FileLength = updateTaskInfo.FileLength + } + + if !cutil.IsEmptyStr(updateTaskInfo.RealMd5) { + task.RealMd5 = updateTaskInfo.RealMd5 + } + + var pieceTotal int32 + if updateTaskInfo.FileLength > 0 { + pieceTotal = int32((updateTaskInfo.FileLength + int64(task.PieceSize-1)) / int64(task.PieceSize)) + } + if pieceTotal != 0 { + task.PieceTotal = pieceTotal + } + task.CdnStatus = updateTaskInfo.CdnStatus + + return nil +} + +func (tm *Manager) addDfgetTask(ctx context.Context, req *types.TaskCreateRequest, task *types.TaskInfo) (*types.DfGetTask, error) { + dfgetTask := &types.DfGetTask{ + CID: req.CID, + Path: req.Path, + PieceSize: task.PieceSize, + Status: types.DfGetTaskStatusWAITING, + TaskID: task.ID, + PeerID: req.PeerID, + } + + if err := tm.dfgetTaskMgr.Add(ctx, dfgetTask); err != nil { + return dfgetTask, err + } + + return dfgetTask, nil +} + +func (tm *Manager) triggerCdnSyncAction(ctx context.Context, task *types.TaskInfo) error { + if !isFrozen(task.CdnStatus) { + logrus.Infof("CDN(%s) is running or has been downloaded successfully for taskID: %s", task.CdnStatus, task.ID) + return nil + } + + if isWait(task.CdnStatus) { + if err := tm.initCdnNode(ctx, task); err != nil { + logrus.Errorf("failed to init cdn node for taskID %s: %v", task.ID, err) + return err + } + logrus.Infof("success to init cdn node or taskID %s", task.ID) + } + if err := tm.updateTask(task.ID, &types.TaskInfo{ + CdnStatus: types.TaskInfoCdnStatusRUNNING, + }); err != nil { + return err + } + + go func() { + updateTaskInfo, err := tm.cdnMgr.TriggerCDN(ctx, task) + if err != nil { + logrus.Errorf("trigger cdn get error: %v", err) + } + tm.updateTask(task.ID, updateTaskInfo) + logrus.Infof("success to update task cdn %+v", updateTaskInfo) + }() + logrus.Infof("success to start cdn trigger for taskID: %s", task.ID) + return nil +} + +func (tm *Manager) initCdnNode(ctx context.Context, task *types.TaskInfo) error { + var cid = tm.cfg.GetSuperCID(task.ID) + var pid = tm.cfg.GetSuperPID() + path, err := tm.cdnMgr.GetHTTPPath(ctx, task.ID) + if err != nil { + return err + } + + if err := tm.dfgetTaskMgr.Add(ctx, &types.DfGetTask{ + CID: cid, + Path: path, + PeerID: pid, + PieceSize: task.PieceSize, + Status: types.DfGetTaskStatusWAITING, + TaskID: task.ID, + }); err != nil { + return errors.Wrapf(err, "failed to add cdn dfgetTask for taskID %s", task.ID) + } + + return tm.progressMgr.InitProgress(ctx, task.ID, pid, cid) +} + +func (tm *Manager) processTaskStart(ctx context.Context, srcCID string, task *types.TaskInfo, dfgetTask *types.DfGetTask) (bool, interface{}, error) { + if err := tm.dfgetTaskMgr.UpdateStatus(ctx, srcCID, task.ID, types.DfGetTaskStatusRUNNING); err != nil { + return false, nil, err + } + logrus.Infof("success update dfgetTask status to RUNNING with taskID: %s clientID: %s", task.ID, srcCID) + + return tm.parseAvaliablePeers(ctx, srcCID, task, dfgetTask) +} + +// req.DstPID, req.PieceRange, req.PieceResult, req.DfgetTaskStatus +func (tm *Manager) processTaskRunning(ctx context.Context, srcCID, srcPID string, task *types.TaskInfo, req *types.PiecePullRequest, + dfgetTask *types.DfGetTask) (bool, interface{}, error) { + pieceNum := util.CalculatePieceNum(req.PieceRange) + if pieceNum == -1 { + return false, nil, errors.Wrapf(errorType.ErrInvalidValue, "pieceRange: %s", req.PieceRange) + } + pieceStatus := convertToPeerPieceStatus(req.PieceResult, req.DfgetTaskStatus) + if pieceStatus == -1 { + return false, nil, errors.Wrapf(errorType.ErrInvalidValue, "failed to convert result: %s and status %s to pieceStatus", req.PieceResult, req.DfgetTaskStatus) + } + + logrus.Infof("start to update progress taskID (%s) srcCID (%s) srcPID (%s) dstPID (%s) pieceNum (%d) pieceStatus (%d)", + task.ID, srcCID, srcPID, req.DstPID, pieceNum, pieceStatus) + if err := tm.progressMgr.UpdateProgress(ctx, task.ID, srcCID, srcPID, req.DstPID, pieceNum, pieceStatus); err != nil { + return false, nil, errors.Wrap(err, "failed to update progress") + } + + return tm.parseAvaliablePeers(ctx, srcCID, task, dfgetTask) +} + +func (tm *Manager) processTaskFinish(ctx context.Context, taskID, clientID, dfgetTaskStatus string) error { + if err := tm.dfgetTaskMgr.UpdateStatus(ctx, clientID, taskID, dfgetTaskStatus); err != nil { + return fmt.Errorf("failed to update dfget task status with taskID(%s) clientID(%s) status(%s): %v", taskID, clientID, dfgetTaskStatus, err) + } + + return nil +} + +func (tm *Manager) parseAvaliablePeers(ctx context.Context, clientID string, task *types.TaskInfo, dfgetTask *types.DfGetTask) (bool, interface{}, error) { + // Step1. validate + if cutil.IsEmptyStr(clientID) { + return false, nil, errors.Wrapf(errorType.ErrEmptyValue, "clientID") + } + + // Step2. validate cdn status + if task.CdnStatus == types.TaskInfoCdnStatusFAILED { + return false, nil, errors.Wrapf(errorType.ErrCDNFail, "taskID: %s", task.ID) + } + if task.CdnStatus == types.TaskInfoCdnStatusWAITING { + return false, nil, errors.Wrapf(errorType.ErrPeerWait, "taskID: %s cdn status is waiting", task.ID) + } + + // Step3. whether success + cdnSuccess := (task.CdnStatus == types.TaskInfoCdnStatusSUCCESS) + pieceSuccess, _ := tm.progressMgr.GetPieceProgressByCID(ctx, task.ID, clientID, "success") + logrus.Infof("get successful pieces: %v", pieceSuccess) + if cdnSuccess && (int32(len(pieceSuccess)) == task.PieceTotal) { + finishInfo := make(map[string]interface{}) + finishInfo["md5"] = task.Md5 + finishInfo["fileLength"] = task.FileLength + return true, finishInfo, nil + } + + // get scheduler pieceResult + logrus.Infof("start scheduler for taskID: %s clientID: %s", task.ID, clientID) + pieceResult, err := tm.schedulerMgr.Schedule(ctx, task.ID, clientID, dfgetTask.PeerID) + if err != nil { + return false, nil, err + } + logrus.Infof("get scheduler result length(%d) with taskID(%s) and clientID(%s)", len(pieceResult), task.ID, clientID) + + var pieceInfos []*types.PieceInfo + for _, v := range pieceResult { + logrus.Debugf("get scheduler result item: %+v with taskID(%s) and clientID(%s)", v, task.ID, clientID) + pieceInfo, err := tm.pieceResultToPieceInfo(ctx, v, task.PieceSize) + if err != nil { + return false, nil, err + } + + pieceInfos = append(pieceInfos, pieceInfo) + } + + return false, pieceInfos, nil +} + +func (tm *Manager) pieceResultToPieceInfo(ctx context.Context, pr *mgr.PieceResult, pieceSize int32) (*types.PieceInfo, error) { + cid, err := tm.dfgetTaskMgr.GetCIDByPeerIDAndTaskID(ctx, pr.DstPID, pr.TaskID) + if err != nil { + return nil, err + } + dfgetTask, err := tm.dfgetTaskMgr.Get(ctx, cid, pr.TaskID) + if err != nil { + return nil, err + } + + peer, err := tm.peerMgr.Get(ctx, pr.DstPID) + if err != nil { + return nil, err + } + + return &types.PieceInfo{ + PID: pr.DstPID, + Path: dfgetTask.Path, + PeerIP: peer.IP.String(), + PeerPort: peer.Port, + PieceRange: util.CalculatePieceRange(pr.PieceNum, pieceSize), + PieceSize: pieceSize, + }, nil +} + +// convertToPeerPieceStatus convert piece result and dfgetTask status to dfgetTask status code. +// And it should return "" if failed to convert. +func convertToDfgetTaskStatus(result, status string) string { + if status == types.PiecePullRequestDfgetTaskStatusSTARTED { + return types.DfGetTaskStatusWAITING + } + + if status == types.PiecePullRequestDfgetTaskStatusRUNNING { + return types.DfGetTaskStatusRUNNING + } + + if status == types.PiecePullRequestDfgetTaskStatusFINISHED { + if result == types.PiecePullRequestPieceResultSUCCESS { + return types.DfGetTaskStatusSUCCESS + } + return types.DfGetTaskStatusFAILED + } + + return "" +} + +// convertToPeerPieceStatus convert piece result and dfgetTask status to piece status code. +// And it should return -1 if failed to convert. +func convertToPeerPieceStatus(result, status string) int { + if status == types.PiecePullRequestDfgetTaskStatusSTARTED { + return config.PieceWAITING + } + + if status == types.PiecePullRequestDfgetTaskStatusRUNNING { + if result == types.PiecePullRequestPieceResultSUCCESS { + return config.PieceSUCCESS + } + if result == types.PiecePullRequestPieceResultFAILED { + return config.PieceFAILED + } + if result == types.PiecePullRequestPieceResultSEMISUC { + return config.PieceSEMISUC + } + } + + return -1 +} + +// equalsTask determines that whether the two task objects are the same. +// +// The result is based only on whether the attributes used to generate taskID are the same +// which including taskURL, md5, identifier. +func equalsTask(existTask, newTask *types.TaskInfo) bool { + if existTask.TaskURL != newTask.TaskURL { + return false + } + + if !cutil.IsEmptyStr(existTask.Md5) { + return existTask.Md5 == newTask.Md5 + } + + return existTask.Identifier == newTask.Identifier +} + +// validateParams validates the params of TaskCreateRequest. +func validateParams(req *types.TaskCreateRequest) error { + if !cutil.IsValidURL(req.RawURL) { + return errors.Wrapf(errorType.ErrInvalidValue, "raw url: %s", req.RawURL) + } + + if cutil.IsEmptyStr(req.Path) { + return errors.Wrapf(errorType.ErrEmptyValue, "path") + } + + if cutil.IsEmptyStr(req.CID) { + return errors.Wrapf(errorType.ErrEmptyValue, "cID") + } + + if cutil.IsEmptyStr(req.PeerID) { + return errors.Wrapf(errorType.ErrEmptyValue, "peerID") + } + + return nil +} + +// generateTaskID generates taskID with taskURL,md5 and identifier +// and returns the SHA-256 checksum of the data. +func generateTaskID(taskURL, md5, identifier string) string { + sign := "" + if !cutil.IsEmptyStr(md5) { + sign = md5 + } else if !cutil.IsEmptyStr(identifier) { + sign = identifier + } + id := fmt.Sprintf("%s%s%s%s", key, taskURL, sign, key) + + return cutil.Sha256(id) +} + +// computePieceSize computes the piece size with specified fileLength. +// +// If the fileLength<=0, which means failed to get fileLength +// and then use the DefaultPieceSize. +func computePieceSize(length int64) int32 { + if length <= 0 || length <= 200*1024*1024 { + return config.DefaultPieceSize + } + + gapCount := length / int64(100*1024*1024) + tmpSize := (gapCount-2)*1024*1024 + config.DefaultPieceSize + if tmpSize > config.DefaultPieceSizeLimit { + return config.DefaultPieceSizeLimit + } + return int32(tmpSize) +} + +// isSuccessCDN determines that whether the CDNStatus is success. +func isSuccessCDN(CDNStatus string) bool { + return CDNStatus == types.TaskInfoCdnStatusSUCCESS +} + +func isFrozen(CDNStatus string) bool { + return CDNStatus == types.TaskInfoCdnStatusFAILED || + CDNStatus == types.TaskInfoCdnStatusWAITING || + CDNStatus == types.TaskInfoCdnStatusSOURCEERROR +} + +func isWait(CDNStatus string) bool { + return CDNStatus == types.TaskInfoCdnStatusWAITING +} + +func getHTTPFileLength(taskID, url string, headers map[string]string) (int64, error) { + fileLength, code, err := getContentLength(url, headers) + if err != nil { + return -1, err + } + + if code == http.StatusUnauthorized || code == http.StatusProxyAuthRequired { + return -1, errors.Wrapf(errorType.ErrAuthenticationRequired, "taskID: %s,code: %d", taskID, code) + } + if code != http.StatusOK { + logrus.Warnf("failed to get http file length with unexpected code: %d", code) + return -1, nil + } + + return fileLength, nil +} diff --git a/supernode/daemon/mgr/task/manager_util_test.go b/supernode/daemon/mgr/task/manager_util_test.go new file mode 100644 index 000000000..e6ee9c271 --- /dev/null +++ b/supernode/daemon/mgr/task/manager_util_test.go @@ -0,0 +1,127 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package task + +import ( + "context" + + "github.com/dragonflyoss/Dragonfly/apis/types" + cutil "github.com/dragonflyoss/Dragonfly/common/util" + "github.com/dragonflyoss/Dragonfly/supernode/config" + "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/mock" + + "github.com/go-check/check" + "github.com/golang/mock/gomock" + "github.com/prashantv/gostub" +) + +func init() { + check.Suite(&TaskUtilTestSuite{}) +} + +type TaskUtilTestSuite struct { + mockCtl *gomock.Controller + mockCDNMgr *mock.MockCDNMgr + mockDfgetTaskMgr *mock.MockDfgetTaskMgr + mockPeerMgr *mock.MockPeerMgr + mockProgressMgr *mock.MockProgressMgr + mockSchedulerMgr *mock.MockSchedulerMgr + + taskManager *Manager + contentLengthStub *gostub.Stubs +} + +func (s *TaskUtilTestSuite) SetUpSuite(c *check.C) { + s.mockCtl = gomock.NewController(c) + + s.mockPeerMgr = mock.NewMockPeerMgr(s.mockCtl) + s.mockCDNMgr = mock.NewMockCDNMgr(s.mockCtl) + s.mockDfgetTaskMgr = mock.NewMockDfgetTaskMgr(s.mockCtl) + s.mockProgressMgr = mock.NewMockProgressMgr(s.mockCtl) + s.mockSchedulerMgr = mock.NewMockSchedulerMgr(s.mockCtl) + s.taskManager, _ = NewManager(config.NewConfig(), s.mockPeerMgr, s.mockDfgetTaskMgr, + s.mockProgressMgr, s.mockCDNMgr, s.mockSchedulerMgr) + + s.contentLengthStub = gostub.Stub(&getContentLength, func(url string, headers map[string]string) (int64, int, error) { + return 1000, 200, nil + }) +} + +func (s *TaskUtilTestSuite) TearDownSuite(c *check.C) { + s.contentLengthStub.Reset() +} + +func (s *TaskUtilTestSuite) TestAddOrUpdateTask(c *check.C) { + var cases = []struct { + req *types.TaskCreateRequest + task *types.TaskInfo + errNil bool + }{ + { + req: &types.TaskCreateRequest{ + CID: "cid", + CallSystem: "foo", + Dfdaemon: true, + Path: "/peer/file/foo", + RawURL: "http://aa.bb.com", + }, + task: &types.TaskInfo{ + ID: generateTaskID("http://aa.bb.com", "", ""), + CallSystem: "foo", + CdnStatus: types.TaskInfoCdnStatusWAITING, + Dfdaemon: true, + HTTPFileLength: 1000, + PieceSize: config.DefaultPieceSize, + PieceTotal: 1, + RawURL: "http://aa.bb.com", + TaskURL: "http://aa.bb.com", + }, + errNil: true, + }, + { + req: &types.TaskCreateRequest{ + CID: "cid2", + CallSystem: "foo2", + Dfdaemon: false, + Path: "/peer/file/foo2", + RawURL: "http://aa.bb.com", + Headers: map[string]string{"aaa": "bbb"}, + }, + task: &types.TaskInfo{ + ID: generateTaskID("http://aa.bb.com", "", ""), + CallSystem: "foo", + CdnStatus: types.TaskInfoCdnStatusWAITING, + Dfdaemon: true, + HTTPFileLength: 1000, + PieceSize: config.DefaultPieceSize, + PieceTotal: 1, + RawURL: "http://aa.bb.com", + TaskURL: "http://aa.bb.com", + Headers: map[string]string{"aaa": "bbb"}, + }, + errNil: true, + }, + } + + for _, v := range cases { + task, err := s.taskManager.addOrUpdateTask(context.Background(), v.req) + c.Check(cutil.IsNil(err), check.Equals, v.errNil) + taskInfo, err := s.taskManager.getTask(task.ID) + c.Check(err, check.IsNil) + c.Check(taskInfo, check.DeepEquals, v.task) + } +} diff --git a/supernode/daemon/mgr/task_mgr.go b/supernode/daemon/mgr/task_mgr.go index e8c2d3226..ff2a84f2c 100644 --- a/supernode/daemon/mgr/task_mgr.go +++ b/supernode/daemon/mgr/task_mgr.go @@ -4,8 +4,16 @@ import ( "context" "github.com/dragonflyoss/Dragonfly/apis/types" + "github.com/dragonflyoss/Dragonfly/supernode/config" ) +// PieceStatusMap maintains the mapping relationship between PieceUpdateRequestResult and PieceStatus code. +var PieceStatusMap = map[string]int{ + types.PieceUpdateRequestPieceStatusFAILED: config.PieceFAILED, + types.PieceUpdateRequestPieceStatusSEMISUC: config.PieceSEMISUC, + types.PieceUpdateRequestPieceStatusSUCCESS: config.PieceSUCCESS, +} + // TaskMgr as an interface defines all operations against Task. // A Task will store some meta info about the taskFile, pieces and something else. // A Task has a one-to-one correspondence with a file on the disk which is identified by taskID. @@ -13,7 +21,7 @@ type TaskMgr interface { // Register a task represents that someone wants to download a file. // Supernode will get the task file meta and return taskID. // NOTE: If supernode cannot find the task file, the CDN download will be triggered. - Register(ctx context.Context, task *types.TaskInfo) (taskID string, err error) + Register(ctx context.Context, taskCreateRequest *types.TaskCreateRequest) (taskCreateResponse *types.TaskCreateResponse, err error) // Get the task Info with specified taskID. Get(ctx context.Context, taskID string) (*types.TaskInfo, error) @@ -24,33 +32,24 @@ type TaskMgr interface { // CheckTaskStatus check whether the taskID corresponding file exists. CheckTaskStatus(ctx context.Context, taskID string) (bool, error) - // DeleteTask delete a task + // Delete deletes a task // NOTE: delete the related peers and dfgetTask info is necessary. - DeleteTask(ctx context.Context, taskID string) error + Delete(ctx context.Context, taskID string) error - // update the task info with specified info. + // Update updates the task info with specified info. // In common, there are several situations that we will use this method: // 1. when finished to download, update task status. // 2. for operation usage. - UpdateTaskInfo(ctx context.Context, taskID string, taskInfo *types.TaskInfo) error + // TODO: define a struct of TaskUpdateRequest? + Update(ctx context.Context, taskID string, taskInfo *types.TaskInfo) error // GetPieces get the pieces to be downloaded based on the scheduling result, // just like this: which pieces can be downloaded from which peers. - GetPieces(ctx context.Context, taskID, clientID string, piecePullRequest *types.PiecePullRequest) ([]*types.PieceInfo, error) + GetPieces(ctx context.Context, taskID, clientID string, piecePullRequest *types.PiecePullRequest) (isFinished bool, data interface{}, err error) // UpdatePieceStatus update the piece status with specified parameters. // A task file is divided into several pieces logically. // We use a sting called pieceRange to identify a piece. // A pieceRange separated by a dash, like this: 0-45565, etc. UpdatePieceStatus(ctx context.Context, taskID, pieceRange string, pieceUpdateRequest *types.PieceUpdateRequest) error - - // GetPieceMD5 returns the md5 of pieceNum for taskID. - GetPieceMD5(ctx context.Context, taskID string, pieceNum int) (pieceMD5 string, err error) - - // SetPieceMD5 set the md5 for pieceNum of taskID. - SetPieceMD5(ctx context.Context, taskID string, pieceNum int, pieceMD5 string) (err error) - - // GetPieceMD5sByTaskID returns all pieceMD5s as a string slice. - // All pieceMD5s are returned only if the CDN status is successful. - GetPieceMD5sByTaskID(ctx context.Context, taskID string) (pieceMD5s []string, err error) } diff --git a/supernode/daemon/util/store.go b/supernode/daemon/util/store.go index d9e7cdb08..1d17cd4c7 100644 --- a/supernode/daemon/util/store.go +++ b/supernode/daemon/util/store.go @@ -28,7 +28,7 @@ func (s *Store) Put(key string, value interface{}) error { func (s *Store) Get(key string) (interface{}, error) { v, ok := s.metaMap.Load(key) if !ok { - return nil, errors.Wrapf(errorType.ErrDataNotFound, "key: %v", key) + return nil, errors.Wrapf(errorType.ErrDataNotFound, "key (%s)", key) } return v, nil @@ -38,7 +38,7 @@ func (s *Store) Get(key string) (interface{}, error) { func (s *Store) Delete(key string) error { _, ok := s.metaMap.Load(key) if !ok { - return errors.Wrapf(errorType.ErrDataNotFound, "key: %v", key) + return errors.Wrapf(errorType.ErrDataNotFound, "key (%s)", key) } s.metaMap.Delete(key) diff --git a/supernode/result/result_info.go b/supernode/server/result_info.go similarity index 83% rename from supernode/result/result_info.go rename to supernode/server/result_info.go index 4f4cb79c2..bfc73c735 100644 --- a/supernode/result/result_info.go +++ b/supernode/server/result_info.go @@ -26,6 +26,14 @@ func NewResultInfoWithError(err error) ResultInfo { return NewResultInfoWithCodeError(constants.CodeTargetNotFound, err) } + if errors.IsPeerWait(err) { + return NewResultInfoWithCodeError(constants.CodePeerWait, err) + } + + if errors.IsPeerContinue(err) { + return NewResultInfoWithCodeError(constants.CodePeerContinue, err) + } + // IsConvertFailed return NewResultInfoWithCodeError(constants.CodeSystemError, err) } @@ -49,6 +57,11 @@ func NewResultInfoWithCodeMsg(code int, msg string) ResultInfo { return NewResultInfo(code, msg, nil) } +// NewResultInfoWithCodeData returns a new ResultInfo with code and specified data. +func NewResultInfoWithCodeData(code int, data interface{}) ResultInfo { + return NewResultInfo(code, "", data) +} + // NewResultInfo returns a new ResultInfo. func NewResultInfo(code int, msg string, data interface{}) ResultInfo { return ResultInfo{ diff --git a/supernode/server/router.go b/supernode/server/router.go index c6b71a21e..fd924f3a1 100644 --- a/supernode/server/router.go +++ b/supernode/server/router.go @@ -7,7 +7,6 @@ import ( "net/http/pprof" "github.com/dragonflyoss/Dragonfly/apis/types" - result "github.com/dragonflyoss/Dragonfly/supernode/result" "github.com/gorilla/mux" ) @@ -89,7 +88,7 @@ func HandleErrorResponse(w http.ResponseWriter, err error) { // By default, daemon side returns code 500 if error happens. code = http.StatusInternalServerError - errMsg = result.NewResultInfoWithError(err).Error() + errMsg = NewResultInfoWithError(err).Error() w.Header().Set("Content-Type", "application/json") w.WriteHeader(code) diff --git a/supernode/server/server.go b/supernode/server/server.go index 239670065..84ed6b338 100644 --- a/supernode/server/server.go +++ b/supernode/server/server.go @@ -8,26 +8,71 @@ import ( "github.com/dragonflyoss/Dragonfly/supernode/config" "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr" - peer "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/peer" + "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/cdn" + "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/dfgettask" + "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/peer" + "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/progress" + "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/scheduler" + "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/task" + "github.com/dragonflyoss/Dragonfly/supernode/store" "github.com/sirupsen/logrus" ) // Server is server instance. type Server struct { - Config *config.Config - PeerMgr mgr.PeerMgr + Config *config.Config + PeerMgr mgr.PeerMgr + TaskMgr mgr.TaskMgr + DfgetTaskMgr mgr.DfgetTaskMgr } // New creates a brand new server instance. func New(cfg *config.Config) (*Server, error) { + sm, err := store.NewManager(cfg) + if err != nil { + return nil, err + } + storeLocal, err := sm.Get(store.LocalStorageDriver) + if err != nil { + return nil, err + } + peerMgr, err := peer.NewManager() if err != nil { return nil, err } + + dfgetTaskMgr, err := dfgettask.NewManager() + if err != nil { + return nil, err + } + + progressMgr, err := progress.NewManager(cfg) + if err != nil { + return nil, err + } + + schedulerMgr, err := scheduler.NewManager(cfg, progressMgr) + if err != nil { + return nil, err + } + + cdnMgr, err := cdn.NewManager(cfg, storeLocal, progressMgr) + if err != nil { + return nil, err + } + + taskMgr, err := task.NewManager(cfg, peerMgr, dfgetTaskMgr, progressMgr, cdnMgr, schedulerMgr) + if err != nil { + return nil, err + } + return &Server{ - Config: cfg, - PeerMgr: peerMgr, + Config: cfg, + PeerMgr: peerMgr, + TaskMgr: taskMgr, + DfgetTaskMgr: dfgetTaskMgr, }, nil } diff --git a/supernode/util/range_util.go b/supernode/util/range_util.go index 06eee37f9..4ceb13aa7 100644 --- a/supernode/util/range_util.go +++ b/supernode/util/range_util.go @@ -81,7 +81,7 @@ func CalculateBreakRange(startPieceNum, pieceContSize int, rangeLength int64) (s // with the following formula: // start = pieceNum * pieceSize // end = start + pieceSize - 1 -func CalculatePieceRange(pieceNum, pieceSize int) string { +func CalculatePieceRange(pieceNum int, pieceSize int32) string { startIndex := int64(pieceNum) * int64(pieceSize) endIndex := startIndex + int64(pieceSize) - 1 return strconv.FormatInt(startIndex, 10) + separator + strconv.FormatInt(endIndex, 10) diff --git a/supernode/util/range_util_test.go b/supernode/util/range_util_test.go index ce9144083..14beebbec 100644 --- a/supernode/util/range_util_test.go +++ b/supernode/util/range_util_test.go @@ -120,7 +120,7 @@ func (suite *RangeUtilSuite) TestCalculateBreakRange(c *check.C) { func (suite *RangeUtilSuite) TestCalculatePieceRange(c *check.C) { var cases = []struct { startPieceNum int - pieceContSize int + pieceContSize int32 expected string }{ {