Skip to content
This repository has been archived by the owner on Dec 20, 2024. It is now read-only.

Commit

Permalink
optimize: reduce the waiting time for starting dfget server
Browse files Browse the repository at this point in the history
Signed-off-by: lowzj <[email protected]>
  • Loading branch information
lowzj committed Nov 20, 2019
1 parent ac262d5 commit baa7538
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 21 deletions.
3 changes: 2 additions & 1 deletion dfget/core/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ type Downloader interface {
// the given timeout duration.
func DoDownloadTimeout(downloader Downloader, timeout time.Duration) error {
if timeout <= 0 {
logrus.Warnf("invalid download timeout(%.3fs)", timeout.Seconds())
logrus.Debugf("invalid download timeout(%.3fs), use default:(%.3fs)",
timeout.Seconds(), config.DefaultDownlodTimeout)
timeout = config.DefaultDownlodTimeout
}
ctx, cancel := context.WithCancel(context.Background())
Expand Down
4 changes: 4 additions & 0 deletions dfget/core/uploader/peer_server_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ func (pe *peerServerExecutor) checkPeerServerExist(cfg *config.Config, port int)
if port <= 0 {
port = getPortFromMeta(cfg.RV.MetaPath)
}
if port <= 0 {
// port 0 is invalid
return 0
}

// check the peer server whether is available
result, err := checkServer(cfg.RV.LocalIP, port, cfg.RV.DataDir, taskFileName, int(cfg.TotalLimit))
Expand Down
51 changes: 31 additions & 20 deletions dfget/core/uploader/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,27 +120,38 @@ func launch(cfg *config.Config, p2pPtr *unsafe.Pointer) error {
return fmt.Errorf("start peer server error and retried at most %d times", retryCount)
}

func waitForStartup(result chan error, p2pPtr *unsafe.Pointer) error {
select {
case err := <-result:
tmp := loadSrvPtr(p2pPtr)
if err == nil {
logrus.Infof("reuse exist server on port:%d", tmp.port)
tmp.setFinished()
}
return err
case <-time.After(100 * time.Millisecond):
// The peer server go routine will block and serve if it starts successfully.
// So we have to wait a moment and check again whether the peer server is
// started.
tmp := loadSrvPtr(p2pPtr)
if tmp == nil {
return fmt.Errorf("initialize peer server error")
}
if !uploaderAPI.PingServer(tmp.host, tmp.port) {
return fmt.Errorf("can't ping port:%d", tmp.port)
func waitForStartup(result chan error, p2pPtr *unsafe.Pointer) (err error) {
ticker := time.NewTicker(5 * time.Millisecond)
defer ticker.Stop()
timeout := time.After(233 * time.Millisecond)

for {
select {
case <-ticker.C:
tmp := loadSrvPtr(p2pPtr)
if tmp != nil && uploaderAPI.PingServer(tmp.host, tmp.port) {
return nil
}
case err = <-result:
tmp := loadSrvPtr(p2pPtr)
if err == nil {
logrus.Infof("reuse exist server on port:%d", tmp.port)
tmp.setFinished()
}
return err
case <-timeout:
// The peer server go routine will block and serve if it starts successfully.
// So we have to wait a moment and check again whether the peer server is
// started.
tmp := loadSrvPtr(p2pPtr)
if tmp == nil {
return fmt.Errorf("initialize peer server error")
}
if !uploaderAPI.PingServer(tmp.host, tmp.port) {
return fmt.Errorf("can't ping port:%d", tmp.port)
}
return nil
}
return nil
}
}

Expand Down

0 comments on commit baa7538

Please sign in to comment.