From f3f67e885f01df64b76aaed18bda33f84b0b0100 Mon Sep 17 00:00:00 2001 From: Alexander Adhyatma Date: Sat, 20 Nov 2021 11:25:11 +0700 Subject: [PATCH 1/5] fetch address-resolver for transport cache, save it in visor --- pkg/transport/network/addrresolver/client.go | 56 ++++++++++++++++++++ pkg/visor/init.go | 11 +++- pkg/visor/visor.go | 2 + 3 files changed, 68 insertions(+), 1 deletion(-) diff --git a/pkg/transport/network/addrresolver/client.go b/pkg/transport/network/addrresolver/client.go index 1f11c64f9..f0d841e25 100644 --- a/pkg/transport/network/addrresolver/client.go +++ b/pkg/transport/network/addrresolver/client.go @@ -7,6 +7,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/skycoin/skywire/pkg/transport/network" "io" "io/ioutil" "net" @@ -43,6 +44,8 @@ var ( ErrNoEntry = errors.New("no entry for this PK") // ErrNotReady is returned when address resolver is not ready. ErrNotReady = errors.New("address resolver is not ready") + // ErrNoTransportsFound returned when no transports are found. + ErrNoTransportsFound = errors.New("failed to get response data from AR transports endpoint") ) // Error is the object returned to the client when there's an error. @@ -57,6 +60,7 @@ type APIClient interface { BindSTCPR(ctx context.Context, port string) error BindSUDPH(filter *pfilter.PacketFilter, handshake Handshake) (<-chan RemoteVisor, error) Resolve(ctx context.Context, netType string, pk cipher.PubKey) (VisorData, error) + Transports(ctx context.Context) (map[cipher.PubKey][]network.Type, error) Close() error } @@ -377,6 +381,58 @@ func (c *httpClient) Resolve(ctx context.Context, tType string, pk cipher.PubKey return resolveResp, nil } +func (c *httpClient) Transports(ctx context.Context) (map[cipher.PubKey][]network.Type, error) { + if !c.isReady() { + return nil, ErrNotReady + } + resp, err := c.Get(ctx, "/transports") + if err != nil { + return nil, err + } + defer func() { + if err = resp.Body.Close(); err != nil { + c.log.WithError(err).Warn("Failed to close response body") + } + }() + + if resp.StatusCode != http.StatusOK { + c.log.Warn(ErrNoTransportsFound.Error()) + return nil, ErrNoTransportsFound + } + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + transportsMap := map[string]string{} + if err = json.Unmarshal(body, &transportsMap); err != nil { + return nil, err + } + + var results map[cipher.PubKey][]network.Type + + for k, v := range transportsMap { + rPK, err := cipher.NewPubKey([]byte(v)) + if err != nil { + continue + } + + // Two kinds of network, SUDPH and STCPR + if _, ok := results[rPK]; ok { + kType := network.Type(k) + if len(results[rPK]) == 1 && kType != results[rPK][0] { + results[rPK] = append(results[rPK], network.Type(k)) + } + } else { + nTypeSlice := make([]network.Type, 0, 2) + nTypeSlice = append(nTypeSlice, network.Type(k)) + results[rPK] = nTypeSlice + } + } + return results, nil +} + func (c *httpClient) isReady() bool { select { case <-c.ready: diff --git a/pkg/visor/init.go b/pkg/visor/init.go index 1a9ab622a..3b315c043 100644 --- a/pkg/visor/init.go +++ b/pkg/visor/init.go @@ -172,8 +172,17 @@ func initAddressResolver(ctx context.Context, v *Visor, log *logging.Logger) err err := fmt.Errorf("failed to create address resolver client: %w", err) return err } + + // initialize cache for available transports + m, err := v.arClient.Transports(ctx) + if err != nil { + log.Warn("failed to fetch transports from AR") + return err + } + v.initLock.Lock() v.arClient = arClient + v.transportsCache = m v.initLock.Unlock() return nil } @@ -655,7 +664,7 @@ func initPublicVisor(_ context.Context, v *Visor, log *logging.Logger) error { logger.Warn("Failed to get STCPR port") return nil } - visorUpdater := v.serviceDisc.VisorUpdater(uint16(port)) + visorUpdater := v.serviceDisc.VisorUpdater(port) visorUpdater.Start() v.log.Infof("Sent request to register visor as public") diff --git a/pkg/visor/visor.go b/pkg/visor/visor.go index 1bb85e621..ba30eabd6 100644 --- a/pkg/visor/visor.go +++ b/pkg/visor/visor.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "github.com/skycoin/dmsg/cipher" "sync" "time" @@ -77,6 +78,7 @@ type Visor struct { runtimeErrors chan error isServicesHealthy *internalHealthInfo + transportsCache map[cipher.PubKey][]network.Type } // todo: consider moving module closing to the module system From aa1df65ed03e3d4ffedc28097ad2b695fe0a805b Mon Sep 17 00:00:00 2001 From: Alexander Adhyatma Date: Mon, 22 Nov 2021 00:43:25 +0700 Subject: [PATCH 2/5] advanced autoconnection initial commit --- pkg/router/router.go | 89 ++++++++++++++------ pkg/servicedisc/autoconnect.go | 23 ++++- pkg/transport/network/addrresolver/client.go | 16 ++-- pkg/visor/init.go | 68 ++++++++++++++- pkg/visor/visor.go | 6 +- 5 files changed, 159 insertions(+), 43 deletions(-) diff --git a/pkg/router/router.go b/pkg/router/router.go index 08fad3857..274d8d2e2 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -52,6 +52,13 @@ var ( ErrNoTransportFound = errors.New("no transport found") ) +// RouteSetupHook is an alias for a function that takes remote public key +// and a reference to transport manager in order to setup i.e: +// 1. If the remote is either available stcpr or sudph, establish the transport to the remote and then continue with the route creation process. +// 2. If neither of these direct transports is available, check if automatic transports are currently active. If they are continue with route creation. +// 3. If none of the first two checks was successful, establish a dmsg transport and then continue with route creation. +type RouteSetupHook func(cipher.PubKey, *transport.Manager) error + // Config configures Router. type Config struct { Logger *logging.Logger @@ -143,24 +150,26 @@ type Router interface { // communicating with setup nodes, forward packets according to local // rules and manages route groups for apps. type router struct { - mx sync.Mutex - conf *Config - logger *logging.Logger - sl *dmsg.Listener - dmsgC *dmsg.Client - trustedVisors map[cipher.PubKey]struct{} - tm *transport.Manager - rt routing.Table - rgsNs map[routing.RouteDescriptor]*NoiseRouteGroup // Noise-wrapped route groups to push incoming reads from transports. - rgsRaw map[routing.RouteDescriptor]*RouteGroup // Not-yet-noise-wrapped route groups. when one of these gets wrapped, it gets removed from here - rpcSrv *rpc.Server - accept chan routing.EdgeRules - done chan struct{} - once sync.Once + mx sync.Mutex + conf *Config + logger *logging.Logger + sl *dmsg.Listener + dmsgC *dmsg.Client + trustedVisors map[cipher.PubKey]struct{} + tm *transport.Manager + rt routing.Table + rgsNs map[routing.RouteDescriptor]*NoiseRouteGroup // Noise-wrapped route groups to push incoming reads from transports. + rgsRaw map[routing.RouteDescriptor]*RouteGroup // Not-yet-noise-wrapped route groups. when one of these gets wrapped, it gets removed from here + rpcSrv *rpc.Server + accept chan routing.EdgeRules + done chan struct{} + once sync.Once + routeSetupHookMu sync.Mutex + routeSetupHooks []RouteSetupHook // see RouteSetupHook description } // New constructs a new Router. -func New(dmsgC *dmsg.Client, config *Config) (Router, error) { +func New(dmsgC *dmsg.Client, config *Config, routeSetupHooks []RouteSetupHook) (Router, error) { config.SetDefaults() sl, err := dmsgC.Listen(skyenv.DmsgAwaitSetupPort) @@ -173,19 +182,24 @@ func New(dmsgC *dmsg.Client, config *Config) (Router, error) { trustedVisors[node] = struct{}{} } + if routeSetupHooks == nil { + routeSetupHooks = []RouteSetupHook{} + } + r := &router{ - conf: config, - logger: config.Logger, - tm: config.TransportManager, - rt: routing.NewTable(), - sl: sl, - dmsgC: dmsgC, - rgsNs: make(map[routing.RouteDescriptor]*NoiseRouteGroup), - rgsRaw: make(map[routing.RouteDescriptor]*RouteGroup), - rpcSrv: rpc.NewServer(), - accept: make(chan routing.EdgeRules, acceptSize), - done: make(chan struct{}), - trustedVisors: trustedVisors, + conf: config, + logger: config.Logger, + tm: config.TransportManager, + rt: routing.NewTable(), + sl: sl, + dmsgC: dmsgC, + rgsNs: make(map[routing.RouteDescriptor]*NoiseRouteGroup), + rgsRaw: make(map[routing.RouteDescriptor]*RouteGroup), + rpcSrv: rpc.NewServer(), + accept: make(chan routing.EdgeRules, acceptSize), + done: make(chan struct{}), + trustedVisors: trustedVisors, + routeSetupHooks: routeSetupHooks, } go r.rulesGCLoop() @@ -197,6 +211,14 @@ func New(dmsgC *dmsg.Client, config *Config) (Router, error) { return r, nil } +// RegisterSetupHooks takes variadic RouteSetupHook to add to router's setup functions +// currently not in use +func (r *router) RegisterSetupHooks(rshooks ...RouteSetupHook) { + r.routeSetupHookMu.Lock() + r.routeSetupHooks = append(r.routeSetupHooks, rshooks...) + r.routeSetupHookMu.Unlock() +} + // DialRoutes dials to a given visor of 'rPK'. // 'lPort'/'rPort' specifies the local/remote ports respectively. // A nil 'opts' input results in a value of '1' for all DialOptions fields. @@ -221,6 +243,17 @@ func (r *router) DialRoutes( lPK := r.conf.PubKey forwardDesc := routing.NewRouteDescriptor(lPK, rPK, lPort, rPort) + r.routeSetupHookMu.Lock() + if len(r.routeSetupHooks) != 0 { + for _, rsf := range r.routeSetupHooks { + if err := rsf(rPK, r.tm); err != nil { + return nil, err + } + } + } + r.routeSetupHookMu.Unlock() + + // check if transports are available ok := r.checkIfTransportAvalailable() if !ok { return nil, ErrNoTransportFound @@ -1042,7 +1075,7 @@ func (r *router) removeRouteGroupOfRule(rule routing.Rule) { func (r *router) checkIfTransportAvalailable() (ok bool) { r.tm.WalkTransports(func(tp *transport.ManagedTransport) bool { ok = true - return true + return ok }) return ok } diff --git a/pkg/servicedisc/autoconnect.go b/pkg/servicedisc/autoconnect.go index ce1e0a760..2622598f0 100644 --- a/pkg/servicedisc/autoconnect.go +++ b/pkg/servicedisc/autoconnect.go @@ -2,8 +2,10 @@ package servicedisc import ( "context" + "fmt" "time" + "github.com/sirupsen/logrus" "github.com/skycoin/dmsg/cipher" "github.com/skycoin/skycoin/src/util/logging" @@ -77,17 +79,34 @@ func (a *autoconnector) Run(ctx context.Context) (err error) { if !ok || val < maxFailedAddressRetryAttempt { a.log.WithField("pk", pk).WithField("attempt", val).Debugln("Trying to add transport to public visor") logger := a.log.WithField("pk", pk).WithField("type", string(network.STCPR)) - if _, err := a.tm.SaveTransport(ctx, pk, network.STCPR, transport.LabelAutomatic); err != nil { + if err = a.tryEstablishTransport(ctx, pk, logger); err != nil { logger.WithError(err).Warnln("Failed to add transport to public visor") failedAddresses[pk]++ continue } - logger.Infoln("Added transport to public visor") } } } } +// tryEstablish transport will try to establish transport to the remote pk via STCPR or SUDPH, if both failed, return error. +func (a *autoconnector) tryEstablishTransport(ctx context.Context, pk cipher.PubKey, logger *logrus.Entry) error { + errSlice := make([]error, 0, 2) + if _, err := a.tm.SaveTransport(ctx, pk, network.STCPR, transport.LabelAutomatic); err != nil { + errSlice = append(errSlice, err) + } + if _, err := a.tm.SaveTransport(ctx, pk, network.SUDPH, transport.LabelAutomatic); err != nil { + errSlice = append(errSlice, err) + } + + if len(errSlice) > 1 { + return fmt.Errorf("unable to establish both SUDPH: %v and STCPR: %v to %s", errSlice[0], errSlice[1], pk) + } + + logger.Debugln("Added transport to public visor") + return nil +} + func (a *autoconnector) fetchPubAddresses(ctx context.Context) ([]cipher.PubKey, error) { retrier := netutil.NewRetrier(fetchServicesDelay, 5, 3, a.log) var services []Service diff --git a/pkg/transport/network/addrresolver/client.go b/pkg/transport/network/addrresolver/client.go index f0d841e25..9434891d9 100644 --- a/pkg/transport/network/addrresolver/client.go +++ b/pkg/transport/network/addrresolver/client.go @@ -7,7 +7,6 @@ import ( "encoding/json" "errors" "fmt" - "github.com/skycoin/skywire/pkg/transport/network" "io" "io/ioutil" "net" @@ -60,7 +59,7 @@ type APIClient interface { BindSTCPR(ctx context.Context, port string) error BindSUDPH(filter *pfilter.PacketFilter, handshake Handshake) (<-chan RemoteVisor, error) Resolve(ctx context.Context, netType string, pk cipher.PubKey) (VisorData, error) - Transports(ctx context.Context) (map[cipher.PubKey][]network.Type, error) + Transports(ctx context.Context) (map[cipher.PubKey][]string, error) Close() error } @@ -381,7 +380,7 @@ func (c *httpClient) Resolve(ctx context.Context, tType string, pk cipher.PubKey return resolveResp, nil } -func (c *httpClient) Transports(ctx context.Context) (map[cipher.PubKey][]network.Type, error) { +func (c *httpClient) Transports(ctx context.Context) (map[cipher.PubKey][]string, error) { if !c.isReady() { return nil, ErrNotReady } @@ -410,7 +409,7 @@ func (c *httpClient) Transports(ctx context.Context) (map[cipher.PubKey][]networ return nil, err } - var results map[cipher.PubKey][]network.Type + results := map[cipher.PubKey][]string{} for k, v := range transportsMap { rPK, err := cipher.NewPubKey([]byte(v)) @@ -420,13 +419,12 @@ func (c *httpClient) Transports(ctx context.Context) (map[cipher.PubKey][]networ // Two kinds of network, SUDPH and STCPR if _, ok := results[rPK]; ok { - kType := network.Type(k) - if len(results[rPK]) == 1 && kType != results[rPK][0] { - results[rPK] = append(results[rPK], network.Type(k)) + if len(results[rPK]) == 1 && k != results[rPK][0] { + results[rPK] = append(results[rPK], k) } } else { - nTypeSlice := make([]network.Type, 0, 2) - nTypeSlice = append(nTypeSlice, network.Type(k)) + nTypeSlice := make([]string, 0, 2) + nTypeSlice = append(nTypeSlice, k) results[rPK] = nTypeSlice } } diff --git a/pkg/visor/init.go b/pkg/visor/init.go index 3b315c043..b31d72a99 100644 --- a/pkg/visor/init.go +++ b/pkg/visor/init.go @@ -174,7 +174,7 @@ func initAddressResolver(ctx context.Context, v *Visor, log *logging.Logger) err } // initialize cache for available transports - m, err := v.arClient.Transports(ctx) + m, err := arClient.Transports(ctx) if err != nil { log.Warn("failed to fetch transports from AR") return err @@ -184,9 +184,37 @@ func initAddressResolver(ctx context.Context, v *Visor, log *logging.Logger) err v.arClient = arClient v.transportsCache = m v.initLock.Unlock() + + doneCh := make(chan struct{}, 1) + t := time.NewTicker(1 * time.Hour) + go fetchARTransports(ctx, v, log, doneCh, t) + v.pushCloseStack("address_resolver", func() error { + doneCh <- struct{}{} + return nil + }) + return nil } +func fetchARTransports(ctx context.Context, v *Visor, log *logging.Logger, doneCh <-chan struct{}, tick *time.Ticker) { + for { + select { + case <-tick.C: + log.Debug("Fetching PKs from AR") + m, err := v.arClient.Transports(ctx) + if err != nil { + log.WithError(err).Warn("failed to fetch AR transport") + } + v.transportCacheMu.Lock() + v.transportsCache = m + v.transportCacheMu.Unlock() + case <-doneCh: + tick.Stop() + return + } + } +} + func initDiscovery(ctx context.Context, v *Visor, log *logging.Logger) error { // Prepare app discovery factory. factory := appdisc.Factory{ @@ -388,6 +416,40 @@ func initTransportSetup(ctx context.Context, v *Visor, log *logging.Logger) erro return nil } +func getRouteSetupHooks(ctx context.Context, v *Visor, log *logging.Logger) []router.RouteSetupHook { + return []router.RouteSetupHook{ + func(rPK cipher.PubKey, tm *transport.Manager) error { + dmsgFallback := func() error { + _, err := tm.SaveTransport(ctx, rPK, network.DMSG, transport.LabelAutomatic) + return err + } + // check visor's AR transport cache + if v.transportsCache == nil { + // skips if there's no AR transports + log.Warn("empty AR transports cache") + return dmsgFallback() + } + transports, ok := v.transportsCache[rPK] + if !ok { + log.WithField("pk", rPK.String()).Warn("pk not found in the transports cache") + // check if automatic transport is available, if it does, + // continue with route creation + if v.conf.Transport.PublicAutoconnect { + return nil + } + } + // try to establish direct connection to rPK (single hop) + errCh := make(chan error, 1) + for _, t := range transports { + if _, err := tm.SaveTransport(ctx, rPK, network.Type(t), transport.LabelAutomatic); err != nil { + errCh <- err + } + } + return <-errCh + }, + } +} + func initRouter(ctx context.Context, v *Visor, log *logging.Logger) error { conf := v.conf.Routing rfClient := rfclient.NewHTTP(conf.RouteFinder, time.Duration(conf.RouteFinderTimeout)) @@ -404,7 +466,9 @@ func initRouter(ctx context.Context, v *Visor, log *logging.Logger) error { MinHops: v.conf.Routing.MinHops, } - r, err := router.New(v.dmsgC, &rConf) + routeSetupHooks := getRouteSetupHooks(ctx, v, log) + + r, err := router.New(v.dmsgC, &rConf, routeSetupHooks) if err != nil { err := fmt.Errorf("failed to create router: %w", err) return err diff --git a/pkg/visor/visor.go b/pkg/visor/visor.go index ba30eabd6..ace98c654 100644 --- a/pkg/visor/visor.go +++ b/pkg/visor/visor.go @@ -5,11 +5,11 @@ import ( "context" "errors" "fmt" - "github.com/skycoin/dmsg/cipher" "sync" "time" "github.com/skycoin/dmsg" + "github.com/skycoin/dmsg/cipher" "github.com/skycoin/skycoin/src/util/logging" "github.com/skycoin/skywire/internal/utclient" @@ -78,7 +78,8 @@ type Visor struct { runtimeErrors chan error isServicesHealthy *internalHealthInfo - transportsCache map[cipher.PubKey][]network.Type + transportCacheMu *sync.Mutex + transportsCache map[cipher.PubKey][]string } // todo: consider moving module closing to the module system @@ -110,6 +111,7 @@ func NewVisor(conf *visorconfig.V1, restartCtx *restart.Context) (*Visor, bool) initLock: new(sync.Mutex), isServicesHealthy: newInternalHealthInfo(), wgTrackers: new(sync.WaitGroup), + transportCacheMu: new(sync.Mutex), } v.wgTrackers.Add(1) defer v.wgTrackers.Done() From ceea95db8577df627db0306f75452e7ed3d3d52a Mon Sep 17 00:00:00 2001 From: Alexander Adhyatma Date: Mon, 22 Nov 2021 10:15:12 +0700 Subject: [PATCH 3/5] fixes json unmarshal format, return dmsg fallback if transports not found --- pkg/router/router.go | 2 -- pkg/transport/network/addrresolver/client.go | 35 ++++++++++---------- pkg/visor/init.go | 19 +++++++---- 3 files changed, 31 insertions(+), 25 deletions(-) diff --git a/pkg/router/router.go b/pkg/router/router.go index 274d8d2e2..42c8ae08f 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -243,7 +243,6 @@ func (r *router) DialRoutes( lPK := r.conf.PubKey forwardDesc := routing.NewRouteDescriptor(lPK, rPK, lPort, rPort) - r.routeSetupHookMu.Lock() if len(r.routeSetupHooks) != 0 { for _, rsf := range r.routeSetupHooks { if err := rsf(rPK, r.tm); err != nil { @@ -251,7 +250,6 @@ func (r *router) DialRoutes( } } } - r.routeSetupHookMu.Unlock() // check if transports are available ok := r.checkIfTransportAvalailable() diff --git a/pkg/transport/network/addrresolver/client.go b/pkg/transport/network/addrresolver/client.go index 9434891d9..e914167be 100644 --- a/pkg/transport/network/addrresolver/client.go +++ b/pkg/transport/network/addrresolver/client.go @@ -380,10 +380,8 @@ func (c *httpClient) Resolve(ctx context.Context, tType string, pk cipher.PubKey return resolveResp, nil } +// Transports query available transports. func (c *httpClient) Transports(ctx context.Context) (map[cipher.PubKey][]string, error) { - if !c.isReady() { - return nil, ErrNotReady - } resp, err := c.Get(ctx, "/transports") if err != nil { return nil, err @@ -404,28 +402,31 @@ func (c *httpClient) Transports(ctx context.Context) (map[cipher.PubKey][]string return nil, err } - transportsMap := map[string]string{} + transportsMap := map[string][]string{} if err = json.Unmarshal(body, &transportsMap); err != nil { return nil, err } results := map[cipher.PubKey][]string{} - for k, v := range transportsMap { - rPK, err := cipher.NewPubKey([]byte(v)) - if err != nil { - continue - } + for k, pks := range transportsMap { + for _, pk := range pks { + rPK := cipher.PubKey{} + if err := rPK.Set(pk); err != nil { + c.log.WithError(err).Warn("unable to transform PK") + continue + } - // Two kinds of network, SUDPH and STCPR - if _, ok := results[rPK]; ok { - if len(results[rPK]) == 1 && k != results[rPK][0] { - results[rPK] = append(results[rPK], k) + // Two kinds of network, SUDPH and STCPR + if _, ok := results[rPK]; ok { + if len(results[rPK]) == 1 && k != results[rPK][0] { + results[rPK] = append(results[rPK], k) + } + } else { + nTypeSlice := make([]string, 0, 2) + nTypeSlice = append(nTypeSlice, k) + results[rPK] = nTypeSlice } - } else { - nTypeSlice := make([]string, 0, 2) - nTypeSlice = append(nTypeSlice, k) - results[rPK] = nTypeSlice } } return results, nil diff --git a/pkg/visor/init.go b/pkg/visor/init.go index b31d72a99..cf4099ec4 100644 --- a/pkg/visor/init.go +++ b/pkg/visor/init.go @@ -6,6 +6,7 @@ import ( "fmt" "net" "net/http" + "strings" "sync" "time" @@ -169,7 +170,7 @@ func initAddressResolver(ctx context.Context, v *Visor, log *logging.Logger) err arClient, err := addrresolver.NewHTTP(conf.AddressResolver, v.conf.PK, v.conf.SK, log) if err != nil { - err := fmt.Errorf("failed to create address resolver client: %w", err) + err = fmt.Errorf("failed to create address resolver client: %w", err) return err } @@ -435,17 +436,23 @@ func getRouteSetupHooks(ctx context.Context, v *Visor, log *logging.Logger) []ro // check if automatic transport is available, if it does, // continue with route creation if v.conf.Transport.PublicAutoconnect { + // we return nil here, if there's no transport available it wlll be checked + // by the router itself next. return nil } + return dmsgFallback() } // try to establish direct connection to rPK (single hop) - errCh := make(chan error, 1) - for _, t := range transports { - if _, err := tm.SaveTransport(ctx, rPK, network.Type(t), transport.LabelAutomatic); err != nil { - errCh <- err + errSlice := make([]error, 0, 2) + for _, trans := range transports { + if _, err := tm.SaveTransport(ctx, rPK, network.Type(strings.ToUpper(trans)), transport.LabelAutomatic); err != nil { + errSlice = append(errSlice, err) } } - return <-errCh + if len(errSlice) != 2 { + return nil + } + return errors.New(errSlice[0].Error()) }, } } From 75562da3bcd010d55e86d6ea8a447feb41fc35c4 Mon Sep 17 00:00:00 2001 From: Alexander Adhyatma Date: Mon, 22 Nov 2021 18:22:10 +0700 Subject: [PATCH 4/5] sudph: don't connect when under symmetric NAT, fixes network type typo --- pkg/router/router.go | 2 +- pkg/servicedisc/autoconnect.go | 11 +---------- pkg/visor/init.go | 8 ++++++-- 3 files changed, 8 insertions(+), 13 deletions(-) diff --git a/pkg/router/router.go b/pkg/router/router.go index 42c8ae08f..b57f8b11f 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -299,7 +299,7 @@ func (r *router) DialRoutes( return nrg, nil } -// AcceptsRoutes should block until we receive an AddRules packet from SetupNode +// AcceptRoutes should block until we receive an AddRules packet from SetupNode // that contains ConsumeRule(s) or ForwardRule(s). // Then the following should happen: // - Save to routing.Table and internal RouteGroup map. diff --git a/pkg/servicedisc/autoconnect.go b/pkg/servicedisc/autoconnect.go index 2622598f0..28c2657df 100644 --- a/pkg/servicedisc/autoconnect.go +++ b/pkg/servicedisc/autoconnect.go @@ -2,7 +2,6 @@ package servicedisc import ( "context" - "fmt" "time" "github.com/sirupsen/logrus" @@ -91,16 +90,8 @@ func (a *autoconnector) Run(ctx context.Context) (err error) { // tryEstablish transport will try to establish transport to the remote pk via STCPR or SUDPH, if both failed, return error. func (a *autoconnector) tryEstablishTransport(ctx context.Context, pk cipher.PubKey, logger *logrus.Entry) error { - errSlice := make([]error, 0, 2) if _, err := a.tm.SaveTransport(ctx, pk, network.STCPR, transport.LabelAutomatic); err != nil { - errSlice = append(errSlice, err) - } - if _, err := a.tm.SaveTransport(ctx, pk, network.SUDPH, transport.LabelAutomatic); err != nil { - errSlice = append(errSlice, err) - } - - if len(errSlice) > 1 { - return fmt.Errorf("unable to establish both SUDPH: %v and STCPR: %v to %s", errSlice[0], errSlice[1], pk) + return err } logger.Debugln("Added transport to public visor") diff --git a/pkg/visor/init.go b/pkg/visor/init.go index cf4099ec4..ca2f6f28b 100644 --- a/pkg/visor/init.go +++ b/pkg/visor/init.go @@ -6,7 +6,6 @@ import ( "fmt" "net" "net/http" - "strings" "sync" "time" @@ -445,7 +444,12 @@ func getRouteSetupHooks(ctx context.Context, v *Visor, log *logging.Logger) []ro // try to establish direct connection to rPK (single hop) errSlice := make([]error, 0, 2) for _, trans := range transports { - if _, err := tm.SaveTransport(ctx, rPK, network.Type(strings.ToUpper(trans)), transport.LabelAutomatic); err != nil { + ntype := network.Type(trans) + // skip if SUDPH is under symmetric NAT / under UDP firewall. + if ntype == network.SUDPH && (v.stunClient.NATType == stun.NATSymmetric || v.stunClient.NATType == stun.NATSymmetricUDPFirewall) { + continue + } + if _, err := tm.SaveTransport(ctx, rPK, ntype, transport.LabelAutomatic); err != nil { errSlice = append(errSlice, err) } } From 284e09e330a29fc9f9e29c6077ab895bc4a575ef Mon Sep 17 00:00:00 2001 From: Alexander Adhyatma Date: Mon, 29 Nov 2021 10:28:27 +0700 Subject: [PATCH 5/5] added retry, runs dmsg transport creation on specific scenarios only --- pkg/router/router.go | 2 ++ pkg/visor/init.go | 20 +++++++++++++------- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/pkg/router/router.go b/pkg/router/router.go index b57f8b11f..fb65b84c3 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -243,6 +243,8 @@ func (r *router) DialRoutes( lPK := r.conf.PubKey forwardDesc := routing.NewRouteDescriptor(lPK, rPK, lPort, rPort) + r.routeSetupHookMu.Lock() + defer r.routeSetupHookMu.Unlock() if len(r.routeSetupHooks) != 0 { for _, rsf := range r.routeSetupHooks { if err := rsf(rPK, r.tm); err != nil { diff --git a/pkg/visor/init.go b/pkg/visor/init.go index ca2f6f28b..197ba8c48 100644 --- a/pkg/visor/init.go +++ b/pkg/visor/init.go @@ -417,14 +417,17 @@ func initTransportSetup(ctx context.Context, v *Visor, log *logging.Logger) erro } func getRouteSetupHooks(ctx context.Context, v *Visor, log *logging.Logger) []router.RouteSetupHook { + retrier := dmsgnetutil.NewRetrier(log, time.Second, time.Second*20, 3, 1.3) return []router.RouteSetupHook{ func(rPK cipher.PubKey, tm *transport.Manager) error { dmsgFallback := func() error { - _, err := tm.SaveTransport(ctx, rPK, network.DMSG, transport.LabelAutomatic) - return err + return retrier.Do(ctx, func() error { + _, err := tm.SaveTransport(ctx, rPK, network.DMSG, transport.LabelAutomatic) + return err + }) } // check visor's AR transport cache - if v.transportsCache == nil { + if v.transportsCache == nil && !v.conf.Transport.PublicAutoconnect { // skips if there's no AR transports log.Warn("empty AR transports cache") return dmsgFallback() @@ -435,13 +438,12 @@ func getRouteSetupHooks(ctx context.Context, v *Visor, log *logging.Logger) []ro // check if automatic transport is available, if it does, // continue with route creation if v.conf.Transport.PublicAutoconnect { - // we return nil here, if there's no transport available it wlll be checked - // by the router itself next. + // we return nil here, router will use multi-hop STCPR rather than one hop DMSG return nil } return dmsgFallback() } - // try to establish direct connection to rPK (single hop) + // try to establish direct connection to rPK (single hop) using SUDPH or STCPR errSlice := make([]error, 0, 2) for _, trans := range transports { ntype := network.Type(trans) @@ -449,7 +451,11 @@ func getRouteSetupHooks(ctx context.Context, v *Visor, log *logging.Logger) []ro if ntype == network.SUDPH && (v.stunClient.NATType == stun.NATSymmetric || v.stunClient.NATType == stun.NATSymmetricUDPFirewall) { continue } - if _, err := tm.SaveTransport(ctx, rPK, ntype, transport.LabelAutomatic); err != nil { + err := retrier.Do(ctx, func() error { + _, err := tm.SaveTransport(ctx, rPK, ntype, transport.LabelAutomatic) + return err + }) + if err != nil { errSlice = append(errSlice, err) } }