diff --git a/pkg/visor/dmsgtracker/dmsg_tracker.go b/pkg/visor/dmsgtracker/dmsg_tracker.go index 18fcdfdc3e..78d0049490 100644 --- a/pkg/visor/dmsgtracker/dmsg_tracker.go +++ b/pkg/visor/dmsgtracker/dmsg_tracker.go @@ -36,8 +36,8 @@ type DmsgTracker struct { ctrl *dmsgctrl.Control // dmsg ctrl } -// NewDmsgTracker creates a new DmsgTracker. -func NewDmsgTracker(ctx context.Context, dmsgC *dmsg.Client, pk cipher.PubKey) (dt *DmsgTracker, err error) { +// newDmsgTracker creates a new DmsgTracker. +func newDmsgTracker(ctx context.Context, dmsgC *dmsg.Client, pk cipher.PubKey) (dt *DmsgTracker, err error) { conn, err := dmsgC.DialStream(ctx, dmsg.Addr{PK: pk, Port: skyenv.DmsgCtrlPort}) if err != nil { return nil, err @@ -179,9 +179,6 @@ func (dtm *Manager) updateAllTrackers(ctx context.Context, dts map[cipher.PubKey // MustGet obtains a DmsgClientSummary of the client of given pk. // If one is not found internally, a new tracker stream is to be established, returning error on failure. func (dtm *Manager) MustGet(ctx context.Context, pk cipher.PubKey) (DmsgClientSummary, error) { - ctx, cancel := context.WithDeadline(ctx, time.Now().Add(dtm.updateTimeout)) - defer cancel() - dtm.mx.Lock() defer dtm.mx.Unlock() @@ -193,7 +190,7 @@ func (dtm *Manager) MustGet(ctx context.Context, pk cipher.PubKey) (DmsgClientSu return e.sum, nil } - dt, err := NewDmsgTracker(ctx, dtm.dc, pk) + dt, err := dtm.mustEstablishTracker(pk) if err != nil { return DmsgClientSummary{}, err } @@ -214,17 +211,29 @@ func (dtm *Manager) Get(pk cipher.PubKey) (DmsgClientSummary, bool) { return dtm.get(pk) } +// mustEstablishTracker creates / re-creates tracker when dmsgTrackerMap entry got deleted, and reconnected. +func (dtm *Manager) mustEstablishTracker(pk cipher.PubKey) (*DmsgTracker, error) { + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(dtm.updateTimeout)) + defer cancel() + return newDmsgTracker(ctx, dtm.dc, pk) +} + // GetBulk obtains bulk dmsg client summaries. func (dtm *Manager) GetBulk(pks []cipher.PubKey) []DmsgClientSummary { dtm.mx.Lock() defer dtm.mx.Unlock() + var err error out := make([]DmsgClientSummary, 0, len(pks)) for _, pk := range pks { dt, ok := dtm.dm[pk] if !ok { - continue + dt, err = dtm.mustEstablishTracker(pk) + if err != nil { + dtm.log.WithError(err).Infoln("failed to re-create dmsgtracker client") + continue + } } out = append(out, dt.sum) } diff --git a/pkg/visor/dmsgtracker/dmsg_tracker_test.go b/pkg/visor/dmsgtracker/dmsg_tracker_test.go index 6dd5c2361a..54f0f8fcc5 100644 --- a/pkg/visor/dmsgtracker/dmsg_tracker_test.go +++ b/pkg/visor/dmsgtracker/dmsg_tracker_test.go @@ -38,7 +38,7 @@ func TestDmsgTracker_Update(t *testing.T) { // arrange: tracking client cT, err := env.NewClient(&conf) require.NoError(t, err) - dt, err := NewDmsgTracker(context.TODO(), cT, cL.LocalPK()) + dt, err := newDmsgTracker(context.TODO(), cT, cL.LocalPK()) require.NoError(t, err) // act: attempt update