diff --git a/go.mod b/go.mod index 8ea310983d..4d928928cb 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/pkg/profile v1.5.0 github.com/shirou/gopsutil/v3 v3.21.4 github.com/sirupsen/logrus v1.8.1 - github.com/skycoin/dmsg v0.0.0-20220401080257-ba4de44b7666 + github.com/skycoin/dmsg v0.0.0-20220607114207-d4a85dc351ce github.com/skycoin/skycoin v0.27.1 github.com/skycoin/yamux v0.0.0-20200803175205-571ceb89da9f github.com/songgao/water v0.0.0-20200317203138-2b4b6d7c09d8 @@ -48,7 +48,7 @@ require ( github.com/go-chi/chi/v5 v5.0.8-0.20220103230436-7dbe9a0bd10f github.com/ivanpirog/coloredcobra v1.0.0 github.com/james-barrow/golang-ipc v0.0.0-20210227130457-95e7cc81f5e2 - github.com/skycoin/skywire-utilities v0.0.0-20220331141811-c29ff9ab891e + github.com/skycoin/skywire-utilities v0.0.0-20220511053113-3d492e0048c4 github.com/skycoin/systray v1.10.1-0.20220509091536-c90eecafd3fd ) diff --git a/go.sum b/go.sum index 523e16b7a2..429ea0ec02 100644 --- a/go.sum +++ b/go.sum @@ -476,14 +476,15 @@ github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMB github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= -github.com/skycoin/dmsg v0.0.0-20220401080257-ba4de44b7666 h1:v7xCo4I6WsaztanDG11QdtWB0jx/r7n/yO39nKNKnk4= -github.com/skycoin/dmsg v0.0.0-20220401080257-ba4de44b7666/go.mod h1:VPXJkm5DtSpK3cmbg9M9lXZaUjPeyOpJgxURp8HIMkU= +github.com/skycoin/dmsg v0.0.0-20220607114207-d4a85dc351ce h1:a0VlsBW5Ac7D3LflMhv52TWRcC1nbkn3ZRCNEE3hqUE= +github.com/skycoin/dmsg v0.0.0-20220607114207-d4a85dc351ce/go.mod h1:naR6dKKUhc9iDxSUEEw95KO8SEQ1ppJoaHC3osQ5m9k= github.com/skycoin/noise v0.0.0-20180327030543-2492fe189ae6 h1:1Nc5EBY6pjfw1kwW0duwyG+7WliWz5u9kgk1h5MnLuA= github.com/skycoin/noise v0.0.0-20180327030543-2492fe189ae6/go.mod h1:UXghlricA7J3aRD/k7p/zBObQfmBawwCxIVPVjz2Q3o= github.com/skycoin/skycoin v0.27.1 h1:HatxsRwVSPaV4qxH6290xPBmkH/HgiuAoY2qC+e8C9I= github.com/skycoin/skycoin v0.27.1/go.mod h1:78nHjQzd8KG0jJJVL/j0xMmrihXi70ti63fh8vXScJw= -github.com/skycoin/skywire-utilities v0.0.0-20220331141811-c29ff9ab891e h1:2LOmMQqYEk2XX4H8A/MXQmYIIl7opIVYmY3k76D1+Oc= -github.com/skycoin/skywire-utilities v0.0.0-20220331141811-c29ff9ab891e/go.mod h1:9fOsut+rqCBd33NPF3cuolXrw8uQESYshSvdYMaZNEo= +github.com/skycoin/skywire-utilities v0.0.0-20220412111240-e8921109e514/go.mod h1:9fOsut+rqCBd33NPF3cuolXrw8uQESYshSvdYMaZNEo= +github.com/skycoin/skywire-utilities v0.0.0-20220511053113-3d492e0048c4 h1:1wnQREqhD+eOek0isH3m1RTKGT+o2aDZtIoiMk4kROw= +github.com/skycoin/skywire-utilities v0.0.0-20220511053113-3d492e0048c4/go.mod h1:9fOsut+rqCBd33NPF3cuolXrw8uQESYshSvdYMaZNEo= github.com/skycoin/systray v1.10.1-0.20220509091536-c90eecafd3fd h1:z2aytJ9vsBpsBrC6nfHuOTS+AUoEbaKs257+AiigUjk= github.com/skycoin/systray v1.10.1-0.20220509091536-c90eecafd3fd/go.mod h1:LvRiIPZbFo8Qqp7Q9C9LXK02CqoVmc9EUGihU6kTRWc= github.com/skycoin/yamux v0.0.0-20200803175205-571ceb89da9f h1:A5dEM1OE9YhN3LciZU9qPjo7fJ46JeHNi3JCroDkK0Y= diff --git a/pkg/visor/api.go b/pkg/visor/api.go index 33ec552f80..502d49162a 100644 --- a/pkg/visor/api.go +++ b/pkg/visor/api.go @@ -221,13 +221,10 @@ func (v *Visor) Summary() (*Summary, error) { } dmsgStatValue := &dmsgtracker.DmsgClientSummary{} - v.initLock.Lock() - if v.trackersReady { - ctx := context.TODO() - dmsgTracker, _ := v.trackers.MustGet(ctx, v.conf.PK) //nolint + if v.isDTMReady() { + dmsgTracker, _ := v.dtm.Get(v.conf.PK) //nolint dmsgStatValue = &dmsgTracker } - v.initLock.Unlock() summary := &Summary{ Overview: overview, diff --git a/pkg/visor/dmsgtracker/dmsg_tracker.go b/pkg/visor/dmsgtracker/dmsg_tracker.go index 8f433af71d..19f59130d7 100644 --- a/pkg/visor/dmsgtracker/dmsg_tracker.go +++ b/pkg/visor/dmsgtracker/dmsg_tracker.go @@ -3,12 +3,10 @@ package dmsgtracker import ( "context" "io" - "runtime" "sort" "sync" "time" - "github.com/sirupsen/logrus" "github.com/skycoin/dmsg/pkg/dmsg" "github.com/skycoin/dmsg/pkg/dmsgctrl" "github.com/skycoin/skycoin/src/util/logging" @@ -83,9 +81,9 @@ type Manager struct { updateInterval time.Duration updateTimeout time.Duration - log logrus.FieldLogger + log *logging.Logger dc *dmsg.Client - dm map[cipher.PubKey]*DmsgTracker + dts map[cipher.PubKey]*DmsgTracker mx sync.Mutex done chan struct{} @@ -94,7 +92,7 @@ type Manager struct { // NewDmsgTrackerManager creates a new dmsg tracker manager. func NewDmsgTrackerManager(mLog *logging.MasterLogger, dc *dmsg.Client, updateInterval, updateTimeout time.Duration) *Manager { - log := mLog.PackageLogger("dmsg_trackers") + log := mLog.PackageLogger("dmsg_tracker_manager") if updateInterval == 0 { updateInterval = DefaultDTMUpdateInterval } @@ -107,7 +105,7 @@ func NewDmsgTrackerManager(mLog *logging.MasterLogger, dc *dmsg.Client, updateIn updateTimeout: updateTimeout, log: log, dc: dc, - dm: make(map[cipher.PubKey]*DmsgTracker), + dts: make(map[cipher.PubKey]*DmsgTracker), done: make(chan struct{}), } @@ -134,33 +132,34 @@ func (dtm *Manager) serve() { case <-dtm.done: return case <-t.C: - ctx, cancel := context.WithDeadline(ctx, time.Now().Add(dtm.updateTimeout)) - dtm.mx.Lock() - dtm.updateAllTrackers(ctx, dtm.dm) - dtm.mx.Unlock() - cancel() + dtm.updateAllTrackers(ctx) } } } -func (dtm *Manager) updateAllTrackers(ctx context.Context, dts map[cipher.PubKey]*DmsgTracker) { - log := dtm.log.WithField("func", funcName()) +func (dtm *Manager) updateAllTrackers(ctx context.Context) { + dtm.mx.Lock() + defer dtm.mx.Unlock() + + cancelCtx, cancel := context.WithDeadline(ctx, time.Now().Add(dtm.updateTimeout)) + defer cancel() + + log := dtm.log.WithField("func", "dtm.updateAllTrackers") type errReport struct { pk cipher.PubKey err error } - dtsLen := len(dts) + dtsLen := len(dtm.dts) errCh := make(chan errReport, dtsLen) defer close(errCh) - for _, te := range dts { - te := te - + for _, dt := range dtm.dts { + dt := dt go func() { - err := te.Update(ctx) - errCh <- errReport{pk: te.sum.PK, err: err} + err := dt.Update(cancelCtx) + errCh <- errReport{pk: dt.sum.PK, err: err} }() } @@ -169,14 +168,14 @@ func (dtm *Manager) updateAllTrackers(ctx context.Context, dts map[cipher.PubKey log.WithError(r.err). WithField("client_pk", r.pk). Warn("Removing dmsg client tracker.") - delete(dts, r.pk) + delete(dtm.dts, r.pk) } } } -// MustGet obtains a DmsgClientSummary of the client of given pk. -// If one is not found internally, a new tracker stream is to be established, returning error on failure. -func (dtm *Manager) MustGet(ctx context.Context, pk cipher.PubKey) (DmsgClientSummary, error) { +// ShouldGet obtains a DmsgClientSummary of the client of given pk. +// If one are not found internally, a new goroutine of tracker stream is to be established. +func (dtm *Manager) ShouldGet(ctx context.Context, pk cipher.PubKey) (DmsgClientSummary, error) { dtm.mx.Lock() defer dtm.mx.Unlock() @@ -184,17 +183,13 @@ func (dtm *Manager) MustGet(ctx context.Context, pk cipher.PubKey) (DmsgClientSu return DmsgClientSummary{}, io.ErrClosedPipe } - if e, ok := dtm.dm[pk]; ok && !isDone(e.ctrl.Done()) { + if e, ok := dtm.dts[pk]; ok && !isDone(e.ctrl.Done()) { return e.sum, nil } - dt, err := dtm.mustEstablishTracker(pk) - if err != nil { - return DmsgClientSummary{}, err - } + go dtm.establishTracker(ctx, pk) - dtm.dm[pk] = dt - return dt.sum, nil + return DmsgClientSummary{}, nil } // Get obtains a DmsgClientSummary of the client with given public key. @@ -210,30 +205,63 @@ func (dtm *Manager) Get(pk cipher.PubKey) (DmsgClientSummary, bool) { } // mustEstablishTracker creates / re-creates tracker when dmsgTrackerMap entry got deleted, and reconnected. -func (dtm *Manager) mustEstablishTracker(pk cipher.PubKey) (*DmsgTracker, error) { - ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(dtm.updateTimeout)) +// It is ment to be used as a goroutine and saves the new DmsgTracker to dtm.dts. +func (dtm *Manager) establishTracker(ctx context.Context, pk cipher.PubKey) { + log := dtm.log.WithField("func", "dtm.establishTracker") + + type errReport struct { + pk cipher.PubKey + err error + } + + errCh := make(chan errReport) + defer close(errCh) + doneCh := make(chan struct{}) + + dCtx, cancel := context.WithDeadline(ctx, time.Now().Add(dtm.updateTimeout)) defer cancel() - return newDmsgTracker(ctx, dtm.dc, pk) + go func() { + dt, err := newDmsgTracker(dCtx, dtm.dc, pk) + if err != nil { + select { + case <-ctx.Done(): + return + default: + errCh <- errReport{pk: pk, err: err} + } + } + dtm.mx.Lock() + if dt != nil { + dtm.dts[pk] = dt + } + dtm.mx.Unlock() + close(doneCh) + }() + + select { + case r := <-errCh: + if r.err != nil { + log.WithError(r.err).WithField("client_pk", r.pk).Warn("Failed to re-create dmsgtracker client.") + } + case <-ctx.Done(): + log.WithError(ctx.Err()).WithField("client_pk", pk).Warn("Failed to re-create dmsgtracker client.") + case <-doneCh: + log.WithField("client_pk", pk).Debug("Dmsgtracker client Established.") + } } // GetBulk obtains bulk dmsg client summaries. -func (dtm *Manager) GetBulk(pks []cipher.PubKey) []DmsgClientSummary { - dtm.mx.Lock() - defer dtm.mx.Unlock() - - var err error - out := make([]DmsgClientSummary, 0, len(pks)) +// If one are not found internally, a new goroutine of tracker stream is to be established. +func (dtm *Manager) GetBulk(ctx context.Context, pks []cipher.PubKey) []DmsgClientSummary { + out := make([]DmsgClientSummary, 0) for _, pk := range pks { - dt, ok := dtm.dm[pk] + ds, ok := dtm.Get(pk) if !ok { - dt, err = dtm.mustEstablishTracker(pk) - if err != nil { - dtm.log.WithError(err).Infoln("failed to re-create dmsgtracker client") - continue - } + // we establish tracker if there is none + go dtm.establishTracker(ctx, pk) } - out = append(out, dt.sum) + out = append(out, ds) } sort.Slice(out, func(i, j int) bool { @@ -246,17 +274,16 @@ func (dtm *Manager) GetBulk(pks []cipher.PubKey) []DmsgClientSummary { } func (dtm *Manager) get(pk cipher.PubKey) (DmsgClientSummary, bool) { - dt, ok := dtm.dm[pk] + dt, ok := dtm.dts[pk] if !ok { return DmsgClientSummary{}, false } - return dt.sum, true } // Close implements io.Closer func (dtm *Manager) Close() error { - log := dtm.log.WithField("func", funcName()) + log := dtm.log.WithField("func", "dtm.Close") dtm.mx.Lock() defer dtm.mx.Unlock() @@ -267,7 +294,7 @@ func (dtm *Manager) Close() error { closed = true close(dtm.done) - for pk, dt := range dtm.dm { + for pk, dt := range dtm.dts { if err := dt.ctrl.Close(); err != nil { log.WithError(err). WithField("client_pk", pk). @@ -291,8 +318,3 @@ func isDone(done <-chan struct{}) bool { return false } } - -func funcName() string { - pc, _, _, _ := runtime.Caller(1) - return runtime.FuncForPC(pc).Name() -} diff --git a/pkg/visor/dmsgtracker/dmsg_tracker_test.go b/pkg/visor/dmsgtracker/dmsg_tracker_test.go index f81418f3c7..e49de5af7c 100644 --- a/pkg/visor/dmsgtracker/dmsg_tracker_test.go +++ b/pkg/visor/dmsgtracker/dmsg_tracker_test.go @@ -2,7 +2,6 @@ package dmsgtracker import ( "context" - "fmt" "runtime" "testing" "time" @@ -10,11 +9,9 @@ import ( "github.com/skycoin/dmsg/pkg/dmsg" "github.com/skycoin/dmsg/pkg/dmsgctrl" "github.com/skycoin/dmsg/pkg/dmsgtest" - "github.com/skycoin/skycoin/src/util/logging" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/skycoin/skywire-utilities/pkg/cipher" "github.com/skycoin/skywire/pkg/skyenv" ) @@ -53,87 +50,3 @@ func TestDmsgTracker_Update(t *testing.T) { assert.NotZero(t, dt.sum.RoundTrip) } } - -func TestDmsgTrackerManager_MustGet(t *testing.T) { - const nServers = 1 - conf := dmsg.Config{MinSessions: 1} - - ctx, cancel := context.WithCancel(context.TODO()) - t.Cleanup(cancel) - - env := dmsgtest.NewEnv(t, timeout) - require.NoError(t, env.Startup(0, nServers, 0, &conf)) - t.Cleanup(env.Shutdown) - mLog := logging.NewMasterLogger() - // arrange: tracker manager - tmC, err := env.NewClient(&conf) - require.NoError(t, err) - tm := NewDmsgTrackerManager(mLog, tmC, 0, 0) - t.Cleanup(func() { assert.NoError(t, tm.Close()) }) - - type testCase struct { - add bool // true:add_client false:close_client - sk cipher.SecKey // secret key of the client to add/close - } - - _, sk1 := cipher.GenerateKeyPair() - _, sk2 := cipher.GenerateKeyPair() - _, sk3 := cipher.GenerateKeyPair() - _, sk4 := cipher.GenerateKeyPair() - - testCases := []testCase{ - {add: true, sk: sk1}, - {add: true, sk: sk2}, - {add: false, sk: sk1}, - {add: true, sk: sk3}, - {add: true, sk: sk4}, - {add: false, sk: sk3}, - {add: false, sk: sk4}, - } - - for i, tc := range testCases { - i, tc := i, tc - - pk, err := tc.sk.PubKey() - require.NoError(t, err) - - if tc.add { - name := fmt.Sprintf("%d:add_%s", i, tc.sk) - t.Run(name, func(t *testing.T) { - c, err := env.NewClientWithKeys(pk, tc.sk, &conf) - require.NoError(t, err) - l, err := c.Listen(skyenv.DmsgCtrlPort) - require.NoError(t, err) - dmsgctrl.ServeListener(l, 0) - - // act - sum, err := tm.MustGet(ctx, pk) - require.NoError(t, err) - tm.updateAllTrackers(ctx, tm.dm) - - // assert - assert.Equal(t, pk, sum.PK) - - if !(runtime.GOOS == "windows") { - // TODO: fix non-deterministic windows roundtrip failure - assert.NotZero(t, tm.dm[pk].sum.RoundTrip) - } - }) - - } else { - name := fmt.Sprintf("%d:close_%s", i, tc.sk) - t.Run(name, func(t *testing.T) { - c, ok := env.ClientOfPK(pk) - require.True(t, ok) - - // act - assert.NoError(t, c.Close()) - tm.updateAllTrackers(ctx, tm.dm) - - // assert - _, ok = tm.Get(pk) - assert.False(t, ok) - }) - } - } -} diff --git a/pkg/visor/hypervisor.go b/pkg/visor/hypervisor.go index 4705e05fe2..d4de7096c0 100644 --- a/pkg/visor/hypervisor.go +++ b/pkg/visor/hypervisor.go @@ -56,18 +56,16 @@ type Conn struct { // Hypervisor manages visors. type Hypervisor struct { - c hypervisorconfig.Config - visor *Visor - dmsgC *dmsg.Client - visors map[cipher.PubKey]Conn // connected remote visors - trackers *dmsgtracker.Manager // dmsg trackers - trackersReady bool - users *usermanager.UserManager - mu *sync.RWMutex - visorMu sync.Mutex - visorChanMux map[cipher.PubKey]*chanMux - selfConn Conn - logger *logging.Logger + c hypervisorconfig.Config + visor *Visor + dmsgC *dmsg.Client + visors map[cipher.PubKey]Conn // connected remote visors + users *usermanager.UserManager + mu *sync.RWMutex + visorMu sync.Mutex + visorChanMux map[cipher.PubKey]*chanMux + selfConn Conn + logger *logging.Logger } // New creates a new Hypervisor. @@ -92,25 +90,16 @@ func New(config hypervisorconfig.Config, visor *Visor, dmsgC *dmsg.Client) (*Hyp } hv := &Hypervisor{ - c: config, - visor: visor, - dmsgC: dmsgC, - visors: make(map[cipher.PubKey]Conn), - users: usermanager.NewUserManager(mLogger, singleUserDB, config.Cookies), - mu: new(sync.RWMutex), - visorChanMux: make(map[cipher.PubKey]*chanMux), - selfConn: selfConn, - logger: mLogger.PackageLogger("hypervisor"), - trackersReady: false, + c: config, + visor: visor, + dmsgC: dmsgC, + visors: make(map[cipher.PubKey]Conn), + users: usermanager.NewUserManager(mLogger, singleUserDB, config.Cookies), + mu: new(sync.RWMutex), + visorChanMux: make(map[cipher.PubKey]*chanMux), + selfConn: selfConn, + logger: mLogger.PackageLogger("hypervisor"), } - - go func() { - hv.mu.Lock() - hv.trackers = dmsgtracker.NewDmsgTrackerManager(mLogger, dmsgC, 0, 0) - hv.trackersReady = true - hv.mu.Unlock() - }() - return hv, nil } @@ -121,15 +110,11 @@ func (hv *Hypervisor) ServeRPC(ctx context.Context, dmsgPort uint16) error { return err } - if hv.visor != nil { + if hv.visor.isDTMReady() { // Track hypervisor node. - hv.mu.Lock() - if hv.trackersReady { - if _, err := hv.trackers.MustGet(ctx, hv.visor.conf.PK); err != nil { - hv.logger.WithField("addr", hv.c.DmsgDiscovery).WithError(err).Warn("Failed to dial tracker stream.") - } + if _, err := hv.visor.dtm.ShouldGet(ctx, hv.visor.conf.PK); err != nil { + hv.logger.WithField("addr", hv.c.DmsgDiscovery).WithError(err).Warn("Failed to dial tracker stream.") } - hv.mu.Unlock() } // setup @@ -152,8 +137,8 @@ func (hv *Hypervisor) ServeRPC(ctx context.Context, dmsgPort uint16) error { API: NewRPCClient(log, conn, RPCPrefix, skyenv.RPCTimeout), PtyUI: setupDmsgPtyUI(hv.dmsgC, addr.PK), } - if hv.trackers != nil { - if _, err := hv.trackers.MustGet(ctx, addr.PK); err != nil { + if hv.visor.isDTMReady() { + if _, err := hv.visor.dtm.ShouldGet(ctx, addr.PK); err != nil { log.WithField("addr", hv.c.DmsgDiscovery).WithError(err).Warn("Failed to dial tracker stream.") } } @@ -345,8 +330,9 @@ func (hv *Hypervisor) getDmsgSummary() []dmsgtracker.DmsgClientSummary { for pk := range hv.visors { pks = append(pks, pk) } - if hv.trackers != nil { - return hv.trackers.GetBulk(pks) + if hv.visor.isDTMReady() { + ctx := context.TODO() + return hv.visor.dtm.GetBulk(ctx, pks) } return []dmsgtracker.DmsgClientSummary{} } diff --git a/pkg/visor/init.go b/pkg/visor/init.go index 09bdcd9564..96cb506066 100644 --- a/pkg/visor/init.go +++ b/pkg/visor/init.go @@ -45,6 +45,7 @@ import ( "github.com/skycoin/skywire/pkg/utclient" "github.com/skycoin/skywire/pkg/util/osutil" "github.com/skycoin/skywire/pkg/util/updater" + "github.com/skycoin/skywire/pkg/visor/dmsgtracker" "github.com/skycoin/skywire/pkg/visor/visorconfig" vinit "github.com/skycoin/skywire/pkg/visor/visorinit" ) @@ -111,6 +112,8 @@ var ( dmsgCtrl vinit.Module // Dmsg http module dmsgHTTP vinit.Module + // Dmsg trackers module + dmsgTrackers vinit.Module // visor that groups all modules together vis vinit.Module ) @@ -136,6 +139,7 @@ func registerModules(logger *logging.MasterLogger) { stcpC = maker("stcp", initStcpClient, &tr) dmsgC = maker("dmsg", initDmsg, &ebc, &dmsgHTTP) dmsgCtrl = maker("dmsg_ctrl", initDmsgCtrl, &dmsgC, &tr) + dmsgTrackers = maker("dmsg_trackers", initDmsgTrackers, &dmsgC) pty = maker("dmsg_pty", initDmsgpty, &dmsgC) rt = maker("router", initRouter, &tr, &dmsgC, &dmsgHTTP) @@ -145,7 +149,7 @@ func registerModules(logger *logging.MasterLogger) { ut = maker("uptime_tracker", initUptimeTracker, &dmsgHTTP) pv = maker("public_autoconnect", initPublicAutoconnect, &tr, &disc) trs = maker("transport_setup", initTransportSetup, &dmsgC, &tr) - tm = vinit.MakeModule("transports", vinit.DoNothing, logger, &sc, &sudphC, &dmsgCtrl) + tm = vinit.MakeModule("transports", vinit.DoNothing, logger, &sc, &sudphC, &dmsgCtrl, &dmsgTrackers) pvs = maker("public_visor", initPublicVisor, &tr, &ar, &disc, &stcprC) vis = vinit.MakeModule("visor", vinit.DoNothing, logger, &up, &ebc, &ar, &disc, &pty, &tr, &rt, &launch, &cli, &hvs, &ut, &pv, &pvs, &trs, &stcpC, &stcprC) @@ -353,6 +357,20 @@ func initDmsgCtrl(ctx context.Context, v *Visor, _ *logging.Logger) error { return nil } +func initDmsgTrackers(ctx context.Context, v *Visor, _ *logging.Logger) error { + dmsgC := v.dmsgC + + dtm := dmsgtracker.NewDmsgTrackerManager(v.MasterLogger(), dmsgC, 0, 0) + v.pushCloseStack("dmsg_tracker_manager", func() error { + return dtm.Close() + }) + v.initLock.Lock() + v.dtm = dtm + v.initLock.Unlock() + v.dtmReadyOnce.Do(func() { close(v.dtmReady) }) + return nil +} + func initSudphClient(ctx context.Context, v *Visor, log *logging.Logger) error { var serviceURL dmsgget.URL diff --git a/pkg/visor/visor.go b/pkg/visor/visor.go index f095ebd272..445fd7f5af 100644 --- a/pkg/visor/visor.go +++ b/pkg/visor/visor.go @@ -62,13 +62,14 @@ type Visor struct { updater *updater.Updater uptimeTracker utclient.APIClient - ebc *appevent.Broadcaster // event broadcaster - dmsgC *dmsg.Client - dmsgDC *dmsg.Client // dmsg direct client - dClient dmsgdisc.APIClient // dmsg direct api client - dmsgHTTP *http.Client // dmsghttp client - trackers *dmsgtracker.Manager - trackersReady bool + ebc *appevent.Broadcaster // event broadcaster + dmsgC *dmsg.Client + dmsgDC *dmsg.Client // dmsg direct client + dClient dmsgdisc.APIClient // dmsg direct api client + dmsgHTTP *http.Client // dmsghttp client + dtm *dmsgtracker.Manager + dtmReady chan struct{} + dtmReadyOnce sync.Once stunClient *network.StunDetails stunReady chan struct{} @@ -120,8 +121,8 @@ func NewVisor(ctx context.Context, conf *visorconfig.V1, restartCtx *restart.Con restartCtx: restartCtx, initLock: new(sync.RWMutex), isServicesHealthy: newInternalHealthInfo(), + dtmReady: make(chan struct{}), stunReady: make(chan struct{}), - trackersReady: false, } v.isServicesHealthy.init() @@ -175,12 +176,6 @@ func NewVisor(ctx context.Context, conf *visorconfig.V1, restartCtx *restart.Con if !v.processRuntimeErrs() { return nil, false } - go func() { - v.initLock.RLock() - v.trackers = dmsgtracker.NewDmsgTrackerManager(v.MasterLogger(), v.dmsgC, 0, 0) - v.trackersReady = true - v.initLock.RUnlock() - }() return v, true } @@ -255,6 +250,15 @@ func (v *Visor) Close() error { return nil } +func (v *Visor) isDTMReady() bool { + select { + case <-v.dtmReady: + return true + default: + return false + } +} + // SetLogstore sets visor runtime logstore func (v *Visor) SetLogstore(store logstore.Store) { v.logstore = store diff --git a/vendor/github.com/skycoin/dmsg/pkg/dmsg/client.go b/vendor/github.com/skycoin/dmsg/pkg/dmsg/client.go index f05ba8204b..ac73793b3c 100644 --- a/vendor/github.com/skycoin/dmsg/pkg/dmsg/client.go +++ b/vendor/github.com/skycoin/dmsg/pkg/dmsg/client.go @@ -16,9 +16,6 @@ import ( "github.com/skycoin/skywire-utilities/pkg/netutil" ) -// TODO(evanlinjin): We should implement exponential backoff at some point. -const serveWait = time.Second - // SessionDialCallback is triggered BEFORE a session is dialed to. // If a non-nil error is returned, the session dial is instantly terminated. type SessionDialCallback func(network, addr string) (err error) @@ -74,6 +71,10 @@ type Client struct { conf *Config porter *netutil.Porter + bo time.Duration // initial backoff duration + maxBO time.Duration // maximum backoff duration + factor float64 // multiplier for the backoff duration that is applied on every retry + errCh chan error done chan struct{} once sync.Once @@ -82,12 +83,6 @@ type Client struct { // NewClient creates a dmsg client entity. func NewClient(pk cipher.PubKey, sk cipher.SecKey, dc disc.APIClient, conf *Config) *Client { - c := new(Client) - c.ready = make(chan struct{}) - c.porter = netutil.NewPorter(netutil.PorterMinEphemeral) - c.errCh = make(chan error, 10) - c.done = make(chan struct{}) - log := logging.MustGetLogger("dmsg_client") // Init config. @@ -95,7 +90,17 @@ func NewClient(pk cipher.PubKey, sk cipher.SecKey, dc disc.APIClient, conf *Conf conf = DefaultConfig() } conf.Ensure() - c.conf = conf + + c := &Client{ + ready: make(chan struct{}), + porter: netutil.NewPorter(netutil.PorterMinEphemeral), + errCh: make(chan error, 10), + done: make(chan struct{}), + conf: conf, + bo: time.Second * 5, + maxBO: time.Minute, + factor: netutil.DefaultFactor, + } // Init common fields. c.EntityCommon.init(pk, sk, dc, log, conf.UpdateInterval) @@ -156,15 +161,15 @@ func (ce *Client) Serve(ctx context.Context) { if err == context.Canceled || err == context.DeadlineExceeded { return } - time.Sleep(time.Second) // TODO(evanlinjin): Implement exponential back off. + ce.serveWait() continue } if len(entries) == 0 { - ce.log.Warnf("No entries found. Retrying after %s...", serveWait.String()) - time.Sleep(serveWait) + ce.log.Warnf("No entries found. Retrying after %s...", ce.bo.String()) + ce.serveWait() } - for _, entry := range entries { + for n, entry := range entries { if isClosed(ce.done) { return } @@ -183,11 +188,21 @@ func (ce *Client) Serve(ctx context.Context) { } if err := ce.EnsureSession(cancellabelCtx, entry); err != nil { - ce.log.WithField("remote_pk", entry.Static).WithError(err).Warn("Failed to establish session.") if err == context.Canceled || err == context.DeadlineExceeded { + ce.log.WithField("remote_pk", entry.Static).WithError(err).Warn("Failed to establish session.") return } - time.Sleep(serveWait) + // we send an error if this is the last server + if n == (len(entries) - 1) { + if !isClosed(ce.done) { + ce.sesMx.Lock() + ce.errCh <- err + ce.sesMx.Unlock() + } + } + ce.log.WithField("remote_pk", entry.Static).WithError(err).WithField("current_backoff", ce.bo.String()). + Warn("Failed to establish session.") + ce.serveWait() } } // We dial all servers and wait for error or done signal. @@ -443,6 +458,21 @@ func (ce *Client) ConnectionsSummary() ConnectionsSummary { return out } +func (ce *Client) serveWait() { + bo := ce.bo + + t := time.NewTimer(bo) + defer t.Stop() + + if newBO := time.Duration(float64(bo) * ce.factor); ce.maxBO == 0 || newBO <= ce.maxBO { + ce.bo = newBO + if newBO > ce.maxBO { + ce.bo = ce.maxBO + } + } + <-t.C +} + func hasPK(pks []cipher.PubKey, pk cipher.PubKey) bool { for _, oldPK := range pks { if oldPK == pk { diff --git a/vendor/github.com/skycoin/dmsg/pkg/dmsg/client_session.go b/vendor/github.com/skycoin/dmsg/pkg/dmsg/client_session.go index 456484600b..deab5a3e98 100644 --- a/vendor/github.com/skycoin/dmsg/pkg/dmsg/client_session.go +++ b/vendor/github.com/skycoin/dmsg/pkg/dmsg/client_session.go @@ -79,7 +79,7 @@ func (cs *ClientSession) serve() error { }() for { if _, err := cs.acceptStream(); err != nil { - if netErr, ok := err.(net.Error); ok && netErr.Temporary() { + if netErr, ok := err.(net.Error); ok && netErr.Temporary() { //nolint cs.log. WithError(err). Info("Failed to accept stream.") diff --git a/vendor/github.com/skycoin/dmsg/pkg/dmsg/session_common.go b/vendor/github.com/skycoin/dmsg/pkg/dmsg/session_common.go index 1c5a6f6d9a..d22ea1d6f9 100644 --- a/vendor/github.com/skycoin/dmsg/pkg/dmsg/session_common.go +++ b/vendor/github.com/skycoin/dmsg/pkg/dmsg/session_common.go @@ -1,7 +1,6 @@ package dmsg import ( - "bufio" "encoding/binary" "io" "net" @@ -62,11 +61,11 @@ func (sc *SessionCommon) initClient(entity *EntityCommon, conn net.Conn, rPK cip return err } - r := bufio.NewReader(conn) - if err := noise.InitiatorHandshake(ns, r, conn); err != nil { + rw := noise.NewReadWriter(conn, ns) + if err := rw.Handshake(time.Second * 5); err != nil { return err } - if r.Buffered() > 0 { + if rw.Buffered() > 0 { return ErrSessionHandshakeExtraBytes } @@ -95,11 +94,11 @@ func (sc *SessionCommon) initServer(entity *EntityCommon, conn net.Conn) error { return err } - r := bufio.NewReader(conn) - if err := noise.ResponderHandshake(ns, r, conn); err != nil { + rw := noise.NewReadWriter(conn, ns) + if err := rw.Handshake(time.Second * 5); err != nil { return err } - if r.Buffered() > 0 { + if rw.Buffered() > 0 { return ErrSessionHandshakeExtraBytes } diff --git a/vendor/github.com/skycoin/dmsg/pkg/dmsgpty/host.go b/vendor/github.com/skycoin/dmsg/pkg/dmsgpty/host.go index 1fcc64be61..583cbe555e 100644 --- a/vendor/github.com/skycoin/dmsg/pkg/dmsgpty/host.go +++ b/vendor/github.com/skycoin/dmsg/pkg/dmsgpty/host.go @@ -58,7 +58,7 @@ func (h *Host) ServeCLI(ctx context.Context, lis net.Listener) error { for { conn, err := lis.Accept() if err != nil { - if err, ok := err.(net.Error); ok && err.Temporary() { + if err, ok := err.(net.Error); ok && err.Temporary() { //nolint log.Warn("Failed to accept CLI connection with temporary error, continuing...") continue } @@ -108,7 +108,7 @@ func (h *Host) ListenAndServe(ctx context.Context, port uint16) error { stream, err := lis.AcceptStream() if err != nil { log := log.WithError(err) - if err, ok := err.(net.Error); ok && err.Temporary() { + if err, ok := err.(net.Error); ok && err.Temporary() { //nolint log.Warn("Failed to accept dmsg.Stream with temporary error, continuing...") continue } diff --git a/vendor/github.com/skycoin/dmsg/pkg/dmsgpty/ui.go b/vendor/github.com/skycoin/dmsg/pkg/dmsgpty/ui.go index 3923e5045c..da295055b5 100644 --- a/vendor/github.com/skycoin/dmsg/pkg/dmsgpty/ui.go +++ b/vendor/github.com/skycoin/dmsg/pkg/dmsgpty/ui.go @@ -178,6 +178,9 @@ func (ui *UI) Handler() http.HandlerFunc { } }() + // set DMSGPTYTERM=1 env on starting dmsgpty-ui + ptyC.Write([]byte("export DMSGPTYTERM=1\n")) //nolint + // io done, once := make(chan struct{}), new(sync.Once) closeDone := func() { once.Do(func() { close(done) }) } diff --git a/vendor/github.com/skycoin/dmsg/pkg/noise/read_writer.go b/vendor/github.com/skycoin/dmsg/pkg/noise/read_writer.go index 87002ae7ec..efac2eef43 100644 --- a/vendor/github.com/skycoin/dmsg/pkg/noise/read_writer.go +++ b/vendor/github.com/skycoin/dmsg/pkg/noise/read_writer.go @@ -105,7 +105,7 @@ func (rw *ReadWriter) Read(p []byte) (int, error) { // * If error is non-temporary, save error in state so further reads will fail. func (rw *ReadWriter) processReadError(err error) error { if nErr, ok := err.(net.Error); ok { - if !nErr.Temporary() { + if !nErr.Temporary() { //nolint rw.rErr = err } return err @@ -189,6 +189,11 @@ func (rw *ReadWriter) Handshake(hsTimeout time.Duration) error { } } +// Buffered returns the number of bytes that can be read from the buffer rawInput. +func (rw *ReadWriter) Buffered() int { + return rw.rawInput.Buffered() +} + // LocalStatic returns the local static public key. func (rw *ReadWriter) LocalStatic() cipher.PubKey { return rw.ns.LocalStatic() @@ -294,7 +299,7 @@ func ReadRawFrame(r *bufio.Reader) (p []byte, err error) { } func isTemp(err error) bool { - if netErr, ok := err.(net.Error); ok && netErr.Temporary() { + if netErr, ok := err.(net.Error); ok && netErr.Temporary() { //nolint return true } return false diff --git a/vendor/github.com/skycoin/skywire-utilities/pkg/geo/geo.go b/vendor/github.com/skycoin/skywire-utilities/pkg/geo/geo.go index 51eabe54df..9c338ac317 100644 --- a/vendor/github.com/skycoin/skywire-utilities/pkg/geo/geo.go +++ b/vendor/github.com/skycoin/skywire-utilities/pkg/geo/geo.go @@ -4,6 +4,7 @@ import ( "encoding/json" "errors" "fmt" + "math" "net" "net/http" @@ -75,8 +76,8 @@ func MakeIPDetails(log logrus.FieldLogger, apiKey string) LocationDetails { // Prepare output. out := LocationData{ - Lat: j.Lat, - Lon: j.Lon, + Lat: roundTwoDigits(j.Lat), + Lon: roundTwoDigits(j.Lon), Country: j.CountryCode, Region: j.Region, } @@ -85,3 +86,7 @@ func MakeIPDetails(log logrus.FieldLogger, apiKey string) LocationDetails { return &out, nil } } + +func roundTwoDigits(value float64) float64 { + return math.Round(value*100) / 100 +} diff --git a/vendor/github.com/skycoin/skywire-utilities/pkg/httputil/error.go b/vendor/github.com/skycoin/skywire-utilities/pkg/httputil/error.go index 0752c6a05e..16ac49f71d 100644 --- a/vendor/github.com/skycoin/skywire-utilities/pkg/httputil/error.go +++ b/vendor/github.com/skycoin/skywire-utilities/pkg/httputil/error.go @@ -13,6 +13,11 @@ type HTTPError struct { Body string } +// Error is the object returned to the client when there's an error. +type Error struct { + Error string `json:"error"` +} + // ErrorFromResp creates an HTTPError from a given server response. func ErrorFromResp(resp *http.Response) error { status := resp.StatusCode diff --git a/vendor/github.com/skycoin/skywire-utilities/pkg/httputil/health.go b/vendor/github.com/skycoin/skywire-utilities/pkg/httputil/health.go index 7bc8c2556d..2bb405c26f 100644 --- a/vendor/github.com/skycoin/skywire-utilities/pkg/httputil/health.go +++ b/vendor/github.com/skycoin/skywire-utilities/pkg/httputil/health.go @@ -2,114 +2,41 @@ package httputil import ( "context" - "errors" - "fmt" - "io" - "io/ioutil" "net/http" - "path" + "time" + + "github.com/skycoin/skywire-utilities/pkg/buildinfo" ) -// HealthGrabberFunc grabs a component's health. -type HealthGrabberFunc func(ctx context.Context) (statusCode int, bodyMsg string) +var path = "/health" -// HealthGrabberEntry adds a 'Name' field to the HealthGrabber. -type HealthGrabberEntry struct { - Name string - Grab HealthGrabberFunc +// HealthCheckResponse is struct of /health endpoint +type HealthCheckResponse struct { + BuildInfo *buildinfo.Info `json:"build_info,omitempty"` + StartedAt time.Time `json:"started_at"` } -// MakeHealthHandler returns a HTTP handler that displays component status(es). -// The endpoint returns a content type of text/plain with each component's -// health on a new line. The format of a line is as follows: -// : -// One can request the endpoint to return the health of a single component only -// via the following path: -// // -func MakeHealthHandler(expectedBase string, entries []HealthGrabberEntry) http.HandlerFunc { - baseMap := make(map[string]HealthGrabberFunc, len(entries)) - for _, e := range entries { - switch e.Name { - case "", expectedBase: - panic(errors.New("entry name cannot be empty or the same as expected base")) - } - - baseMap[e.Name] = e.Grab +// GetServiceHealth gets the response from the given service url +func GetServiceHealth(ctx context.Context, url string) (health *HealthCheckResponse, err error) { + resp, err := http.Get(url + path) + if err != nil { + return nil, err } - - return func(w http.ResponseWriter, req *http.Request) { - switch base := path.Base(req.URL.EscapedPath()); base { - case "", "/", expectedBase: - msg := "" - for _, e := range entries { - code, m := e.Grab(req.Context()) - msg += formatMsg(e.Name, code, m) - - if code < 200 || code > 299 { - if err := writeHTTPText(w, code, msg); err != nil { - GetLogger(req).WithError(err).Warn("Failed to write response body.") - } - return - } + if resp != nil { + defer func() { + if cErr := resp.Body.Close(); cErr != nil && err == nil { + err = cErr } - - if err := writeHTTPText(w, http.StatusOK, msg); err != nil { - GetLogger(req).WithError(err).Warn("Failed to write response body.") - } - - default: - grab, ok := baseMap[base] - if !ok { - msg := fmt.Sprintf("unexpected path base: %s", base) - if err := writeHTTPText(w, http.StatusBadRequest, msg); err != nil { - GetLogger(req).WithError(err).Warn("Failed to write response body.") - } - return - } - - code, msg := grab(req.Context()) - if err := writeHTTPText(w, code, formatMsg(base, code, msg)); err != nil { - GetLogger(req).WithError(err).Warn("Failed to write response body.") - } - } + }() } -} - -// CheckHealth calls the health endpoint at given URL, and writes the response -// into the provided writer (if nothing went wrong). -func CheckHealth(urlStr string, w io.Writer) error { - resp, err := http.Get(urlStr) //nolint:gosec - if err != nil { - return err - } - defer func() { - if err := resp.Body.Close(); err != nil { - fmt.Println("failed to close response body:", err) + if resp.StatusCode != http.StatusOK { + var hErr HTTPError + if err = json.NewDecoder(resp.Body).Decode(&hErr); err != nil { + return nil, err } - }() - - b, err := ioutil.ReadAll(resp.Body) - if err != nil { - return err + return nil, &hErr } + err = json.NewDecoder(resp.Body).Decode(&health) - if resp.StatusCode == http.StatusBadRequest { - return fmt.Errorf("bad request: %s", string(b)) - } - - _, err = w.Write(b) - return err -} - -func formatMsg(name string, code int, msg string) string { - return fmt.Sprintf("%s: %d %s\n", name, code, msg) -} - -func writeHTTPText(w http.ResponseWriter, code int, msg string) error { - w.WriteHeader(code) - w.Header().Set("Content-Type", "text/plain") - if _, err := w.Write([]byte(msg)); err != nil { - return err - } - return nil + return health, nil } diff --git a/vendor/modules.txt b/vendor/modules.txt index 89e38020c5..bfb8779b53 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -147,7 +147,7 @@ github.com/shirou/gopsutil/v3/process ## explicit; go 1.13 github.com/sirupsen/logrus github.com/sirupsen/logrus/hooks/syslog -# github.com/skycoin/dmsg v0.0.0-20220401080257-ba4de44b7666 +# github.com/skycoin/dmsg v0.0.0-20220607114207-d4a85dc351ce ## explicit; go 1.16 github.com/skycoin/dmsg/internal/servermetrics github.com/skycoin/dmsg/pkg/direct @@ -171,7 +171,7 @@ github.com/skycoin/skycoin/src/cipher/ripemd160 github.com/skycoin/skycoin/src/cipher/secp256k1-go github.com/skycoin/skycoin/src/cipher/secp256k1-go/secp256k1-go2 github.com/skycoin/skycoin/src/util/logging -# github.com/skycoin/skywire-utilities v0.0.0-20220331141811-c29ff9ab891e +# github.com/skycoin/skywire-utilities v0.0.0-20220511053113-3d492e0048c4 ## explicit; go 1.17 github.com/skycoin/skywire-utilities/pkg/buildinfo github.com/skycoin/skywire-utilities/pkg/cipher