From d88d8bb16d5c3f9c642ecb7391ebace0227ac509 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Wed, 8 Jul 2020 19:04:24 +0300 Subject: [PATCH 1/5] Register trusted visor in service discovery --- pkg/app/appdisc/factory.go | 23 ++++++++++++++++++++++- pkg/app/appserver/proc_manager.go | 2 +- pkg/servicedisc/client.go | 10 +++++++++- pkg/servicedisc/types.go | 2 ++ pkg/visor/init.go | 15 +++++++++++++++ pkg/visor/visorconfig/v1.go | 2 ++ 6 files changed, 51 insertions(+), 3 deletions(-) diff --git a/pkg/app/appdisc/factory.go b/pkg/app/appdisc/factory.go index de236a7e72..e78db32c36 100644 --- a/pkg/app/appdisc/factory.go +++ b/pkg/app/appdisc/factory.go @@ -35,7 +35,28 @@ func (f *Factory) setDefaults() { } // Updater obtains an updater based on the app name and configuration. -func (f *Factory) Updater(conf appcommon.ProcConfig) (Updater, bool) { +func (f *Factory) VisorUpdater() Updater { + // Always return empty updater if keys are not set. + if f.setDefaults(); f.PK.Null() || f.SK.Null() { + return &emptyUpdater{} + } + + conf := servicedisc.Config{ + Type: servicedisc.ServiceTypeVisor, + PK: f.PK, + SK: f.SK, + Port: 0, + DiscAddr: f.ProxyDisc, + } + + return &serviceUpdater{ + client: servicedisc.NewClient(f.Log, conf), + interval: f.UpdateInterval, + } +} + +// AppUpdater obtains an updater based on the app name and configuration. +func (f *Factory) AppUpdater(conf appcommon.ProcConfig) (Updater, bool) { // Always return empty updater if keys are not set. if f.setDefaults(); f.PK.Null() || f.SK.Null() { return &emptyUpdater{}, false diff --git a/pkg/app/appserver/proc_manager.go b/pkg/app/appserver/proc_manager.go index c58ac6427e..be709752e7 100644 --- a/pkg/app/appserver/proc_manager.go +++ b/pkg/app/appserver/proc_manager.go @@ -189,7 +189,7 @@ func (m *procManager) Start(conf appcommon.ProcConfig) (appcommon.ProcID, error) break } - disc, ok := m.discF.Updater(conf) + disc, ok := m.discF.AppUpdater(conf) if !ok { log.WithField("appName", conf.AppName). Debug("No app discovery associated with app.") diff --git a/pkg/servicedisc/client.go b/pkg/servicedisc/client.go index 19d7efa5d8..81987e1bae 100644 --- a/pkg/servicedisc/client.go +++ b/pkg/servicedisc/client.go @@ -35,12 +35,17 @@ type HTTPClient struct { // NewClient creates a new HTTPClient. func NewClient(log logrus.FieldLogger, conf Config) *HTTPClient { + var stats *Stats + if conf.Type != ServiceTypeVisor { + stats = &Stats{ConnectedClients: 0} + } + return &HTTPClient{ log: log, conf: conf, entry: Service{ Addr: NewSWAddr(conf.PK, conf.Port), - Stats: &Stats{ConnectedClients: 0}, + Stats: stats, Type: conf.Type, }, client: http.Client{}, @@ -62,10 +67,12 @@ func (c *HTTPClient) Auth(ctx context.Context) (*httpauth.Client, error) { if c.auth != nil { return c.auth, nil } + auth, err := httpauth.NewClient(ctx, c.conf.DiscAddr, c.conf.PK, c.conf.SK) if err != nil { return nil, err } + c.auth = auth return auth, nil } @@ -81,6 +88,7 @@ func (c *HTTPClient) Services(ctx context.Context) (out []Service, err error) { if err != nil { return nil, err } + if resp != nil { defer func() { if cErr := resp.Body.Close(); cErr != nil && err == nil { diff --git a/pkg/servicedisc/types.go b/pkg/servicedisc/types.go index 6f5ff34f97..cd36bea0c5 100644 --- a/pkg/servicedisc/types.go +++ b/pkg/servicedisc/types.go @@ -15,6 +15,8 @@ const ( ServiceTypeProxy = "proxy" // ServiceTypeVPN stands for the VPN discovery. ServiceTypeVPN = "vpn" + // ServiceTypeVisor stands for visor. + ServiceTypeVisor = "visor" ) // Errors associated with service discovery types. diff --git a/pkg/visor/init.go b/pkg/visor/init.go index f91b766f6d..7362ce6569 100644 --- a/pkg/visor/init.go +++ b/pkg/visor/init.go @@ -259,6 +259,7 @@ func initLauncher(v *Visor) bool { factory := appdisc.Factory{ Log: v.MasterLogger().PackageLogger("app_discovery"), } + if conf.Discovery != nil { factory.PK = v.conf.PK factory.SK = v.conf.SK @@ -266,6 +267,12 @@ func initLauncher(v *Visor) bool { factory.ProxyDisc = conf.Discovery.ServiceDisc } + var disc appdisc.Updater + if v.conf.PublicTrustedVisor { + disc = factory.VisorUpdater() + disc.Start() + } + // Prepare proc manager. procM, err := appserver.NewProcManager(v.MasterLogger(), &factory, v.ebc, conf.ServerAddr) if err != nil { @@ -273,6 +280,9 @@ func initLauncher(v *Visor) bool { } v.pushCloseStack("launcher.proc_manager", func() bool { + if disc != nil { + disc.Stop() + } return report(procM.Close()) }) @@ -284,21 +294,26 @@ func initLauncher(v *Visor) bool { BinPath: conf.BinPath, LocalPath: conf.LocalPath, } + launchLog := v.MasterLogger().PackageLogger("launcher") + launch, err := launcher.NewLauncher(launchLog, launchConf, v.net.Dmsg(), v.router, procM) if err != nil { return report(fmt.Errorf("failed to start launcher: %w", err)) } + err = launch.AutoStart(map[string]func() ([]string, error){ skyenv.VPNClientName: func() ([]string, error) { return makeVPNEnvs(v.conf, v.net) }, skyenv.VPNServerName: func() ([]string, error) { return makeVPNEnvs(v.conf, v.net) }, }) + if err != nil { return report(fmt.Errorf("failed to autostart apps: %w", err)) } v.procM = procM v.appL = launch + return report(nil) } diff --git a/pkg/visor/visorconfig/v1.go b/pkg/visor/visorconfig/v1.go index fa67ac2f32..b96d828a50 100644 --- a/pkg/visor/visorconfig/v1.go +++ b/pkg/visor/visorconfig/v1.go @@ -33,6 +33,8 @@ type V1 struct { LogLevel string `json:"log_level"` ShutdownTimeout Duration `json:"shutdown_timeout,omitempty"` // time value, examples: 10s, 1m, etc RestartCheckDelay string `json:"restart_check_delay,omitempty"` + + PublicTrustedVisor bool `json:"public_trusted_visor,omitempty"` } // V1Dmsgpty configures the dmsgpty-host. From 3b2bd137255b6583c96812feadefa9c443b52d36 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Fri, 10 Jul 2020 17:08:58 +0300 Subject: [PATCH 2/5] Check if visor has a public IP to detect if it's available from the outside --- internal/netutil/netutil.go | 78 +++++++++++++++++++++++++++++++++++++ pkg/snet/arclient/client.go | 33 +++------------- pkg/visor/init.go | 20 +++++++--- 3 files changed, 98 insertions(+), 33 deletions(-) create mode 100644 internal/netutil/netutil.go diff --git a/internal/netutil/netutil.go b/internal/netutil/netutil.go new file mode 100644 index 0000000000..35728c3c5e --- /dev/null +++ b/internal/netutil/netutil.go @@ -0,0 +1,78 @@ +package netutil + +import "net" + +func LocalAddresses() ([]string, error) { + result := make([]string, 0) + + addresses, err := net.InterfaceAddrs() + if err != nil { + return nil, err + } + + for _, addr := range addresses { + switch v := addr.(type) { + case *net.IPNet: + if v.IP.IsGlobalUnicast() || v.IP.IsLoopback() { + result = append(result, v.IP.String()) + } + case *net.IPAddr: + if v.IP.IsGlobalUnicast() || v.IP.IsLoopback() { + result = append(result, v.IP.String()) + } + } + } + + return result, nil +} + +func HasPublicIP() (bool, error) { + ifaces, err := net.Interfaces() + if err != nil { + return false, err + } + + for _, i := range ifaces { + addrs, err := i.Addrs() + if err != nil { + return false, err + } + + for _, addr := range addrs { + var ip net.IP + switch v := addr.(type) { + case *net.IPNet: + ip = v.IP + case *net.IPAddr: + ip = v.IP + } + + if isPublicIP(ip) { + return true, nil + } + } + } + + return false, nil +} + +func isPublicIP(IP net.IP) bool { + if IP.IsLoopback() || IP.IsLinkLocalMulticast() || IP.IsLinkLocalUnicast() { + return false + } + + if ip4 := IP.To4(); ip4 != nil { + switch { + case ip4[0] == 10: + return false + case ip4[0] == 172 && ip4[1] >= 16 && ip4[1] <= 31: + return false + case ip4[0] == 192 && ip4[1] == 168: + return false + default: + return true + } + } + + return false +} diff --git a/pkg/snet/arclient/client.go b/pkg/snet/arclient/client.go index 5cfe86bb73..b4e4c0f1e3 100644 --- a/pkg/snet/arclient/client.go +++ b/pkg/snet/arclient/client.go @@ -17,11 +17,12 @@ import ( "github.com/AudriusButkevicius/pfilter" "github.com/SkycoinProject/dmsg" "github.com/SkycoinProject/dmsg/cipher" - "github.com/SkycoinProject/dmsg/netutil" + dmsgnetutil "github.com/SkycoinProject/dmsg/netutil" "github.com/SkycoinProject/skycoin/src/util/logging" "github.com/xtaci/kcp-go" "github.com/SkycoinProject/skywire-mainnet/internal/httpauth" + "github.com/SkycoinProject/skywire-mainnet/internal/netutil" "github.com/SkycoinProject/skywire-mainnet/internal/packetfilter" "github.com/SkycoinProject/skywire-mainnet/pkg/snet/directtp/tpconn" "github.com/SkycoinProject/skywire-mainnet/pkg/snet/directtp/tphandshake" @@ -121,7 +122,7 @@ func (c *httpClient) initHTTPClient() { Warnf("Failed to connect to address resolver. STCPR/SUDPH services are temporarily unavailable. Retrying...") retryLog := logging.MustGetLogger("snet.arclient.retrier") - retry := netutil.NewRetrier(retryLog, 1*time.Second, 10*time.Second, 0, 1) + retry := dmsgnetutil.NewRetrier(retryLog, 1*time.Second, 10*time.Second, 0, 1) err := retry.Do(context.Background(), func() error { httpAuthClient, err = httpauth.NewClient(context.Background(), c.remoteHTTPAddr, c.pk, c.sk) @@ -193,7 +194,7 @@ func (c *httpClient) BindSTCPR(ctx context.Context, port string) error { c.log.Infof("BindSTCPR: Address resolver became ready, binding") } - addresses, err := localAddresses() + addresses, err := netutil.LocalAddresses() if err != nil { return err } @@ -247,7 +248,7 @@ func (c *httpClient) BindSUDPH(filter *pfilter.PacketFilter) (<-chan RemoteVisor return nil, err } - addresses, err := localAddresses() + addresses, err := netutil.LocalAddresses() if err != nil { return nil, err } @@ -469,27 +470,3 @@ func extractError(r io.Reader) error { return errors.New(apiError.Error) } - -func localAddresses() ([]string, error) { - result := make([]string, 0) - - addresses, err := net.InterfaceAddrs() - if err != nil { - return nil, err - } - - for _, addr := range addresses { - switch v := addr.(type) { - case *net.IPNet: - if v.IP.IsGlobalUnicast() || v.IP.IsLoopback() { - result = append(result, v.IP.String()) - } - case *net.IPAddr: - if v.IP.IsGlobalUnicast() || v.IP.IsLoopback() { - result = append(result, v.IP.String()) - } - } - } - - return result, nil -} diff --git a/pkg/visor/init.go b/pkg/visor/init.go index 7362ce6569..f227816a05 100644 --- a/pkg/visor/init.go +++ b/pkg/visor/init.go @@ -11,9 +11,10 @@ import ( "github.com/SkycoinProject/dmsg" "github.com/SkycoinProject/dmsg/cipher" "github.com/SkycoinProject/dmsg/dmsgctrl" - "github.com/SkycoinProject/dmsg/netutil" + dmsgnetutil "github.com/SkycoinProject/dmsg/netutil" "github.com/sirupsen/logrus" + "github.com/SkycoinProject/skywire-mainnet/internal/netutil" "github.com/SkycoinProject/skywire-mainnet/internal/utclient" "github.com/SkycoinProject/skywire-mainnet/internal/vpn" "github.com/SkycoinProject/skywire-mainnet/pkg/app/appdisc" @@ -269,8 +270,17 @@ func initLauncher(v *Visor) bool { var disc appdisc.Updater if v.conf.PublicTrustedVisor { - disc = factory.VisorUpdater() - disc.Start() + hasPublicIP, err := netutil.HasPublicIP() + if err != nil { + v.log.WithError(err).Errorf("Failed to check if visor has public IP") + } else if !hasPublicIP { + v.log.Errorf("Visor doesn't have a public IP, it's impossible to register it as public trusted") + } else { + disc = factory.VisorUpdater() + disc.Start() + + v.log.Infof("Registered visor as public trusted") + } } // Prepare proc manager. @@ -323,7 +333,7 @@ func makeVPNEnvs(conf *visorconfig.V1, n *snet.Network) ([]string, error) { if conf.Dmsg != nil { envCfg.DmsgDiscovery = conf.Dmsg.Discovery - r := netutil.NewRetrier(logrus.New(), 1*time.Second, 10*time.Second, 0, 1) + r := dmsgnetutil.NewRetrier(logrus.New(), 1*time.Second, 10*time.Second, 0, 1) err := r.Do(context.Background(), func() error { for _, ses := range n.Dmsg().AllSessions() { envCfg.DmsgServers = append(envCfg.DmsgServers, ses.LocalTCPAddr().String()) @@ -509,7 +519,7 @@ func connectToTpDisc(v *Visor) (transport.DiscoveryClient, error) { conf := v.conf.Transport log := v.MasterLogger().PackageLogger("tp_disc_retrier") - tpdCRetrier := netutil.NewRetrier(log, + tpdCRetrier := dmsgnetutil.NewRetrier(log, initBO, maxBO, tries, factor) var tpdC transport.DiscoveryClient From bd1d8b42988540261a7c6a1c8905bfa2f51a8a61 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Fri, 10 Jul 2020 21:27:38 +0300 Subject: [PATCH 3/5] Check if visor is reachable from service discovery --- go.mod | 1 + go.sum | 3 +++ pkg/app/appdisc/factory.go | 4 ++-- pkg/servicedisc/client.go | 12 ++++++++++ pkg/snet/directtp/client.go | 28 +++++++++++++++++++++++ pkg/snet/network.go | 44 +++++++++++++++++++++++++++++++++---- pkg/visor/init.go | 38 ++++++++++++++------------------ pkg/visor/visor.go | 6 +++-- vendor/modules.txt | 1 + 9 files changed, 107 insertions(+), 30 deletions(-) diff --git a/go.mod b/go.mod index 7fa57f9b99..543ee752c6 100644 --- a/go.mod +++ b/go.mod @@ -20,6 +20,7 @@ require ( github.com/mmcloughlin/avo v0.0.0-20200523190732-4439b6b2c061 // indirect github.com/pkg/profile v1.5.0 github.com/prometheus/client_golang v1.7.1 + github.com/prometheus/common v0.10.0 // indirect github.com/rakyll/statik v0.1.7 github.com/schollz/progressbar/v2 v2.15.0 github.com/shirou/gopsutil v2.20.5+incompatible diff --git a/go.sum b/go.sum index 815257e676..3bd14d7290 100644 --- a/go.sum +++ b/go.sum @@ -12,8 +12,10 @@ github.com/SkycoinProject/yamux v0.0.0-20191213015001-a36efeefbf6a/go.mod h1:IaE github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d h1:G0m3OIz70MZUWq3EgK3CesDbo8upS2Vm9/P3FtgI+Jk= github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= +github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4 h1:Hs82Z41s6SdL1CELW+XaDYmOH4hkBN4/N9og/AsOv7E= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/andybalholm/brotli v0.0.0-20190621154722-5f990b63d2d6 h1:bZ28Hqta7TFAK3Q08CMvv8y3/8ATaEqv2nGoc6yff6c= github.com/andybalholm/brotli v0.0.0-20190621154722-5f990b63d2d6/go.mod h1:+lx6/Aqd1kLJ1GQfkvOnaZ1WGmLpMpbprPuIOOZX30U= @@ -362,6 +364,7 @@ google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miE google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/pkg/app/appdisc/factory.go b/pkg/app/appdisc/factory.go index e78db32c36..1aab6f8310 100644 --- a/pkg/app/appdisc/factory.go +++ b/pkg/app/appdisc/factory.go @@ -35,7 +35,7 @@ func (f *Factory) setDefaults() { } // Updater obtains an updater based on the app name and configuration. -func (f *Factory) VisorUpdater() Updater { +func (f *Factory) VisorUpdater(port uint16) Updater { // Always return empty updater if keys are not set. if f.setDefaults(); f.PK.Null() || f.SK.Null() { return &emptyUpdater{} @@ -45,7 +45,7 @@ func (f *Factory) VisorUpdater() Updater { Type: servicedisc.ServiceTypeVisor, PK: f.PK, SK: f.SK, - Port: 0, + Port: port, DiscAddr: f.ProxyDisc, } diff --git a/pkg/servicedisc/client.go b/pkg/servicedisc/client.go index 81987e1bae..04cd7cef9f 100644 --- a/pkg/servicedisc/client.go +++ b/pkg/servicedisc/client.go @@ -4,7 +4,9 @@ import ( "bytes" "context" "encoding/json" + "errors" "net/http" + "strings" "sync" "time" @@ -14,6 +16,11 @@ import ( "github.com/SkycoinProject/skywire-mainnet/internal/httpauth" ) +var ( + // ErrVisorUnreachable is returned when visor is unreachable. + ErrVisorUnreachable = errors.New("visor is unreachable") +) + // Config configures the HTTPClient. type Config struct { Type string @@ -195,6 +202,11 @@ func (c *HTTPClient) UpdateLoop(ctx context.Context, updateInterval time.Duratio c.entryMx.Unlock() if err != nil { + if strings.Contains(err.Error(), ErrVisorUnreachable.Error()) { + c.log.Errorf("Unable to register visor as public trusted as it's unreachable from WAN") + return + } + c.log.WithError(err).Warn("Failed to update service entry in discovery. Retrying...") time.Sleep(time.Second * 10) // TODO(evanlinjin): Exponential backoff. continue diff --git a/pkg/snet/directtp/client.go b/pkg/snet/directtp/client.go index 7446e0e050..a11417ba23 100644 --- a/pkg/snet/directtp/client.go +++ b/pkg/snet/directtp/client.go @@ -44,6 +44,9 @@ var ( // ErrAlreadyListening is returned when transport is already listening. ErrAlreadyListening = errors.New("already listening") + // ErrNotListening is returned when transport is not listening. + ErrNotListening = errors.New("not listening") + // ErrPortOccupied is returned when port is occupied. ErrPortOccupied = errors.New("port is already occupied") ) @@ -52,6 +55,7 @@ var ( type Client interface { Dial(ctx context.Context, rPK cipher.PubKey, rPort uint16) (*tpconn.Conn, error) Listen(lPort uint16) (*tplistener.Listener, error) + LocalAddr() (net.Addr, error) Serve() error Close() error Type() string @@ -75,6 +79,7 @@ type client struct { log *logging.Logger porter *porter.Porter listener net.Listener + listening chan struct{} listeners map[uint16]*tplistener.Listener // key: lPort sudphPacketFilter *pfilter.PacketFilter sudphListener net.PacketConn @@ -89,6 +94,7 @@ func NewClient(conf Config) Client { porter: porter.New(porter.MinEphemeral), listeners: make(map[uint16]*tplistener.Listener), done: make(chan struct{}), + listening: make(chan struct{}), } } @@ -113,6 +119,7 @@ func (c *client) Serve() error { } c.listener = l + close(c.listening) if c.conf.Type == tptypes.STCPR { localAddr := c.listener.Addr().String() @@ -145,6 +152,27 @@ func (c *client) Serve() error { return nil } +func (c *client) LocalAddr() (net.Addr, error) { + <-c.listening + + switch c.conf.Type { + case tptypes.STCP, tptypes.STCPR: + if c.listener == nil { + return nil, ErrNotListening + } + + return c.listener.Addr(), nil + case tptypes.SUDPH: + if c.sudphListener == nil { + return nil, ErrNotListening + } + + return c.listener.Addr(), nil + } + + return nil, ErrUnknownTransportType +} + func (c *client) acceptConn() error { if c.isClosed() { return io.ErrClosedPipe diff --git a/pkg/snet/network.go b/pkg/snet/network.go index 8758793103..6937277558 100644 --- a/pkg/snet/network.go +++ b/pkg/snet/network.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "net" + "strconv" "strings" "sync" "time" @@ -14,6 +15,7 @@ import ( "github.com/SkycoinProject/dmsg/disc" "github.com/SkycoinProject/skycoin/src/util/logging" + "github.com/SkycoinProject/skywire-mainnet/pkg/app/appdisc" "github.com/SkycoinProject/skywire-mainnet/pkg/app/appevent" "github.com/SkycoinProject/skywire-mainnet/pkg/snet/arclient" "github.com/SkycoinProject/skywire-mainnet/pkg/snet/directtp" @@ -81,6 +83,8 @@ type Config struct { SecKey cipher.SecKey ARClient arclient.APIClient NetworkConfigs NetworkConfigs + ServiceDisc appdisc.Factory + PublicTrusted bool } // NetworkConfigs represents all network configs. @@ -97,10 +101,11 @@ type NetworkClients struct { // Network represents a network between nodes in Skywire. type Network struct { - conf Config - netsMu sync.RWMutex - nets map[string]struct{} // networks to be used with transports - clients NetworkClients + conf Config + netsMu sync.RWMutex + nets map[string]struct{} // networks to be used with transports + clients NetworkClients + visorUpdater appdisc.Updater onNewNetworkTypeMu sync.Mutex onNewNetworkType func(netType string) @@ -212,6 +217,10 @@ func (n *Network) Init() error { if err := client.Serve(); err != nil { return fmt.Errorf("failed to initiate 'stcpr': %w", err) } + + if n.conf.PublicTrusted { + n.registerPublicTrusted(client) + } } else { log.Infof("No config found for stcpr") } @@ -228,6 +237,31 @@ func (n *Network) Init() error { return nil } +func (n *Network) registerPublicTrusted(client directtp.Client) { + la, err := client.LocalAddr() + if err != nil { + log.WithError(err).Errorf("Failed to get STCPR local addr") + return + } + + _, portStr, err := net.SplitHostPort(la.String()) + if err != nil { + log.WithError(err).Errorf("Failed to extract port from addr %v", la.String()) + return + } + + port, err := strconv.Atoi(portStr) + if err != nil { + log.WithError(err).Errorf("Failed to convert port to int") + return + } + + n.visorUpdater = n.conf.ServiceDisc.VisorUpdater(uint16(port)) + n.visorUpdater.Start() + + log.Infof("Registered visor as public trusted") +} + // OnNewNetworkType sets callback to be called when new network type is ready. func (n *Network) OnNewNetworkType(callback func(netType string)) { n.onNewNetworkTypeMu.Lock() @@ -248,6 +282,8 @@ func (n *Network) Close() error { n.netsMu.Lock() defer n.netsMu.Unlock() + n.visorUpdater.Stop() + wg := new(sync.WaitGroup) var dmsgErr error diff --git a/pkg/visor/init.go b/pkg/visor/init.go index f227816a05..ebddf9776d 100644 --- a/pkg/visor/init.go +++ b/pkg/visor/init.go @@ -14,7 +14,6 @@ import ( dmsgnetutil "github.com/SkycoinProject/dmsg/netutil" "github.com/sirupsen/logrus" - "github.com/SkycoinProject/skywire-mainnet/internal/netutil" "github.com/SkycoinProject/skywire-mainnet/internal/utclient" "github.com/SkycoinProject/skywire-mainnet/internal/vpn" "github.com/SkycoinProject/skywire-mainnet/pkg/app/appdisc" @@ -41,6 +40,7 @@ func initStack() []initFunc { initUpdater, initEventBroadcaster, initAddressResolver, + initDiscovery, initSNet, initDmsgpty, initTransport, @@ -95,6 +95,8 @@ func initSNet(v *Visor) bool { SecKey: v.conf.SK, ARClient: v.arClient, NetworkConfigs: nc, + ServiceDisc: v.serviceDisc, + PublicTrusted: v.conf.PublicTrustedVisor, } n, err := snet.New(conf, v.ebc) @@ -252,15 +254,16 @@ func initRouter(v *Visor) bool { return report(nil) } -func initLauncher(v *Visor) bool { - report := v.makeReporter("launcher") - conf := v.conf.Launcher +func initDiscovery(v *Visor) bool { + report := v.makeReporter("discovery") // Prepare app discovery factory. factory := appdisc.Factory{ Log: v.MasterLogger().PackageLogger("app_discovery"), } + conf := v.conf.Launcher + if conf.Discovery != nil { factory.PK = v.conf.PK factory.SK = v.conf.SK @@ -268,31 +271,22 @@ func initLauncher(v *Visor) bool { factory.ProxyDisc = conf.Discovery.ServiceDisc } - var disc appdisc.Updater - if v.conf.PublicTrustedVisor { - hasPublicIP, err := netutil.HasPublicIP() - if err != nil { - v.log.WithError(err).Errorf("Failed to check if visor has public IP") - } else if !hasPublicIP { - v.log.Errorf("Visor doesn't have a public IP, it's impossible to register it as public trusted") - } else { - disc = factory.VisorUpdater() - disc.Start() - - v.log.Infof("Registered visor as public trusted") - } - } + v.serviceDisc = factory + + return report(nil) +} + +func initLauncher(v *Visor) bool { + report := v.makeReporter("launcher") + conf := v.conf.Launcher // Prepare proc manager. - procM, err := appserver.NewProcManager(v.MasterLogger(), &factory, v.ebc, conf.ServerAddr) + procM, err := appserver.NewProcManager(v.MasterLogger(), &v.serviceDisc, v.ebc, conf.ServerAddr) if err != nil { return report(fmt.Errorf("failed to start proc_manager: %w", err)) } v.pushCloseStack("launcher.proc_manager", func() bool { - if disc != nil { - disc.Stop() - } return report(procM.Close()) }) diff --git a/pkg/visor/visor.go b/pkg/visor/visor.go index d459e7fdb1..c62d566f25 100644 --- a/pkg/visor/visor.go +++ b/pkg/visor/visor.go @@ -17,6 +17,7 @@ import ( "github.com/sirupsen/logrus" "github.com/SkycoinProject/skywire-mainnet/internal/utclient" + "github.com/SkycoinProject/skywire-mainnet/pkg/app/appdisc" "github.com/SkycoinProject/skywire-mainnet/pkg/app/appevent" "github.com/SkycoinProject/skywire-mainnet/pkg/app/appserver" "github.com/SkycoinProject/skywire-mainnet/pkg/app/launcher" @@ -67,8 +68,9 @@ type Visor struct { router router.Router rfClient rfclient.Client - procM appserver.ProcManager // proc manager - appL *launcher.Launcher // app launcher + procM appserver.ProcManager // proc manager + appL *launcher.Launcher // app launcher + serviceDisc appdisc.Factory } type vReport struct { diff --git a/vendor/modules.txt b/vendor/modules.txt index 8c5b915984..d4e62ac795 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -151,6 +151,7 @@ github.com/prometheus/client_golang/prometheus/promhttp # github.com/prometheus/client_model v0.2.0 github.com/prometheus/client_model/go # github.com/prometheus/common v0.10.0 +## explicit github.com/prometheus/common/expfmt github.com/prometheus/common/internal/bitbucket.org/ww/goautoneg github.com/prometheus/common/model From 841953da3ceec47607bd360365c7ed9358f36ef0 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Fri, 10 Jul 2020 21:41:35 +0300 Subject: [PATCH 4/5] Fix linter and tests --- internal/netutil/netutil.go | 52 +------------------------------------ pkg/app/appdisc/factory.go | 4 +-- pkg/snet/network.go | 4 ++- 3 files changed, 6 insertions(+), 54 deletions(-) diff --git a/internal/netutil/netutil.go b/internal/netutil/netutil.go index 35728c3c5e..0c2e7bf440 100644 --- a/internal/netutil/netutil.go +++ b/internal/netutil/netutil.go @@ -2,6 +2,7 @@ package netutil import "net" +// LocalAddresses returns a list of all local addresses func LocalAddresses() ([]string, error) { result := make([]string, 0) @@ -25,54 +26,3 @@ func LocalAddresses() ([]string, error) { return result, nil } - -func HasPublicIP() (bool, error) { - ifaces, err := net.Interfaces() - if err != nil { - return false, err - } - - for _, i := range ifaces { - addrs, err := i.Addrs() - if err != nil { - return false, err - } - - for _, addr := range addrs { - var ip net.IP - switch v := addr.(type) { - case *net.IPNet: - ip = v.IP - case *net.IPAddr: - ip = v.IP - } - - if isPublicIP(ip) { - return true, nil - } - } - } - - return false, nil -} - -func isPublicIP(IP net.IP) bool { - if IP.IsLoopback() || IP.IsLinkLocalMulticast() || IP.IsLinkLocalUnicast() { - return false - } - - if ip4 := IP.To4(); ip4 != nil { - switch { - case ip4[0] == 10: - return false - case ip4[0] == 172 && ip4[1] >= 16 && ip4[1] <= 31: - return false - case ip4[0] == 192 && ip4[1] == 168: - return false - default: - return true - } - } - - return false -} diff --git a/pkg/app/appdisc/factory.go b/pkg/app/appdisc/factory.go index 1aab6f8310..0a7333e41c 100644 --- a/pkg/app/appdisc/factory.go +++ b/pkg/app/appdisc/factory.go @@ -34,7 +34,7 @@ func (f *Factory) setDefaults() { } } -// Updater obtains an updater based on the app name and configuration. +// VisorUpdater obtains a visor updater. func (f *Factory) VisorUpdater(port uint16) Updater { // Always return empty updater if keys are not set. if f.setDefaults(); f.PK.Null() || f.SK.Null() { @@ -55,7 +55,7 @@ func (f *Factory) VisorUpdater(port uint16) Updater { } } -// AppUpdater obtains an updater based on the app name and configuration. +// AppUpdater obtains an app updater based on the app name and configuration. func (f *Factory) AppUpdater(conf appcommon.ProcConfig) (Updater, bool) { // Always return empty updater if keys are not set. if f.setDefaults(); f.PK.Null() || f.SK.Null() { diff --git a/pkg/snet/network.go b/pkg/snet/network.go index 6937277558..6f27848005 100644 --- a/pkg/snet/network.go +++ b/pkg/snet/network.go @@ -282,7 +282,9 @@ func (n *Network) Close() error { n.netsMu.Lock() defer n.netsMu.Unlock() - n.visorUpdater.Stop() + if n.visorUpdater != nil { + n.visorUpdater.Stop() + } wg := new(sync.WaitGroup) From bd11226880ff2c72535ee027a0acce3a41bb90fb Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Mon, 13 Jul 2020 16:31:54 +0300 Subject: [PATCH 5/5] Check for a request from service discovery to STCPR --- pkg/snet/directtp/client.go | 5 +++++ pkg/snet/network.go | 6 ++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/pkg/snet/directtp/client.go b/pkg/snet/directtp/client.go index a11417ba23..0c4919a5f2 100644 --- a/pkg/snet/directtp/client.go +++ b/pkg/snet/directtp/client.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "net" + "strings" "sync" "time" @@ -139,6 +140,10 @@ func (c *client) Serve() error { for { if err := c.acceptConn(); err != nil { + if strings.Contains(err.Error(), io.EOF.Error()) { + continue // likely it's a dummy connection from service discovery + } + c.log.Warnf("failed to accept incoming connection: %v", err) if !tphandshake.IsHandshakeError(err) { diff --git a/pkg/snet/network.go b/pkg/snet/network.go index 6f27848005..c4073720fc 100644 --- a/pkg/snet/network.go +++ b/pkg/snet/network.go @@ -219,7 +219,7 @@ func (n *Network) Init() error { } if n.conf.PublicTrusted { - n.registerPublicTrusted(client) + go n.registerPublicTrusted(client) } } else { log.Infof("No config found for stcpr") @@ -238,6 +238,8 @@ func (n *Network) Init() error { } func (n *Network) registerPublicTrusted(client directtp.Client) { + log.Infof("Trying to register visor as public trusted") + la, err := client.LocalAddr() if err != nil { log.WithError(err).Errorf("Failed to get STCPR local addr") @@ -259,7 +261,7 @@ func (n *Network) registerPublicTrusted(client directtp.Client) { n.visorUpdater = n.conf.ServiceDisc.VisorUpdater(uint16(port)) n.visorUpdater.Start() - log.Infof("Registered visor as public trusted") + log.Infof("Sent request to register visor as public trusted") } // OnNewNetworkType sets callback to be called when new network type is ready.