Skip to content

Commit

Permalink
Merge pull request #980 from alexadhy/fix/dmsgsummary-tracker
Browse files Browse the repository at this point in the history
fixes dmsg showing 00000 after successful reconnection
  • Loading branch information
jdknives authored Oct 28, 2021
2 parents 96195f0 + 2f82f2f commit c08d224
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 8 deletions.
23 changes: 16 additions & 7 deletions pkg/visor/dmsgtracker/dmsg_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ type DmsgTracker struct {
ctrl *dmsgctrl.Control // dmsg ctrl
}

// NewDmsgTracker creates a new DmsgTracker.
func NewDmsgTracker(ctx context.Context, dmsgC *dmsg.Client, pk cipher.PubKey) (dt *DmsgTracker, err error) {
// newDmsgTracker creates a new DmsgTracker.
func newDmsgTracker(ctx context.Context, dmsgC *dmsg.Client, pk cipher.PubKey) (dt *DmsgTracker, err error) {
conn, err := dmsgC.DialStream(ctx, dmsg.Addr{PK: pk, Port: skyenv.DmsgCtrlPort})
if err != nil {
return nil, err
Expand Down Expand Up @@ -179,9 +179,6 @@ func (dtm *Manager) updateAllTrackers(ctx context.Context, dts map[cipher.PubKey
// MustGet obtains a DmsgClientSummary of the client of given pk.
// If one is not found internally, a new tracker stream is to be established, returning error on failure.
func (dtm *Manager) MustGet(ctx context.Context, pk cipher.PubKey) (DmsgClientSummary, error) {
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(dtm.updateTimeout))
defer cancel()

dtm.mx.Lock()
defer dtm.mx.Unlock()

Expand All @@ -193,7 +190,7 @@ func (dtm *Manager) MustGet(ctx context.Context, pk cipher.PubKey) (DmsgClientSu
return e.sum, nil
}

dt, err := NewDmsgTracker(ctx, dtm.dc, pk)
dt, err := dtm.mustEstablishTracker(pk)
if err != nil {
return DmsgClientSummary{}, err
}
Expand All @@ -214,17 +211,29 @@ func (dtm *Manager) Get(pk cipher.PubKey) (DmsgClientSummary, bool) {
return dtm.get(pk)
}

// mustEstablishTracker creates / re-creates tracker when dmsgTrackerMap entry got deleted, and reconnected.
func (dtm *Manager) mustEstablishTracker(pk cipher.PubKey) (*DmsgTracker, error) {
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(dtm.updateTimeout))
defer cancel()
return newDmsgTracker(ctx, dtm.dc, pk)
}

// GetBulk obtains bulk dmsg client summaries.
func (dtm *Manager) GetBulk(pks []cipher.PubKey) []DmsgClientSummary {
dtm.mx.Lock()
defer dtm.mx.Unlock()

var err error
out := make([]DmsgClientSummary, 0, len(pks))

for _, pk := range pks {
dt, ok := dtm.dm[pk]
if !ok {
continue
dt, err = dtm.mustEstablishTracker(pk)
if err != nil {
dtm.log.WithError(err).Infoln("failed to re-create dmsgtracker client")
continue
}
}
out = append(out, dt.sum)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/visor/dmsgtracker/dmsg_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestDmsgTracker_Update(t *testing.T) {
// arrange: tracking client
cT, err := env.NewClient(&conf)
require.NoError(t, err)
dt, err := NewDmsgTracker(context.TODO(), cT, cL.LocalPK())
dt, err := newDmsgTracker(context.TODO(), cT, cL.LocalPK())
require.NoError(t, err)

// act: attempt update
Expand Down

0 comments on commit c08d224

Please sign in to comment.