diff --git a/pkg/router/router.go b/pkg/router/router.go index 08fad3857d..fb65b84c37 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -52,6 +52,13 @@ var ( ErrNoTransportFound = errors.New("no transport found") ) +// RouteSetupHook is an alias for a function that takes remote public key +// and a reference to transport manager in order to setup i.e: +// 1. If the remote is either available stcpr or sudph, establish the transport to the remote and then continue with the route creation process. +// 2. If neither of these direct transports is available, check if automatic transports are currently active. If they are continue with route creation. +// 3. If none of the first two checks was successful, establish a dmsg transport and then continue with route creation. +type RouteSetupHook func(cipher.PubKey, *transport.Manager) error + // Config configures Router. type Config struct { Logger *logging.Logger @@ -143,24 +150,26 @@ type Router interface { // communicating with setup nodes, forward packets according to local // rules and manages route groups for apps. type router struct { - mx sync.Mutex - conf *Config - logger *logging.Logger - sl *dmsg.Listener - dmsgC *dmsg.Client - trustedVisors map[cipher.PubKey]struct{} - tm *transport.Manager - rt routing.Table - rgsNs map[routing.RouteDescriptor]*NoiseRouteGroup // Noise-wrapped route groups to push incoming reads from transports. - rgsRaw map[routing.RouteDescriptor]*RouteGroup // Not-yet-noise-wrapped route groups. when one of these gets wrapped, it gets removed from here - rpcSrv *rpc.Server - accept chan routing.EdgeRules - done chan struct{} - once sync.Once + mx sync.Mutex + conf *Config + logger *logging.Logger + sl *dmsg.Listener + dmsgC *dmsg.Client + trustedVisors map[cipher.PubKey]struct{} + tm *transport.Manager + rt routing.Table + rgsNs map[routing.RouteDescriptor]*NoiseRouteGroup // Noise-wrapped route groups to push incoming reads from transports. + rgsRaw map[routing.RouteDescriptor]*RouteGroup // Not-yet-noise-wrapped route groups. when one of these gets wrapped, it gets removed from here + rpcSrv *rpc.Server + accept chan routing.EdgeRules + done chan struct{} + once sync.Once + routeSetupHookMu sync.Mutex + routeSetupHooks []RouteSetupHook // see RouteSetupHook description } // New constructs a new Router. -func New(dmsgC *dmsg.Client, config *Config) (Router, error) { +func New(dmsgC *dmsg.Client, config *Config, routeSetupHooks []RouteSetupHook) (Router, error) { config.SetDefaults() sl, err := dmsgC.Listen(skyenv.DmsgAwaitSetupPort) @@ -173,19 +182,24 @@ func New(dmsgC *dmsg.Client, config *Config) (Router, error) { trustedVisors[node] = struct{}{} } + if routeSetupHooks == nil { + routeSetupHooks = []RouteSetupHook{} + } + r := &router{ - conf: config, - logger: config.Logger, - tm: config.TransportManager, - rt: routing.NewTable(), - sl: sl, - dmsgC: dmsgC, - rgsNs: make(map[routing.RouteDescriptor]*NoiseRouteGroup), - rgsRaw: make(map[routing.RouteDescriptor]*RouteGroup), - rpcSrv: rpc.NewServer(), - accept: make(chan routing.EdgeRules, acceptSize), - done: make(chan struct{}), - trustedVisors: trustedVisors, + conf: config, + logger: config.Logger, + tm: config.TransportManager, + rt: routing.NewTable(), + sl: sl, + dmsgC: dmsgC, + rgsNs: make(map[routing.RouteDescriptor]*NoiseRouteGroup), + rgsRaw: make(map[routing.RouteDescriptor]*RouteGroup), + rpcSrv: rpc.NewServer(), + accept: make(chan routing.EdgeRules, acceptSize), + done: make(chan struct{}), + trustedVisors: trustedVisors, + routeSetupHooks: routeSetupHooks, } go r.rulesGCLoop() @@ -197,6 +211,14 @@ func New(dmsgC *dmsg.Client, config *Config) (Router, error) { return r, nil } +// RegisterSetupHooks takes variadic RouteSetupHook to add to router's setup functions +// currently not in use +func (r *router) RegisterSetupHooks(rshooks ...RouteSetupHook) { + r.routeSetupHookMu.Lock() + r.routeSetupHooks = append(r.routeSetupHooks, rshooks...) + r.routeSetupHookMu.Unlock() +} + // DialRoutes dials to a given visor of 'rPK'. // 'lPort'/'rPort' specifies the local/remote ports respectively. // A nil 'opts' input results in a value of '1' for all DialOptions fields. @@ -221,6 +243,17 @@ func (r *router) DialRoutes( lPK := r.conf.PubKey forwardDesc := routing.NewRouteDescriptor(lPK, rPK, lPort, rPort) + r.routeSetupHookMu.Lock() + defer r.routeSetupHookMu.Unlock() + if len(r.routeSetupHooks) != 0 { + for _, rsf := range r.routeSetupHooks { + if err := rsf(rPK, r.tm); err != nil { + return nil, err + } + } + } + + // check if transports are available ok := r.checkIfTransportAvalailable() if !ok { return nil, ErrNoTransportFound @@ -268,7 +301,7 @@ func (r *router) DialRoutes( return nrg, nil } -// AcceptsRoutes should block until we receive an AddRules packet from SetupNode +// AcceptRoutes should block until we receive an AddRules packet from SetupNode // that contains ConsumeRule(s) or ForwardRule(s). // Then the following should happen: // - Save to routing.Table and internal RouteGroup map. @@ -1042,7 +1075,7 @@ func (r *router) removeRouteGroupOfRule(rule routing.Rule) { func (r *router) checkIfTransportAvalailable() (ok bool) { r.tm.WalkTransports(func(tp *transport.ManagedTransport) bool { ok = true - return true + return ok }) return ok } diff --git a/pkg/servicedisc/autoconnect.go b/pkg/servicedisc/autoconnect.go index 90a501efae..1195160f5e 100644 --- a/pkg/servicedisc/autoconnect.go +++ b/pkg/servicedisc/autoconnect.go @@ -5,6 +5,7 @@ import ( "net/http" "time" + "github.com/sirupsen/logrus" "github.com/skycoin/dmsg/cipher" "github.com/skycoin/skycoin/src/util/logging" @@ -78,17 +79,26 @@ func (a *autoconnector) Run(ctx context.Context) (err error) { if !ok || val < maxFailedAddressRetryAttempt { a.log.WithField("pk", pk).WithField("attempt", val).Debugln("Trying to add transport to public visor") logger := a.log.WithField("pk", pk).WithField("type", string(network.STCPR)) - if _, err := a.tm.SaveTransport(ctx, pk, network.STCPR, transport.LabelAutomatic); err != nil { + if err = a.tryEstablishTransport(ctx, pk, logger); err != nil { logger.WithError(err).Warnln("Failed to add transport to public visor") failedAddresses[pk]++ continue } - logger.Infoln("Added transport to public visor") } } } } +// tryEstablish transport will try to establish transport to the remote pk via STCPR or SUDPH, if both failed, return error. +func (a *autoconnector) tryEstablishTransport(ctx context.Context, pk cipher.PubKey, logger *logrus.Entry) error { + if _, err := a.tm.SaveTransport(ctx, pk, network.STCPR, transport.LabelAutomatic); err != nil { + return err + } + + logger.Debugln("Added transport to public visor") + return nil +} + func (a *autoconnector) fetchPubAddresses(ctx context.Context) ([]cipher.PubKey, error) { retrier := netutil.NewRetrier(fetchServicesDelay, 5, 3, a.log) var services []Service diff --git a/pkg/transport/network/addrresolver/client.go b/pkg/transport/network/addrresolver/client.go index b59ed40e16..856b60d189 100644 --- a/pkg/transport/network/addrresolver/client.go +++ b/pkg/transport/network/addrresolver/client.go @@ -43,6 +43,8 @@ var ( ErrNoEntry = errors.New("no entry for this PK") // ErrNotReady is returned when address resolver is not ready. ErrNotReady = errors.New("address resolver is not ready") + // ErrNoTransportsFound returned when no transports are found. + ErrNoTransportsFound = errors.New("failed to get response data from AR transports endpoint") ) // Error is the object returned to the client when there's an error. @@ -57,6 +59,7 @@ type APIClient interface { BindSTCPR(ctx context.Context, port string) error BindSUDPH(filter *pfilter.PacketFilter, handshake Handshake) (<-chan RemoteVisor, error) Resolve(ctx context.Context, netType string, pk cipher.PubKey) (VisorData, error) + Transports(ctx context.Context) (map[cipher.PubKey][]string, error) Close() error } @@ -377,6 +380,58 @@ func (c *httpClient) Resolve(ctx context.Context, tType string, pk cipher.PubKey return resolveResp, nil } +// Transports query available transports. +func (c *httpClient) Transports(ctx context.Context) (map[cipher.PubKey][]string, error) { + resp, err := c.Get(ctx, "/transports") + if err != nil { + return nil, err + } + defer func() { + if err = resp.Body.Close(); err != nil { + c.log.WithError(err).Warn("Failed to close response body") + } + }() + + if resp.StatusCode != http.StatusOK { + c.log.Warn(ErrNoTransportsFound.Error()) + return nil, ErrNoTransportsFound + } + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + transportsMap := map[string][]string{} + if err = json.Unmarshal(body, &transportsMap); err != nil { + return nil, err + } + + results := map[cipher.PubKey][]string{} + + for k, pks := range transportsMap { + for _, pk := range pks { + rPK := cipher.PubKey{} + if err := rPK.Set(pk); err != nil { + c.log.WithError(err).Warn("unable to transform PK") + continue + } + + // Two kinds of network, SUDPH and STCPR + if _, ok := results[rPK]; ok { + if len(results[rPK]) == 1 && k != results[rPK][0] { + results[rPK] = append(results[rPK], k) + } + } else { + nTypeSlice := make([]string, 0, 2) + nTypeSlice = append(nTypeSlice, k) + results[rPK] = nTypeSlice + } + } + } + return results, nil +} + func (c *httpClient) isReady() bool { select { case <-c.ready: diff --git a/pkg/visor/init.go b/pkg/visor/init.go index 1828db2922..34a9c71cd0 100644 --- a/pkg/visor/init.go +++ b/pkg/visor/init.go @@ -206,15 +206,52 @@ func initAddressResolver(ctx context.Context, v *Visor, log *logging.Logger) err arClient, err := addrresolver.NewHTTP(conf.AddressResolver, v.conf.PK, v.conf.SK, httpC, log) if err != nil { - err := fmt.Errorf("failed to create address resolver client: %w", err) + err = fmt.Errorf("failed to create address resolver client: %w", err) return err } + + // initialize cache for available transports + m, err := arClient.Transports(ctx) + if err != nil { + log.Warn("failed to fetch transports from AR") + return err + } + v.initLock.Lock() v.arClient = arClient + v.transportsCache = m v.initLock.Unlock() + + doneCh := make(chan struct{}, 1) + t := time.NewTicker(1 * time.Hour) + go fetchARTransports(ctx, v, log, doneCh, t) + v.pushCloseStack("address_resolver", func() error { + doneCh <- struct{}{} + return nil + }) + return nil } +func fetchARTransports(ctx context.Context, v *Visor, log *logging.Logger, doneCh <-chan struct{}, tick *time.Ticker) { + for { + select { + case <-tick.C: + log.Debug("Fetching PKs from AR") + m, err := v.arClient.Transports(ctx) + if err != nil { + log.WithError(err).Warn("failed to fetch AR transport") + } + v.transportCacheMu.Lock() + v.transportsCache = m + v.transportCacheMu.Unlock() + case <-doneCh: + tick.Stop() + return + } + } +} + func initDiscovery(ctx context.Context, v *Visor, log *logging.Logger) error { // Prepare app discovery factory. factory := appdisc.Factory{ @@ -429,6 +466,57 @@ func initTransportSetup(ctx context.Context, v *Visor, log *logging.Logger) erro return nil } +func getRouteSetupHooks(ctx context.Context, v *Visor, log *logging.Logger) []router.RouteSetupHook { + retrier := dmsgnetutil.NewRetrier(log, time.Second, time.Second*20, 3, 1.3) + return []router.RouteSetupHook{ + func(rPK cipher.PubKey, tm *transport.Manager) error { + dmsgFallback := func() error { + return retrier.Do(ctx, func() error { + _, err := tm.SaveTransport(ctx, rPK, network.DMSG, transport.LabelAutomatic) + return err + }) + } + // check visor's AR transport cache + if v.transportsCache == nil && !v.conf.Transport.PublicAutoconnect { + // skips if there's no AR transports + log.Warn("empty AR transports cache") + return dmsgFallback() + } + transports, ok := v.transportsCache[rPK] + if !ok { + log.WithField("pk", rPK.String()).Warn("pk not found in the transports cache") + // check if automatic transport is available, if it does, + // continue with route creation + if v.conf.Transport.PublicAutoconnect { + // we return nil here, router will use multi-hop STCPR rather than one hop DMSG + return nil + } + return dmsgFallback() + } + // try to establish direct connection to rPK (single hop) using SUDPH or STCPR + errSlice := make([]error, 0, 2) + for _, trans := range transports { + ntype := network.Type(trans) + // skip if SUDPH is under symmetric NAT / under UDP firewall. + if ntype == network.SUDPH && (v.stunClient.NATType == stun.NATSymmetric || v.stunClient.NATType == stun.NATSymmetricUDPFirewall) { + continue + } + err := retrier.Do(ctx, func() error { + _, err := tm.SaveTransport(ctx, rPK, ntype, transport.LabelAutomatic) + return err + }) + if err != nil { + errSlice = append(errSlice, err) + } + } + if len(errSlice) != 2 { + return nil + } + return errors.New(errSlice[0].Error()) + }, + } +} + func initRouter(ctx context.Context, v *Visor, log *logging.Logger) error { conf := v.conf.Routing @@ -451,7 +539,9 @@ func initRouter(ctx context.Context, v *Visor, log *logging.Logger) error { MinHops: v.conf.Routing.MinHops, } - r, err := router.New(v.dmsgC, &rConf) + routeSetupHooks := getRouteSetupHooks(ctx, v, log) + + r, err := router.New(v.dmsgC, &rConf, routeSetupHooks) if err != nil { err := fmt.Errorf("failed to create router: %w", err) return err @@ -716,7 +806,7 @@ func initPublicVisor(_ context.Context, v *Visor, log *logging.Logger) error { logger.Warn("Failed to get STCPR port") return nil } - visorUpdater := v.serviceDisc.VisorUpdater(uint16(port)) + visorUpdater := v.serviceDisc.VisorUpdater(port) visorUpdater.Start() v.log.Infof("Sent request to register visor as public") diff --git a/pkg/visor/visor.go b/pkg/visor/visor.go index a28d2ea84d..c18c5799b8 100644 --- a/pkg/visor/visor.go +++ b/pkg/visor/visor.go @@ -10,6 +10,7 @@ import ( "time" "github.com/skycoin/dmsg" + "github.com/skycoin/dmsg/cipher" "github.com/skycoin/dmsg/direct" "github.com/skycoin/skycoin/src/util/logging" @@ -81,6 +82,8 @@ type Visor struct { runtimeErrors chan error isServicesHealthy *internalHealthInfo + transportCacheMu *sync.Mutex + transportsCache map[cipher.PubKey][]string } // todo: consider moving module closing to the module system @@ -112,6 +115,7 @@ func NewVisor(conf *visorconfig.V1, restartCtx *restart.Context) (*Visor, bool) initLock: new(sync.Mutex), isServicesHealthy: newInternalHealthInfo(), wgTrackers: new(sync.WaitGroup), + transportCacheMu: new(sync.Mutex), } v.wgTrackers.Add(1) defer v.wgTrackers.Done()