Skip to content
This repository has been archived by the owner on Dec 20, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1372 from antsystem/feat/add-more-supernode-api-f…
Browse files Browse the repository at this point in the history
…or-seed-pattern

add supernode api for seed pattern
  • Loading branch information
lowzj authored Jun 3, 2020
2 parents 5981693 + 9542232 commit ed1145f
Show file tree
Hide file tree
Showing 5 changed files with 206 additions and 6 deletions.
3 changes: 2 additions & 1 deletion dfget/config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ const (
StrTotalLimit = "totalLimit"
StrCDNSource = "cdnSource"

StrBytes = "bytes"
StrBytes = "bytes"
StrPattern = "pattern"
)

/* piece meta */
Expand Down
162 changes: 162 additions & 0 deletions dfget/core/api/supernode_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ const (
peerClientErrorPath = "/peer/piece/error"
peerServiceDownPath = "/peer/service/down"
metricsReportPath = "/task/metrics"
fetchP2PNetworkPath = "/peer/network"
peerHeartBeatPath = "/peer/heartbeat"
)

// NewSupernodeAPI creates a new instance of SupernodeAPI with default value.
Expand All @@ -57,6 +59,11 @@ type SupernodeAPI interface {
ServiceDown(node string, taskID string, cid string) (resp *types.BaseResponse, e error)
ReportClientError(node string, req *types.ClientErrorRequest) (resp *types.BaseResponse, e error)
ReportMetrics(node string, req *api_types.TaskMetricsRequest) (resp *types.BaseResponse, e error)
HeartBeat(node string, req *api_types.HeartBeatRequest) (resp *types.HeartBeatResponse, err error)
FetchP2PNetworkInfo(node string, start int, limit int, req *api_types.NetworkInfoFetchRequest) (resp *api_types.NetworkInfoFetchResponse, e error)
ReportResource(node string, req *types.RegisterRequest) (resp *types.RegisterResponse, err error)
ApplyForSeedNode(node string, req *types.RegisterRequest) (resp *types.RegisterResponse, err error)
ReportResourceDeleted(node string, taskID string, cid string) (resp *types.BaseResponse, err error)
}

type supernodeAPI struct {
Expand Down Expand Up @@ -191,3 +198,158 @@ func (api *supernodeAPI) get(url string, resp interface{}) error {
}
return json.Unmarshal(body, resp)
}

// report resource
func (api *supernodeAPI) ReportResource(node string, req *types.RegisterRequest) (resp *types.RegisterResponse, err error) {
var (
code int
body []byte
)
url := fmt.Sprintf("%s://%s%s",
api.Scheme, node, peerRegisterPath)
header := map[string]string{
"X-report-resource": "true",
}
if code, body, err = api.HTTPClient.PostJSONWithHeaders(url, header, req, api.Timeout); err != nil {
return nil, err
}

logrus.Infof("ReportResource, url: %s, header: %v, req: %v, "+
"code: %d, body: %s", url, header, req, code, string(body))

if !httputils.HTTPStatusOk(code) {
return nil, fmt.Errorf("%d:%s", code, body)
}
resp = new(types.RegisterResponse)
if err = json.Unmarshal(body, resp); err != nil {
return nil, err
}
return resp, err
}

func (api *supernodeAPI) ReportResourceDeleted(node string, taskID string, cid string) (resp *types.BaseResponse, err error) {
url := fmt.Sprintf("%s://%s%s?taskId=%s&cid=%s",
api.Scheme, node, peerServiceDownPath, taskID, cid)

header := map[string]string{
"X-report-resource": "true",
}

logrus.Infof("Call ReportResourceDeleted, node: %s, taskID: %s, cid: %s, "+
"url: %s, header: %v", node, taskID, cid, url, header)

resp = new(types.BaseResponse)
resp.Code = constants.Success

if err = api.get(url, resp); err != nil {
logrus.Errorf("failed to send service down,err: %v", err)
return nil, err
}
if resp.Code != constants.CodeGetPeerDown {
logrus.Errorf("failed to send service down to supernode: api response code is %d not equal to %d", resp.Code, constants.CodeGetPeerDown)
}

return
}

// apply for seed node to supernode, if selected as seed, the resp.AsSeed will set true.
func (api *supernodeAPI) ApplyForSeedNode(node string, req *types.RegisterRequest) (resp *types.RegisterResponse, err error) {
var (
code int
body []byte
)
url := fmt.Sprintf("%s://%s%s",
api.Scheme, node, peerRegisterPath)
header := map[string]string{
"X-report-resource": "true",
}
if code, body, err = api.HTTPClient.PostJSONWithHeaders(url, header, req, api.Timeout); err != nil {
return nil, err
}

logrus.Infof("ReportResource, url: %s, header: %v, req: %v, "+
"code: %d, body: %s", url, header, req, code, string(body))

if !httputils.HTTPStatusOk(code) {
return nil, fmt.Errorf("%d:%s", code, body)
}
resp = new(types.RegisterResponse)
if err = json.Unmarshal(body, resp); err != nil {
return nil, err
}
return resp, err
}

