From b803398c5a41a6d6e20eccc8785c5788fa968db9 Mon Sep 17 00:00:00 2001 From: ersonp Date: Wed, 29 Jun 2022 13:50:32 +0530 Subject: [PATCH 01/25] Fix spellings --- pkg/visor/init.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/visor/init.go b/pkg/visor/init.go index bcb77aca35..4d2f98dab1 100644 --- a/pkg/visor/init.go +++ b/pkg/visor/init.go @@ -89,7 +89,7 @@ var ( trs vinit.Module // Routing system rt vinit.Module - // Application launcer + // Application launcher launch vinit.Module // CLI cli vinit.Module @@ -458,7 +458,7 @@ func initTransport(ctx context.Context, v *Visor, log *logging.Logger) error { func initTransportSetup(ctx context.Context, v *Visor, log *logging.Logger) error { ctx, cancel := context.WithCancel(ctx) - // To remove the block set by NewTransportListener if dmsg is not initilized + // To remove the block set by NewTransportListener if dmsg is not initialized go func() { ts, err := ts.NewTransportListener(ctx, v.conf, v.dmsgC, v.tpM, v.MasterLogger()) if err != nil { @@ -472,7 +472,7 @@ func initTransportSetup(ctx context.Context, v *Visor, log *logging.Logger) erro } }() - // waiting for atleast one transport to initilize + // waiting for at least one transport to initialize <-v.tpM.Ready() v.pushCloseStack("transport_setup.rpc", func() error { From 891f3aa55d7c84f1a6e11d7b5ece11b6ce58b1f8 Mon Sep 17 00:00:00 2001 From: ersonp Date: Thu, 30 Jun 2022 13:04:54 +0530 Subject: [PATCH 02/25] Add ErrorPacket This commit adds a new packet called ErrorPacket to the router. This packet is supposed to be used to send errors in it's payload. --- pkg/router/route_group.go | 18 +++++++++-- pkg/router/router.go | 61 ++++++++++++++++++++++++++++++++++++++ pkg/routing/packet.go | 24 +++++++++++++-- pkg/routing/packet_test.go | 12 ++++++++ 4 files changed, 111 insertions(+), 4 deletions(-) diff --git a/pkg/router/route_group.go b/pkg/router/route_group.go index 74278a0d5a..d5f8e3380b 100644 --- a/pkg/router/route_group.go +++ b/pkg/router/route_group.go @@ -601,6 +601,8 @@ func (rg *RouteGroup) handlePacket(packet routing.Packet) error { close(rg.handshakeProcessed) }) + case routing.ErrorPacket: + return rg.handleErrorPacket(packet) } return nil @@ -616,8 +618,8 @@ func (rg *RouteGroup) handleNetworkProbePacket(packet routing.Packet) error { sentAt := time.Unix(int64(sentAtMs/1000), int64(ms)*int64(time.Millisecond)).UTC() latency := time.Now().UTC().Sub(sentAt).Milliseconds() - // todo (ersonp): this is a dirty fix, we nned to implement new packets Ping and Pong to calculate the RTT. - // if larency is negative we set it to be the previous one + // todo (ersonp): this is a dirty fix, we need to implement new packets Ping and Pong to calculate the RTT. + // if latency is negative we set it to be the previous one if math.Signbit(float64(latency)) { latency = int64(rg.networkStats.Latency()) } @@ -649,6 +651,18 @@ func (rg *RouteGroup) handleDataPacket(packet routing.Packet) error { return nil } +func (rg *RouteGroup) handleErrorPacket(packet routing.Packet) error { + + // in this case remote is already closed, and `readCh` is closed too, + // but some packets may still reach the rg causing panic on writing + // to `readCh`, so we simple omit such packets + if rg.isRemoteClosed() { + return nil + } + rg.logger.Error(string(packet.Payload())) + return nil +} + func (rg *RouteGroup) handleClosePacket(code routing.CloseCode) error { rg.logger.Debugf("Got close packet with code %d", code) diff --git a/pkg/router/router.go b/pkg/router/router.go index a78422e2a1..6470a90f89 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -532,6 +532,8 @@ func (r *router) handleTransportPacket(ctx context.Context, packet routing.Packe return r.handleKeepAlivePacket(ctx, packet) case routing.NetworkProbePacket: return r.handleNetworkProbePacket(ctx, packet) + case routing.ErrorPacket: + return r.handleErrorPacket(ctx, packet) default: return ErrUnknownPacketType } @@ -733,6 +735,58 @@ func (r *router) handleKeepAlivePacket(ctx context.Context, packet routing.Packe return nil } +func (r *router) handleErrorPacket(ctx context.Context, packet routing.Packet) error { + rule, err := r.GetRule(packet.RouteID()) + if err != nil { + return err + } + log := r.logger.WithField("func", "router.handleErrorPacket") + if rt := rule.Type(); rt == routing.RuleForward || rt == routing.RuleIntermediary { + log.Tracef("Handling packet of type %s with route ID %d and next ID %d", packet.Type(), + packet.RouteID(), rule.NextRouteID()) + return r.forwardPacket(ctx, packet, rule) + } + + log.Tracef("Handling packet of type %s with route ID %d", packet.Type(), packet.RouteID()) + + desc := rule.RouteDescriptor() + nrg, ok := r.noiseRouteGroup(desc) + + log.Tracef("Handling packet with descriptor %s", &desc) + + if ok { + if nrg == nil { + return errors.New("noiseRouteGroup is nil") + } + + // in this case we have already initialized nrg and may use it straightforward + log.Tracef("Got new remote packet with size %d and route ID %d. Using rule: %s", + len(packet.Payload()), packet.RouteID(), rule) + + return nrg.handlePacket(packet) + } + + // we don't have nrg for this packet. it's either handshake message or + // we don't have route for this one completely + + rg, ok := r.initializingRouteGroup(desc) + if !ok { + // no route, just return error + log.Tracef("Descriptor not found for rule with type %s, descriptor: %s", rule.Type(), &desc) + return errors.New("route descriptor does not exist") + } + + if rg == nil { + return errors.New("initializing RouteGroup is nil") + } + + // handshake packet, handling with the raw rg + log.Tracef("Got new remote packet with size %d and route ID %d. Using rule: %s", + len(packet.Payload()), packet.RouteID(), rule) + + return rg.handlePacket(packet) +} + // GetRule gets routing rule. func (r *router) GetRule(routeID routing.RouteID) (routing.Rule, error) { rule, err := r.rt.Rule(routeID) @@ -815,6 +869,13 @@ func (r *router) forwardPacket(ctx context.Context, packet routing.Packet, rule p = routing.MakeKeepAlivePacket(rule.NextRouteID()) case routing.ClosePacket: p = routing.MakeClosePacket(rule.NextRouteID(), routing.CloseCode(packet.Payload()[0])) + case routing.ErrorPacket: + var err error + + p, err = routing.MakeErrorPacket(rule.NextRouteID(), packet.Payload()) + if err != nil { + return err + } default: return fmt.Errorf("packet of type %s can't be forwarded", packet.Type()) } diff --git a/pkg/routing/packet.go b/pkg/routing/packet.go index 74b5c2194e..da069756d0 100644 --- a/pkg/routing/packet.go +++ b/pkg/routing/packet.go @@ -41,10 +41,12 @@ func (t PacketType) String() string { return "ClosePacket" case KeepAlivePacket: return "KeepAlivePacket" - case NetworkProbePacket: - return "NetworkProbe" case HandshakePacket: return "Handshake" + case NetworkProbePacket: + return "NetworkProbe" + case ErrorPacket: + return "Error" default: return fmt.Sprintf("Unknown(%d)", t) } @@ -60,6 +62,7 @@ const ( KeepAlivePacket HandshakePacket NetworkProbePacket + ErrorPacket ) // CloseCode represents close code for ClosePacket. @@ -152,6 +155,23 @@ func MakeHandshakePacket(id RouteID, supportEncryption bool) Packet { return packet } +// MakeErrorPacket constructs a new ErrorPacket. +// If payload size is more than uint16, MakeErrorPacket returns an error. +func MakeErrorPacket(id RouteID, errPayload []byte) (Packet, error) { + if len(errPayload) > math.MaxUint16 { + return Packet{}, ErrPayloadTooBig + } + + packet := make([]byte, PacketHeaderSize+len(errPayload)) + + packet[PacketTypeOffset] = byte(ErrorPacket) + binary.BigEndian.PutUint32(packet[PacketRouteIDOffset:], uint32(id)) + binary.BigEndian.PutUint16(packet[PacketPayloadSizeOffset:], uint16(len(errPayload))) + copy(packet[PacketPayloadOffset:], errPayload) + + return packet, nil +} + // Type returns Packet's type. func (p Packet) Type() PacketType { return PacketType(p[PacketTypeOffset]) diff --git a/pkg/routing/packet_test.go b/pkg/routing/packet_test.go index 6fa39b5bee..c0247c072a 100644 --- a/pkg/routing/packet_test.go +++ b/pkg/routing/packet_test.go @@ -38,3 +38,15 @@ func TestMakeKeepAlivePacket(t *testing.T) { assert.Equal(t, RouteID(4), packet.RouteID()) assert.Equal(t, []byte{}, packet.Payload()) } + +func TestMakeErrorPacket(t *testing.T) { + packet, err := MakeErrorPacket(2, []byte("foo")) + require.NoError(t, err) + + expected := []byte{0x0, 0x0, 0x0, 0x0, 0x2, 0x0, 0x3, 0x66, 0x6f, 0x6f} + + assert.Equal(t, expected, []byte(packet)) + assert.Equal(t, uint16(3), packet.Size()) + assert.Equal(t, RouteID(2), packet.RouteID()) + assert.Equal(t, []byte("foo"), packet.Payload()) +} From 1a044e58ca42364154fde26badbc3fcc6a252331 Mon Sep 17 00:00:00 2001 From: ersonp Date: Tue, 5 Jul 2022 10:10:11 +0530 Subject: [PATCH 03/25] Fix spellings --- internal/vpn/server.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/internal/vpn/server.go b/internal/vpn/server.go index 4d9ab7decb..5595ad872d 100644 --- a/internal/vpn/server.go +++ b/internal/vpn/server.go @@ -41,7 +41,7 @@ func NewServer(cfg ServerConfig, appCl *app.Client) (*Server, error) { if err != nil { return nil, fmt.Errorf("error getting default network interface: %w", err) } - ifcs, hasMultiple := s.hasMutipleNetworkInterfaces(defaultNetworkIfcs) + ifcs, hasMultiple := s.hasMultipleNetworkInterfaces(defaultNetworkIfcs) if hasMultiple { if cfg.NetworkInterface == "" { return nil, fmt.Errorf("multiple default network interfaces detected...set a default one for VPN server or remove one: %v", ifcs) @@ -74,18 +74,18 @@ func NewServer(cfg ServerConfig, appCl *app.Client) (*Server, error) { fmt.Println("Old IP forwarding values:") fmt.Printf("IPv4: %s, IPv6: %s\n", ipv4ForwardingVal, ipv6ForwardingVal) - iptablesForwarPolicy, err := GetIPTablesForwardPolicy() + iptablesForwardPolicy, err := GetIPTablesForwardPolicy() if err != nil { return nil, fmt.Errorf("error getting iptables forward policy: %w", err) } - fmt.Printf("Old iptables forward policy: %s\n", iptablesForwarPolicy) + fmt.Printf("Old iptables forward policy: %s\n", iptablesForwardPolicy) s.defaultNetworkInterface = defaultNetworkIfc s.defaultNetworkInterfaceIPs = defaultNetworkIfcIPs s.ipv4ForwardingVal = ipv4ForwardingVal s.ipv6ForwardingVal = ipv6ForwardingVal - s.iptablesForwardPolicy = iptablesForwarPolicy + s.iptablesForwardPolicy = iptablesForwardPolicy return s, nil } @@ -335,7 +335,7 @@ func (s *Server) shakeHands(conn net.Conn) (tunIP, tunGateway net.IP, unsecureVP if err := WriteJSON(conn, &sHello); err != nil { unsecureVPN() - return nil, nil, nil, fmt.Errorf("error finishing hadnshake: error sending server hello: %w", err) + return nil, nil, nil, fmt.Errorf("error finishing handshake: error sending server hello: %w", err) } return sTUNIP, sTUNGateway, unsecureVPN, nil @@ -363,7 +363,7 @@ func (s *Server) sendServerErrHello(conn net.Conn, status HandshakeStatus) { } } -func (s *Server) hasMutipleNetworkInterfaces(defaultNetworkInterface string) ([]string, bool) { +func (s *Server) hasMultipleNetworkInterfaces(defaultNetworkInterface string) ([]string, bool) { networkInterfaces := strings.Split(defaultNetworkInterface, "\n") if len(networkInterfaces) > 1 { return networkInterfaces, true From 1eb1e82d75a1d6c6370ec42223fe20a7d3a1c534 Mon Sep 17 00:00:00 2001 From: ersonp Date: Tue, 5 Jul 2022 10:13:53 +0530 Subject: [PATCH 04/25] Fix spelling --- pkg/router/router.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/router/router.go b/pkg/router/router.go index 6470a90f89..72f5d8be71 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -257,7 +257,7 @@ func (r *router) DialRoutes( } // check if transports are available - ok := r.checkIfTransportAvalailable() + ok := r.checkIfTransportAvailable() if !ok { return nil, ErrNoTransportFound } @@ -1136,7 +1136,7 @@ func (r *router) removeRouteGroupOfRule(rule routing.Rule) { log.Debug("Noise route group closed.") } -func (r *router) checkIfTransportAvalailable() (ok bool) { +func (r *router) checkIfTransportAvailable() (ok bool) { r.tm.WalkTransports(func(tp *transport.ManagedTransport) bool { ok = true return ok From bdba9ed0a1cb75510724e2120a94cd4effcbd36d Mon Sep 17 00:00:00 2001 From: ersonp Date: Tue, 5 Jul 2022 10:50:57 +0530 Subject: [PATCH 05/25] Send error packet on close with error --- pkg/app/appnet/skywire_networker.go | 15 +++++++++++++-- pkg/router/route_group.go | 29 +++++++++++++++++++++++++++++ pkg/router/router.go | 1 + 3 files changed, 43 insertions(+), 2 deletions(-) diff --git a/pkg/app/appnet/skywire_networker.go b/pkg/app/appnet/skywire_networker.go index dd89cd4996..fc04b714ce 100644 --- a/pkg/app/appnet/skywire_networker.go +++ b/pkg/app/appnet/skywire_networker.go @@ -3,6 +3,7 @@ package appnet import ( "context" "errors" + "fmt" "io" "net" "strings" @@ -27,6 +28,7 @@ type SkywireNetworker struct { r router.Router porter *netutil.Porter isServing int32 + err error } // NewSkywireNetworker constructs skywire networker. @@ -138,9 +140,10 @@ func (r *SkywireNetworker) serve(conn net.Conn) { lisIfc, ok := r.porter.PortValue(uint16(localAddr.Port)) if !ok { + err := errors.New(fmt.Sprintf("no listener on port %d", localAddr.Port)) + r.setErr(err) r.close(conn) - r.log.Errorf("no listener on port %d", localAddr.Port) - + r.log.Error(err) return } @@ -155,6 +158,14 @@ func (r *SkywireNetworker) serve(conn net.Conn) { lis.putConn(conn) } +func (r *SkywireNetworker) setErr(err error) { + r.err = err +} + +func (r *SkywireNetworker) Err() error { + return r.err +} + // closeRG closes router group and logs error if any. func (r *SkywireNetworker) close(closer io.Closer) { if err := closer.Close(); err != nil { diff --git a/pkg/router/route_group.go b/pkg/router/route_group.go index d5f8e3380b..4b90bf04c7 100644 --- a/pkg/router/route_group.go +++ b/pkg/router/route_group.go @@ -114,6 +114,8 @@ type RouteGroup struct { // used to wait for all the `Close` packets to run through the loop and come back closeDone sync.WaitGroup once sync.Once + + closeError error } // NewRouteGroup creates a new RouteGroup. @@ -217,6 +219,7 @@ func (rg *RouteGroup) Close() error { rg.mu.Lock() defer rg.mu.Unlock() + rg.logger.Error("RouteGroup.Close") return rg.close(routing.CloseRequested) } @@ -281,6 +284,11 @@ func (rg *RouteGroup) BandwidthReceived() uint64 { return rg.networkStats.BandwidthReceived() } +// SetError sets write deadline. +func (rg *RouteGroup) SetError(err error) { + rg.closeError = err +} + // read reads incoming data. It tries to fetch the data from the internal buffer. // If buffer is empty it blocks on receiving from the data channel func (rg *RouteGroup) read(p []byte) (int, error) { @@ -529,6 +537,20 @@ func (rg *RouteGroup) sendHandshake(encrypt bool) error { return ErrNoSuitableTransport } +func (rg *RouteGroup) sendError(rule routing.Rule, tp *transport.ManagedTransport) error { + errPayload := rg.closeError + if errPayload == nil { + return nil + } + + packet, err := routing.MakeErrorPacket(rule.NextRouteID(), []byte(errPayload.Error())) + if err != nil { + return err + } + + return rg.writePacket(context.Background(), tp, packet, rule.KeyRouteID()) +} + // Close closes a RouteGroup with the specified close `code`: // - Send Close packet for all ForwardRules with the code `code`. // - Delete all rules (ForwardRules and ConsumeRules) from routing table. @@ -549,6 +571,7 @@ func (rg *RouteGroup) close(code routing.CloseCode) error { rg.closeDone.Add(len(rg.tps)) } + rg.logger.Error("RouteGroup.close") rg.broadcastClosePackets(code) if closeInitiator { @@ -673,16 +696,22 @@ func (rg *RouteGroup) handleClosePacket(code routing.CloseCode) error { rg.closeDone.Done() return nil } + rg.logger.Error("RouteGroup.handleClosePacket") return rg.close(code) } func (rg *RouteGroup) broadcastClosePackets(code routing.CloseCode) { + rg.logger.Error("RouteGroup.broadcastClosePackets") for i := 0; i < len(rg.tps); i++ { if rg.tps[i] == nil || rg.fwd[i] == nil { continue } + if err := rg.sendError(rg.fwd[i], rg.tps[i]); err != nil { + rg.logger.WithError(err).Errorf("Failed to send error packet to %s", rg.tps[i].Remote()) + } + packet := routing.MakeClosePacket(rg.fwd[i].NextRouteID(), code) if err := rg.writePacket(context.Background(), rg.tps[i], packet, rg.fwd[i].KeyRouteID()); err != nil { rg.logger.WithError(err).Errorf("Failed to send close packet to %s", rg.tps[i].Remote()) diff --git a/pkg/router/router.go b/pkg/router/router.go index 72f5d8be71..f3b350ba1c 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -736,6 +736,7 @@ func (r *router) handleKeepAlivePacket(ctx context.Context, packet routing.Packe } func (r *router) handleErrorPacket(ctx context.Context, packet routing.Packet) error { + r.logger.Warn("router.handleErrorPacket") rule, err := r.GetRule(packet.RouteID()) if err != nil { return err From 315a77863cea884bcfc0b466deffe12b96a56651 Mon Sep 17 00:00:00 2001 From: ersonp Date: Tue, 5 Jul 2022 13:17:11 +0530 Subject: [PATCH 06/25] Fix routingtable logging --- pkg/router/router.go | 4 ++-- pkg/visor/init.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/router/router.go b/pkg/router/router.go index 776216b92f..e5c529b7eb 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -171,7 +171,7 @@ type router struct { } // New constructs a new Router. -func New(dmsgC *dmsg.Client, config *Config, routeSetupHooks []RouteSetupHook) (Router, error) { +func New(dmsgC *dmsg.Client, config *Config, routeSetupHooks []RouteSetupHook, logger *logging.Logger) (Router, error) { config.SetDefaults() sl, err := dmsgC.Listen(skyenv.DmsgAwaitSetupPort) @@ -193,7 +193,7 @@ func New(dmsgC *dmsg.Client, config *Config, routeSetupHooks []RouteSetupHook) ( logger: config.Logger, mLogger: config.MasterLogger, tm: config.TransportManager, - rt: routing.NewTable(config.Logger), + rt: routing.NewTable(logger), sl: sl, dmsgC: dmsgC, rgsNs: make(map[routing.RouteDescriptor]*NoiseRouteGroup), diff --git a/pkg/visor/init.go b/pkg/visor/init.go index 4d2f98dab1..a4638de0bc 100644 --- a/pkg/visor/init.go +++ b/pkg/visor/init.go @@ -583,7 +583,7 @@ func initRouter(ctx context.Context, v *Visor, log *logging.Logger) error { routeSetupHooks := getRouteSetupHooks(ctx, v, log) - r, err := router.New(v.dmsgC, &rConf, routeSetupHooks) + r, err := router.New(v.dmsgC, &rConf, routeSetupHooks, logger) if err != nil { err := fmt.Errorf("failed to create router: %w", err) return err From 504893c4047f9cea6d62de7b319bec18aa6b43a0 Mon Sep 17 00:00:00 2001 From: ersonp Date: Wed, 6 Jul 2022 10:23:58 +0530 Subject: [PATCH 07/25] Fix minor print logs --- cmd/apps/skychat/skychat.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/apps/skychat/skychat.go b/cmd/apps/skychat/skychat.go index e52a6b09a6..190cac0a5c 100644 --- a/cmd/apps/skychat/skychat.go +++ b/cmd/apps/skychat/skychat.go @@ -98,7 +98,7 @@ func main() { err := http.ListenAndServe(*addr, nil) if err != nil { - print(err) + print(err.Error()) setAppError(appCl, err) os.Exit(1) } From 1b88bf2d10798750c94c73faee6ec7e9ecb8bc27 Mon Sep 17 00:00:00 2001 From: ersonp Date: Wed, 6 Jul 2022 10:48:41 +0530 Subject: [PATCH 08/25] Send error packet on error This commit contains the logic for sending the error packet on error in SkywireNetworker. The method AcceptRoutes of router now sends back the route group which is used to set an error in the rg. If the error in the routegroup is set then the error packet is sent before broadcasting close packets. --- pkg/app/appnet/skywire_networker.go | 22 ++++++------------- pkg/router/mock_router.go | 33 +++++++++++++++++------------ pkg/router/router.go | 14 ++++++------ 3 files changed, 33 insertions(+), 36 deletions(-) diff --git a/pkg/app/appnet/skywire_networker.go b/pkg/app/appnet/skywire_networker.go index fc04b714ce..0b9415caf0 100644 --- a/pkg/app/appnet/skywire_networker.go +++ b/pkg/app/appnet/skywire_networker.go @@ -28,7 +28,6 @@ type SkywireNetworker struct { r router.Router porter *netutil.Porter isServing int32 - err error } // NewSkywireNetworker constructs skywire networker. @@ -113,7 +112,7 @@ func (r *SkywireNetworker) serveRouteGroup(ctx context.Context) error { for { log.Debug("Awaiting to accept route group...") - conn, err := r.r.AcceptRoutes(ctx) + conn, rg, err := r.r.AcceptRoutes(ctx) if err != nil { log.WithError(err).Debug("Stopped accepting routes.") return err @@ -124,12 +123,12 @@ func (r *SkywireNetworker) serveRouteGroup(ctx context.Context) error { WithField("remote", conn.RemoteAddr()). Debug("Accepted route group.") - go r.serve(conn) + go r.serve(conn, rg) } } // serveRG passes accepted router group to the corresponding listener. -func (r *SkywireNetworker) serve(conn net.Conn) { +func (r *SkywireNetworker) serve(conn net.Conn, rg *router.RouteGroup) { localAddr, ok := conn.LocalAddr().(routing.Addr) if !ok { r.close(conn) @@ -140,10 +139,11 @@ func (r *SkywireNetworker) serve(conn net.Conn) { lisIfc, ok := r.porter.PortValue(uint16(localAddr.Port)) if !ok { - err := errors.New(fmt.Sprintf("no listener on port %d", localAddr.Port)) - r.setErr(err) - r.close(conn) + err := fmt.Errorf("no listener on port %d", localAddr.Port) r.log.Error(err) + rg.SetError(err) + r.close(conn) + return } @@ -158,14 +158,6 @@ func (r *SkywireNetworker) serve(conn net.Conn) { lis.putConn(conn) } -func (r *SkywireNetworker) setErr(err error) { - r.err = err -} - -func (r *SkywireNetworker) Err() error { - return r.err -} - // closeRG closes router group and logs error if any. func (r *SkywireNetworker) close(closer io.Closer) { if err := closer.Close(); err != nil { diff --git a/pkg/router/mock_router.go b/pkg/router/mock_router.go index 0f681a374b..56085d23bf 100644 --- a/pkg/router/mock_router.go +++ b/pkg/router/mock_router.go @@ -2,15 +2,11 @@ package router -import ( - context "context" - net "net" - - mock "github.com/stretchr/testify/mock" - - cipher "github.com/skycoin/skywire-utilities/pkg/cipher" - routing "github.com/skycoin/skywire/pkg/routing" -) +import cipher "github.com/skycoin/skywire-utilities/pkg/cipher" +import context "context" +import mock "github.com/stretchr/testify/mock" +import net "net" +import routing "github.com/skycoin/skywire/pkg/routing" // MockRouter is an autogenerated mock type for the Router type type MockRouter struct { @@ -18,7 +14,7 @@ type MockRouter struct { } // AcceptRoutes provides a mock function with given fields: _a0 -func (_m *MockRouter) AcceptRoutes(_a0 context.Context) (net.Conn, error) { +func (_m *MockRouter) AcceptRoutes(_a0 context.Context) (net.Conn, *RouteGroup, error) { ret := _m.Called(_a0) var r0 net.Conn @@ -30,14 +26,23 @@ func (_m *MockRouter) AcceptRoutes(_a0 context.Context) (net.Conn, error) { } } - var r1 error - if rf, ok := ret.Get(1).(func(context.Context) error); ok { + var r1 *RouteGroup + if rf, ok := ret.Get(1).(func(context.Context) *RouteGroup); ok { r1 = rf(_a0) } else { - r1 = ret.Error(1) + if ret.Get(1) != nil { + r1 = ret.Get(1).(*RouteGroup) + } } - return r0, r1 + var r2 error + if rf, ok := ret.Get(2).(func(context.Context) error); ok { + r2 = rf(_a0) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 } // Close provides a mock function with given fields: diff --git a/pkg/router/router.go b/pkg/router/router.go index e5c529b7eb..ee52920124 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -132,7 +132,7 @@ type Router interface { // Then the following should happen: // - Save to routing.Table and internal RouteGroup map. // - Return the RoutingGroup. - AcceptRoutes(context.Context) (net.Conn, error) + AcceptRoutes(context.Context) (net.Conn, *RouteGroup, error) SaveRoutingRules(rules ...routing.Rule) error ReserveKeys(n int) ([]routing.RouteID, error) IntroduceRules(rules routing.EdgeRules) error @@ -309,7 +309,7 @@ func (r *router) DialRoutes( // Then the following should happen: // - Save to routing.Table and internal RouteGroup map. // - Return the RoutingGroup. -func (r *router) AcceptRoutes(ctx context.Context) (net.Conn, error) { +func (r *router) AcceptRoutes(ctx context.Context) (net.Conn, *RouteGroup, error) { var ( rules routing.EdgeRules ok bool @@ -317,7 +317,7 @@ func (r *router) AcceptRoutes(ctx context.Context) (net.Conn, error) { select { case <-ctx.Done(): - return nil, ctx.Err() + return nil, nil, ctx.Err() case rules, ok = <-r.accept: } @@ -329,11 +329,11 @@ func (r *router) AcceptRoutes(ctx context.Context) (net.Conn, error) { Err: errors.New("use of closed network connection"), } - return nil, err + return nil, nil, err } if err := r.SaveRoutingRules(rules.Forward, rules.Reverse); err != nil { - return nil, fmt.Errorf("SaveRoutingRules: %w", err) + return nil, nil, fmt.Errorf("SaveRoutingRules: %w", err) } nsConf := noise.Config{ @@ -345,12 +345,12 @@ func (r *router) AcceptRoutes(ctx context.Context) (net.Conn, error) { nrg, err := r.saveRouteGroupRules(rules, nsConf) if err != nil { - return nil, fmt.Errorf("saveRouteGroupRules: %w", err) + return nil, nil, fmt.Errorf("saveRouteGroupRules: %w", err) } nrg.rg.startOffServiceLoops() - return nrg, nil + return nrg, nrg.rg, nil } // Serve starts transport listening loop. From ab15e69679b3127c8ea4aab905ad2efa82b24c48 Mon Sep 17 00:00:00 2001 From: ersonp Date: Wed, 6 Jul 2022 10:54:17 +0530 Subject: [PATCH 09/25] Add mutex lock to closeError in route group --- pkg/router/route_group.go | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/pkg/router/route_group.go b/pkg/router/route_group.go index 4b90bf04c7..c9f546ac4a 100644 --- a/pkg/router/route_group.go +++ b/pkg/router/route_group.go @@ -115,6 +115,7 @@ type RouteGroup struct { closeDone sync.WaitGroup once sync.Once + errorMu sync.RWMutex closeError error } @@ -284,11 +285,22 @@ func (rg *RouteGroup) BandwidthReceived() uint64 { return rg.networkStats.BandwidthReceived() } -// SetError sets write deadline. +// SetError sets the close error. func (rg *RouteGroup) SetError(err error) { + rg.errorMu.Lock() + defer rg.errorMu.Unlock() + rg.closeError = err } +// GetError gets the close error. +func (rg *RouteGroup) GetError() error { + rg.errorMu.RLock() + defer rg.errorMu.RUnlock() + + return rg.closeError +} + // read reads incoming data. It tries to fetch the data from the internal buffer. // If buffer is empty it blocks on receiving from the data channel func (rg *RouteGroup) read(p []byte) (int, error) { @@ -538,7 +550,7 @@ func (rg *RouteGroup) sendHandshake(encrypt bool) error { } func (rg *RouteGroup) sendError(rule routing.Rule, tp *transport.ManagedTransport) error { - errPayload := rg.closeError + errPayload := rg.GetError() if errPayload == nil { return nil } From a882fb821996051d5d8bc643137e7344516438c4 Mon Sep 17 00:00:00 2001 From: ersonp Date: Wed, 6 Jul 2022 13:55:30 +0530 Subject: [PATCH 10/25] Remove test logs --- pkg/router/route_group.go | 4 ---- pkg/router/router.go | 1 - 2 files changed, 5 deletions(-) diff --git a/pkg/router/route_group.go b/pkg/router/route_group.go index c9f546ac4a..18219c0709 100644 --- a/pkg/router/route_group.go +++ b/pkg/router/route_group.go @@ -220,7 +220,6 @@ func (rg *RouteGroup) Close() error { rg.mu.Lock() defer rg.mu.Unlock() - rg.logger.Error("RouteGroup.Close") return rg.close(routing.CloseRequested) } @@ -583,7 +582,6 @@ func (rg *RouteGroup) close(code routing.CloseCode) error { rg.closeDone.Add(len(rg.tps)) } - rg.logger.Error("RouteGroup.close") rg.broadcastClosePackets(code) if closeInitiator { @@ -708,13 +706,11 @@ func (rg *RouteGroup) handleClosePacket(code routing.CloseCode) error { rg.closeDone.Done() return nil } - rg.logger.Error("RouteGroup.handleClosePacket") return rg.close(code) } func (rg *RouteGroup) broadcastClosePackets(code routing.CloseCode) { - rg.logger.Error("RouteGroup.broadcastClosePackets") for i := 0; i < len(rg.tps); i++ { if rg.tps[i] == nil || rg.fwd[i] == nil { continue diff --git a/pkg/router/router.go b/pkg/router/router.go index ee52920124..27b8b1d729 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -736,7 +736,6 @@ func (r *router) handleKeepAlivePacket(ctx context.Context, packet routing.Packe } func (r *router) handleErrorPacket(ctx context.Context, packet routing.Packet) error { - r.logger.Warn("router.handleErrorPacket") rule, err := r.GetRule(packet.RouteID()) if err != nil { return err From 2e8d0ee9a4e26cdf107c09093c4b5328f20d2686 Mon Sep 17 00:00:00 2001 From: ersonp Date: Wed, 6 Jul 2022 14:58:13 +0530 Subject: [PATCH 11/25] Fix log --- internal/vpn/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/vpn/client.go b/internal/vpn/client.go index f18f535141..f8a54a56f4 100644 --- a/internal/vpn/client.go +++ b/internal/vpn/client.go @@ -449,7 +449,7 @@ serveLoop: func (c *Client) dialServeConn() error { conn, err := c.dialServer(c.appCl, c.cfg.ServerPK) if err != nil { - fmt.Printf("error connecting to VPN server: %s", err) + fmt.Printf("error connecting to VPN server: %s\n", err) return err } From 42a396af5a20ec0831be865108fbff4899c30bf2 Mon Sep 17 00:00:00 2001 From: ersonp Date: Thu, 7 Jul 2022 16:55:54 +0530 Subject: [PATCH 12/25] Revert Router changes This commit revertes the Router changes of returning *RouteGroup in the method AcceptRoutes. Insted we now use interface conversion to get the RouteGroup from conn in SkywireNetworker. --- pkg/app/appnet/skywire_networker.go | 7 ++++--- pkg/router/mock_router.go | 19 +++++-------------- pkg/router/router.go | 14 +++++++------- 3 files changed, 16 insertions(+), 24 deletions(-) diff --git a/pkg/app/appnet/skywire_networker.go b/pkg/app/appnet/skywire_networker.go index 0b9415caf0..c0c004bdd3 100644 --- a/pkg/app/appnet/skywire_networker.go +++ b/pkg/app/appnet/skywire_networker.go @@ -112,7 +112,7 @@ func (r *SkywireNetworker) serveRouteGroup(ctx context.Context) error { for { log.Debug("Awaiting to accept route group...") - conn, rg, err := r.r.AcceptRoutes(ctx) + conn, err := r.r.AcceptRoutes(ctx) if err != nil { log.WithError(err).Debug("Stopped accepting routes.") return err @@ -123,12 +123,13 @@ func (r *SkywireNetworker) serveRouteGroup(ctx context.Context) error { WithField("remote", conn.RemoteAddr()). Debug("Accepted route group.") - go r.serve(conn, rg) + go r.serve(conn) } } // serveRG passes accepted router group to the corresponding listener. -func (r *SkywireNetworker) serve(conn net.Conn, rg *router.RouteGroup) { +func (r *SkywireNetworker) serve(conn net.Conn) { + rg := conn.(*router.RouteGroup) localAddr, ok := conn.LocalAddr().(routing.Addr) if !ok { r.close(conn) diff --git a/pkg/router/mock_router.go b/pkg/router/mock_router.go index 56085d23bf..3217343c83 100644 --- a/pkg/router/mock_router.go +++ b/pkg/router/mock_router.go @@ -14,7 +14,7 @@ type MockRouter struct { } // AcceptRoutes provides a mock function with given fields: _a0 -func (_m *MockRouter) AcceptRoutes(_a0 context.Context) (net.Conn, *RouteGroup, error) { +func (_m *MockRouter) AcceptRoutes(_a0 context.Context) (net.Conn, error) { ret := _m.Called(_a0) var r0 net.Conn @@ -26,23 +26,14 @@ func (_m *MockRouter) AcceptRoutes(_a0 context.Context) (net.Conn, *RouteGroup, } } - var r1 *RouteGroup - if rf, ok := ret.Get(1).(func(context.Context) *RouteGroup); ok { + var r1 error + if rf, ok := ret.Get(1).(func(context.Context) error); ok { r1 = rf(_a0) } else { - if ret.Get(1) != nil { - r1 = ret.Get(1).(*RouteGroup) - } - } - - var r2 error - if rf, ok := ret.Get(2).(func(context.Context) error); ok { - r2 = rf(_a0) - } else { - r2 = ret.Error(2) + r1 = ret.Error(1) } - return r0, r1, r2 + return r0, r1 } // Close provides a mock function with given fields: diff --git a/pkg/router/router.go b/pkg/router/router.go index 27b8b1d729..63644b45e3 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -132,7 +132,7 @@ type Router interface { // Then the following should happen: // - Save to routing.Table and internal RouteGroup map. // - Return the RoutingGroup. - AcceptRoutes(context.Context) (net.Conn, *RouteGroup, error) + AcceptRoutes(context.Context) (net.Conn, error) SaveRoutingRules(rules ...routing.Rule) error ReserveKeys(n int) ([]routing.RouteID, error) IntroduceRules(rules routing.EdgeRules) error @@ -309,7 +309,7 @@ func (r *router) DialRoutes( // Then the following should happen: // - Save to routing.Table and internal RouteGroup map. // - Return the RoutingGroup. -func (r *router) AcceptRoutes(ctx context.Context) (net.Conn, *RouteGroup, error) { +func (r *router) AcceptRoutes(ctx context.Context) (net.Conn, error) { var ( rules routing.EdgeRules ok bool @@ -317,7 +317,7 @@ func (r *router) AcceptRoutes(ctx context.Context) (net.Conn, *RouteGroup, error select { case <-ctx.Done(): - return nil, nil, ctx.Err() + return nil, ctx.Err() case rules, ok = <-r.accept: } @@ -329,11 +329,11 @@ func (r *router) AcceptRoutes(ctx context.Context) (net.Conn, *RouteGroup, error Err: errors.New("use of closed network connection"), } - return nil, nil, err + return nil, err } if err := r.SaveRoutingRules(rules.Forward, rules.Reverse); err != nil { - return nil, nil, fmt.Errorf("SaveRoutingRules: %w", err) + return nil, fmt.Errorf("SaveRoutingRules: %w", err) } nsConf := noise.Config{ @@ -345,12 +345,12 @@ func (r *router) AcceptRoutes(ctx context.Context) (net.Conn, *RouteGroup, error nrg, err := r.saveRouteGroupRules(rules, nsConf) if err != nil { - return nil, nil, fmt.Errorf("saveRouteGroupRules: %w", err) + return nil, fmt.Errorf("saveRouteGroupRules: %w", err) } nrg.rg.startOffServiceLoops() - return nrg, nrg.rg, nil + return nrg, nil } // Serve starts transport listening loop. From 722ed64f97da49305ea06d74e59169794bdff9b7 Mon Sep 17 00:00:00 2001 From: ersonp Date: Thu, 7 Jul 2022 16:56:08 +0530 Subject: [PATCH 13/25] Minor updates --- internal/vpn/client.go | 8 ++++++-- pkg/router/noise_route_group.go | 10 ++++++++++ pkg/router/route_group.go | 6 ++++++ 3 files changed, 22 insertions(+), 2 deletions(-) diff --git a/internal/vpn/client.go b/internal/vpn/client.go index f8a54a56f4..52989045d8 100644 --- a/internal/vpn/client.go +++ b/internal/vpn/client.go @@ -20,6 +20,7 @@ import ( "github.com/skycoin/skywire/pkg/app" "github.com/skycoin/skywire/pkg/app/appnet" "github.com/skycoin/skywire/pkg/app/appserver" + "github.com/skycoin/skywire/pkg/router" "github.com/skycoin/skywire/pkg/routing" "github.com/skycoin/skywire/pkg/skyenv" ) @@ -452,7 +453,6 @@ func (c *Client) dialServeConn() error { fmt.Printf("error connecting to VPN server: %s\n", err) return err } - fmt.Printf("Dialed %s\n", conn.RemoteAddr()) defer func() { @@ -679,6 +679,10 @@ func (c *Client) shakeHands(conn net.Conn) (TUNIP, TUNGateway net.IP, err error) var sHello ServerHello if err := ReadJSONWithTimeout(conn, &sHello, handshakeTimeout); err != nil { + ng := conn.(*router.NoiseRouteGroup) + if serverErr := ng.GetError(); serverErr != nil { + return nil, nil, serverErr + } return nil, nil, fmt.Errorf("error reading server hello: %w", err) } @@ -710,7 +714,7 @@ func (c *Client) dialServer(appCl *app.Client, pk cipher.PubKey) (net.Conn, erro } if c.isClosed() { - // we need to signal outer code that connection object is inalid + // we need to signal outer code that connection object is invalid // in this case return nil, errors.New("client got closed") } diff --git a/pkg/router/noise_route_group.go b/pkg/router/noise_route_group.go index d28c1019b8..27b312d460 100644 --- a/pkg/router/noise_route_group.go +++ b/pkg/router/noise_route_group.go @@ -54,6 +54,16 @@ func (nrg *NoiseRouteGroup) BandwidthReceived() uint64 { return nrg.rg.BandwidthReceived() } +// SetError sets the close error. +func (nrg *NoiseRouteGroup) SetError(err error) { + nrg.rg.SetError(err) +} + +// GetError gets the close error. +func (nrg *NoiseRouteGroup) GetError() error { + return nrg.rg.GetError() +} + func (nrg *NoiseRouteGroup) isClosed() bool { return nrg.rg.isClosed() } diff --git a/pkg/router/route_group.go b/pkg/router/route_group.go index 18219c0709..bfe19c8632 100644 --- a/pkg/router/route_group.go +++ b/pkg/router/route_group.go @@ -554,6 +554,10 @@ func (rg *RouteGroup) sendError(rule routing.Rule, tp *transport.ManagedTranspor return nil } + if !rg.isCloseInitiator() { + return nil + } + packet, err := routing.MakeErrorPacket(rule.NextRouteID(), []byte(errPayload.Error())) if err != nil { return err @@ -692,6 +696,8 @@ func (rg *RouteGroup) handleErrorPacket(packet routing.Packet) error { if rg.isRemoteClosed() { return nil } + + rg.SetError(fmt.Errorf(string(packet.Payload()))) rg.logger.Error(string(packet.Payload())) return nil } From 36a001b0347eb55812522c5122bc519c298fa1c0 Mon Sep 17 00:00:00 2001 From: ersonp Date: Thu, 7 Jul 2022 18:03:21 +0530 Subject: [PATCH 14/25] Revert vpn client changes --- internal/vpn/client.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/internal/vpn/client.go b/internal/vpn/client.go index 52989045d8..a6e919a2f8 100644 --- a/internal/vpn/client.go +++ b/internal/vpn/client.go @@ -20,7 +20,6 @@ import ( "github.com/skycoin/skywire/pkg/app" "github.com/skycoin/skywire/pkg/app/appnet" "github.com/skycoin/skywire/pkg/app/appserver" - "github.com/skycoin/skywire/pkg/router" "github.com/skycoin/skywire/pkg/routing" "github.com/skycoin/skywire/pkg/skyenv" ) @@ -679,10 +678,6 @@ func (c *Client) shakeHands(conn net.Conn) (TUNIP, TUNGateway net.IP, err error) var sHello ServerHello if err := ReadJSONWithTimeout(conn, &sHello, handshakeTimeout); err != nil { - ng := conn.(*router.NoiseRouteGroup) - if serverErr := ng.GetError(); serverErr != nil { - return nil, nil, serverErr - } return nil, nil, fmt.Errorf("error reading server hello: %w", err) } From f246dcc1712b9f267846343785818f35db0aca3b Mon Sep 17 00:00:00 2001 From: ersonp Date: Thu, 7 Jul 2022 18:05:45 +0530 Subject: [PATCH 15/25] Change conn conversion from RG to NoiseRouteGroup --- pkg/app/appnet/skywire_networker.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/app/appnet/skywire_networker.go b/pkg/app/appnet/skywire_networker.go index c0c004bdd3..d0d6ff1e6c 100644 --- a/pkg/app/appnet/skywire_networker.go +++ b/pkg/app/appnet/skywire_networker.go @@ -129,7 +129,7 @@ func (r *SkywireNetworker) serveRouteGroup(ctx context.Context) error { // serveRG passes accepted router group to the corresponding listener. func (r *SkywireNetworker) serve(conn net.Conn) { - rg := conn.(*router.RouteGroup) + ng := conn.(*router.NoiseRouteGroup) localAddr, ok := conn.LocalAddr().(routing.Addr) if !ok { r.close(conn) @@ -142,7 +142,7 @@ func (r *SkywireNetworker) serve(conn net.Conn) { if !ok { err := fmt.Errorf("no listener on port %d", localAddr.Port) r.log.Error(err) - rg.SetError(err) + ng.SetError(err) r.close(conn) return From d6d186ff2b15760f27e773e6e30f04ecc0091595 Mon Sep 17 00:00:00 2001 From: ersonp Date: Thu, 7 Jul 2022 19:55:09 +0530 Subject: [PATCH 16/25] Pass route group error to RPCIngressGateway This commit passes the error from the routegroup that is received from the vpn-server to the RPCIngressGateway method Read so that it can be passed to the app via the rpcIngressClient. --- pkg/app/appnet/skywire_conn.go | 10 ++++++++++ pkg/app/appserver/rpc_ingress_gateway.go | 6 ++++++ 2 files changed, 16 insertions(+) diff --git a/pkg/app/appnet/skywire_conn.go b/pkg/app/appnet/skywire_conn.go index e4d616d247..a75a70c747 100644 --- a/pkg/app/appnet/skywire_conn.go +++ b/pkg/app/appnet/skywire_conn.go @@ -47,6 +47,16 @@ func (c *SkywireConn) BandwidthReceived() uint64 { return c.nrg.BandwidthReceived() } +// SetError sets the close error. +func (c *SkywireConn) SetError(err error) { + c.nrg.SetError(err) +} + +// GetError gets the close error. +func (c *SkywireConn) GetError() error { + return c.nrg.GetError() +} + // Close closes connection. func (c *SkywireConn) Close() error { var err error diff --git a/pkg/app/appserver/rpc_ingress_gateway.go b/pkg/app/appserver/rpc_ingress_gateway.go index 515705af77..2c6f01fb29 100644 --- a/pkg/app/appserver/rpc_ingress_gateway.go +++ b/pkg/app/appserver/rpc_ingress_gateway.go @@ -283,6 +283,12 @@ func (r *RPCIngressGateway) Read(req *ReadReq, resp *ReadResp) error { } } + wrappedConn := conn.(*appnet.WrappedConn) + skywireConn := wrappedConn.Conn.(*appnet.SkywireConn) + if ngErr := skywireConn.GetError(); ngErr != nil { + err = ngErr + } + resp.Err = ioErrToRPCIOErr(err) // avoid error in RPC pipeline, error is included in response body From 946c157fc586fedfc026eced9080f44a5080fbeb Mon Sep 17 00:00:00 2001 From: ersonp Date: Thu, 7 Jul 2022 20:32:37 +0530 Subject: [PATCH 17/25] Whitelist server offline error --- internal/vpn/client.go | 6 ++++-- internal/vpn/errors.go | 5 +++++ pkg/app/appnet/errors.go | 24 ++++++++++++++++++++++++ pkg/app/appnet/skywire_networker.go | 3 +-- pkg/app/appserver/app_state.go | 2 +- 5 files changed, 35 insertions(+), 5 deletions(-) create mode 100644 pkg/app/appnet/errors.go diff --git a/internal/vpn/client.go b/internal/vpn/client.go index a6e919a2f8..59abb2e68e 100644 --- a/internal/vpn/client.go +++ b/internal/vpn/client.go @@ -170,7 +170,8 @@ func (c *Client) Serve() error { r := netutil.NewRetrier(nil, netutil.DefaultInitBackoff, netutil.DefaultMaxBackoff, 3, netutil.DefaultFactor). WithErrWhitelist(errHandshakeStatusForbidden, errHandshakeStatusInternalError, errHandshakeNoFreeIPs, - errHandshakeStatusBadRequest, errNoTransportFound, errTransportNotFound, errErrSetupNode, errNotPermitted) + errHandshakeStatusBadRequest, errNoTransportFound, errTransportNotFound, errErrSetupNode, errNotPermitted, + errErrServerOffline) err := r.Do(context.Background(), func() error { if c.isClosed() { @@ -180,7 +181,8 @@ func (c *Client) Serve() error { if err := c.dialServeConn(); err != nil { switch err { case errHandshakeStatusForbidden, errHandshakeStatusInternalError, errHandshakeNoFreeIPs, - errHandshakeStatusBadRequest, errNoTransportFound, errTransportNotFound, errErrSetupNode, errNotPermitted: + errHandshakeStatusBadRequest, errNoTransportFound, errTransportNotFound, errErrSetupNode, errNotPermitted, + errErrServerOffline: c.setAppError(err) c.resetConnDuration() return err diff --git a/internal/vpn/errors.go b/internal/vpn/errors.go index 1f35f8c884..4e18bd5120 100644 --- a/internal/vpn/errors.go +++ b/internal/vpn/errors.go @@ -3,10 +3,12 @@ package vpn import ( "errors" + "github.com/skycoin/skywire/pkg/app/appnet" "github.com/skycoin/skywire/pkg/app/appserver" "github.com/skycoin/skywire/pkg/routefinder/rfclient" "github.com/skycoin/skywire/pkg/router" "github.com/skycoin/skywire/pkg/setup/setupclient" + "github.com/skycoin/skywire/pkg/skyenv" ) var ( @@ -29,4 +31,7 @@ var ( errErrSetupNode = appserver.RPCErr{ Err: setupclient.ErrSetupNode.Error(), } + errErrServerOffline = appserver.RPCErr{ + Err: appnet.ErrServiceOffline(skyenv.VPNServerPort).Error(), + } ) diff --git a/pkg/app/appnet/errors.go b/pkg/app/appnet/errors.go new file mode 100644 index 0000000000..a5be58de02 --- /dev/null +++ b/pkg/app/appnet/errors.go @@ -0,0 +1,24 @@ +package appnet + +import ( + "fmt" + + "github.com/skycoin/skywire/pkg/skyenv" +) + +// ErrServiceOffline is used to get a verbose error of GetListenerError +func ErrServiceOffline(port uint16) error { + switch port { + case skyenv.SkychatPort: + return fmt.Errorf("no listener on port %d, skychat offline", port) + case skyenv.SkysocksPort: + return fmt.Errorf("no listener on port %d, skysocks offline", port) + case skyenv.SkysocksClientPort: + return fmt.Errorf("no listener on port %d, skysocks-client offline", port) + case skyenv.VPNServerPort: + return fmt.Errorf("no listener on port %d, vpn-server offline", port) + case skyenv.VPNClientPort: + return fmt.Errorf("no listener on port %d, vpn-client offline", port) + } + return fmt.Errorf("no listener on port %d, service offline", port) +} diff --git a/pkg/app/appnet/skywire_networker.go b/pkg/app/appnet/skywire_networker.go index d0d6ff1e6c..cec181319f 100644 --- a/pkg/app/appnet/skywire_networker.go +++ b/pkg/app/appnet/skywire_networker.go @@ -3,7 +3,6 @@ package appnet import ( "context" "errors" - "fmt" "io" "net" "strings" @@ -140,7 +139,7 @@ func (r *SkywireNetworker) serve(conn net.Conn) { lisIfc, ok := r.porter.PortValue(uint16(localAddr.Port)) if !ok { - err := fmt.Errorf("no listener on port %d", localAddr.Port) + err := ErrServiceOffline(uint16(localAddr.Port)) r.log.Error(err) ng.SetError(err) r.close(conn) diff --git a/pkg/app/appserver/app_state.go b/pkg/app/appserver/app_state.go index e5aea25f39..632fdfe923 100644 --- a/pkg/app/appserver/app_state.go +++ b/pkg/app/appserver/app_state.go @@ -38,7 +38,7 @@ type AppState struct { type AppDetailedStatus string const ( - // AppDetailedStatusStarting is set during app initilization process. + // AppDetailedStatusStarting is set during app initialization process. AppDetailedStatusStarting = "Starting" // AppDetailedStatusRunning is set when the app is running. From f1e76ebb31e6d7e4319d6ee2c0ed19fd11c064bd Mon Sep 17 00:00:00 2001 From: ersonp Date: Thu, 7 Jul 2022 21:19:52 +0530 Subject: [PATCH 18/25] Fix ErrServiceOffline This commit fixes the ErrServiceOffline by wrapping the error in appserver.RPCErr so that the whitelist in retrier works correctly. --- internal/vpn/client.go | 9 ++++++++- internal/vpn/net.go | 3 ++- pkg/router/route_group.go | 1 - 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/internal/vpn/client.go b/internal/vpn/client.go index 59abb2e68e..198d53442d 100644 --- a/internal/vpn/client.go +++ b/internal/vpn/client.go @@ -9,6 +9,7 @@ import ( "os" "runtime" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -680,7 +681,13 @@ func (c *Client) shakeHands(conn net.Conn) (TUNIP, TUNGateway net.IP, err error) var sHello ServerHello if err := ReadJSONWithTimeout(conn, &sHello, handshakeTimeout); err != nil { - return nil, nil, fmt.Errorf("error reading server hello: %w", err) + fmt.Printf("error reading server hello: %v\n", err) + if strings.Contains(err.Error(), appnet.ErrServiceOffline(skyenv.VPNServerPort).Error()) { + err = appserver.RPCErr{ + Err: err.Error(), + } + } + return nil, nil, err } fmt.Printf("Got server hello: %v", sHello) diff --git a/internal/vpn/net.go b/internal/vpn/net.go index e70b8f9365..476ecbb1f0 100644 --- a/internal/vpn/net.go +++ b/internal/vpn/net.go @@ -70,7 +70,8 @@ func ReadJSON(conn net.Conn, data interface{}) error { for { n, err := conn.Read(buf) if err != nil { - return fmt.Errorf("error reading data: %w", err) + fmt.Printf("error reading data: %v\n", err) + return err } dataBytes = append(dataBytes, buf[:n]...) diff --git a/pkg/router/route_group.go b/pkg/router/route_group.go index bfe19c8632..61fc4bffa7 100644 --- a/pkg/router/route_group.go +++ b/pkg/router/route_group.go @@ -698,7 +698,6 @@ func (rg *RouteGroup) handleErrorPacket(packet routing.Packet) error { } rg.SetError(fmt.Errorf(string(packet.Payload()))) - rg.logger.Error(string(packet.Payload())) return nil } From dfea189af8b77aec8a89927f7d4358dfa7c4ac0c Mon Sep 17 00:00:00 2001 From: ersonp Date: Thu, 7 Jul 2022 21:22:19 +0530 Subject: [PATCH 19/25] Fix packet test --- pkg/routing/packet_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/routing/packet_test.go b/pkg/routing/packet_test.go index c0247c072a..6dbb655997 100644 --- a/pkg/routing/packet_test.go +++ b/pkg/routing/packet_test.go @@ -43,7 +43,7 @@ func TestMakeErrorPacket(t *testing.T) { packet, err := MakeErrorPacket(2, []byte("foo")) require.NoError(t, err) - expected := []byte{0x0, 0x0, 0x0, 0x0, 0x2, 0x0, 0x3, 0x66, 0x6f, 0x6f} + expected := []byte{0x5, 0x0, 0x0, 0x0, 0x2, 0x0, 0x3, 0x66, 0x6f, 0x6f} assert.Equal(t, expected, []byte(packet)) assert.Equal(t, uint16(3), packet.Size()) From 3b41bcfa454edcbbcafecea8ead919abf64177f8 Mon Sep 17 00:00:00 2001 From: ersonp Date: Thu, 7 Jul 2022 21:26:26 +0530 Subject: [PATCH 20/25] make format --- go.sum | 3 --- pkg/router/mock_router.go | 14 +++++++++----- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/go.sum b/go.sum index 14c6ed80a5..8ed92953cf 100644 --- a/go.sum +++ b/go.sum @@ -474,15 +474,12 @@ github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMB github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= -github.com/skycoin/dmsg v0.0.0-20220617100223-c17f98a92a47 h1:d/KdILjeiZOj3QFOm8KkOwIr5wwx9zWCl+oGVXPln1o= -github.com/skycoin/dmsg v0.0.0-20220617100223-c17f98a92a47/go.mod h1:7ixxeJVjbe3lxDkI4Yizj/TWoafYxs8cPJfxjlDeG+w= github.com/skycoin/dmsg v0.0.0-20220704102949-fece1bd9c40c h1:v7d+0yOp066U8FmcUdQ0Nh9Q+qshBO7+w3ZybGJlBnk= github.com/skycoin/dmsg v0.0.0-20220704102949-fece1bd9c40c/go.mod h1:7ixxeJVjbe3lxDkI4Yizj/TWoafYxs8cPJfxjlDeG+w= github.com/skycoin/noise v0.0.0-20180327030543-2492fe189ae6 h1:1Nc5EBY6pjfw1kwW0duwyG+7WliWz5u9kgk1h5MnLuA= github.com/skycoin/noise v0.0.0-20180327030543-2492fe189ae6/go.mod h1:UXghlricA7J3aRD/k7p/zBObQfmBawwCxIVPVjz2Q3o= github.com/skycoin/skycoin v0.27.1 h1:HatxsRwVSPaV4qxH6290xPBmkH/HgiuAoY2qC+e8C9I= github.com/skycoin/skycoin v0.27.1/go.mod h1:78nHjQzd8KG0jJJVL/j0xMmrihXi70ti63fh8vXScJw= -github.com/skycoin/skywire-utilities v0.0.0-20220617085111-5c8c8d3ced14 h1:u8NYb1CX3l3I6wy6RWy2lQM6iqA6XbFex6Q/4MmEyfA= github.com/skycoin/skywire-utilities v0.0.0-20220617085111-5c8c8d3ced14/go.mod h1:B63p56igl38Ha+zjqi26d2om6XEe9jozwB6kzAWMnm0= github.com/skycoin/skywire-utilities v0.0.0-20220630144749-6ea8913bf1e8 h1:xUPi4duqObtDt4BYiNhbwssiUOFTor67Nftqx1F6/uc= github.com/skycoin/skywire-utilities v0.0.0-20220630144749-6ea8913bf1e8/go.mod h1:B63p56igl38Ha+zjqi26d2om6XEe9jozwB6kzAWMnm0= diff --git a/pkg/router/mock_router.go b/pkg/router/mock_router.go index 3217343c83..0f681a374b 100644 --- a/pkg/router/mock_router.go +++ b/pkg/router/mock_router.go @@ -2,11 +2,15 @@ package router -import cipher "github.com/skycoin/skywire-utilities/pkg/cipher" -import context "context" -import mock "github.com/stretchr/testify/mock" -import net "net" -import routing "github.com/skycoin/skywire/pkg/routing" +import ( + context "context" + net "net" + + mock "github.com/stretchr/testify/mock" + + cipher "github.com/skycoin/skywire-utilities/pkg/cipher" + routing "github.com/skycoin/skywire/pkg/routing" +) // MockRouter is an autogenerated mock type for the Router type type MockRouter struct { From 1dc03065a19b2fc8f794e8e09526711351edd7c4 Mon Sep 17 00:00:00 2001 From: ersonp Date: Thu, 7 Jul 2022 22:10:22 +0530 Subject: [PATCH 21/25] Fix test --- pkg/app/appserver/rpc_ingress_gateway.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/pkg/app/appserver/rpc_ingress_gateway.go b/pkg/app/appserver/rpc_ingress_gateway.go index 2c6f01fb29..da9d863a4e 100644 --- a/pkg/app/appserver/rpc_ingress_gateway.go +++ b/pkg/app/appserver/rpc_ingress_gateway.go @@ -283,10 +283,14 @@ func (r *RPCIngressGateway) Read(req *ReadReq, resp *ReadResp) error { } } - wrappedConn := conn.(*appnet.WrappedConn) - skywireConn := wrappedConn.Conn.(*appnet.SkywireConn) - if ngErr := skywireConn.GetError(); ngErr != nil { - err = ngErr + wrappedConn, ok := conn.(*appnet.WrappedConn) + if ok { + skywireConn, ok := wrappedConn.Conn.(*appnet.SkywireConn) + if ok { + if ngErr := skywireConn.GetError(); ngErr != nil { + err = ngErr + } + } } resp.Err = ioErrToRPCIOErr(err) From 33d598488a5fd647575f554fc332551500bec719 Mon Sep 17 00:00:00 2001 From: ersonp Date: Thu, 7 Jul 2022 22:13:21 +0530 Subject: [PATCH 22/25] Fix interface conversion --- pkg/app/appnet/skywire_networker.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/app/appnet/skywire_networker.go b/pkg/app/appnet/skywire_networker.go index cec181319f..52b9bc4e24 100644 --- a/pkg/app/appnet/skywire_networker.go +++ b/pkg/app/appnet/skywire_networker.go @@ -128,7 +128,6 @@ func (r *SkywireNetworker) serveRouteGroup(ctx context.Context) error { // serveRG passes accepted router group to the corresponding listener. func (r *SkywireNetworker) serve(conn net.Conn) { - ng := conn.(*router.NoiseRouteGroup) localAddr, ok := conn.LocalAddr().(routing.Addr) if !ok { r.close(conn) @@ -141,7 +140,10 @@ func (r *SkywireNetworker) serve(conn net.Conn) { if !ok { err := ErrServiceOffline(uint16(localAddr.Port)) r.log.Error(err) - ng.SetError(err) + ng, ok := conn.(*router.NoiseRouteGroup) + if ok { + ng.SetError(err) + } r.close(conn) return From 7224f1b6fe419d53c495d11b1bfe7c8074ad1b7a Mon Sep 17 00:00:00 2001 From: ersonp Date: Fri, 8 Jul 2022 10:45:55 +0530 Subject: [PATCH 23/25] Streamline code --- pkg/app/appnet/skywire_networker.go | 3 +-- pkg/app/appserver/rpc_ingress_gateway.go | 6 ++---- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/pkg/app/appnet/skywire_networker.go b/pkg/app/appnet/skywire_networker.go index 52b9bc4e24..e378217b59 100644 --- a/pkg/app/appnet/skywire_networker.go +++ b/pkg/app/appnet/skywire_networker.go @@ -140,8 +140,7 @@ func (r *SkywireNetworker) serve(conn net.Conn) { if !ok { err := ErrServiceOffline(uint16(localAddr.Port)) r.log.Error(err) - ng, ok := conn.(*router.NoiseRouteGroup) - if ok { + if ng, ok := conn.(*router.NoiseRouteGroup); ok { ng.SetError(err) } r.close(conn) diff --git a/pkg/app/appserver/rpc_ingress_gateway.go b/pkg/app/appserver/rpc_ingress_gateway.go index da9d863a4e..95b83ad10b 100644 --- a/pkg/app/appserver/rpc_ingress_gateway.go +++ b/pkg/app/appserver/rpc_ingress_gateway.go @@ -283,10 +283,8 @@ func (r *RPCIngressGateway) Read(req *ReadReq, resp *ReadResp) error { } } - wrappedConn, ok := conn.(*appnet.WrappedConn) - if ok { - skywireConn, ok := wrappedConn.Conn.(*appnet.SkywireConn) - if ok { + if wrappedConn, ok := conn.(*appnet.WrappedConn); ok { + if skywireConn, ok := wrappedConn.Conn.(*appnet.SkywireConn); ok { if ngErr := skywireConn.GetError(); ngErr != nil { err = ngErr } From eb6b693e1230ac687c84266bf17682dc3f4e44a9 Mon Sep 17 00:00:00 2001 From: ersonp Date: Fri, 29 Jul 2022 09:08:49 +0530 Subject: [PATCH 24/25] Use logger from router config --- pkg/router/router.go | 4 ++-- pkg/visor/init.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/router/router.go b/pkg/router/router.go index 63644b45e3..1789a8c974 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -171,7 +171,7 @@ type router struct { } // New constructs a new Router. -func New(dmsgC *dmsg.Client, config *Config, routeSetupHooks []RouteSetupHook, logger *logging.Logger) (Router, error) { +func New(dmsgC *dmsg.Client, config *Config, routeSetupHooks []RouteSetupHook) (Router, error) { config.SetDefaults() sl, err := dmsgC.Listen(skyenv.DmsgAwaitSetupPort) @@ -193,7 +193,7 @@ func New(dmsgC *dmsg.Client, config *Config, routeSetupHooks []RouteSetupHook, l logger: config.Logger, mLogger: config.MasterLogger, tm: config.TransportManager, - rt: routing.NewTable(logger), + rt: routing.NewTable(config.Logger), sl: sl, dmsgC: dmsgC, rgsNs: make(map[routing.RouteDescriptor]*NoiseRouteGroup), diff --git a/pkg/visor/init.go b/pkg/visor/init.go index a4638de0bc..4d2f98dab1 100644 --- a/pkg/visor/init.go +++ b/pkg/visor/init.go @@ -583,7 +583,7 @@ func initRouter(ctx context.Context, v *Visor, log *logging.Logger) error { routeSetupHooks := getRouteSetupHooks(ctx, v, log) - r, err := router.New(v.dmsgC, &rConf, routeSetupHooks, logger) + r, err := router.New(v.dmsgC, &rConf, routeSetupHooks) if err != nil { err := fmt.Errorf("failed to create router: %w", err) return err From 7c426651dfc85a4b43078736249f9b1c95c62a40 Mon Sep 17 00:00:00 2001 From: ersonp Date: Fri, 29 Jul 2022 09:13:51 +0530 Subject: [PATCH 25/25] Fix comment --- pkg/router/router.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/router/router.go b/pkg/router/router.go index 1789a8c974..6202e38b43 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -766,8 +766,7 @@ func (r *router) handleErrorPacket(ctx context.Context, packet routing.Packet) e return nrg.handlePacket(packet) } - // we don't have nrg for this packet. it's either handshake message or - // we don't have route for this one completely + // we don't have nrg for this packet and we don't have route for this one completely rg, ok := r.initializingRouteGroup(desc) if !ok {