Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automate trusted visor functionality #434

Merged
merged 5 commits into from
Jul 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ require (
github.com/mmcloughlin/avo v0.0.0-20200523190732-4439b6b2c061 // indirect
github.com/pkg/profile v1.5.0
github.com/prometheus/client_golang v1.7.1
github.com/prometheus/common v0.10.0 // indirect
github.com/rakyll/statik v0.1.7
github.com/schollz/progressbar/v2 v2.15.0
github.com/shirou/gopsutil v2.20.5+incompatible
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ github.com/SkycoinProject/yamux v0.0.0-20191213015001-a36efeefbf6a/go.mod h1:IaE
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d h1:G0m3OIz70MZUWq3EgK3CesDbo8upS2Vm9/P3FtgI+Jk=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4 h1:Hs82Z41s6SdL1CELW+XaDYmOH4hkBN4/N9og/AsOv7E=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/andybalholm/brotli v0.0.0-20190621154722-5f990b63d2d6 h1:bZ28Hqta7TFAK3Q08CMvv8y3/8ATaEqv2nGoc6yff6c=
github.com/andybalholm/brotli v0.0.0-20190621154722-5f990b63d2d6/go.mod h1:+lx6/Aqd1kLJ1GQfkvOnaZ1WGmLpMpbprPuIOOZX30U=
Expand Down Expand Up @@ -362,6 +364,7 @@ google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miE
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
28 changes: 28 additions & 0 deletions internal/netutil/netutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package netutil

import "net"

// LocalAddresses returns a list of all local addresses
func LocalAddresses() ([]string, error) {
result := make([]string, 0)

addresses, err := net.InterfaceAddrs()
if err != nil {
return nil, err
}

for _, addr := range addresses {
switch v := addr.(type) {
case *net.IPNet:
if v.IP.IsGlobalUnicast() || v.IP.IsLoopback() {
result = append(result, v.IP.String())
}
case *net.IPAddr:
if v.IP.IsGlobalUnicast() || v.IP.IsLoopback() {
result = append(result, v.IP.String())
}
}
}

return result, nil
}
25 changes: 23 additions & 2 deletions pkg/app/appdisc/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,29 @@ func (f *Factory) setDefaults() {
}
}