// FetchP2PNetworkInfo fetch the p2p network info from supernode.
// @parameter
// start: the start index for array of result
// limit: the limit size of array of result, if -1 means no paging
func (api *supernodeAPI) FetchP2PNetworkInfo(node string, start int, limit int, req *api_types.NetworkInfoFetchRequest) (resp *api_types.NetworkInfoFetchResponse, err error) {
var (
code int
body []byte
)

if start < 0 {
start = 0
}

if limit < 0 {
limit = -1
}

if limit == 0 {
//todo: the page default limit should be configuration item of dfdaemon
limit = 500
}

url := fmt.Sprintf("%s://%s%s?start=%d&limit=%d",
api.Scheme, node, fetchP2PNetworkPath, start, limit)
if code, body, err = api.HTTPClient.PostJSON(url, req, api.Timeout); err != nil {
return nil, err
}

logrus.Debugf("in FetchP2PNetworkInfo, req url: %s, timeout: %v, body: %v", url, api.Timeout, req)
logrus.Debugf("in FetchP2PNetworkInfo, resp code: %d, body: %s", code, string(body))

if !httputils.HTTPStatusOk(code) {
return nil, fmt.Errorf("%d:%s", code, body)
}
rr := new(types.FetchP2PNetworkInfoResponse)
if err = json.Unmarshal(body, rr); err != nil {
return nil, err
}

if rr.Code != constants.Success {
return nil, fmt.Errorf("%d:%s", code, rr.Msg)
}

return rr.Data, nil
}

func (api *supernodeAPI) HeartBeat(node string, req *api_types.HeartBeatRequest) (resp *types.HeartBeatResponse, err error) {
var (
code int
body []byte
)

url := fmt.Sprintf("%s://%s%s",
api.Scheme, node, peerHeartBeatPath)

if code, body, err = api.HTTPClient.PostJSON(url, req, api.Timeout); err != nil {
return nil, err
}

if !httputils.HTTPStatusOk(code) {
logrus.Errorf("failed to heart beat, code %d, body: %s", code, string(body))
return nil, fmt.Errorf("%d:%s", code, string(body))
}

logrus.Debugf("heart beat resp: %s", string(body))

resp = new(types.HeartBeatResponse)
if err = json.Unmarshal(body, resp); err != nil {
return nil, err
}
return resp, err
}
16 changes: 16 additions & 0 deletions dfget/core/helper/test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,22 @@ func (m *MockSupernodeAPI) ReportMetrics(ip string, req *api_types.TaskMetricsRe
return nil, nil
}

func (m *MockSupernodeAPI) HeartBeat(node string, req *api_types.HeartBeatRequest) (resp *types.HeartBeatResponse, err error) {
return nil, nil
}
func (m *MockSupernodeAPI) FetchP2PNetworkInfo(node string, start int, limit int, req *api_types.NetworkInfoFetchRequest) (resp *api_types.NetworkInfoFetchResponse, e error) {
return nil, nil
}
func (m *MockSupernodeAPI) ReportResource(node string, req *types.RegisterRequest) (resp *types.RegisterResponse, err error) {
return nil, nil
}
func (m *MockSupernodeAPI) ApplyForSeedNode(node string, req *types.RegisterRequest) (resp *types.RegisterResponse, err error) {
return nil, nil
}
func (m *MockSupernodeAPI) ReportResourceDeleted(node string, taskID string, cid string) (resp *types.BaseResponse, err error) {
return nil, nil
}

// CreateRegisterFunc creates a mock register function.
func CreateRegisterFunc() RegisterFuncType {
var newResponse = func(code int, msg string) *types.RegisterResponse {
Expand Down
6 changes: 1 addition & 5 deletions dfget/types/fetch_p2p_networkinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,5 @@ type FetchP2PNetworkInfoRequest struct {
// FetchP2PNetworkInfoResponse is send to supernode to fetch p2p network info
type FetchP2PNetworkInfoResponse struct {
*BaseResponse
Data *FetchNetworkInfoDataResponse `json:"data,omitempty"`
}

type FetchNetworkInfoDataResponse struct {
Nodes []*types.Node `json:"nodes"`
Data *types.NetworkInfoFetchResponse `json:"data,omitempty"`
}
25 changes: 25 additions & 0 deletions dfget/types/heartbeat_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright The Dragonfly Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package types

import "github.com/dragonflyoss/Dragonfly/apis/types"

// HeartBeatResponse is the response of heart beat.
type HeartBeatResponse struct {
*BaseResponse
Data *types.HeartBeatResponse `json:"data,omitempty"`
}

0 comments on commit ed1145f

Please sign in to comment.