diff --git a/cmd/dfget/app/root.go b/cmd/dfget/app/root.go index 0144a97f7..eeae7c298 100644 --- a/cmd/dfget/app/root.go +++ b/cmd/dfget/app/root.go @@ -82,6 +82,10 @@ func runDfget() error { util.Printer.Println(err.Error()) return err } + if err := handleNodes(); err != nil { + util.Printer.Println(err.Error()) + return err + } checkParameters() logrus.Infof("get cmd params:%q", os.Args) @@ -260,6 +264,21 @@ func transFilter(filter string) []string { return strings.Split(filter, "&") } +func handleNodes() error { + nodes := make([]string, 0) + + for _, v := range cfg.Node { + // TODO: check the validity of v. + if strings.IndexByte(v, ':') > 0 { + nodes = append(nodes, v) + continue + } + nodes = append(nodes, fmt.Sprintf("%s:%d", v, config.DefaultSupernodePort)) + } + cfg.Node = nodes + return nil +} + func resultMsg(cfg *config.Config, end time.Time, e *errors.DfError) string { if e != nil { return fmt.Sprintf("download FAIL(%d) cost:%.3fs length:%d reason:%d error:%v", diff --git a/common/util/net_util.go b/common/util/net_util.go index d1c6c6706..82a650945 100644 --- a/common/util/net_util.go +++ b/common/util/net_util.go @@ -113,6 +113,29 @@ func ExtractHost(hostAndPort string) string { return fields[0] } +// GetIPAndPortFromNode return ip and port by parsing the node value. +// It will return defaultPort as the value of port +// when the node is a string without port or with an illegal port. +func GetIPAndPortFromNode(node string, defaultPort int) (string, int) { + if IsEmptyStr(node) { + return "", defaultPort + } + + nodeFields := strings.Split(node, ":") + switch len(nodeFields) { + case 1: + return nodeFields[0], defaultPort + case 2: + port, err := strconv.Atoi(nodeFields[1]) + if err != nil { + return nodeFields[0], defaultPort + } + return nodeFields[0], port + default: + return "", defaultPort + } +} + // FilterURLParam filters request queries in URL. // Eg: // If you pass parameters as follows: diff --git a/common/util/net_util_test.go b/common/util/net_util_test.go index 676f64d2b..dea5cbe86 100644 --- a/common/util/net_util_test.go +++ b/common/util/net_util_test.go @@ -34,6 +34,25 @@ func (suite *UtilSuite) TestExtractHost(c *check.C) { c.Assert(host, check.Equals, "1") } +func (suite *UtilSuite) TestGetIPAndPortFromNode(c *check.C) { + var cases = []struct { + node string + defaultPort int + expectedIP string + expectedPort int + }{ + {"127.0.0.1", 8002, "127.0.0.1", 8002}, + {"127.0.0.1:8001", 8002, "127.0.0.1", 8001}, + {"127.0.0.1:abcd", 8002, "127.0.0.1", 8002}, + } + + for _, v := range cases { + ip, port := GetIPAndPortFromNode(v.node, v.defaultPort) + c.Check(ip, check.Equals, v.expectedIP) + c.Check(port, check.Equals, v.expectedPort) + } +} + func (suite *UtilSuite) TestNetLimit(c *check.C) { speed := NetLimit() if runtime.NumCPU() < 24 { diff --git a/dfget/config/constants.go b/dfget/config/constants.go index 6724208be..d52b600ba 100644 --- a/dfget/config/constants.go +++ b/dfget/config/constants.go @@ -113,6 +113,8 @@ const ( DataExpireTime = 3 * time.Minute ServerAliveTime = 5 * time.Minute + + DefaultSupernodePort = 8002 ) /* errors code */ diff --git a/dfget/core/api/supernode_api.go b/dfget/core/api/supernode_api.go index 2d16ced76..674e1bb40 100644 --- a/dfget/core/api/supernode_api.go +++ b/dfget/core/api/supernode_api.go @@ -37,41 +37,39 @@ const ( // NewSupernodeAPI creates a new instance of SupernodeAPI with default value. func NewSupernodeAPI() SupernodeAPI { return &supernodeAPI{ - Scheme: "http", - ServicePort: 8002, - Timeout: 5 * time.Second, - HTTPClient: util.DefaultHTTPClient, + Scheme: "http", + Timeout: 5 * time.Second, + HTTPClient: util.DefaultHTTPClient, } } // SupernodeAPI defines the communication methods between supernode and dfget. type SupernodeAPI interface { - Register(ip string, req *types.RegisterRequest) (resp *types.RegisterResponse, e error) - PullPieceTask(ip string, req *types.PullPieceTaskRequest) (resp *types.PullPieceTaskResponse, e error) - ReportPiece(ip string, req *types.ReportPieceRequest) (resp *types.BaseResponse, e error) - ServiceDown(ip string, taskID string, cid string) (resp *types.BaseResponse, e error) - ReportClientError(ip string, req *types.ClientErrorRequest) (resp *types.BaseResponse, e error) + Register(node string, req *types.RegisterRequest) (resp *types.RegisterResponse, e error) + PullPieceTask(node string, req *types.PullPieceTaskRequest) (resp *types.PullPieceTaskResponse, e error) + ReportPiece(node string, req *types.ReportPieceRequest) (resp *types.BaseResponse, e error) + ServiceDown(node string, taskID string, cid string) (resp *types.BaseResponse, e error) + ReportClientError(node string, req *types.ClientErrorRequest) (resp *types.BaseResponse, e error) } type supernodeAPI struct { - Scheme string - ServicePort int - Timeout time.Duration - HTTPClient util.SimpleHTTPClient + Scheme string + Timeout time.Duration + HTTPClient util.SimpleHTTPClient } var _ SupernodeAPI = &supernodeAPI{} // Register sends a request to the supernode to register itself as a peer // and create downloading task. -func (api *supernodeAPI) Register(ip string, req *types.RegisterRequest) ( +func (api *supernodeAPI) Register(node string, req *types.RegisterRequest) ( resp *types.RegisterResponse, e error) { var ( code int body []byte ) - url := fmt.Sprintf("%s://%s:%d%s", - api.Scheme, ip, api.ServicePort, peerRegisterPath) + url := fmt.Sprintf("%s://%s%s", + api.Scheme, node, peerRegisterPath) if code, body, e = api.HTTPClient.PostJSON(url, req, api.Timeout); e != nil { return nil, e } @@ -85,11 +83,11 @@ func (api *supernodeAPI) Register(ip string, req *types.RegisterRequest) ( // PullPieceTask pull a piece downloading task from supernode, and get a // response that describes from which peer to download. -func (api *supernodeAPI) PullPieceTask(ip string, req *types.PullPieceTaskRequest) ( +func (api *supernodeAPI) PullPieceTask(node string, req *types.PullPieceTaskRequest) ( resp *types.PullPieceTaskResponse, e error) { - url := fmt.Sprintf("%s://%s:%d%s?%s", - api.Scheme, ip, api.ServicePort, peerPullPieceTaskPath, util.ParseQuery(req)) + url := fmt.Sprintf("%s://%s%s?%s", + api.Scheme, node, peerPullPieceTaskPath, util.ParseQuery(req)) resp = new(types.PullPieceTaskResponse) e = api.get(url, resp) @@ -97,11 +95,11 @@ func (api *supernodeAPI) PullPieceTask(ip string, req *types.PullPieceTaskReques } // ReportPiece reports the status of piece downloading task to supernode. -func (api *supernodeAPI) ReportPiece(ip string, req *types.ReportPieceRequest) ( +func (api *supernodeAPI) ReportPiece(node string, req *types.ReportPieceRequest) ( resp *types.BaseResponse, e error) { - url := fmt.Sprintf("%s://%s:%d%s?%s", - api.Scheme, ip, api.ServicePort, peerReportPiecePath, util.ParseQuery(req)) + url := fmt.Sprintf("%s://%s%s?%s", + api.Scheme, node, peerReportPiecePath, util.ParseQuery(req)) resp = new(types.BaseResponse) e = api.get(url, resp) @@ -109,11 +107,11 @@ func (api *supernodeAPI) ReportPiece(ip string, req *types.ReportPieceRequest) ( } // ServiceDown reports the status of the local peer to supernode. -func (api *supernodeAPI) ServiceDown(ip string, taskID string, cid string) ( +func (api *supernodeAPI) ServiceDown(node string, taskID string, cid string) ( resp *types.BaseResponse, e error) { - url := fmt.Sprintf("%s://%s:%d%s?taskId=%s&cid=%s", - api.Scheme, ip, api.ServicePort, peerServiceDownPath, taskID, cid) + url := fmt.Sprintf("%s://%s%s?taskId=%s&cid=%s", + api.Scheme, node, peerServiceDownPath, taskID, cid) resp = new(types.BaseResponse) e = api.get(url, resp) @@ -121,11 +119,11 @@ func (api *supernodeAPI) ServiceDown(ip string, taskID string, cid string) ( } // ReportClientError reports the client error when downloading piece to supernode. -func (api *supernodeAPI) ReportClientError(ip string, req *types.ClientErrorRequest) ( +func (api *supernodeAPI) ReportClientError(node string, req *types.ClientErrorRequest) ( resp *types.BaseResponse, e error) { - url := fmt.Sprintf("%s://%s:%d%s?%s", - api.Scheme, ip, api.ServicePort, peerClientErrorPath, util.ParseQuery(req)) + url := fmt.Sprintf("%s://%s%s?%s", + api.Scheme, node, peerClientErrorPath, util.ParseQuery(req)) resp = new(types.BaseResponse) e = api.get(url, resp) diff --git a/dfget/core/core.go b/dfget/core/core.go index 58cf47811..3fd7b05d7 100644 --- a/dfget/core/core.go +++ b/dfget/core/core.go @@ -24,8 +24,6 @@ import ( "os" "path" "path/filepath" - "strconv" - "strings" "time" "github.com/dragonflyoss/Dragonfly/common/constants" @@ -250,15 +248,11 @@ func adjustSupernodeList(nodes []string) []string { func checkConnectSupernode(nodes []string) (localIP string) { var ( - e error - port = 8002 + e error ) for _, n := range nodes { - nodeFields := strings.Split(n, ":") - if len(nodeFields) == 2 { - port, _ = strconv.Atoi(nodeFields[1]) - } - if localIP, e = cutil.CheckConnect(nodeFields[0], port, 1000); e == nil { + ip, port := cutil.GetIPAndPortFromNode(n, config.DefaultSupernodePort) + if localIP, e = cutil.CheckConnect(ip, port, 1000); e == nil { return localIP } logrus.Errorf("Connect to node:%s error: %v", n, e) diff --git a/dfget/core/regist/register.go b/dfget/core/regist/register.go index e31c8faeb..f39f8d668 100644 --- a/dfget/core/regist/register.go +++ b/dfget/core/regist/register.go @@ -64,7 +64,7 @@ func (s *supernodeRegister) Register(peerPort int) (*RegisterResult, *errors.DfE nodes, nLen := s.cfg.Node, len(s.cfg.Node) req := s.constructRegisterRequest(peerPort) for i = 0; i < nLen; i++ { - req.SupernodeIP = nodes[i] + req.SupernodeIP = util.ExtractHost(nodes[i]) resp, e = s.api.Register(nodes[i], req) logrus.Infof("do register to %s, res:%s error:%v", nodes[i], resp, e) if e != nil { diff --git a/test/cli_dfget_p2p_test.go b/test/cli_dfget_p2p_test.go index cf3b10442..06f5e0766 100644 --- a/test/cli_dfget_p2p_test.go +++ b/test/cli_dfget_p2p_test.go @@ -24,6 +24,7 @@ import ( "github.com/go-check/check" "github.com/dragonflyoss/Dragonfly/test/command" + "github.com/dragonflyoss/Dragonfly/test/environment" ) func init() { @@ -52,6 +53,7 @@ func (s *DFGetP2PTestSuite) TestDownload(c *check.C) { cmd, err := s.starter.DFGet(5*time.Second, "-u", "https://lowzj.com", "-o", Join(s.starter.Home, "a.test"), + "--node", fmt.Sprintf("127.0.0.1:%d", environment.SupernodeListenPort), "--notbs") cmd.Wait() diff --git a/test/environment/env.go b/test/environment/env.go index be7d86948..33f7a3c3c 100644 --- a/test/environment/env.go +++ b/test/environment/env.go @@ -12,7 +12,7 @@ var UseJavaVersion bool var ( // SupernodeListenPort is the port that supernode will listen. - SupernodeListenPort = 8002 + SupernodeListenPort = 8008 // DragonflySupernodeBinary is default binary DragonflySupernodeBinary = "/usr/local/bin/supernode"