Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixes dmsg showing 00000 after successful reconnection #980

Merged
merged 3 commits into from
Oct 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions pkg/visor/dmsgtracker/dmsg_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ type DmsgTracker struct {
ctrl *dmsgctrl.Control // dmsg ctrl
}

// NewDmsgTracker creates a new DmsgTracker.
func NewDmsgTracker(ctx context.Context, dmsgC *dmsg.Client, pk cipher.PubKey) (dt *DmsgTracker, err error) {
// newDmsgTracker creates a new DmsgTracker.
func newDmsgTracker(ctx context.Context, dmsgC *dmsg.Client, pk cipher.PubKey) (dt *DmsgTracker, err error) {
conn, err := dmsgC.DialStream(ctx, dmsg.Addr{PK: pk, Port: skyenv.DmsgCtrlPort})
if err != nil {
return nil, err
Expand Down Expand Up @@ -179,9 +179,6 @@ func (dtm *Manager) updateAllTrackers(ctx context.Context, dts map[cipher.PubKey
// MustGet obtains a DmsgClientSummary of the client of given pk.
// If one is not found internally, a new tracker stream is to be established, returning error on failure.
func (dtm *Manager) MustGet(ctx context.Context, pk cipher.PubKey) (DmsgClientSummary, error) {
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(dtm.updateTimeout))
defer cancel()

dtm.mx.Lock()
defer dtm.mx.Unlock()

Expand All @@ -193,7 +190,7 @@ func (dtm *Manager) MustGet(ctx context.Context, pk cipher.PubKey) (DmsgClientSu
return e.sum, nil
}

dt, err := NewDmsgTracker(ctx, dtm.dc, pk)
dt, err := dtm.mustEstablishTracker(pk)
if err != nil {
return DmsgClientSummary{}, err
}
Expand All @@ -214,17 +211,29 @@ func (dtm *Manager) Get(pk cipher.PubKey) (DmsgClientSummary, bool) {
return dtm.get(pk)
}

// mustEstablishTracker creates / re-creates tracker when dmsgTrackerMap entry got deleted, and reconnected.
func (dtm *Manager) mustEstablishTracker(pk cipher.PubKey) (*DmsgTracker, error) {
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(dtm.updateTimeout))
defer cancel()
return newDmsgTracker(ctx, dtm.dc, pk)
}

// GetBulk obtains bulk dmsg client summaries.
func (dtm *Manager) GetBulk(pks []cipher.PubKey) []DmsgClientSummary {
dtm.mx.Lock()
defer dtm.mx.Unlock()

var err error
out := make([]DmsgClientSummary, 0, len(pks))

for _, pk := range pks {
dt, ok := dtm.dm[pk]
if !ok {
continue
dt, err = dtm.mustEstablishTracker(pk)
if err != nil {
dtm.log.WithError(err).Infoln("failed to re-create dmsgtracker client")
continue
}
}
out = append(out, dt.sum)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/visor/dmsgtracker/dmsg_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestDmsgTracker_Update(t *testing.T) {
// arrange: tracking client
cT, err := env.NewClient(&conf)
require.NoError(t, err)
dt, err := NewDmsgTracker(context.TODO(), cT, cL.LocalPK())
dt, err := newDmsgTracker(context.TODO(), cT, cL.LocalPK())
require.NoError(t, err)

// act: attempt update
Expand Down