diff --git a/cmd/apps/skychat/skychat.go b/cmd/apps/skychat/skychat.go index e52a6b09a6..190cac0a5c 100644 --- a/cmd/apps/skychat/skychat.go +++ b/cmd/apps/skychat/skychat.go @@ -98,7 +98,7 @@ func main() { err := http.ListenAndServe(*addr, nil) if err != nil { - print(err) + print(err.Error()) setAppError(appCl, err) os.Exit(1) } diff --git a/internal/vpn/client.go b/internal/vpn/client.go index f18f535141..198d53442d 100644 --- a/internal/vpn/client.go +++ b/internal/vpn/client.go @@ -9,6 +9,7 @@ import ( "os" "runtime" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -170,7 +171,8 @@ func (c *Client) Serve() error { r := netutil.NewRetrier(nil, netutil.DefaultInitBackoff, netutil.DefaultMaxBackoff, 3, netutil.DefaultFactor). WithErrWhitelist(errHandshakeStatusForbidden, errHandshakeStatusInternalError, errHandshakeNoFreeIPs, - errHandshakeStatusBadRequest, errNoTransportFound, errTransportNotFound, errErrSetupNode, errNotPermitted) + errHandshakeStatusBadRequest, errNoTransportFound, errTransportNotFound, errErrSetupNode, errNotPermitted, + errErrServerOffline) err := r.Do(context.Background(), func() error { if c.isClosed() { @@ -180,7 +182,8 @@ func (c *Client) Serve() error { if err := c.dialServeConn(); err != nil { switch err { case errHandshakeStatusForbidden, errHandshakeStatusInternalError, errHandshakeNoFreeIPs, - errHandshakeStatusBadRequest, errNoTransportFound, errTransportNotFound, errErrSetupNode, errNotPermitted: + errHandshakeStatusBadRequest, errNoTransportFound, errTransportNotFound, errErrSetupNode, errNotPermitted, + errErrServerOffline: c.setAppError(err) c.resetConnDuration() return err @@ -449,10 +452,9 @@ serveLoop: func (c *Client) dialServeConn() error { conn, err := c.dialServer(c.appCl, c.cfg.ServerPK) if err != nil { - fmt.Printf("error connecting to VPN server: %s", err) + fmt.Printf("error connecting to VPN server: %s\n", err) return err } - fmt.Printf("Dialed %s\n", conn.RemoteAddr()) defer func() { @@ -679,7 +681,13 @@ func (c *Client) shakeHands(conn net.Conn) (TUNIP, TUNGateway net.IP, err error) var sHello ServerHello if err := ReadJSONWithTimeout(conn, &sHello, handshakeTimeout); err != nil { - return nil, nil, fmt.Errorf("error reading server hello: %w", err) + fmt.Printf("error reading server hello: %v\n", err) + if strings.Contains(err.Error(), appnet.ErrServiceOffline(skyenv.VPNServerPort).Error()) { + err = appserver.RPCErr{ + Err: err.Error(), + } + } + return nil, nil, err } fmt.Printf("Got server hello: %v", sHello) @@ -710,7 +718,7 @@ func (c *Client) dialServer(appCl *app.Client, pk cipher.PubKey) (net.Conn, erro } if c.isClosed() { - // we need to signal outer code that connection object is inalid + // we need to signal outer code that connection object is invalid // in this case return nil, errors.New("client got closed") } diff --git a/internal/vpn/errors.go b/internal/vpn/errors.go index 1f35f8c884..4e18bd5120 100644 --- a/internal/vpn/errors.go +++ b/internal/vpn/errors.go @@ -3,10 +3,12 @@ package vpn import ( "errors" + "github.com/skycoin/skywire/pkg/app/appnet" "github.com/skycoin/skywire/pkg/app/appserver" "github.com/skycoin/skywire/pkg/routefinder/rfclient" "github.com/skycoin/skywire/pkg/router" "github.com/skycoin/skywire/pkg/setup/setupclient" + "github.com/skycoin/skywire/pkg/skyenv" ) var ( @@ -29,4 +31,7 @@ var ( errErrSetupNode = appserver.RPCErr{ Err: setupclient.ErrSetupNode.Error(), } + errErrServerOffline = appserver.RPCErr{ + Err: appnet.ErrServiceOffline(skyenv.VPNServerPort).Error(), + } ) diff --git a/internal/vpn/net.go b/internal/vpn/net.go index e70b8f9365..476ecbb1f0 100644 --- a/internal/vpn/net.go +++ b/internal/vpn/net.go @@ -70,7 +70,8 @@ func ReadJSON(conn net.Conn, data interface{}) error { for { n, err := conn.Read(buf) if err != nil { - return fmt.Errorf("error reading data: %w", err) + fmt.Printf("error reading data: %v\n", err) + return err } dataBytes = append(dataBytes, buf[:n]...) diff --git a/internal/vpn/server.go b/internal/vpn/server.go index f0d7de6f23..336d99cb33 100644 --- a/internal/vpn/server.go +++ b/internal/vpn/server.go @@ -41,7 +41,7 @@ func NewServer(cfg ServerConfig, appCl *app.Client) (*Server, error) { if err != nil { return nil, fmt.Errorf("error getting default network interface: %w", err) } - ifcs, hasMultiple := s.hasMutipleNetworkInterfaces(defaultNetworkIfcs) + ifcs, hasMultiple := s.hasMultipleNetworkInterfaces(defaultNetworkIfcs) if hasMultiple { if cfg.NetworkInterface == "" { return nil, fmt.Errorf("multiple default network interfaces detected...set a default one for VPN server or remove one: %v", ifcs) @@ -74,18 +74,18 @@ func NewServer(cfg ServerConfig, appCl *app.Client) (*Server, error) { fmt.Println("Old IP forwarding values:") fmt.Printf("IPv4: %s, IPv6: %s\n", ipv4ForwardingVal, ipv6ForwardingVal) - iptablesForwarPolicy, err := GetIPTablesForwardPolicy() + iptablesForwardPolicy, err := GetIPTablesForwardPolicy() if err != nil { return nil, fmt.Errorf("error getting iptables forward policy: %w", err) } - fmt.Printf("Old iptables forward policy: %s\n", iptablesForwarPolicy) + fmt.Printf("Old iptables forward policy: %s\n", iptablesForwardPolicy) s.defaultNetworkInterface = defaultNetworkIfc s.defaultNetworkInterfaceIPs = defaultNetworkIfcIPs s.ipv4ForwardingVal = ipv4ForwardingVal s.ipv6ForwardingVal = ipv6ForwardingVal - s.iptablesForwardPolicy = iptablesForwarPolicy + s.iptablesForwardPolicy = iptablesForwardPolicy return s, nil } @@ -335,7 +335,7 @@ func (s *Server) shakeHands(conn net.Conn) (tunIP, tunGateway net.IP, unsecureVP if err := WriteJSON(conn, &sHello); err != nil { unsecureVPN() - return nil, nil, nil, fmt.Errorf("error finishing hadnshake: error sending server hello: %w", err) + return nil, nil, nil, fmt.Errorf("error finishing handshake: error sending server hello: %w", err) } return sTUNIP, sTUNGateway, unsecureVPN, nil @@ -363,7 +363,7 @@ func (s *Server) sendServerErrHello(conn net.Conn, status HandshakeStatus) { } } -func (s *Server) hasMutipleNetworkInterfaces(defaultNetworkInterface string) ([]string, bool) { +func (s *Server) hasMultipleNetworkInterfaces(defaultNetworkInterface string) ([]string, bool) { networkInterfaces := strings.Split(defaultNetworkInterface, "\n") if len(networkInterfaces) > 1 { return networkInterfaces, true diff --git a/pkg/app/appnet/errors.go b/pkg/app/appnet/errors.go new file mode 100644 index 0000000000..a5be58de02 --- /dev/null +++ b/pkg/app/appnet/errors.go @@ -0,0 +1,24 @@ +package appnet + +import ( + "fmt" + + "github.com/skycoin/skywire/pkg/skyenv" +) + +// ErrServiceOffline is used to get a verbose error of GetListenerError +func ErrServiceOffline(port uint16) error { + switch port { + case skyenv.SkychatPort: + return fmt.Errorf("no listener on port %d, skychat offline", port) + case skyenv.SkysocksPort: + return fmt.Errorf("no listener on port %d, skysocks offline", port) + case skyenv.SkysocksClientPort: + return fmt.Errorf("no listener on port %d, skysocks-client offline", port) + case skyenv.VPNServerPort: + return fmt.Errorf("no listener on port %d, vpn-server offline", port) + case skyenv.VPNClientPort: + return fmt.Errorf("no listener on port %d, vpn-client offline", port) + } + return fmt.Errorf("no listener on port %d, service offline", port) +} diff --git a/pkg/app/appnet/skywire_conn.go b/pkg/app/appnet/skywire_conn.go index e4d616d247..a75a70c747 100644 --- a/pkg/app/appnet/skywire_conn.go +++ b/pkg/app/appnet/skywire_conn.go @@ -47,6 +47,16 @@ func (c *SkywireConn) BandwidthReceived() uint64 { return c.nrg.BandwidthReceived() } +// SetError sets the close error. +func (c *SkywireConn) SetError(err error) { + c.nrg.SetError(err) +} + +// GetError gets the close error. +func (c *SkywireConn) GetError() error { + return c.nrg.GetError() +} + // Close closes connection. func (c *SkywireConn) Close() error { var err error diff --git a/pkg/app/appnet/skywire_networker.go b/pkg/app/appnet/skywire_networker.go index dd89cd4996..e378217b59 100644 --- a/pkg/app/appnet/skywire_networker.go +++ b/pkg/app/appnet/skywire_networker.go @@ -138,8 +138,12 @@ func (r *SkywireNetworker) serve(conn net.Conn) { lisIfc, ok := r.porter.PortValue(uint16(localAddr.Port)) if !ok { + err := ErrServiceOffline(uint16(localAddr.Port)) + r.log.Error(err) + if ng, ok := conn.(*router.NoiseRouteGroup); ok { + ng.SetError(err) + } r.close(conn) - r.log.Errorf("no listener on port %d", localAddr.Port) return } diff --git a/pkg/app/appserver/app_state.go b/pkg/app/appserver/app_state.go index e5aea25f39..632fdfe923 100644 --- a/pkg/app/appserver/app_state.go +++ b/pkg/app/appserver/app_state.go @@ -38,7 +38,7 @@ type AppState struct { type AppDetailedStatus string const ( - // AppDetailedStatusStarting is set during app initilization process. + // AppDetailedStatusStarting is set during app initialization process. AppDetailedStatusStarting = "Starting" // AppDetailedStatusRunning is set when the app is running. diff --git a/pkg/app/appserver/rpc_ingress_gateway.go b/pkg/app/appserver/rpc_ingress_gateway.go index 515705af77..95b83ad10b 100644 --- a/pkg/app/appserver/rpc_ingress_gateway.go +++ b/pkg/app/appserver/rpc_ingress_gateway.go @@ -283,6 +283,14 @@ func (r *RPCIngressGateway) Read(req *ReadReq, resp *ReadResp) error { } } + if wrappedConn, ok := conn.(*appnet.WrappedConn); ok { + if skywireConn, ok := wrappedConn.Conn.(*appnet.SkywireConn); ok { + if ngErr := skywireConn.GetError(); ngErr != nil { + err = ngErr + } + } + } + resp.Err = ioErrToRPCIOErr(err) // avoid error in RPC pipeline, error is included in response body diff --git a/pkg/router/noise_route_group.go b/pkg/router/noise_route_group.go index d28c1019b8..27b312d460 100644 --- a/pkg/router/noise_route_group.go +++ b/pkg/router/noise_route_group.go @@ -54,6 +54,16 @@ func (nrg *NoiseRouteGroup) BandwidthReceived() uint64 { return nrg.rg.BandwidthReceived() } +// SetError sets the close error. +func (nrg *NoiseRouteGroup) SetError(err error) { + nrg.rg.SetError(err) +} + +// GetError gets the close error. +func (nrg *NoiseRouteGroup) GetError() error { + return nrg.rg.GetError() +} + func (nrg *NoiseRouteGroup) isClosed() bool { return nrg.rg.isClosed() } diff --git a/pkg/router/route_group.go b/pkg/router/route_group.go index b8b65b3c68..61fc4bffa7 100644 --- a/pkg/router/route_group.go +++ b/pkg/router/route_group.go @@ -114,6 +114,9 @@ type RouteGroup struct { // used to wait for all the `Close` packets to run through the loop and come back closeDone sync.WaitGroup once sync.Once + + errorMu sync.RWMutex + closeError error } // NewRouteGroup creates a new RouteGroup. @@ -281,6 +284,22 @@ func (rg *RouteGroup) BandwidthReceived() uint64 { return rg.networkStats.BandwidthReceived() } +// SetError sets the close error. +func (rg *RouteGroup) SetError(err error) { + rg.errorMu.Lock() + defer rg.errorMu.Unlock() + + rg.closeError = err +} + +// GetError gets the close error. +func (rg *RouteGroup) GetError() error { + rg.errorMu.RLock() + defer rg.errorMu.RUnlock() + + return rg.closeError +} + // read reads incoming data. It tries to fetch the data from the internal buffer. // If buffer is empty it blocks on receiving from the data channel func (rg *RouteGroup) read(p []byte) (int, error) { @@ -529,6 +548,24 @@ func (rg *RouteGroup) sendHandshake(encrypt bool) error { return ErrNoSuitableTransport } +func (rg *RouteGroup) sendError(rule routing.Rule, tp *transport.ManagedTransport) error { + errPayload := rg.GetError() + if errPayload == nil { + return nil + } + + if !rg.isCloseInitiator() { + return nil + } + + packet, err := routing.MakeErrorPacket(rule.NextRouteID(), []byte(errPayload.Error())) + if err != nil { + return err + } + + return rg.writePacket(context.Background(), tp, packet, rule.KeyRouteID()) +} + // Close closes a RouteGroup with the specified close `code`: // - Send Close packet for all ForwardRules with the code `code`. // - Delete all rules (ForwardRules and ConsumeRules) from routing table. @@ -601,6 +638,8 @@ func (rg *RouteGroup) handlePacket(packet routing.Packet) error { close(rg.handshakeProcessed) }) + case routing.ErrorPacket: + return rg.handleErrorPacket(packet) } return nil @@ -649,6 +688,19 @@ func (rg *RouteGroup) handleDataPacket(packet routing.Packet) error { return nil } +func (rg *RouteGroup) handleErrorPacket(packet routing.Packet) error { + + // in this case remote is already closed, and `readCh` is closed too, + // but some packets may still reach the rg causing panic on writing + // to `readCh`, so we simple omit such packets + if rg.isRemoteClosed() { + return nil + } + + rg.SetError(fmt.Errorf(string(packet.Payload()))) + return nil +} + func (rg *RouteGroup) handleClosePacket(code routing.CloseCode) error { rg.logger.Debugf("Got close packet with code %d", code) @@ -669,6 +721,10 @@ func (rg *RouteGroup) broadcastClosePackets(code routing.CloseCode) { continue } + if err := rg.sendError(rg.fwd[i], rg.tps[i]); err != nil { + rg.logger.WithError(err).Errorf("Failed to send error packet to %s", rg.tps[i].Remote()) + } + packet := routing.MakeClosePacket(rg.fwd[i].NextRouteID(), code) if err := rg.writePacket(context.Background(), rg.tps[i], packet, rg.fwd[i].KeyRouteID()); err != nil { rg.logger.WithError(err).Errorf("Failed to send close packet to %s", rg.tps[i].Remote()) diff --git a/pkg/router/router.go b/pkg/router/router.go index 59fc2f576f..6202e38b43 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -257,7 +257,7 @@ func (r *router) DialRoutes( } // check if transports are available - ok := r.checkIfTransportAvalailable() + ok := r.checkIfTransportAvailable() if !ok { return nil, ErrNoTransportFound } @@ -532,6 +532,8 @@ func (r *router) handleTransportPacket(ctx context.Context, packet routing.Packe return r.handleKeepAlivePacket(ctx, packet) case routing.NetworkProbePacket: return r.handleNetworkProbePacket(ctx, packet) + case routing.ErrorPacket: + return r.handleErrorPacket(ctx, packet) default: return ErrUnknownPacketType } @@ -733,6 +735,57 @@ func (r *router) handleKeepAlivePacket(ctx context.Context, packet routing.Packe return nil } +func (r *router) handleErrorPacket(ctx context.Context, packet routing.Packet) error { + rule, err := r.GetRule(packet.RouteID()) + if err != nil { + return err + } + log := r.logger.WithField("func", "router.handleErrorPacket") + if rt := rule.Type(); rt == routing.RuleForward || rt == routing.RuleIntermediary { + log.Tracef("Handling packet of type %s with route ID %d and next ID %d", packet.Type(), + packet.RouteID(), rule.NextRouteID()) + return r.forwardPacket(ctx, packet, rule) + } + + log.Tracef("Handling packet of type %s with route ID %d", packet.Type(), packet.RouteID()) + + desc := rule.RouteDescriptor() + nrg, ok := r.noiseRouteGroup(desc) + + log.Tracef("Handling packet with descriptor %s", &desc) + + if ok { + if nrg == nil { + return errors.New("noiseRouteGroup is nil") + } + + // in this case we have already initialized nrg and may use it straightforward + log.Tracef("Got new remote packet with size %d and route ID %d. Using rule: %s", + len(packet.Payload()), packet.RouteID(), rule) + + return nrg.handlePacket(packet) + } + + // we don't have nrg for this packet and we don't have route for this one completely + + rg, ok := r.initializingRouteGroup(desc) + if !ok { + // no route, just return error + log.Tracef("Descriptor not found for rule with type %s, descriptor: %s", rule.Type(), &desc) + return errors.New("route descriptor does not exist") + } + + if rg == nil { + return errors.New("initializing RouteGroup is nil") + } + + // handshake packet, handling with the raw rg + log.Tracef("Got new remote packet with size %d and route ID %d. Using rule: %s", + len(packet.Payload()), packet.RouteID(), rule) + + return rg.handlePacket(packet) +} + // GetRule gets routing rule. func (r *router) GetRule(routeID routing.RouteID) (routing.Rule, error) { rule, err := r.rt.Rule(routeID) @@ -815,6 +868,13 @@ func (r *router) forwardPacket(ctx context.Context, packet routing.Packet, rule p = routing.MakeKeepAlivePacket(rule.NextRouteID()) case routing.ClosePacket: p = routing.MakeClosePacket(rule.NextRouteID(), routing.CloseCode(packet.Payload()[0])) + case routing.ErrorPacket: + var err error + + p, err = routing.MakeErrorPacket(rule.NextRouteID(), packet.Payload()) + if err != nil { + return err + } default: return fmt.Errorf("packet of type %s can't be forwarded", packet.Type()) } @@ -1075,7 +1135,7 @@ func (r *router) removeRouteGroupOfRule(rule routing.Rule) { log.Debug("Noise route group closed.") } -func (r *router) checkIfTransportAvalailable() (ok bool) { +func (r *router) checkIfTransportAvailable() (ok bool) { r.tm.WalkTransports(func(tp *transport.ManagedTransport) bool { ok = true return ok diff --git a/pkg/routing/packet.go b/pkg/routing/packet.go index 74b5c2194e..da069756d0 100644 --- a/pkg/routing/packet.go +++ b/pkg/routing/packet.go @@ -41,10 +41,12 @@ func (t PacketType) String() string { return "ClosePacket" case KeepAlivePacket: return "KeepAlivePacket" - case NetworkProbePacket: - return "NetworkProbe" case HandshakePacket: return "Handshake" + case NetworkProbePacket: + return "NetworkProbe" + case ErrorPacket: + return "Error" default: return fmt.Sprintf("Unknown(%d)", t) } @@ -60,6 +62,7 @@ const ( KeepAlivePacket HandshakePacket NetworkProbePacket + ErrorPacket ) // CloseCode represents close code for ClosePacket. @@ -152,6 +155,23 @@ func MakeHandshakePacket(id RouteID, supportEncryption bool) Packet { return packet } +// MakeErrorPacket constructs a new ErrorPacket. +// If payload size is more than uint16, MakeErrorPacket returns an error. +func MakeErrorPacket(id RouteID, errPayload []byte) (Packet, error) { + if len(errPayload) > math.MaxUint16 { + return Packet{}, ErrPayloadTooBig + } + + packet := make([]byte, PacketHeaderSize+len(errPayload)) + + packet[PacketTypeOffset] = byte(ErrorPacket) + binary.BigEndian.PutUint32(packet[PacketRouteIDOffset:], uint32(id)) + binary.BigEndian.PutUint16(packet[PacketPayloadSizeOffset:], uint16(len(errPayload))) + copy(packet[PacketPayloadOffset:], errPayload) + + return packet, nil +} + // Type returns Packet's type. func (p Packet) Type() PacketType { return PacketType(p[PacketTypeOffset]) diff --git a/pkg/routing/packet_test.go b/pkg/routing/packet_test.go index 6fa39b5bee..6dbb655997 100644 --- a/pkg/routing/packet_test.go +++ b/pkg/routing/packet_test.go @@ -38,3 +38,15 @@ func TestMakeKeepAlivePacket(t *testing.T) { assert.Equal(t, RouteID(4), packet.RouteID()) assert.Equal(t, []byte{}, packet.Payload()) } + +func TestMakeErrorPacket(t *testing.T) { + packet, err := MakeErrorPacket(2, []byte("foo")) + require.NoError(t, err) + + expected := []byte{0x5, 0x0, 0x0, 0x0, 0x2, 0x0, 0x3, 0x66, 0x6f, 0x6f} + + assert.Equal(t, expected, []byte(packet)) + assert.Equal(t, uint16(3), packet.Size()) + assert.Equal(t, RouteID(2), packet.RouteID()) + assert.Equal(t, []byte("foo"), packet.Payload()) +} diff --git a/pkg/visor/init.go b/pkg/visor/init.go index 4164f39877..262138d189 100644 --- a/pkg/visor/init.go +++ b/pkg/visor/init.go @@ -91,7 +91,7 @@ var ( trs vinit.Module // Routing system rt vinit.Module - // Application launcer + // Application launcher launch vinit.Module // CLI cli vinit.Module @@ -460,7 +460,7 @@ func initTransport(ctx context.Context, v *Visor, log *logging.Logger) error { func initTransportSetup(ctx context.Context, v *Visor, log *logging.Logger) error { ctx, cancel := context.WithCancel(ctx) - // To remove the block set by NewTransportListener if dmsg is not initilized + // To remove the block set by NewTransportListener if dmsg is not initialized go func() { ts, err := ts.NewTransportListener(ctx, v.conf, v.dmsgC, v.tpM, v.MasterLogger()) if err != nil { @@ -474,7 +474,7 @@ func initTransportSetup(ctx context.Context, v *Visor, log *logging.Logger) erro } }() - // waiting for atleast one transport to initilize + // waiting for at least one transport to initialize <-v.tpM.Ready() v.pushCloseStack("transport_setup.rpc", func() error {