Skip to content
This repository has been archived by the owner on Dec 20, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1325 from lowzj/refactor-dfget-core-with-locator
Browse files Browse the repository at this point in the history
refactor: refactor dfget/core with SupernodeLocator
  • Loading branch information
starnop authored May 13, 2020
2 parents 8a1b37a + 122ac3c commit 2443cba
Show file tree
Hide file tree
Showing 11 changed files with 280 additions and 125 deletions.
2 changes: 1 addition & 1 deletion cmd/dfget/app/root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type dfgetSuit struct {

func (suit *dfgetSuit) Test_initFlagsNoArguments() {
initProperties()
suit.Equal(cfg.Nodes, []string{"127.0.0.1:8002"})
suit.Equal(cfg.Nodes, []string(nil))
suit.Equal(cfg.LocalLimit, 20*rate.MB)
suit.Equal(cfg.TotalLimit, rate.Rate(0))
suit.Equal(cfg.Notbs, false)
Expand Down
3 changes: 2 additions & 1 deletion dfget/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ type Properties struct {

// NewProperties creates a new properties with default values.
func NewProperties() *Properties {
// don't set Supernodes as default value, the SupernodeLocator will
// do this in a better way.
return &Properties{
Supernodes: GetDefaultSupernodesValue(),
LocalLimit: DefaultLocalLimit,
MinRate: DefaultMinRate,
ClientQueueSize: DefaultClientQueueSize,
Expand Down
60 changes: 36 additions & 24 deletions dfget/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
p2pDown "github.com/dragonflyoss/Dragonfly/dfget/core/downloader/p2p_downloader"
"github.com/dragonflyoss/Dragonfly/dfget/core/regist"
"github.com/dragonflyoss/Dragonfly/dfget/core/uploader"
"github.com/dragonflyoss/Dragonfly/dfget/locator"
"github.com/dragonflyoss/Dragonfly/pkg/algorithm"
"github.com/dragonflyoss/Dragonfly/pkg/constants"
"github.com/dragonflyoss/Dragonfly/pkg/errortypes"
Expand All @@ -50,32 +51,33 @@ import (
// Start function creates a new task and starts it to download file.
func Start(cfg *config.Config) *errortypes.DfError {
var (
supernodeAPI = api.NewSupernodeAPI()
register = regist.NewSupernodeRegister(cfg, supernodeAPI)
err error
result *regist.RegisterResult
supernodeAPI = api.NewSupernodeAPI()
supernodeLocator = locator.CreateLocator(cfg)
register = regist.NewSupernodeRegister(cfg, supernodeAPI, supernodeLocator)
err error
result *regist.RegisterResult
)

printer.Println(fmt.Sprintf("--%s-- %s",
cfg.StartTime.Format(config.DefaultTimestampFormat), cfg.URL))

if err = prepare(cfg); err != nil {
if err = prepare(cfg, supernodeLocator); err != nil {
return errortypes.New(config.CodePrepareError, err.Error())
}

if result, err = registerToSuperNode(cfg, register); err != nil {
if result, err = registerToSuperNode(cfg, register, supernodeLocator); err != nil {
return errortypes.New(config.CodeRegisterError, err.Error())
}

if err = downloadFile(cfg, supernodeAPI, register, result); err != nil {
if err = downloadFile(cfg, supernodeAPI, supernodeLocator, register, result); err != nil {
return errortypes.New(config.CodeDownloadError, err.Error())
}

return nil
}

// prepare the RV-related information and create the corresponding files.
func prepare(cfg *config.Config) (err error) {
func prepare(cfg *config.Config, locator locator.SupernodeLocator) (err error) {
printer.Printf("dfget version:%s", version.DFGetVersion)
printer.Printf("workspace:%s", cfg.WorkHome)
printer.Printf("sign:%s", cfg.Sign)
Expand Down Expand Up @@ -104,9 +106,8 @@ func prepare(cfg *config.Config) (err error) {
}
rv.DataDir = cfg.RV.SystemDataDir

cfg.Nodes = adjustSupernodeList(cfg.Nodes)
if stringutils.IsEmptyStr(rv.LocalIP) {
rv.LocalIP = checkConnectSupernode(cfg.Nodes)
rv.LocalIP = checkConnectSupernode(locator)
}
rv.Cid = getCid(rv.LocalIP, cfg.Sign)
rv.TaskFileName = getTaskFileName(rv.RealTarget, cfg.Sign)
Expand All @@ -124,7 +125,7 @@ func launchPeerServer(cfg *config.Config) error {
return err
}

func registerToSuperNode(cfg *config.Config, register regist.SupernodeRegister) (
func registerToSuperNode(cfg *config.Config, register regist.SupernodeRegister, supernodeLocator locator.SupernodeLocator) (
*regist.RegisterResult, error) {
defer func() {
if r := recover(); r != nil {
Expand All @@ -137,7 +138,7 @@ func registerToSuperNode(cfg *config.Config, register regist.SupernodeRegister)
panic("user specified")
}

if len(cfg.Nodes) == 0 {
if supernodeLocator == nil || supernodeLocator.Size() == 0 {
cfg.BackSourceReason = config.BackSourceReasonNodeEmpty
panic("supernode empty")
}
Expand All @@ -162,7 +163,7 @@ func registerToSuperNode(cfg *config.Config, register regist.SupernodeRegister)
return result, nil
}

func downloadFile(cfg *config.Config, supernodeAPI api.SupernodeAPI,
func downloadFile(cfg *config.Config, supernodeAPI api.SupernodeAPI, locator locator.SupernodeLocator,
register regist.SupernodeRegister, result *regist.RegisterResult) error {
timeout := calculateTimeout(cfg)

Expand All @@ -180,7 +181,7 @@ func downloadFile(cfg *config.Config, supernodeAPI api.SupernodeAPI,
downloadTime := time.Since(cfg.StartTime).Seconds()
// upload metrics to supernode only if pattern is p2p or cdn and result is not nil
if cfg.Pattern != config.PatternSource && result != nil {
reportMetrics(cfg, supernodeAPI, downloadTime, result.TaskID, success)
reportMetrics(cfg, supernodeAPI, locator, downloadTime, result.TaskID, success)
}

if success {
Expand Down Expand Up @@ -285,21 +286,26 @@ func adjustSupernodeList(nodes []string) []string {
}
}

func checkConnectSupernode(nodes []string) (localIP string) {
func checkConnectSupernode(locator locator.SupernodeLocator) (localIP string) {
var (
e error
)
for _, n := range nodes {
ip, port := netutils.GetIPAndPortFromNode(n, config.DefaultSupernodePort)
if localIP, e = httputils.CheckConnect(ip, port, 1000); e == nil {
return localIP
if locator == nil {
return ""
}
for _, group := range locator.All() {
for _, n := range group.Nodes {
if localIP, e = httputils.CheckConnect(n.IP, n.Port, 1000); e == nil {
return localIP
}
logrus.Errorf("Connect to node:%s error: %v", n, e)
}
logrus.Errorf("Connect to node:%s error: %v", n, e)
}
return ""
}

func reportMetrics(cfg *config.Config, supernodeAPI api.SupernodeAPI, downloadTime float64, taskID string, success bool) {
func reportMetrics(cfg *config.Config, supernodeAPI api.SupernodeAPI, locator locator.SupernodeLocator,
downloadTime float64, taskID string, success bool) {
req := &types.TaskMetricsRequest{
BacksourceReason: strconv.Itoa(cfg.BackSourceReason),
IP: cfg.RV.LocalIP,
Expand All @@ -311,10 +317,16 @@ func reportMetrics(cfg *config.Config, supernodeAPI api.SupernodeAPI, downloadTi
Success: success,
TaskID: taskID,
}
for _, node := range cfg.Nodes {
resp, err := supernodeAPI.ReportMetrics(node, req)
node := locator.Get()
if node == nil {
return
}
nodeStr := node.String()
// retry twice
for i := 0; i < 2; i++ {
resp, err := supernodeAPI.ReportMetrics(nodeStr, req)
if err != nil {
logrus.Errorf("failed to report metrics to supernode %s: %v", node, err)
logrus.Errorf("failed to report metrics to supernode %s: %v", nodeStr, err)
}
if resp != nil && resp.IsSuccess() {
return
Expand Down
22 changes: 15 additions & 7 deletions dfget/core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
. "github.com/dragonflyoss/Dragonfly/dfget/core/helper"
"github.com/dragonflyoss/Dragonfly/dfget/core/regist"
"github.com/dragonflyoss/Dragonfly/dfget/core/uploader"
"github.com/dragonflyoss/Dragonfly/dfget/locator"
"github.com/dragonflyoss/Dragonfly/pkg/algorithm"

"github.com/go-check/check"
Expand Down Expand Up @@ -68,18 +69,20 @@ func (s *CoreTestSuite) TestPrepare(c *check.C) {
cfg := s.createConfig(buf)
cfg.Output = filepath.Join(s.workHome, "test.output")

err := prepare(cfg)
err := prepare(cfg, nil)
fmt.Printf("%s\nerror:%v", buf.String(), err)
}

func (s *CoreTestSuite) TestRegisterToSupernode(c *check.C) {
cfg := s.createConfig(&bytes.Buffer{})
m := new(MockSupernodeAPI)
m.RegisterFunc = CreateRegisterFunc()
register := regist.NewSupernodeRegister(cfg, m)
nodeStr := "127.0.0.1:8002"
snLocator, _ := locator.NewStaticLocatorFromStr("test", []string{nodeStr})
register := regist.NewSupernodeRegister(cfg, m, snLocator)

var f = func(bc int, errIsNil bool, data *regist.RegisterResult) {
res, e := registerToSuperNode(cfg, register)
res, e := registerToSuperNode(cfg, register, snLocator)
c.Assert(res == nil, check.Equals, data == nil)
c.Assert(e == nil, check.Equals, errIsNil)
c.Assert(cfg.BackSourceReason, check.Equals, bc)
Expand All @@ -88,18 +91,21 @@ func (s *CoreTestSuite) TestRegisterToSupernode(c *check.C) {
}
}

tmpGroup := snLocator.Group
snLocator.Group = nil
f(config.BackSourceReasonNodeEmpty, true, nil)

cfg.Pattern = config.PatternSource
f(config.BackSourceReasonUserSpecified, true, nil)

uploader.SetupPeerServerExecutor(nil)
cfg.Pattern = config.PatternP2P
cfg.Nodes = []string{"x"}
snLocator.Group = tmpGroup
snLocator.Refresh()
cfg.URL = "http://x.com"
f(config.BackSourceReasonRegisterFail, true, nil)

cfg.Nodes = []string{"x"}
snLocator.Refresh()
cfg.URL = "http://taobao.com"
cfg.BackSourceReason = config.BackSourceReasonNone
}
Expand Down Expand Up @@ -131,11 +137,13 @@ func (s *CoreTestSuite) TestCheckConnectSupernode(c *check.C) {
s.createConfig(buf)

nodes := []string{host}
ip := checkConnectSupernode(nodes)
l, _ := locator.NewStaticLocatorFromStr("test", nodes)
ip := checkConnectSupernode(l)
c.Assert(ip, check.Equals, "127.0.0.1")

buf.Reset()
ip = checkConnectSupernode([]string{"127.0.0.2"})
l, _ = locator.NewStaticLocatorFromStr("test", []string{"127.0.0.2"})
ip = checkConnectSupernode(l)
c.Assert(strings.Index(buf.String(), "Connect") > 0, check.Equals, true)
c.Assert(ip, check.Equals, "")
}
Expand Down
Loading

0 comments on commit 2443cba

Please sign in to comment.