Skip to content

Commit

Permalink
Merge pull request #1010 from alexadhy/advanced-autoconnection
Browse files Browse the repository at this point in the history
Advanced autoconnection
  • Loading branch information
jdknives authored Nov 30, 2021
2 parents b2dfd21 + 37a2305 commit 3b89de7
Show file tree
Hide file tree
Showing 5 changed files with 226 additions and 34 deletions.
91 changes: 62 additions & 29 deletions pkg/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ var (
ErrNoTransportFound = errors.New("no transport found")
)

// RouteSetupHook is an alias for a function that takes remote public key
// and a reference to transport manager in order to setup i.e:
// 1. If the remote is either available stcpr or sudph, establish the transport to the remote and then continue with the route creation process.
// 2. If neither of these direct transports is available, check if automatic transports are currently active. If they are continue with route creation.
// 3. If none of the first two checks was successful, establish a dmsg transport and then continue with route creation.
type RouteSetupHook func(cipher.PubKey, *transport.Manager) error

// Config configures Router.
type Config struct {
Logger *logging.Logger
Expand Down Expand Up @@ -143,24 +150,26 @@ type Router interface {
// communicating with setup nodes, forward packets according to local
// rules and manages route groups for apps.
type router struct {
mx sync.Mutex
conf *Config
logger *logging.Logger
sl *dmsg.Listener
dmsgC *dmsg.Client
trustedVisors map[cipher.PubKey]struct{}
tm *transport.Manager
rt routing.Table
rgsNs map[routing.RouteDescriptor]*NoiseRouteGroup // Noise-wrapped route groups to push incoming reads from transports.
rgsRaw map[routing.RouteDescriptor]*RouteGroup // Not-yet-noise-wrapped route groups. when one of these gets wrapped, it gets removed from here
rpcSrv *rpc.Server
accept chan routing.EdgeRules
done chan struct{}
once sync.Once
mx sync.Mutex
conf *Config
logger *logging.Logger
sl *dmsg.Listener
dmsgC *dmsg.Client
trustedVisors map[cipher.PubKey]struct{}
tm *transport.Manager
rt routing.Table
rgsNs map[routing.RouteDescriptor]*NoiseRouteGroup // Noise-wrapped route groups to push incoming reads from transports.
rgsRaw map[routing.RouteDescriptor]*RouteGroup // Not-yet-noise-wrapped route groups. when one of these gets wrapped, it gets removed from here
rpcSrv *rpc.Server
accept chan routing.EdgeRules
done chan struct{}
once sync.Once
routeSetupHookMu sync.Mutex
routeSetupHooks []RouteSetupHook // see RouteSetupHook description
}

// New constructs a new Router.
func New(dmsgC *dmsg.Client, config *Config) (Router, error) {
func New(dmsgC *dmsg.Client, config *Config, routeSetupHooks []RouteSetupHook) (Router, error) {
config.SetDefaults()

sl, err := dmsgC.Listen(skyenv.DmsgAwaitSetupPort)
Expand All @@ -173,19 +182,24 @@ func New(dmsgC *dmsg.Client, config *Config) (Router, error) {
trustedVisors[node] = struct{}{}
}

if routeSetupHooks == nil {
routeSetupHooks = []RouteSetupHook{}
}

r := &router{
conf: config,
logger: config.Logger,
tm: config.TransportManager,
rt: routing.NewTable(),
sl: sl,
dmsgC: dmsgC,
rgsNs: make(map[routing.RouteDescriptor]*NoiseRouteGroup),
rgsRaw: make(map[routing.RouteDescriptor]*RouteGroup),
rpcSrv: rpc.NewServer(),
accept: make(chan routing.EdgeRules, acceptSize),
done: make(chan struct{}),
trustedVisors: trustedVisors,
conf: config,
logger: config.Logger,
tm: config.TransportManager,
rt: routing.NewTable(),
sl: sl,
dmsgC: dmsgC,
rgsNs: make(map[routing.RouteDescriptor]*NoiseRouteGroup),
rgsRaw: make(map[routing.RouteDescriptor]*RouteGroup),
rpcSrv: rpc.NewServer(),
accept: make(chan routing.EdgeRules, acceptSize),
done: make(chan struct{}),
trustedVisors: trustedVisors,
routeSetupHooks: routeSetupHooks,
}

go r.rulesGCLoop()
Expand All @@ -197,6 +211,14 @@ func New(dmsgC *dmsg.Client, config *Config) (Router, error) {
return r, nil
}

// RegisterSetupHooks takes variadic RouteSetupHook to add to router's setup functions
// currently not in use
func (r *router) RegisterSetupHooks(rshooks ...RouteSetupHook) {
r.routeSetupHookMu.Lock()
r.routeSetupHooks = append(r.routeSetupHooks, rshooks...)
r.routeSetupHookMu.Unlock()
}

// DialRoutes dials to a given visor of 'rPK'.
// 'lPort'/'rPort' specifies the local/remote ports respectively.
// A nil 'opts' input results in a value of '1' for all DialOptions fields.
Expand All @@ -221,6 +243,17 @@ func (r *router) DialRoutes(
lPK := r.conf.PubKey
forwardDesc := routing.NewRouteDescriptor(lPK, rPK, lPort, rPort)

r.routeSetupHookMu.Lock()
defer r.routeSetupHookMu.Unlock()
if len(r.routeSetupHooks) != 0 {
for _, rsf := range r.routeSetupHooks {
if err := rsf(rPK, r.tm); err != nil {
return nil, err
}
}
}

// check if transports are available
ok := r.checkIfTransportAvalailable()
if !ok {
return nil, ErrNoTransportFound
Expand Down Expand Up @@ -268,7 +301,7 @@ func (r *router) DialRoutes(
return nrg, nil
}

// AcceptsRoutes should block until we receive an AddRules packet from SetupNode
// AcceptRoutes should block until we receive an AddRules packet from SetupNode
// that contains ConsumeRule(s) or ForwardRule(s).
// Then the following should happen:
// - Save to routing.Table and internal RouteGroup map.
Expand Down Expand Up @@ -1042,7 +1075,7 @@ func (r *router) removeRouteGroupOfRule(rule routing.Rule) {
func (r *router) checkIfTransportAvalailable() (ok bool) {
r.tm.WalkTransports(func(tp *transport.ManagedTransport) bool {
ok = true
return true
return ok
})
return ok
}
14 changes: 12 additions & 2 deletions pkg/servicedisc/autoconnect.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net/http"
"time"

"github.com/sirupsen/logrus"
"github.com/skycoin/dmsg/cipher"
"github.com/skycoin/skycoin/src/util/logging"

Expand Down Expand Up @@ -78,17 +79,26 @@ func (a *autoconnector) Run(ctx context.Context) (err error) {
if !ok || val < maxFailedAddressRetryAttempt {
a.log.WithField("pk", pk).WithField("attempt", val).Debugln("Trying to add transport to public visor")
logger := a.log.WithField("pk", pk).WithField("type", string(network.STCPR))
if _, err := a.tm.SaveTransport(ctx, pk, network.STCPR, transport.LabelAutomatic); err != nil {
if err = a.tryEstablishTransport(ctx, pk, logger); err != nil {
logger.WithError(err).Warnln("Failed to add transport to public visor")
failedAddresses[pk]++
continue
}
logger.Infoln("Added transport to public visor")
}
}
}
}

// tryEstablish transport will try to establish transport to the remote pk via STCPR or SUDPH, if both failed, return error.
func (a *autoconnector) tryEstablishTransport(ctx context.Context, pk cipher.PubKey, logger *logrus.Entry) error {
if _, err := a.tm.SaveTransport(ctx, pk, network.STCPR, transport.LabelAutomatic); err != nil {
return err
}

logger.Debugln("Added transport to public visor")
return nil
}

func (a *autoconnector) fetchPubAddresses(ctx context.Context) ([]cipher.PubKey, error) {
retrier := netutil.NewRetrier(fetchServicesDelay, 5, 3, a.log)
var services []Service
Expand Down
55 changes: 55 additions & 0 deletions pkg/transport/network/addrresolver/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ var (
ErrNoEntry = errors.New("no entry for this PK")
// ErrNotReady is returned when address resolver is not ready.
ErrNotReady = errors.New("address resolver is not ready")
// ErrNoTransportsFound returned when no transports are found.
ErrNoTransportsFound = errors.New("failed to get response data from AR transports endpoint")
)

// Error is the object returned to the client when there's an error.
Expand All @@ -57,6 +59,7 @@ type APIClient interface {
BindSTCPR(ctx context.Context, port string) error
BindSUDPH(filter *pfilter.PacketFilter, handshake Handshake) (<-chan RemoteVisor, error)
Resolve(ctx context.Context, netType string, pk cipher.PubKey) (VisorData, error)
Transports(ctx context.Context) (map[cipher.PubKey][]string, error)
Close() error
}

Expand Down Expand Up @@ -377,6 +380,58 @@ func (c *httpClient) Resolve(ctx context.Context, tType string, pk cipher.PubKey
return resolveResp, nil
}

// Transports query available transports.
func (c *httpClient) Transports(ctx context.Context) (map[cipher.PubKey][]string, error) {
resp, err := c.Get(ctx, "/transports")
if err != nil {
return nil, err
}
defer func() {
if err = resp.Body.Close(); err != nil {
c.log.WithError(err).Warn("Failed to close response body")
}
}()

if resp.StatusCode != http.StatusOK {
c.log.Warn(ErrNoTransportsFound.Error())
return nil, ErrNoTransportsFound
}

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}

transportsMap := map[string][]string{}
if err = json.Unmarshal(body, &transportsMap); err != nil {
return nil, err
}

results := map[cipher.PubKey][]string{}

for k, pks := range transportsMap {
for _, pk := range pks {
rPK := cipher.PubKey{}
if err := rPK.Set(pk); err != nil {
c.log.WithError(err).Warn("unable to transform PK")
continue
}

// Two kinds of network, SUDPH and STCPR
if _, ok := results[rPK]; ok {
if len(results[rPK]) == 1 && k != results[rPK][0] {
results[rPK] = append(results[rPK], k)
}
} else {
nTypeSlice := make([]string, 0, 2)
nTypeSlice = append(nTypeSlice, k)
results[rPK] = nTypeSlice
}
}
}
return results, nil
}

func (c *httpClient) isReady() bool {
select {
case <-c.ready:
Expand Down
96 changes: 93 additions & 3 deletions pkg/visor/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,15 +211,52 @@ func initAddressResolver(ctx context.Context, v *Visor, log *logging.Logger) err

arClient, err := addrresolver.NewHTTP(conf.AddressResolver, v.conf.PK, v.conf.SK, httpC, log)
if err != nil {
err := fmt.Errorf("failed to create address resolver client: %w", err)
err = fmt.Errorf("failed to create address resolver client: %w", err)
return err
}

// initialize cache for available transports
m, err := arClient.Transports(ctx)
if err != nil {
log.Warn("failed to fetch transports from AR")
return err
}

v.initLock.Lock()
v.arClient = arClient
v.transportsCache = m
v.initLock.Unlock()

doneCh := make(chan struct{}, 1)
t := time.NewTicker(1 * time.Hour)
go fetchARTransports(ctx, v, log, doneCh, t)
v.pushCloseStack("address_resolver", func() error {
doneCh <- struct{}{}
return nil
})

return nil
}

func fetchARTransports(ctx context.Context, v *Visor, log *logging.Logger, doneCh <-chan struct{}, tick *time.Ticker) {
for {
select {
case <-tick.C:
log.Debug("Fetching PKs from AR")
m, err := v.arClient.Transports(ctx)
if err != nil {
log.WithError(err).Warn("failed to fetch AR transport")
}
v.transportCacheMu.Lock()
v.transportsCache = m
v.transportCacheMu.Unlock()
case <-doneCh:
tick.Stop()
return
}
}
}

func initDiscovery(ctx context.Context, v *Visor, log *logging.Logger) error {
// Prepare app discovery factory.
factory := appdisc.Factory{
Expand Down Expand Up @@ -434,6 +471,57 @@ func initTransportSetup(ctx context.Context, v *Visor, log *logging.Logger) erro
return nil
}

func getRouteSetupHooks(ctx context.Context, v *Visor, log *logging.Logger) []router.RouteSetupHook {
retrier := dmsgnetutil.NewRetrier(log, time.Second, time.Second*20, 3, 1.3)
return []router.RouteSetupHook{
func(rPK cipher.PubKey, tm *transport.Manager) error {
dmsgFallback := func() error {
return retrier.Do(ctx, func() error {
_, err := tm.SaveTransport(ctx, rPK, network.DMSG, transport.LabelAutomatic)
return err
})
}
// check visor's AR transport cache
if v.transportsCache == nil && !v.conf.Transport.PublicAutoconnect {
// skips if there's no AR transports
log.Warn("empty AR transports cache")
return dmsgFallback()
}
transports, ok := v.transportsCache[rPK]
if !ok {
log.WithField("pk", rPK.String()).Warn("pk not found in the transports cache")
// check if automatic transport is available, if it does,
// continue with route creation
if v.conf.Transport.PublicAutoconnect {
// we return nil here, router will use multi-hop STCPR rather than one hop DMSG
return nil
}
return dmsgFallback()
}
// try to establish direct connection to rPK (single hop) using SUDPH or STCPR
errSlice := make([]error, 0, 2)
for _, trans := range transports {
ntype := network.Type(trans)
// skip if SUDPH is under symmetric NAT / under UDP firewall.
if ntype == network.SUDPH && (v.stunClient.NATType == stun.NATSymmetric || v.stunClient.NATType == stun.NATSymmetricUDPFirewall) {
continue
}
err := retrier.Do(ctx, func() error {
_, err := tm.SaveTransport(ctx, rPK, ntype, transport.LabelAutomatic)
return err
})
if err != nil {
errSlice = append(errSlice, err)
}
}
if len(errSlice) != 2 {
return nil
}
return errors.New(errSlice[0].Error())
},
}
}

func initRouter(ctx context.Context, v *Visor, log *logging.Logger) error {
conf := v.conf.Routing

Expand All @@ -456,7 +544,9 @@ func initRouter(ctx context.Context, v *Visor, log *logging.Logger) error {
MinHops: v.conf.Routing.MinHops,
}

r, err := router.New(v.dmsgC, &rConf)
routeSetupHooks := getRouteSetupHooks(ctx, v, log)

r, err := router.New(v.dmsgC, &rConf, routeSetupHooks)
if err != nil {
err := fmt.Errorf("failed to create router: %w", err)
return err
Expand Down Expand Up @@ -726,7 +816,7 @@ func initPublicVisor(_ context.Context, v *Visor, log *logging.Logger) error {
logger.Warn("Failed to get STCPR port")
return nil
}
visorUpdater := v.serviceDisc.VisorUpdater(uint16(port))
visorUpdater := v.serviceDisc.VisorUpdater(port)
visorUpdater.Start()

v.log.Infof("Sent request to register visor as public")
Expand Down
Loading

0 comments on commit 3b89de7

Please sign in to comment.