Skip to content
This repository has been archived by the owner on Dec 20, 2024. It is now read-only.

Commit

Permalink
feature: parse the port of supernode from --node flag of dfget
Browse files Browse the repository at this point in the history
Signed-off-by: Starnop <[email protected]>
  • Loading branch information
starnop committed Jun 6, 2019
1 parent 82daa6e commit 32172d5
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 39 deletions.
19 changes: 19 additions & 0 deletions cmd/dfget/app/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ func runDfget() error {
util.Printer.Println(err.Error())
return err
}
if err := handleNodes(); err != nil {
util.Printer.Println(err.Error())
return err
}

checkParameters()
logrus.Infof("get cmd params:%q", os.Args)
Expand Down Expand Up @@ -260,6 +264,21 @@ func transFilter(filter string) []string {
return strings.Split(filter, "&")
}

func handleNodes() error {
nodes := make([]string, 0)

for _, v := range cfg.Node {
// TODO: check the validity of v.
if strings.IndexByte(v, ':') > 0 {
nodes = append(nodes, v)
continue
}
nodes = append(nodes, fmt.Sprintf("%s:%d", v, config.DefaultSupernodePort))
}
cfg.Node = nodes
return nil
}

func resultMsg(cfg *config.Config, end time.Time, e *errors.DfError) string {
if e != nil {
return fmt.Sprintf("download FAIL(%d) cost:%.3fs length:%d reason:%d error:%v",
Expand Down
23 changes: 23 additions & 0 deletions common/util/net_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,29 @@ func ExtractHost(hostAndPort string) string {
return fields[0]
}

// GetIPAndPortFromNode return ip and port by parsing the node value.
// It will return defaultPort as the value of port
// when the node is a string without port or with an illegal port.
func GetIPAndPortFromNode(node string, defaultPort int) (string, int) {
if IsEmptyStr(node) {
return "", defaultPort
}

nodeFields := strings.Split(node, ":")
switch len(nodeFields) {
case 1:
return nodeFields[0], defaultPort
case 2:
port, err := strconv.Atoi(nodeFields[1])
if err != nil {
return nodeFields[0], defaultPort
}
return nodeFields[0], port
default:
return "", defaultPort
}
}

// FilterURLParam filters request queries in URL.
// Eg:
// If you pass parameters as follows:
Expand Down
19 changes: 19 additions & 0 deletions common/util/net_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,25 @@ func (suite *UtilSuite) TestExtractHost(c *check.C) {
c.Assert(host, check.Equals, "1")
}

func (suite *UtilSuite) TestGetIPAndPortFromNode(c *check.C) {
var cases = []struct {
node string
defaultPort int
expectedIP string
expectedPort int
}{
{"127.0.0.1", 8002, "127.0.0.1", 8002},
{"127.0.0.1:8001", 8002, "127.0.0.1", 8001},
{"127.0.0.1:abcd", 8002, "127.0.0.1", 8002},
}

for _, v := range cases {
ip, port := GetIPAndPortFromNode(v.node, v.defaultPort)
c.Check(ip, check.Equals, v.expectedIP)
c.Check(port, check.Equals, v.expectedPort)
}
}

func (suite *UtilSuite) TestNetLimit(c *check.C) {
speed := NetLimit()
if runtime.NumCPU() < 24 {
Expand Down
2 changes: 2 additions & 0 deletions dfget/config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ const (

DataExpireTime = 3 * time.Minute
ServerAliveTime = 5 * time.Minute

DefaultSupernodePort = 8002
)

/* errors code */
Expand Down
54 changes: 26 additions & 28 deletions dfget/core/api/supernode_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,41 +37,39 @@ const (
// NewSupernodeAPI creates a new instance of SupernodeAPI with default value.
func NewSupernodeAPI() SupernodeAPI {
return &supernodeAPI{
Scheme: "http",
ServicePort: 8002,
Timeout: 5 * time.Second,
HTTPClient: util.DefaultHTTPClient,
Scheme: "http",
Timeout: 5 * time.Second,
HTTPClient: util.DefaultHTTPClient,
}
}

// SupernodeAPI defines the communication methods between supernode and dfget.
type SupernodeAPI interface {
Register(ip string, req *types.RegisterRequest) (resp *types.RegisterResponse, e error)
PullPieceTask(ip string, req *types.PullPieceTaskRequest) (resp *types.PullPieceTaskResponse, e error)
ReportPiece(ip string, req *types.ReportPieceRequest) (resp *types.BaseResponse, e error)
ServiceDown(ip string, taskID string, cid string) (resp *types.BaseResponse, e error)
ReportClientError(ip string, req *types.ClientErrorRequest) (resp *types.BaseResponse, e error)
Register(node string, req *types.RegisterRequest) (resp *types.RegisterResponse, e error)
PullPieceTask(node string, req *types.PullPieceTaskRequest) (resp *types.PullPieceTaskResponse, e error)
ReportPiece(node string, req *types.ReportPieceRequest) (resp *types.BaseResponse, e error)
ServiceDown(node string, taskID string, cid string) (resp *types.BaseResponse, e error)
ReportClientError(node string, req *types.ClientErrorRequest) (resp *types.BaseResponse, e error)
}

type supernodeAPI struct {
Scheme string
ServicePort int
Timeout time.Duration
HTTPClient util.SimpleHTTPClient
Scheme string
Timeout time.Duration
HTTPClient util.SimpleHTTPClient
}

var _ SupernodeAPI = &supernodeAPI{}

// Register sends a request to the supernode to register itself as a peer
// and create downloading task.
func (api *supernodeAPI) Register(ip string, req *types.RegisterRequest) (
func (api *supernodeAPI) Register(node string, req *types.RegisterRequest) (
resp *types.RegisterResponse, e error) {
var (
code int
body []byte
)
url := fmt.Sprintf("%s://%s:%d%s",
api.Scheme, ip, api.ServicePort, peerRegisterPath)
url := fmt.Sprintf("%s://%s%s",
api.Scheme, node, peerRegisterPath)
if code, body, e = api.HTTPClient.PostJSON(url, req, api.Timeout); e != nil {
return nil, e
}
Expand All @@ -85,47 +83,47 @@ func (api *supernodeAPI) Register(ip string, req *types.RegisterRequest) (

// PullPieceTask pull a piece downloading task from supernode, and get a
// response that describes from which peer to download.
func (api *supernodeAPI) PullPieceTask(ip string, req *types.PullPieceTaskRequest) (
func (api *supernodeAPI) PullPieceTask(node string, req *types.PullPieceTaskRequest) (
resp *types.PullPieceTaskResponse, e error) {

url := fmt.Sprintf("%s://%s:%d%s?%s",
api.Scheme, ip, api.ServicePort, peerPullPieceTaskPath, util.ParseQuery(req))
url := fmt.Sprintf("%s://%s%s?%s",
api.Scheme, node, peerPullPieceTaskPath, util.ParseQuery(req))

resp = new(types.PullPieceTaskResponse)
e = api.get(url, resp)
return
}

// ReportPiece reports the status of piece downloading task to supernode.
func (api *supernodeAPI) ReportPiece(ip string, req *types.ReportPieceRequest) (
func (api *supernodeAPI) ReportPiece(node string, req *types.ReportPieceRequest) (
resp *types.BaseResponse, e error) {

url := fmt.Sprintf("%s://%s:%d%s?%s",
api.Scheme, ip, api.ServicePort, peerReportPiecePath, util.ParseQuery(req))
url := fmt.Sprintf("%s://%s%s?%s",
api.Scheme, node, peerReportPiecePath, util.ParseQuery(req))

resp = new(types.BaseResponse)
e = api.get(url, resp)
return
}

// ServiceDown reports the status of the local peer to supernode.
func (api *supernodeAPI) ServiceDown(ip string, taskID string, cid string) (
func (api *supernodeAPI) ServiceDown(node string, taskID string, cid string) (
resp *types.BaseResponse, e error) {

url := fmt.Sprintf("%s://%s:%d%s?taskId=%s&cid=%s",
api.Scheme, ip, api.ServicePort, peerServiceDownPath, taskID, cid)
url := fmt.Sprintf("%s://%s%s?taskId=%s&cid=%s",
api.Scheme, node, peerServiceDownPath, taskID, cid)

resp = new(types.BaseResponse)
e = api.get(url, resp)
return
}

// ReportClientError reports the client error when downloading piece to supernode.
func (api *supernodeAPI) ReportClientError(ip string, req *types.ClientErrorRequest) (
func (api *supernodeAPI) ReportClientError(node string, req *types.ClientErrorRequest) (
resp *types.BaseResponse, e error) {

url := fmt.Sprintf("%s://%s:%d%s?%s",
api.Scheme, ip, api.ServicePort, peerClientErrorPath, util.ParseQuery(req))
url := fmt.Sprintf("%s://%s%s?%s",
api.Scheme, node, peerClientErrorPath, util.ParseQuery(req))

resp = new(types.BaseResponse)
e = api.get(url, resp)
Expand Down
12 changes: 3 additions & 9 deletions dfget/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
"os"
"path"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/dragonflyoss/Dragonfly/common/constants"
Expand Down Expand Up @@ -250,15 +248,11 @@ func adjustSupernodeList(nodes []string) []string {

func checkConnectSupernode(nodes []string) (localIP string) {
var (
e error
port = 8002
e error
)
for _, n := range nodes {
nodeFields := strings.Split(n, ":")
if len(nodeFields) == 2 {
port, _ = strconv.Atoi(nodeFields[1])
}
if localIP, e = cutil.CheckConnect(nodeFields[0], port, 1000); e == nil {
ip, port := cutil.GetIPAndPortFromNode(n, config.DefaultSupernodePort)
if localIP, e = cutil.CheckConnect(ip, port, 1000); e == nil {
return localIP
}
logrus.Errorf("Connect to node:%s error: %v", n, e)
Expand Down
2 changes: 1 addition & 1 deletion dfget/core/regist/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (s *supernodeRegister) Register(peerPort int) (*RegisterResult, *errors.DfE
nodes, nLen := s.cfg.Node, len(s.cfg.Node)
req := s.constructRegisterRequest(peerPort)
for i = 0; i < nLen; i++ {
req.SupernodeIP = nodes[i]
req.SupernodeIP = util.ExtractHost(nodes[i])
resp, e = s.api.Register(nodes[i], req)
logrus.Infof("do register to %s, res:%s error:%v", nodes[i], resp, e)
if e != nil {
Expand Down
2 changes: 2 additions & 0 deletions test/cli_dfget_p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/go-check/check"

"github.com/dragonflyoss/Dragonfly/test/command"
"github.com/dragonflyoss/Dragonfly/test/environment"
)

func init() {
Expand Down Expand Up @@ -52,6 +53,7 @@ func (s *DFGetP2PTestSuite) TestDownload(c *check.C) {
cmd, err := s.starter.DFGet(5*time.Second,
"-u", "https://lowzj.com",
"-o", Join(s.starter.Home, "a.test"),
"--node", fmt.Sprintf("127.0.0.1:%d", environment.SupernodeListenPort),
"--notbs")
cmd.Wait()

Expand Down
2 changes: 1 addition & 1 deletion test/environment/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ var UseJavaVersion bool

var (
// SupernodeListenPort is the port that supernode will listen.
SupernodeListenPort = 8002
SupernodeListenPort = 8008

// DragonflySupernodeBinary is default binary
DragonflySupernodeBinary = "/usr/local/bin/supernode"
Expand Down

0 comments on commit 32172d5

Please sign in to comment.