diff --git a/internal/utclient/client.go b/internal/utclient/client.go index 1cb27348f..5c6c1ef88 100644 --- a/internal/utclient/client.go +++ b/internal/utclient/client.go @@ -12,10 +12,13 @@ import ( "net/http" "github.com/skycoin/dmsg/cipher" + "github.com/skycoin/skycoin/src/util/logging" "github.com/skycoin/skywire/internal/httpauth" ) +var log = logging.MustGetLogger("utclient") + // Error is the object returned to the client when there's an error. type Error struct { Error string `json:"error"` @@ -61,10 +64,16 @@ func (c *httpClient) Get(ctx context.Context, path string) (*http.Response, erro // UpdateNodeUptime updates node uptime. func (c *httpClient) UpdateNodeUptime(ctx context.Context) error { resp, err := c.Get(ctx, "/update") + if resp != nil { + defer func() { + if err := resp.Body.Close(); err != nil { + log.WithError(err).Warn("Failed to close response body") + } + }() + } if err != nil { return err } - defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return fmt.Errorf("status: %d, error: %v", resp.StatusCode, extractError(resp.Body)) diff --git a/internal/utclient/client_test.go b/internal/utclient/client_test.go index f3e16e337..e72afa6b4 100644 --- a/internal/utclient/client_test.go +++ b/internal/utclient/client_test.go @@ -30,7 +30,9 @@ func TestClientAuth(t *testing.T) { headerCh <- r.Header case fmt.Sprintf("/security/nonces/%s", testPubKey): - fmt.Fprintf(w, `{"edge": "%s", "next_nonce": 1}`, testPubKey) + if _, err := fmt.Fprintf(w, `{"edge": "%s", "next_nonce": 1}`, testPubKey); err != nil { + t.Errorf("Failed to write nonce response: %s", err) + } default: t.Errorf("Don't know how to handle URL = '%s'", url) @@ -75,7 +77,9 @@ func authHandler(next http.Handler) http.Handler { m := http.NewServeMux() m.Handle("/security/nonces/", http.HandlerFunc( func(w http.ResponseWriter, r *http.Request) { - json.NewEncoder(w).Encode(&httpauth.NextNonceResponse{Edge: testPubKey, NextNonce: 1}) // nolint: errcheck + if err := json.NewEncoder(w).Encode(&httpauth.NextNonceResponse{Edge: testPubKey, NextNonce: 1}); err != nil { + log.WithError(err).Error("Failed to encode nonce response") + } }, )) m.Handle("/", next) diff --git a/pkg/router/route_manager.go b/pkg/router/route_manager.go index fea59389c..fb927519f 100644 --- a/pkg/router/route_manager.go +++ b/pkg/router/route_manager.go @@ -17,6 +17,7 @@ import ( "github.com/skycoin/skywire/pkg/snet" ) +// RMConfig represents route manager configuration. type RMConfig struct { SetupPKs []cipher.PubKey // Trusted setup PKs. GarbageCollectDuration time.Duration @@ -24,6 +25,7 @@ type RMConfig struct { OnLoopClosed func(loop routing.Loop) error } +// SetupIsTrusted checks if setup node is trusted. func (sc RMConfig) SetupIsTrusted(sPK cipher.PubKey) bool { for _, pk := range sc.SetupPKs { if sPK == pk { @@ -33,6 +35,7 @@ func (sc RMConfig) SetupIsTrusted(sPK cipher.PubKey) bool { return false } +// routeManager represents route manager. type routeManager struct { Logger *logging.Logger conf RMConfig @@ -42,8 +45,8 @@ type routeManager struct { done chan struct{} } -// NewRouteManager creates a new route manager. -func NewRouteManager(n *snet.Network, rt routing.Table, config RMConfig) (*routeManager, error) { +// newRouteManager creates a new route manager. +func newRouteManager(n *snet.Network, rt routing.Table, config RMConfig) (*routeManager, error) { sl, err := n.Listen(snet.DmsgType, snet.AwaitSetupPort) if err != nil { return nil, err @@ -58,11 +61,13 @@ func NewRouteManager(n *snet.Network, rt routing.Table, config RMConfig) (*route }, nil } +// Close closes route manager. func (rm *routeManager) Close() error { close(rm.done) return rm.sl.Close() } +// Serve initiates serving connections by route manager. func (rm *routeManager) Serve() { // Routing table garbage collect loop. go rm.rtGarbageCollectLoop() @@ -97,7 +102,11 @@ func (rm *routeManager) serveConn() error { } func (rm *routeManager) handleSetupConn(conn net.Conn) error { - defer func() { _ = conn.Close() }() //nolint:errcheck + defer func() { + if err := conn.Close(); err != nil { + log.WithError(err).Warn("Failed to close connection") + } + }() proto := setup.NewSetupProtocol(conn) t, body, err := proto.ReadPacket() @@ -148,7 +157,7 @@ func (rm *routeManager) rtGarbageCollectLoop() { } } -func (rm *routeManager) dialSetupConn(ctx context.Context) (*snet.Conn, error) { +func (rm *routeManager) dialSetupConn(_ context.Context) (*snet.Conn, error) { for _, sPK := range rm.conf.SetupPKs { conn, err := rm.n.Dial(snet.DmsgType, sPK, snet.SetupPort) if err != nil { @@ -160,6 +169,7 @@ func (rm *routeManager) dialSetupConn(ctx context.Context) (*snet.Conn, error) { return nil, errors.New("failed to dial to a setup node") } +// GetRule gets routing rule. func (rm *routeManager) GetRule(routeID routing.RouteID) (routing.Rule, error) { rule, err := rm.rt.Rule(routeID) if err != nil { @@ -183,6 +193,7 @@ func (rm *routeManager) GetRule(routeID routing.RouteID) (routing.Rule, error) { return rule, nil } +// RemoveLoopRule removes loop rule. func (rm *routeManager) RemoveLoopRule(loop routing.Loop) error { var appRouteID routing.RouteID var appRule routing.Rule diff --git a/pkg/router/route_manager_test.go b/pkg/router/route_manager_test.go index bd371ad47..f8624f3ea 100644 --- a/pkg/router/route_manager_test.go +++ b/pkg/router/route_manager_test.go @@ -25,7 +25,7 @@ func TestNewRouteManager(t *testing.T) { rt := routing.InMemoryRoutingTable() - rm, err := NewRouteManager(env.Nets[0], rt, RMConfig{}) + rm, err := newRouteManager(env.Nets[0], rt, RMConfig{}) require.NoError(t, err) defer func() { require.NoError(t, rm.Close()) }() diff --git a/pkg/router/router.go b/pkg/router/router.go index 57f96d7c5..4cd89fb43 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -89,7 +89,7 @@ func New(n *snet.Network, config *Config) (*Router, error) { } // Prepare route manager. - rm, err := NewRouteManager(n, config.RoutingTable, RMConfig{ + rm, err := newRouteManager(n, config.RoutingTable, RMConfig{ SetupPKs: config.SetupNodes, GarbageCollectDuration: config.GarbageCollectDuration, OnConfirmLoop: r.confirmLoop, @@ -428,6 +428,7 @@ fetchRoutesAgain: return fwdRoutes[0], revRoutes[0], nil } +// SetupIsTrusted checks if setup node is trusted. func (r *Router) SetupIsTrusted(sPK cipher.PubKey) bool { return r.rm.conf.SetupIsTrusted(sPK) } diff --git a/pkg/setup/node.go b/pkg/setup/node.go index e2a6c0aa6..caa566a4a 100644 --- a/pkg/setup/node.go +++ b/pkg/setup/node.go @@ -304,6 +304,7 @@ func (sn *Node) createRoute(ctx context.Context, expireAt time.Time, route routi rulesSetupErr = err } } + cancelOnce.Do(cancel) // close chan to avoid leaks close(rulesSetupErrs) diff --git a/pkg/setup/node_test.go b/pkg/setup/node_test.go index cc2cf1af8..f1ce282d0 100644 --- a/pkg/setup/node_test.go +++ b/pkg/setup/node_test.go @@ -105,7 +105,11 @@ func TestMain(m *testing.M) { dmsgL: listener, metrics: metrics.NewDummy(), } - go func() { _ = sn.Serve(context.TODO()) }() //nolint:errcheck + go func() { + if err := sn.Serve(context.TODO()); err != nil { + sn.Logger.WithError(err).Error("Failed to serve") + } + }() return sn, func() { require.NoError(t, sn.Close()) } @@ -208,7 +212,8 @@ func TestMain(m *testing.M) { } // TODO: This error is not checked due to a bug in dmsg. - _ = proto.WritePacket(RespSuccess, nil) //nolint:errcheck + err = proto.WritePacket(RespSuccess, nil) + _ = err fmt.Printf("client %v:%v responded for PacketAddRules\n", client, clients[client].Addr) @@ -242,7 +247,8 @@ func TestMain(m *testing.M) { } // TODO: This error is not checked due to a bug in dmsg. - _ = proto.WritePacket(RespSuccess, nil) //nolint:errcheck + err = proto.WritePacket(RespSuccess, nil) + _ = err require.NoError(t, tp.Close()) } @@ -331,7 +337,8 @@ func TestMain(m *testing.M) { require.Equal(t, ld.Loop.Local, d.Loop.Remote) // TODO: This error is not checked due to a bug in dmsg. - _ = proto.WritePacket(RespSuccess, nil) //nolint:errcheck + err = proto.WritePacket(RespSuccess, nil) + _ = err }) }*/ diff --git a/pkg/snet/network.go b/pkg/snet/network.go index 7d81c1b39..342ff6bf5 100644 --- a/pkg/snet/network.go +++ b/pkg/snet/network.go @@ -29,9 +29,11 @@ const ( ) var ( + // ErrUnknownNetwork occurs on attempt to dial an unknown network type. ErrUnknownNetwork = errors.New("unknown network type") ) +// Config represents a network configuration. type Config struct { PubKey cipher.PubKey SecKey cipher.SecKey @@ -41,12 +43,13 @@ type Config struct { DmsgMinSrvs int } -// Network represents +// Network represents a network between nodes in Skywire. type Network struct { conf Config dmsgC *dmsg.Client } +// New creates a network from a config. func New(conf Config) *Network { dmsgC := dmsg.NewClient(conf.PubKey, conf.SecKey, disc.NewHTTP(conf.DmsgDiscAddr), dmsg.SetLogger(logging.MustGetLogger("snet.dmsgC"))) return &Network{ @@ -55,6 +58,7 @@ func New(conf Config) *Network { } } +// NewRaw creates a network from a config and a dmsg client. func NewRaw(conf Config, dmsgC *dmsg.Client) *Network { return &Network{ conf: conf, @@ -62,6 +66,7 @@ func NewRaw(conf Config, dmsgC *dmsg.Client) *Network { } } +// Init initiates server connections. func (n *Network) Init(ctx context.Context) error { fmt.Println("dmsg: min_servers:", n.conf.DmsgMinSrvs) if err := n.dmsgC.InitiateServerConnections(ctx, n.conf.DmsgMinSrvs); err != nil { @@ -70,6 +75,7 @@ func (n *Network) Init(ctx context.Context) error { return nil } +// Close closes underlying connections. func (n *Network) Close() error { wg := new(sync.WaitGroup) wg.Add(1) @@ -87,15 +93,19 @@ func (n *Network) Close() error { return nil } +// LocalPK returns local public key. func (n *Network) LocalPK() cipher.PubKey { return n.conf.PubKey } +// LocalSK returns local secure key. func (n *Network) LocalSK() cipher.SecKey { return n.conf.SecKey } // TransportNetworks returns network types that are used for transports. func (n *Network) TransportNetworks() []string { return n.conf.TpNetworks } +// Dmsg returns underlying dmsg client. func (n *Network) Dmsg() *dmsg.Client { return n.dmsgC } +// Dial dials a node by its public key and returns a connection. func (n *Network) Dial(network string, pk cipher.PubKey, port uint16) (*Conn, error) { ctx := context.Background() switch network { @@ -110,6 +120,7 @@ func (n *Network) Dial(network string, pk cipher.PubKey, port uint16) (*Conn, er } } +// Listen listens on the specified port. func (n *Network) Listen(network string, port uint16) (*Listener, error) { switch network { case DmsgType: @@ -123,6 +134,7 @@ func (n *Network) Listen(network string, port uint16) (*Listener, error) { } } +// Listener represents a listener. type Listener struct { net.Listener lPK cipher.PubKey @@ -135,10 +147,16 @@ func makeListener(l net.Listener, network string) *Listener { return &Listener{Listener: l, lPK: lPK, lPort: lPort, network: network} } +// LocalPK returns a local public key of listener. func (l Listener) LocalPK() cipher.PubKey { return l.lPK } -func (l Listener) LocalPort() uint16 { return l.lPort } -func (l Listener) Network() string { return l.network } +// LocalPort returns a local port of listener. +func (l Listener) LocalPort() uint16 { return l.lPort } + +// Network returns a network of listener. +func (l Listener) Network() string { return l.network } + +// AcceptConn accepts a connection from listener. func (l Listener) AcceptConn() (*Conn, error) { conn, err := l.Listener.Accept() if err != nil { @@ -147,6 +165,7 @@ func (l Listener) AcceptConn() (*Conn, error) { return makeConn(conn, l.network), nil } +// Conn represent a connection between nodes in Skywire. type Conn struct { net.Conn lPK cipher.PubKey @@ -162,11 +181,20 @@ func makeConn(conn net.Conn, network string) *Conn { return &Conn{Conn: conn, lPK: lPK, rPK: rPK, lPort: lPort, rPort: rPort, network: network} } -func (c Conn) LocalPK() cipher.PubKey { return c.lPK } +// LocalPK returns local public key of connection. +func (c Conn) LocalPK() cipher.PubKey { return c.lPK } + +// RemotePK returns remote public key of connection. func (c Conn) RemotePK() cipher.PubKey { return c.rPK } -func (c Conn) LocalPort() uint16 { return c.lPort } -func (c Conn) RemotePort() uint16 { return c.rPort } -func (c Conn) Network() string { return c.network } + +// LocalPort returns local port of connection. +func (c Conn) LocalPort() uint16 { return c.lPort } + +// RemotePort returns remote port of connection. +func (c Conn) RemotePort() uint16 { return c.rPort } + +// Network returns network of connection. +func (c Conn) Network() string { return c.network } func disassembleAddr(addr net.Addr) (pk cipher.PubKey, port uint16) { strs := strings.Split(addr.String(), ":") diff --git a/pkg/snet/snettest/env.go b/pkg/snet/snettest/env.go index 2a62af4d0..590441415 100644 --- a/pkg/snet/snettest/env.go +++ b/pkg/snet/snettest/env.go @@ -87,7 +87,7 @@ func NewEnv(t *testing.T, keys []KeyPair) *Env { } } -// TearDown shutdowns the Env. +// Teardown shutdowns the Env. func (e *Env) Teardown() { e.teardown() } func createDmsgSrv(t *testing.T, dc disc.APIClient) (srv *dmsg.Server, srvErr <-chan error) { diff --git a/pkg/therealssh/session.go b/pkg/therealssh/session.go index 5c2daf1bb..9b221b3cc 100644 --- a/pkg/therealssh/session.go +++ b/pkg/therealssh/session.go @@ -119,7 +119,8 @@ func (s *Session) Run(command string) ([]byte, error) { }() // Best effort. // as stated in https://github.com/creack/pty/issues/21#issuecomment-513069505 we can ignore this error - res, _ := ioutil.ReadAll(ptmx) // nolint: err + res, err := ioutil.ReadAll(ptmx) + _ = err return res, nil } diff --git a/pkg/transport/handshake.go b/pkg/transport/handshake.go index abfba66a6..7ec41237d 100644 --- a/pkg/transport/handshake.go +++ b/pkg/transport/handshake.go @@ -81,12 +81,15 @@ func (hs SettlementHS) Do(ctx context.Context, dc DiscoveryClient, conn *snet.Co // MakeSettlementHS creates a settlement handshake. // `init` determines whether the local side is initiating or responding. func MakeSettlementHS(init bool) SettlementHS { - // initiating logic. initHS := func(ctx context.Context, dc DiscoveryClient, conn *snet.Conn, sk cipher.SecKey) (err error) { entry := makeEntryFromTpConn(conn) - defer func() { _, _ = dc.UpdateStatuses(ctx, &Status{ID: entry.ID, IsUp: err == nil}) }() //nolint:errcheck + defer func() { + if _, err := dc.UpdateStatuses(ctx, &Status{ID: entry.ID, IsUp: err == nil}); err != nil { + log.WithError(err).Error("Failed to update statuses") + } + }() // create signed entry and send it to responding visor node. se, ok := NewSignedEntry(&entry, conn.LocalPK(), sk) @@ -123,7 +126,9 @@ func MakeSettlementHS(init bool) SettlementHS { entry = *recvSE.Entry // Ensure transport is registered. - _ = dc.RegisterTransports(ctx, recvSE) //nolint:errcheck + if err := dc.RegisterTransports(ctx, recvSE); err != nil { + log.WithError(err).Error("Failed to register transports") + } // inform initiating visor node. if _, err := conn.Write([]byte{1}); err != nil { diff --git a/pkg/transport/managed_transport.go b/pkg/transport/managed_transport.go index 8bccebdc4..a4f7470e1 100644 --- a/pkg/transport/managed_transport.go +++ b/pkg/transport/managed_transport.go @@ -100,7 +100,9 @@ func (mt *ManagedTransport) Serve(readCh chan<- routing.Packet, done <-chan stru mt.connMx.Lock() close(mt.connCh) if mt.conn != nil { - _ = mt.conn.Close() //nolint:errcheck + if err := mt.conn.Close(); err != nil { + mt.log.WithError(err).Warn("Failed to close connection") + } mt.conn = nil } mt.connMx.Unlock() @@ -193,7 +195,9 @@ func (mt *ManagedTransport) Accept(ctx context.Context, conn *snet.Conn) error { } if !mt.isServing() { - _ = conn.Close() //nolint:errcheck + if err := conn.Close(); err != nil { + log.WithError(err).Warn("Failed to close connection") + } return ErrNotServing } @@ -248,7 +252,9 @@ func (mt *ManagedTransport) getConn() *snet.Conn { // TODO: Add logging here. func (mt *ManagedTransport) setIfConnNil(ctx context.Context, conn *snet.Conn) error { if mt.conn != nil { - _ = conn.Close() //nolint:errcheck + if err := conn.Close(); err != nil { + log.WithError(err).Warn("Failed to close connection") + } return ErrConnAlreadyExists } @@ -272,7 +278,9 @@ func (mt *ManagedTransport) setIfConnNil(ctx context.Context, conn *snet.Conn) e func (mt *ManagedTransport) clearConn(ctx context.Context) { if mt.conn != nil { - _ = mt.conn.Close() //nolint:errcheck + if err := mt.conn.Close(); err != nil { + log.WithError(err).Warn("Failed to close connection") + } mt.conn = nil } if _, err := mt.dc.UpdateStatuses(ctx, &Status{ID: mt.Entry.ID, IsUp: false}); err != nil { diff --git a/pkg/transport/manager.go b/pkg/transport/manager.go index a4e390184..f44a6344f 100644 --- a/pkg/transport/manager.go +++ b/pkg/transport/manager.go @@ -81,7 +81,7 @@ func (tm *Manager) serve(ctx context.Context) { listeners = append(listeners, lis) tm.wg.Add(1) - go func(netName string) { + go func() { defer tm.wg.Done() for { select { @@ -98,7 +98,7 @@ func (tm *Manager) serve(ctx context.Context) { } } } - }(netType) + }() } tm.Logger.Info("transport manager is serving.") @@ -116,25 +116,26 @@ func (tm *Manager) serve(ctx context.Context) { } } -func (tm *Manager) initTransports(ctx context.Context) { - tm.mx.Lock() - defer tm.mx.Unlock() - - entries, err := tm.conf.DiscoveryClient.GetTransportsByEdge(ctx, tm.conf.PubKey) - if err != nil { - log.Warnf("No transports found for local node: %v", err) - } - for _, entry := range entries { - var ( - tpType = entry.Entry.Type - remote = entry.Entry.RemoteEdge(tm.conf.PubKey) - tpID = entry.Entry.ID - ) - if _, err := tm.saveTransport(remote, tpType); err != nil { - tm.Logger.Warnf("INIT: failed to init tp: type(%s) remote(%s) tpID(%s)", tpType, remote, tpID) - } - } -} +// TODO(nkryuchkov): either use or remove if unused +// func (tm *Manager) initTransports(ctx context.Context) { +// tm.mx.Lock() +// defer tm.mx.Unlock() +// +// entries, err := tm.conf.DiscoveryClient.GetTransportsByEdge(ctx, tm.conf.PubKey) +// if err != nil { +// log.Warnf("No transports found for local node: %v", err) +// } +// for _, entry := range entries { +// var ( +// tpType = entry.Entry.Type +// remote = entry.Entry.RemoteEdge(tm.conf.PubKey) +// tpID = entry.Entry.ID +// ) +// if _, err := tm.saveTransport(remote, tpType); err != nil { +// tm.Logger.Warnf("INIT: failed to init tp: type(%s) remote(%s) tpID(%s)", tpType, remote, tpID) +// } +// } +// } func (tm *Manager) acceptTransport(ctx context.Context, lis *snet.Listener) error { conn, err := lis.AcceptConn() @@ -268,16 +269,16 @@ func (tm *Manager) Local() cipher.PubKey { } // Close closes opened transports and registered factories. -func (tm *Manager) Close() (err error) { +func (tm *Manager) Close() error { tm.closeOnce.Do(func() { - err = tm.close() + tm.close() }) - return err + return nil } -func (tm *Manager) close() error { +func (tm *Manager) close() { if tm == nil { - return nil + return } tm.mx.Lock() @@ -297,7 +298,6 @@ func (tm *Manager) close() error { tm.wg.Wait() close(tm.readCh) - return nil } func (tm *Manager) isClosing() bool { diff --git a/pkg/visor/config.go b/pkg/visor/config.go index fdbfef03f..37831ed53 100644 --- a/pkg/visor/config.go +++ b/pkg/visor/config.go @@ -161,12 +161,13 @@ func ensureDir(path string) (string, error) { return absPath, nil } -// HypervisorConfig represents a connection to a hypervisor. +// HypervisorConfig represents hypervisor configuration. type HypervisorConfig struct { PubKey cipher.PubKey `json:"public_key"` Addr string `json:"address"` } +// DmsgConfig represents dmsg configuration. type DmsgConfig struct { PubKey cipher.PubKey SecKey cipher.SecKey