From 95422324e80cb9ce40ded8150df28c50934ca367 Mon Sep 17 00:00:00 2001 From: "allen.wq" Date: Mon, 1 Jun 2020 11:45:54 +0800 Subject: [PATCH] add supernode api for seed pattern Signed-off-by: allen.wq --- dfget/config/constants.go | 3 +- dfget/core/api/supernode_api.go | 162 +++++++++++++++++++++++++++ dfget/core/helper/test_helper.go | 16 +++ dfget/types/fetch_p2p_networkinfo.go | 6 +- dfget/types/heartbeat_response.go | 25 +++++ 5 files changed, 206 insertions(+), 6 deletions(-) create mode 100644 dfget/types/heartbeat_response.go diff --git a/dfget/config/constants.go b/dfget/config/constants.go index 9010f84a8..f0c7ff475 100644 --- a/dfget/config/constants.go +++ b/dfget/config/constants.go @@ -73,7 +73,8 @@ const ( StrTotalLimit = "totalLimit" StrCDNSource = "cdnSource" - StrBytes = "bytes" + StrBytes = "bytes" + StrPattern = "pattern" ) /* piece meta */ diff --git a/dfget/core/api/supernode_api.go b/dfget/core/api/supernode_api.go index 361a8244b..d25bd6473 100644 --- a/dfget/core/api/supernode_api.go +++ b/dfget/core/api/supernode_api.go @@ -38,6 +38,8 @@ const ( peerClientErrorPath = "/peer/piece/error" peerServiceDownPath = "/peer/service/down" metricsReportPath = "/task/metrics" + fetchP2PNetworkPath = "/peer/network" + peerHeartBeatPath = "/peer/heartbeat" ) // NewSupernodeAPI creates a new instance of SupernodeAPI with default value. @@ -57,6 +59,11 @@ type SupernodeAPI interface { ServiceDown(node string, taskID string, cid string) (resp *types.BaseResponse, e error) ReportClientError(node string, req *types.ClientErrorRequest) (resp *types.BaseResponse, e error) ReportMetrics(node string, req *api_types.TaskMetricsRequest) (resp *types.BaseResponse, e error) + HeartBeat(node string, req *api_types.HeartBeatRequest) (resp *types.HeartBeatResponse, err error) + FetchP2PNetworkInfo(node string, start int, limit int, req *api_types.NetworkInfoFetchRequest) (resp *api_types.NetworkInfoFetchResponse, e error) + ReportResource(node string, req *types.RegisterRequest) (resp *types.RegisterResponse, err error) + ApplyForSeedNode(node string, req *types.RegisterRequest) (resp *types.RegisterResponse, err error) + ReportResourceDeleted(node string, taskID string, cid string) (resp *types.BaseResponse, err error) } type supernodeAPI struct { @@ -191,3 +198,158 @@ func (api *supernodeAPI) get(url string, resp interface{}) error { } return json.Unmarshal(body, resp) } + +// report resource +func (api *supernodeAPI) ReportResource(node string, req *types.RegisterRequest) (resp *types.RegisterResponse, err error) { + var ( + code int + body []byte + ) + url := fmt.Sprintf("%s://%s%s", + api.Scheme, node, peerRegisterPath) + header := map[string]string{ + "X-report-resource": "true", + } + if code, body, err = api.HTTPClient.PostJSONWithHeaders(url, header, req, api.Timeout); err != nil { + return nil, err + } + + logrus.Infof("ReportResource, url: %s, header: %v, req: %v, "+ + "code: %d, body: %s", url, header, req, code, string(body)) + + if !httputils.HTTPStatusOk(code) { + return nil, fmt.Errorf("%d:%s", code, body) + } + resp = new(types.RegisterResponse) + if err = json.Unmarshal(body, resp); err != nil { + return nil, err + } + return resp, err +} + +func (api *supernodeAPI) ReportResourceDeleted(node string, taskID string, cid string) (resp *types.BaseResponse, err error) { + url := fmt.Sprintf("%s://%s%s?taskId=%s&cid=%s", + api.Scheme, node, peerServiceDownPath, taskID, cid) + + header := map[string]string{ + "X-report-resource": "true", + } + + logrus.Infof("Call ReportResourceDeleted, node: %s, taskID: %s, cid: %s, "+ + "url: %s, header: %v", node, taskID, cid, url, header) + + resp = new(types.BaseResponse) + resp.Code = constants.Success + + if err = api.get(url, resp); err != nil { + logrus.Errorf("failed to send service down,err: %v", err) + return nil, err + } + if resp.Code != constants.CodeGetPeerDown { + logrus.Errorf("failed to send service down to supernode: api response code is %d not equal to %d", resp.Code, constants.CodeGetPeerDown) + } + + return +} + +// apply for seed node to supernode, if selected as seed, the resp.AsSeed will set true. +func (api *supernodeAPI) ApplyForSeedNode(node string, req *types.RegisterRequest) (resp *types.RegisterResponse, err error) { + var ( + code int + body []byte + ) + url := fmt.Sprintf("%s://%s%s", + api.Scheme, node, peerRegisterPath) + header := map[string]string{ + "X-report-resource": "true", + } + if code, body, err = api.HTTPClient.PostJSONWithHeaders(url, header, req, api.Timeout); err != nil { + return nil, err + } + + logrus.Infof("ReportResource, url: %s, header: %v, req: %v, "+ + "code: %d, body: %s", url, header, req, code, string(body)) + + if !httputils.HTTPStatusOk(code) { + return nil, fmt.Errorf("%d:%s", code, body) + } + resp = new(types.RegisterResponse) + if err = json.Unmarshal(body, resp); err != nil { + return nil, err + } + return resp, err +} + +// FetchP2PNetworkInfo fetch the p2p network info from supernode. +// @parameter +// start: the start index for array of result +// limit: the limit size of array of result, if -1 means no paging +func (api *supernodeAPI) FetchP2PNetworkInfo(node string, start int, limit int, req *api_types.NetworkInfoFetchRequest) (resp *api_types.NetworkInfoFetchResponse, err error) { + var ( + code int + body []byte + ) + + if start < 0 { + start = 0 + } + + if limit < 0 { + limit = -1 + } + + if limit == 0 { + //todo: the page default limit should be configuration item of dfdaemon + limit = 500 + } + + url := fmt.Sprintf("%s://%s%s?start=%d&limit=%d", + api.Scheme, node, fetchP2PNetworkPath, start, limit) + if code, body, err = api.HTTPClient.PostJSON(url, req, api.Timeout); err != nil { + return nil, err + } + + logrus.Debugf("in FetchP2PNetworkInfo, req url: %s, timeout: %v, body: %v", url, api.Timeout, req) + logrus.Debugf("in FetchP2PNetworkInfo, resp code: %d, body: %s", code, string(body)) + + if !httputils.HTTPStatusOk(code) { + return nil, fmt.Errorf("%d:%s", code, body) + } + rr := new(types.FetchP2PNetworkInfoResponse) + if err = json.Unmarshal(body, rr); err != nil { + return nil, err + } + + if rr.Code != constants.Success { + return nil, fmt.Errorf("%d:%s", code, rr.Msg) + } + + return rr.Data, nil +} + +func (api *supernodeAPI) HeartBeat(node string, req *api_types.HeartBeatRequest) (resp *types.HeartBeatResponse, err error) { + var ( + code int + body []byte + ) + + url := fmt.Sprintf("%s://%s%s", + api.Scheme, node, peerHeartBeatPath) + + if code, body, err = api.HTTPClient.PostJSON(url, req, api.Timeout); err != nil { + return nil, err + } + + if !httputils.HTTPStatusOk(code) { + logrus.Errorf("failed to heart beat, code %d, body: %s", code, string(body)) + return nil, fmt.Errorf("%d:%s", code, string(body)) + } + + logrus.Debugf("heart beat resp: %s", string(body)) + + resp = new(types.HeartBeatResponse) + if err = json.Unmarshal(body, resp); err != nil { + return nil, err + } + return resp, err +} diff --git a/dfget/core/helper/test_helper.go b/dfget/core/helper/test_helper.go index 8ccd8acb9..43eda4241 100644 --- a/dfget/core/helper/test_helper.go +++ b/dfget/core/helper/test_helper.go @@ -184,6 +184,22 @@ func (m *MockSupernodeAPI) ReportMetrics(ip string, req *api_types.TaskMetricsRe return nil, nil } +func (m *MockSupernodeAPI) HeartBeat(node string, req *api_types.HeartBeatRequest) (resp *types.HeartBeatResponse, err error) { + return nil, nil +} +func (m *MockSupernodeAPI) FetchP2PNetworkInfo(node string, start int, limit int, req *api_types.NetworkInfoFetchRequest) (resp *api_types.NetworkInfoFetchResponse, e error) { + return nil, nil +} +func (m *MockSupernodeAPI) ReportResource(node string, req *types.RegisterRequest) (resp *types.RegisterResponse, err error) { + return nil, nil +} +func (m *MockSupernodeAPI) ApplyForSeedNode(node string, req *types.RegisterRequest) (resp *types.RegisterResponse, err error) { + return nil, nil +} +func (m *MockSupernodeAPI) ReportResourceDeleted(node string, taskID string, cid string) (resp *types.BaseResponse, err error) { + return nil, nil +} + // CreateRegisterFunc creates a mock register function. func CreateRegisterFunc() RegisterFuncType { var newResponse = func(code int, msg string) *types.RegisterResponse { diff --git a/dfget/types/fetch_p2p_networkinfo.go b/dfget/types/fetch_p2p_networkinfo.go index 0b89d3b0b..c1598e11c 100644 --- a/dfget/types/fetch_p2p_networkinfo.go +++ b/dfget/types/fetch_p2p_networkinfo.go @@ -27,9 +27,5 @@ type FetchP2PNetworkInfoRequest struct { // FetchP2PNetworkInfoResponse is send to supernode to fetch p2p network info type FetchP2PNetworkInfoResponse struct { *BaseResponse - Data *FetchNetworkInfoDataResponse `json:"data,omitempty"` -} - -type FetchNetworkInfoDataResponse struct { - Nodes []*types.Node `json:"nodes"` + Data *types.NetworkInfoFetchResponse `json:"data,omitempty"` } diff --git a/dfget/types/heartbeat_response.go b/dfget/types/heartbeat_response.go new file mode 100644 index 000000000..3a9bf4cf9 --- /dev/null +++ b/dfget/types/heartbeat_response.go @@ -0,0 +1,25 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package types + +import "github.com/dragonflyoss/Dragonfly/apis/types" + +// HeartBeatResponse is the response of heart beat. +type HeartBeatResponse struct { + *BaseResponse + Data *types.HeartBeatResponse `json:"data,omitempty"` +}