// Updater obtains an updater based on the app name and configuration.
func (f *Factory) Updater(conf appcommon.ProcConfig) (Updater, bool) {
// VisorUpdater obtains a visor updater.
func (f *Factory) VisorUpdater(port uint16) Updater {
// Always return empty updater if keys are not set.
if f.setDefaults(); f.PK.Null() || f.SK.Null() {
return &emptyUpdater{}
}

conf := servicedisc.Config{
Type: servicedisc.ServiceTypeVisor,
PK: f.PK,
SK: f.SK,
Port: port,
DiscAddr: f.ProxyDisc,
}

return &serviceUpdater{
client: servicedisc.NewClient(f.Log, conf),
interval: f.UpdateInterval,
}
}

// AppUpdater obtains an app updater based on the app name and configuration.
func (f *Factory) AppUpdater(conf appcommon.ProcConfig) (Updater, bool) {
// Always return empty updater if keys are not set.
if f.setDefaults(); f.PK.Null() || f.SK.Null() {
return &emptyUpdater{}, false
Expand Down
2 changes: 1 addition & 1 deletion pkg/app/appserver/proc_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (m *procManager) Start(conf appcommon.ProcConfig) (appcommon.ProcID, error)
break
}

disc, ok := m.discF.Updater(conf)
disc, ok := m.discF.AppUpdater(conf)
if !ok {
log.WithField("appName", conf.AppName).
Debug("No app discovery associated with app.")
Expand Down
22 changes: 21 additions & 1 deletion pkg/servicedisc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"net/http"
"strings"
"sync"
"time"

Expand All @@ -14,6 +16,11 @@ import (
"github.com/SkycoinProject/skywire-mainnet/internal/httpauth"
)

var (
// ErrVisorUnreachable is returned when visor is unreachable.
ErrVisorUnreachable = errors.New("visor is unreachable")
)

// Config configures the HTTPClient.
type Config struct {
Type string
Expand All @@ -35,12 +42,17 @@ type HTTPClient struct {

// NewClient creates a new HTTPClient.
func NewClient(log logrus.FieldLogger, conf Config) *HTTPClient {
var stats *Stats
if conf.Type != ServiceTypeVisor {
stats = &Stats{ConnectedClients: 0}
}

return &HTTPClient{
log: log,
conf: conf,
entry: Service{
Addr: NewSWAddr(conf.PK, conf.Port),
Stats: &Stats{ConnectedClients: 0},
Stats: stats,
Type: conf.Type,
},
client: http.Client{},
Expand All @@ -62,10 +74,12 @@ func (c *HTTPClient) Auth(ctx context.Context) (*httpauth.Client, error) {
if c.auth != nil {
return c.auth, nil
}

auth, err := httpauth.NewClient(ctx, c.conf.DiscAddr, c.conf.PK, c.conf.SK)
if err != nil {
return nil, err
}

c.auth = auth
return auth, nil
}
Expand All @@ -81,6 +95,7 @@ func (c *HTTPClient) Services(ctx context.Context) (out []Service, err error) {
if err != nil {
return nil, err
}

if resp != nil {
defer func() {
if cErr := resp.Body.Close(); cErr != nil && err == nil {
Expand Down Expand Up @@ -187,6 +202,11 @@ func (c *HTTPClient) UpdateLoop(ctx context.Context, updateInterval time.Duratio
c.entryMx.Unlock()

if err != nil {
if strings.Contains(err.Error(), ErrVisorUnreachable.Error()) {
c.log.Errorf("Unable to register visor as public trusted as it's unreachable from WAN")
return
}

c.log.WithError(err).Warn("Failed to update service entry in discovery. Retrying...")
time.Sleep(time.Second * 10) // TODO(evanlinjin): Exponential backoff.
continue
Expand Down
2 changes: 2 additions & 0 deletions pkg/servicedisc/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ const (
ServiceTypeProxy = "proxy"
// ServiceTypeVPN stands for the VPN discovery.
ServiceTypeVPN = "vpn"
// ServiceTypeVisor stands for visor.
ServiceTypeVisor = "visor"
)

// Errors associated with service discovery types.
Expand Down
33 changes: 5 additions & 28 deletions pkg/snet/arclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ import (
"github.com/AudriusButkevicius/pfilter"
"github.com/SkycoinProject/dmsg"
"github.com/SkycoinProject/dmsg/cipher"
"github.com/SkycoinProject/dmsg/netutil"
dmsgnetutil "github.com/SkycoinProject/dmsg/netutil"
"github.com/SkycoinProject/skycoin/src/util/logging"
"github.com/xtaci/kcp-go"

"github.com/SkycoinProject/skywire-mainnet/internal/httpauth"
"github.com/SkycoinProject/skywire-mainnet/internal/netutil"
"github.com/SkycoinProject/skywire-mainnet/internal/packetfilter"
"github.com/SkycoinProject/skywire-mainnet/pkg/snet/directtp/tpconn"
"github.com/SkycoinProject/skywire-mainnet/pkg/snet/directtp/tphandshake"
Expand Down Expand Up @@ -121,7 +122,7 @@ func (c *httpClient) initHTTPClient() {
Warnf("Failed to connect to address resolver. STCPR/SUDPH services are temporarily unavailable. Retrying...")

retryLog := logging.MustGetLogger("snet.arclient.retrier")
retry := netutil.NewRetrier(retryLog, 1*time.Second, 10*time.Second, 0, 1)
retry := dmsgnetutil.NewRetrier(retryLog, 1*time.Second, 10*time.Second, 0, 1)

err := retry.Do(context.Background(), func() error {
httpAuthClient, err = httpauth.NewClient(context.Background(), c.remoteHTTPAddr, c.pk, c.sk)
Expand Down Expand Up @@ -193,7 +194,7 @@ func (c *httpClient) BindSTCPR(ctx context.Context, port string) error {
c.log.Infof("BindSTCPR: Address resolver became ready, binding")
}

addresses, err := localAddresses()
addresses, err := netutil.LocalAddresses()
if err != nil {
return err
}
Expand Down Expand Up @@ -247,7 +248,7 @@ func (c *httpClient) BindSUDPH(filter *pfilter.PacketFilter) (<-chan RemoteVisor
return nil, err
}

addresses, err := localAddresses()
addresses, err := netutil.LocalAddresses()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -469,27 +470,3 @@ func extractError(r io.Reader) error {

return errors.New(apiError.Error)
}

func localAddresses() ([]string, error) {
result := make([]string, 0)

addresses, err := net.InterfaceAddrs()
if err != nil {
return nil, err
}

for _, addr := range addresses {
switch v := addr.(type) {
case *net.IPNet:
if v.IP.IsGlobalUnicast() || v.IP.IsLoopback() {
result = append(result, v.IP.String())
}
case *net.IPAddr:
if v.IP.IsGlobalUnicast() || v.IP.IsLoopback() {
result = append(result, v.IP.String())
}
}
}

return result, nil
}
33 changes: 33 additions & 0 deletions pkg/snet/directtp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"net"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -44,6 +45,9 @@ var (
// ErrAlreadyListening is returned when transport is already listening.
ErrAlreadyListening = errors.New("already listening")

// ErrNotListening is returned when transport is not listening.
ErrNotListening = errors.New("not listening")

// ErrPortOccupied is returned when port is occupied.
ErrPortOccupied = errors.New("port is already occupied")
)
Expand All @@ -52,6 +56,7 @@ var (
type Client interface {
Dial(ctx context.Context, rPK cipher.PubKey, rPort uint16) (*tpconn.Conn, error)
Listen(lPort uint16) (*tplistener.Listener, error)
LocalAddr() (net.Addr, error)
Serve() error
Close() error
Type() string
Expand All @@ -75,6 +80,7 @@ type client struct {
log *logging.Logger
porter *porter.Porter
listener net.Listener
listening chan struct{}
listeners map[uint16]*tplistener.Listener // key: lPort
sudphPacketFilter *pfilter.PacketFilter
sudphListener net.PacketConn
Expand All @@ -89,6 +95,7 @@ func NewClient(conf Config) Client {
porter: porter.New(porter.MinEphemeral),
listeners: make(map[uint16]*tplistener.Listener),
done: make(chan struct{}),
listening: make(chan struct{}),
}
}

Expand All @@ -113,6 +120,7 @@ func (c *client) Serve() error {
}

c.listener = l
close(c.listening)

if c.conf.Type == tptypes.STCPR {
localAddr := c.listener.Addr().String()
Expand All @@ -132,6 +140,10 @@ func (c *client) Serve() error {

for {
if err := c.acceptConn(); err != nil {
if strings.Contains(err.Error(), io.EOF.Error()) {
continue // likely it's a dummy connection from service discovery
}

c.log.Warnf("failed to accept incoming connection: %v", err)

if !tphandshake.IsHandshakeError(err) {
Expand All @@ -145,6 +157,27 @@ func (c *client) Serve() error {
return nil
}

func (c *client) LocalAddr() (net.Addr, error) {
<-c.listening

switch c.conf.Type {
case tptypes.STCP, tptypes.STCPR:
if c.listener == nil {
return nil, ErrNotListening
}

return c.listener.Addr(), nil
case tptypes.SUDPH:
if c.sudphListener == nil {
return nil, ErrNotListening
}

return c.listener.Addr(), nil
}

return nil, ErrUnknownTransportType
}

func (c *client) acceptConn() error {
if c.isClosed() {
return io.ErrClosedPipe
Expand Down
Loading