Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Advanced autoconnection #1010

Merged
merged 6 commits into from
Nov 30, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 60 additions & 29 deletions pkg/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ var (
ErrNoTransportFound = errors.New("no transport found")
)

// RouteSetupHook is an alias for a function that takes remote public key
// and a reference to transport manager in order to setup i.e:
// 1. If the remote is either available stcpr or sudph, establish the transport to the remote and then continue with the route creation process.
// 2. If neither of these direct transports is available, check if automatic transports are currently active. If they are continue with route creation.
// 3. If none of the first two checks was successful, establish a dmsg transport and then continue with route creation.
type RouteSetupHook func(cipher.PubKey, *transport.Manager) error

// Config configures Router.
type Config struct {
Logger *logging.Logger
Expand Down Expand Up @@ -143,24 +150,26 @@ type Router interface {
// communicating with setup nodes, forward packets according to local
// rules and manages route groups for apps.
type router struct {
mx sync.Mutex
conf *Config
logger *logging.Logger
sl *dmsg.Listener
dmsgC *dmsg.Client
trustedVisors map[cipher.PubKey]struct{}
tm *transport.Manager
rt routing.Table
rgsNs map[routing.RouteDescriptor]*NoiseRouteGroup // Noise-wrapped route groups to push incoming reads from transports.
rgsRaw map[routing.RouteDescriptor]*RouteGroup // Not-yet-noise-wrapped route groups. when one of these gets wrapped, it gets removed from here
rpcSrv *rpc.Server
accept chan routing.EdgeRules
done chan struct{}
once sync.Once
mx sync.Mutex
conf *Config
logger *logging.Logger
sl *dmsg.Listener
dmsgC *dmsg.Client
trustedVisors map[cipher.PubKey]struct{}
tm *transport.Manager
rt routing.Table
rgsNs map[routing.RouteDescriptor]*NoiseRouteGroup // Noise-wrapped route groups to push incoming reads from transports.
rgsRaw map[routing.RouteDescriptor]*RouteGroup // Not-yet-noise-wrapped route groups. when one of these gets wrapped, it gets removed from here
rpcSrv *rpc.Server
accept chan routing.EdgeRules
done chan struct{}
once sync.Once
routeSetupHookMu sync.Mutex
routeSetupHooks []RouteSetupHook // see RouteSetupHook description
}

// New constructs a new Router.
func New(dmsgC *dmsg.Client, config *Config) (Router, error) {
func New(dmsgC *dmsg.Client, config *Config, routeSetupHooks []RouteSetupHook) (Router, error) {
config.SetDefaults()

sl, err := dmsgC.Listen(skyenv.DmsgAwaitSetupPort)
Expand All @@ -173,19 +182,24 @@ func New(dmsgC *dmsg.Client, config *Config) (Router, error) {
trustedVisors[node] = struct{}{}
}

if routeSetupHooks == nil {
routeSetupHooks = []RouteSetupHook{}
}

r := &router{
conf: config,
logger: config.Logger,
tm: config.TransportManager,
rt: routing.NewTable(),
sl: sl,
dmsgC: dmsgC,
rgsNs: make(map[routing.RouteDescriptor]*NoiseRouteGroup),
rgsRaw: make(map[routing.RouteDescriptor]*RouteGroup),
rpcSrv: rpc.NewServer(),
accept: make(chan routing.EdgeRules, acceptSize),
done: make(chan struct{}),
trustedVisors: trustedVisors,
conf: config,
logger: config.Logger,
tm: config.TransportManager,
rt: routing.NewTable(),
sl: sl,
dmsgC: dmsgC,
rgsNs: make(map[routing.RouteDescriptor]*NoiseRouteGroup),
rgsRaw: make(map[routing.RouteDescriptor]*RouteGroup),
rpcSrv: rpc.NewServer(),
accept: make(chan routing.EdgeRules, acceptSize),
done: make(chan struct{}),
trustedVisors: trustedVisors,
routeSetupHooks: routeSetupHooks,
}

go r.rulesGCLoop()
Expand All @@ -197,6 +211,14 @@ func New(dmsgC *dmsg.Client, config *Config) (Router, error) {
return r, nil
}

// RegisterSetupHooks takes variadic RouteSetupHook to add to router's setup functions
// currently not in use
func (r *router) RegisterSetupHooks(rshooks ...RouteSetupHook) {
r.routeSetupHookMu.Lock()
r.routeSetupHooks = append(r.routeSetupHooks, rshooks...)
r.routeSetupHookMu.Unlock()
}

// DialRoutes dials to a given visor of 'rPK'.
// 'lPort'/'rPort' specifies the local/remote ports respectively.
// A nil 'opts' input results in a value of '1' for all DialOptions fields.
Expand All @@ -221,6 +243,15 @@ func (r *router) DialRoutes(
lPK := r.conf.PubKey
forwardDesc := routing.NewRouteDescriptor(lPK, rPK, lPort, rPort)

if len(r.routeSetupHooks) != 0 {
for _, rsf := range r.routeSetupHooks {
if err := rsf(rPK, r.tm); err != nil {
return nil, err
}
}
}

// check if transports are available
ok := r.checkIfTransportAvalailable()
if !ok {
return nil, ErrNoTransportFound
Expand Down Expand Up @@ -268,7 +299,7 @@ func (r *router) DialRoutes(
return nrg, nil
}

// AcceptsRoutes should block until we receive an AddRules packet from SetupNode
// AcceptRoutes should block until we receive an AddRules packet from SetupNode
// that contains ConsumeRule(s) or ForwardRule(s).
// Then the following should happen:
// - Save to routing.Table and internal RouteGroup map.
Expand Down Expand Up @@ -1042,7 +1073,7 @@ func (r *router) removeRouteGroupOfRule(rule routing.Rule) {
func (r *router) checkIfTransportAvalailable() (ok bool) {
r.tm.WalkTransports(func(tp *transport.ManagedTransport) bool {
ok = true
return true
return ok
jdknives marked this conversation as resolved.
Show resolved Hide resolved
})
return ok
}
14 changes: 12 additions & 2 deletions pkg/servicedisc/autoconnect.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"time"

"github.com/sirupsen/logrus"
"github.com/skycoin/dmsg/cipher"
"github.com/skycoin/skycoin/src/util/logging"

Expand Down Expand Up @@ -77,17 +78,26 @@ func (a *autoconnector) Run(ctx context.Context) (err error) {
if !ok || val < maxFailedAddressRetryAttempt {
a.log.WithField("pk", pk).WithField("attempt", val).Debugln("Trying to add transport to public visor")
logger := a.log.WithField("pk", pk).WithField("type", string(network.STCPR))
if _, err := a.tm.SaveTransport(ctx, pk, network.STCPR, transport.LabelAutomatic); err != nil {
if err = a.tryEstablishTransport(ctx, pk, logger); err != nil {
logger.WithError(err).Warnln("Failed to add transport to public visor")
failedAddresses[pk]++
continue
}
logger.Infoln("Added transport to public visor")
}
}
}
}

// tryEstablish transport will try to establish transport to the remote pk via STCPR or SUDPH, if both failed, return error.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In its currnet form it does not try to establish an sudph transport it seems.

func (a *autoconnector) tryEstablishTransport(ctx context.Context, pk cipher.PubKey, logger *logrus.Entry) error {
if _, err := a.tm.SaveTransport(ctx, pk, network.STCPR, transport.LabelAutomatic); err != nil {
return err
}

logger.Debugln("Added transport to public visor")
return nil
}

func (a *autoconnector) fetchPubAddresses(ctx context.Context) ([]cipher.PubKey, error) {
retrier := netutil.NewRetrier(fetchServicesDelay, 5, 3, a.log)
var services []Service
Expand Down
55 changes: 55 additions & 0 deletions pkg/transport/network/addrresolver/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ var (
ErrNoEntry = errors.New("no entry for this PK")
// ErrNotReady is returned when address resolver is not ready.
ErrNotReady = errors.New("address resolver is not ready")
// ErrNoTransportsFound returned when no transports are found.
ErrNoTransportsFound = errors.New("failed to get response data from AR transports endpoint")
)

// Error is the object returned to the client when there's an error.
Expand All @@ -57,6 +59,7 @@ type APIClient interface {
BindSTCPR(ctx context.Context, port string) error
BindSUDPH(filter *pfilter.PacketFilter, handshake Handshake) (<-chan RemoteVisor, error)
Resolve(ctx context.Context, netType string, pk cipher.PubKey) (VisorData, error)
Transports(ctx context.Context) (map[cipher.PubKey][]string, error)
Close() error
}

Expand Down Expand Up @@ -377,6 +380,58 @@ func (c *httpClient) Resolve(ctx context.Context, tType string, pk cipher.PubKey
return resolveResp, nil
}

// Transports query available transports.
func (c *httpClient) Transports(ctx context.Context) (map[cipher.PubKey][]string, error) {
resp, err := c.Get(ctx, "/transports")
if err != nil {
return nil, err
}
defer func() {
if err = resp.Body.Close(); err != nil {
c.log.WithError(err).Warn("Failed to close response body")
}
}()

if resp.StatusCode != http.StatusOK {
c.log.Warn(ErrNoTransportsFound.Error())
return nil, ErrNoTransportsFound
}

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}

transportsMap := map[string][]string{}
if err = json.Unmarshal(body, &transportsMap); err != nil {
return nil, err
}

results := map[cipher.PubKey][]string{}

for k, pks := range transportsMap {
for _, pk := range pks {
rPK := cipher.PubKey{}
if err := rPK.Set(pk); err != nil {
c.log.WithError(err).Warn("unable to transform PK")
continue
}

// Two kinds of network, SUDPH and STCPR
if _, ok := results[rPK]; ok {
if len(results[rPK]) == 1 && k != results[rPK][0] {
results[rPK] = append(results[rPK], k)
}
} else {
nTypeSlice := make([]string, 0, 2)
nTypeSlice = append(nTypeSlice, k)
results[rPK] = nTypeSlice
}
}
}
return results, nil
}

func (c *httpClient) isReady() bool {
select {
case <-c.ready:
Expand Down
90 changes: 87 additions & 3 deletions pkg/visor/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,15 +169,52 @@ func initAddressResolver(ctx context.Context, v *Visor, log *logging.Logger) err

arClient, err := addrresolver.NewHTTP(conf.AddressResolver, v.conf.PK, v.conf.SK, log)
if err != nil {
err := fmt.Errorf("failed to create address resolver client: %w", err)
err = fmt.Errorf("failed to create address resolver client: %w", err)
return err
}

// initialize cache for available transports
m, err := arClient.Transports(ctx)
if err != nil {
log.Warn("failed to fetch transports from AR")
return err
}

v.initLock.Lock()
v.arClient = arClient
v.transportsCache = m
v.initLock.Unlock()

doneCh := make(chan struct{}, 1)
t := time.NewTicker(1 * time.Hour)
go fetchARTransports(ctx, v, log, doneCh, t)
v.pushCloseStack("address_resolver", func() error {
doneCh <- struct{}{}
return nil
})

return nil
}

func fetchARTransports(ctx context.Context, v *Visor, log *logging.Logger, doneCh <-chan struct{}, tick *time.Ticker) {
for {
select {
case <-tick.C:
log.Debug("Fetching PKs from AR")
m, err := v.arClient.Transports(ctx)
if err != nil {
log.WithError(err).Warn("failed to fetch AR transport")
}
v.transportCacheMu.Lock()
v.transportsCache = m
v.transportCacheMu.Unlock()
case <-doneCh:
tick.Stop()
return
}
}
}

func initDiscovery(ctx context.Context, v *Visor, log *logging.Logger) error {
// Prepare app discovery factory.
factory := appdisc.Factory{
Expand Down Expand Up @@ -379,6 +416,51 @@ func initTransportSetup(ctx context.Context, v *Visor, log *logging.Logger) erro
return nil
}

func getRouteSetupHooks(ctx context.Context, v *Visor, log *logging.Logger) []router.RouteSetupHook {
return []router.RouteSetupHook{
func(rPK cipher.PubKey, tm *transport.Manager) error {
dmsgFallback := func() error {
_, err := tm.SaveTransport(ctx, rPK, network.DMSG, transport.LabelAutomatic)
return err
}
// check visor's AR transport cache
if v.transportsCache == nil {
// skips if there's no AR transports
log.Warn("empty AR transports cache")
return dmsgFallback()
}
transports, ok := v.transportsCache[rPK]
if !ok {
log.WithField("pk", rPK.String()).Warn("pk not found in the transports cache")
// check if automatic transport is available, if it does,
// continue with route creation
if v.conf.Transport.PublicAutoconnect {
// we return nil here, if there's no transport available it wlll be checked
// by the router itself next.
return nil
}
return dmsgFallback()
}
// try to establish direct connection to rPK (single hop)
errSlice := make([]error, 0, 2)
for _, trans := range transports {
ntype := network.Type(trans)
// skip if SUDPH is under symmetric NAT / under UDP firewall.
if ntype == network.SUDPH && (v.stunClient.NATType == stun.NATSymmetric || v.stunClient.NATType == stun.NATSymmetricUDPFirewall) {
continue
}
if _, err := tm.SaveTransport(ctx, rPK, ntype, transport.LabelAutomatic); err != nil {
errSlice = append(errSlice, err)
}
}
if len(errSlice) != 2 {
return nil
}
return errors.New(errSlice[0].Error())
},
}
}

func initRouter(ctx context.Context, v *Visor, log *logging.Logger) error {
conf := v.conf.Routing
rfClient := rfclient.NewHTTP(conf.RouteFinder, time.Duration(conf.RouteFinderTimeout))
Expand All @@ -395,7 +477,9 @@ func initRouter(ctx context.Context, v *Visor, log *logging.Logger) error {
MinHops: v.conf.Routing.MinHops,
}

r, err := router.New(v.dmsgC, &rConf)
routeSetupHooks := getRouteSetupHooks(ctx, v, log)

r, err := router.New(v.dmsgC, &rConf, routeSetupHooks)
if err != nil {
err := fmt.Errorf("failed to create router: %w", err)
return err
Expand Down Expand Up @@ -655,7 +739,7 @@ func initPublicVisor(_ context.Context, v *Visor, log *logging.Logger) error {
logger.Warn("Failed to get STCPR port")
return nil
}
visorUpdater := v.serviceDisc.VisorUpdater(uint16(port))
visorUpdater := v.serviceDisc.VisorUpdater(port)
visorUpdater.Start()

v.log.Infof("Sent request to register visor as public")
Expand Down
Loading