Skip to content

Commit

Permalink
Merge pull request #1160 from mrpalide/feature/dmsg-trackers-own-goro…
Browse files Browse the repository at this point in the history
…utine

own goroutine for dmsg trackers
  • Loading branch information
ersonp authored Apr 22, 2022
2 parents 28ea17d + 1a5010f commit ee6f1e5
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 15 deletions.
1 change: 0 additions & 1 deletion pkg/visor/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ func (v *Visor) Summary() (*Summary, error) {
}

dmsgStatValue := &dmsgtracker.DmsgClientSummary{}
v.wgTrackers.Wait()
if v.trackers != nil {
if dmsgTracker := v.trackers.GetBulk([]cipher.PubKey{v.conf.PK}); len(dmsgTracker) > 0 {
dmsgStatValue = &dmsgTracker[0]
Expand Down
1 change: 0 additions & 1 deletion pkg/visor/dmsgtracker/dmsg_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ type Manager struct {

// NewDmsgTrackerManager creates a new dmsg tracker manager.
func NewDmsgTrackerManager(mLog *logging.MasterLogger, dc *dmsg.Client, updateInterval, updateTimeout time.Duration) *Manager {

log := mLog.PackageLogger("dmsg_trackers")
if updateInterval == 0 {
updateInterval = DefaultDTMUpdateInterval
Expand Down
24 changes: 16 additions & 8 deletions pkg/visor/hypervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,17 @@ func New(config hypervisorconfig.Config, visor *Visor, dmsgC *dmsg.Client) (*Hyp
visor: visor,
dmsgC: dmsgC,
visors: make(map[cipher.PubKey]Conn),
trackers: dmsgtracker.NewDmsgTrackerManager(mLogger, dmsgC, 0, 0),
users: usermanager.NewUserManager(mLogger, singleUserDB, config.Cookies),
mu: new(sync.RWMutex),
visorChanMux: make(map[cipher.PubKey]*chanMux),
selfConn: selfConn,
logger: mLogger.PackageLogger("hypervisor"),
}

go func() {
hv.trackers = dmsgtracker.NewDmsgTrackerManager(mLogger, dmsgC, 0, 0)
}()

return hv, nil
}

Expand All @@ -115,8 +118,10 @@ func (hv *Hypervisor) ServeRPC(ctx context.Context, dmsgPort uint16) error {

if hv.visor != nil {
// Track hypervisor node.
if _, err := hv.trackers.MustGet(ctx, hv.visor.conf.PK); err != nil {
hv.logger.WithField("addr", hv.c.DmsgDiscovery).WithError(err).Warn("Failed to dial tracker stream.")
if hv.trackers != nil {
if _, err := hv.trackers.MustGet(ctx, hv.visor.conf.PK); err != nil {
hv.logger.WithField("addr", hv.c.DmsgDiscovery).WithError(err).Warn("Failed to dial tracker stream.")
}
}
}

Expand All @@ -140,9 +145,10 @@ func (hv *Hypervisor) ServeRPC(ctx context.Context, dmsgPort uint16) error {
API: NewRPCClient(log, conn, RPCPrefix, skyenv.RPCTimeout),
PtyUI: setupDmsgPtyUI(hv.dmsgC, addr.PK),
}

if _, err := hv.trackers.MustGet(ctx, addr.PK); err != nil {
log.WithField("addr", hv.c.DmsgDiscovery).WithError(err).Warn("Failed to dial tracker stream.")
if hv.trackers != nil {
if _, err := hv.trackers.MustGet(ctx, addr.PK); err != nil {
log.WithField("addr", hv.c.DmsgDiscovery).WithError(err).Warn("Failed to dial tracker stream.")
}
}

log.Info("Accepted.")
Expand Down Expand Up @@ -332,8 +338,10 @@ func (hv *Hypervisor) getDmsgSummary() []dmsgtracker.DmsgClientSummary {
for pk := range hv.visors {
pks = append(pks, pk)
}

return hv.trackers.GetBulk(pks)
if hv.trackers != nil {
return hv.trackers.GetBulk(pks)
}
return []dmsgtracker.DmsgClientSummary{}
}

// Health represents a visor's health report attached to hypervisor to visor request status
Expand Down
8 changes: 3 additions & 5 deletions pkg/visor/visor.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ type Visor struct {
appL *launcher.Launcher // app launcher
serviceDisc appdisc.Factory
initLock *sync.Mutex
wgTrackers *sync.WaitGroup
// when module is failed it pushes its error to this channel
// used by init and shutdown to show/check for any residual errors
// produced by concurrent parts of modules
Expand Down Expand Up @@ -114,7 +113,6 @@ func NewVisor(conf *visorconfig.V1, restartCtx *restart.Context) (*Visor, bool)
restartCtx: restartCtx,
initLock: new(sync.Mutex),
isServicesHealthy: newInternalHealthInfo(),
wgTrackers: new(sync.WaitGroup),
wgStunClient: new(sync.WaitGroup),
}
v.wgStunClient.Add(1)
Expand Down Expand Up @@ -153,9 +151,9 @@ func NewVisor(conf *visorconfig.V1, restartCtx *restart.Context) (*Visor, bool)
if !v.processRuntimeErrs() {
return nil, false
}
v.wgTrackers.Add(1)
defer v.wgTrackers.Done()
v.trackers = dmsgtracker.NewDmsgTrackerManager(v.MasterLogger(), v.dmsgC, 0, 0)
go func() {
v.trackers = dmsgtracker.NewDmsgTrackerManager(v.MasterLogger(), v.dmsgC, 0, 0)
}()
return v, true
}

Expand Down

0 comments on commit ee6f1e5

Please sign in to comment.