Skip to content

Commit

Permalink
Merge branch 'milestone2' into fix/dmsgpty-ui-https-#206
Browse files Browse the repository at this point in the history
  • Loading branch information
志宇 authored Mar 6, 2020
2 parents 5aec4b7 + fa385fa commit 86d3d2b
Show file tree
Hide file tree
Showing 25 changed files with 337 additions and 184 deletions.
7 changes: 4 additions & 3 deletions pkg/app/appcommon/key.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package appcommon

import "github.com/SkycoinProject/dmsg/cipher"
import (
"github.com/google/uuid"
)

// Key is an app key to authenticate within the
// app server.
type Key string

// GenerateAppKey generates new app key.
func GenerateAppKey() Key {
raw, _ := cipher.GenerateKeyPair()
return Key(raw.Hex())
return Key(uuid.New().String())
}
16 changes: 11 additions & 5 deletions pkg/app/appnet/skywire_networker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"io"
"net"
"strings"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -87,8 +88,8 @@ func (r *SkywireNetworker) ListenContext(ctx context.Context, addr Addr) (net.Li

if atomic.CompareAndSwapInt32(&r.isServing, 0, 1) {
go func() {
if err := r.serveLoop(ctx); err != nil {
r.log.WithError(err).Error("error serving")
if err := r.serveLoop(ctx); err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
r.log.WithError(err).Error("serveLoop stopped unexpectedly.")
}
}()
}
Expand All @@ -98,16 +99,21 @@ func (r *SkywireNetworker) ListenContext(ctx context.Context, addr Addr) (net.Li

// serveLoop accepts and serves routes.
func (r *SkywireNetworker) serveLoop(ctx context.Context) error {
log := r.log.WithField("func", "serveLoop")

for {
r.log.Infoln("Trying to accept routing group...")
log.Debug("Awaiting to accept route group...")

rg, err := r.r.AcceptRoutes(ctx)
if err != nil {
r.log.Infof("Error accepting routing group: %v", err)
log.WithError(err).Info("Stopped accepting routes.")
return err
}

r.log.Infoln("Accepted routing group")
log.
WithField("local", rg.LocalAddr()).
WithField("remote", rg.RemoteAddr()).
Info("Accepted route group.")

go r.serve(rg)
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/app/appserver/proc_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"os/exec"
"strings"
"sync"

"github.com/SkycoinProject/skycoin/src/util/logging"
Expand Down Expand Up @@ -140,11 +141,12 @@ func (m *procManager) StopAll() {
defer m.mx.Unlock()

for name, proc := range m.procs {
if err := proc.Stop(); err != nil {
m.log.WithError(err).Errorf("(%s) failed to stop app", name)
} else {
m.log.Infof("(%s) app stopped successfully", name)
log := m.log.WithField("app_name", name)
if err := proc.Stop(); err != nil && strings.Contains(err.Error(), "process already finished") {
log.WithError(err).Error("Failed to stop app.")
continue
}
log.Infof("App stopped successfully.")
}

m.procs = make(map[string]*Proc)
Expand Down
56 changes: 27 additions & 29 deletions pkg/app/appserver/rpc_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/SkycoinProject/skywire-mainnet/pkg/app/appnet"
"github.com/SkycoinProject/skywire-mainnet/pkg/app/idmanager"
"github.com/SkycoinProject/skywire-mainnet/pkg/routing"
"github.com/SkycoinProject/skywire-mainnet/pkg/util/rpcutil"
)

// RPCIOErr is used to return an error coming from network stack.
Expand Down Expand Up @@ -77,7 +78,9 @@ type DialResp struct {
}

// Dial dials to the remote.
func (r *RPCGateway) Dial(remote *appnet.Addr, resp *DialResp) error {
func (r *RPCGateway) Dial(remote *appnet.Addr, resp *DialResp) (err error) {
defer rpcutil.LogCall(r.log, "Dial", remote)(resp, &err)

reservedConnID, free, err := r.cm.ReserveNextID()
if err != nil {
return err
Expand All @@ -96,12 +99,10 @@ func (r *RPCGateway) Dial(remote *appnet.Addr, resp *DialResp) error {
}

if err := r.cm.Set(*reservedConnID, wrappedConn); err != nil {
if err := wrappedConn.Close(); err != nil {
r.log.WithError(err).Error("error closing conn")
if cErr := wrappedConn.Close(); cErr != nil {
r.log.WithError(cErr).Error("Error closing wrappedConn.")
}

free()

return err
}

Expand All @@ -114,7 +115,9 @@ func (r *RPCGateway) Dial(remote *appnet.Addr, resp *DialResp) error {
}

// Listen starts listening.
func (r *RPCGateway) Listen(local *appnet.Addr, lisID *uint16) error {
func (r *RPCGateway) Listen(local *appnet.Addr, lisID *uint16) (err error) {
defer rpcutil.LogCall(r.log, "Listen", local)(lisID, &err)

nextLisID, free, err := r.lm.ReserveNextID()
if err != nil {
return err
Expand All @@ -127,17 +130,14 @@ func (r *RPCGateway) Listen(local *appnet.Addr, lisID *uint16) error {
}

if err := r.lm.Set(*nextLisID, l); err != nil {
if err := l.Close(); err != nil {
r.log.WithError(err).Error("error closing listener")
if cErr := l.Close(); cErr != nil {
r.log.WithError(cErr).Error("Error closing listener.")
}

free()

return err
}

*lisID = *nextLisID

return nil
}

Expand All @@ -148,48 +148,42 @@ type AcceptResp struct {
}

// Accept accepts connection from the listener specified by `lisID`.
func (r *RPCGateway) Accept(lisID *uint16, resp *AcceptResp) error {
r.log.Infoln("Inside RPC Accept on server side")
func (r *RPCGateway) Accept(lisID *uint16, resp *AcceptResp) (err error) {
defer rpcutil.LogCall(r.log, "Accept", lisID)(resp, &err)

log := r.log.WithField("func", "Accept")

log.Debug("Getting listener...")
lis, err := r.getListener(*lisID)
if err != nil {
r.log.Infoln("Error getting listener on RPC Accept server side")
return err
}

r.log.Infoln("Reserving next ID on RPC Accept server side")

log.Debug("Reserving next ID...")
connID, free, err := r.cm.ReserveNextID()
if err != nil {
r.log.Infoln("Error reserving next ID on RPC Accept server side")
return err
}

r.log.Infoln("Accepting conn on RPC Accept server side")

log.Debug("Accepting conn...")
conn, err := lis.Accept()
if err != nil {
r.log.Warnf("Error accepting conn on RPC Accept server side: %v", err)
free()

return err
}

r.log.Infoln("Wrapping conn on RPC Accept server side")

log.Debug("Wrapping conn...")
wrappedConn, err := appnet.WrapConn(conn)
if err != nil {
free()
return err
}

if err := r.cm.Set(*connID, wrappedConn); err != nil {
if err := wrappedConn.Close(); err != nil {
r.log.WithError(err).Error("error closing DMSG transport")
if cErr := wrappedConn.Close(); cErr != nil {
r.log.WithError(cErr).Error("Failed to close wrappedConn.")
}

free()

return err
}

Expand Down Expand Up @@ -264,7 +258,9 @@ func (r *RPCGateway) Read(req *ReadReq, resp *ReadResp) error {
}

// CloseConn closes connection specified by `connID`.
func (r *RPCGateway) CloseConn(connID *uint16, _ *struct{}) error {
func (r *RPCGateway) CloseConn(connID *uint16, _ *struct{}) (err error) {
defer rpcutil.LogCall(r.log, "CloseConn", connID)(nil, &err)

conn, err := r.popConn(*connID)
if err != nil {
return err
Expand All @@ -274,7 +270,9 @@ func (r *RPCGateway) CloseConn(connID *uint16, _ *struct{}) error {
}

// CloseListener closes listener specified by `lisID`.
func (r *RPCGateway) CloseListener(lisID *uint16, _ *struct{}) error {
func (r *RPCGateway) CloseListener(lisID *uint16, _ *struct{}) (err error) {
defer rpcutil.LogCall(r.log, "CloseConn", lisID)(nil, &err)

lis, err := r.popListener(*lisID)
if err != nil {
return err
Expand Down
7 changes: 4 additions & 3 deletions pkg/app/appserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"net"
"net/rpc"
"strings"
"sync"

"github.com/SkycoinProject/skycoin/src/util/logging"
Expand Down Expand Up @@ -33,7 +34,7 @@ func New(log *logging.Logger, sockFile string) *Server {

// Register registers an app key in RPC server.
func (s *Server) Register(appKey appcommon.Key) error {
logger := logging.MustGetLogger(fmt.Sprintf("rpc_server_%s", appKey))
logger := logging.MustGetLogger(fmt.Sprintf("app_gateway:%s", appKey))
gateway := NewRPCGateway(logger)

return s.rpcS.RegisterName(string(appKey), gateway)
Expand Down Expand Up @@ -81,8 +82,8 @@ func (s *Server) serveConn(conn net.Conn) {

<-s.stopCh

if err := conn.Close(); err != nil {
s.log.WithError(err).Error("error closing conn")
if err := conn.Close(); err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
s.log.WithError(err).Error("Unexpected error while closing conn.")
}

s.done.Done()
Expand Down
11 changes: 6 additions & 5 deletions pkg/app/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net"
"net/rpc"
"os"
"strings"

"github.com/SkycoinProject/dmsg/cipher"
"github.com/SkycoinProject/skycoin/src/util/logging"
Expand Down Expand Up @@ -115,8 +116,8 @@ func (c *Client) Dial(remote appnet.Addr) (net.Conn, error) {
if err != nil {
conn.freeConnMx.Unlock()

if err := conn.Close(); err != nil {
c.log.WithError(err).Error("error closing conn")
if err := conn.Close(); err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
c.log.WithError(err).Error("Unexpected error while closing conn.")
}

return nil, err
Expand Down Expand Up @@ -201,13 +202,13 @@ func (c *Client) Close() {

for _, lis := range listeners {
if err := lis.Close(); err != nil {
c.log.WithError(err).Error("error closing listener")
c.log.WithError(err).Error("Error closing listener.")
}
}

for _, conn := range conns {
if err := conn.Close(); err != nil {
c.log.WithError(err).Error("error closing conn")
if err := conn.Close(); err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
c.log.WithError(err).Error("Unexpected error while closing conn.")
}
}
}
7 changes: 3 additions & 4 deletions pkg/app/log_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func newBoltDB(path, appName string) (_ LogStore, err error) {
}

// Write implements io.Writer
func (l *boltDBappLogs) Write(p []byte) (int, error) {
func (l *boltDBappLogs) Write(p []byte) (n int, err error) {
// ensure there is at least timestamp long bytes
if len(p) < 37 {
return 0, io.ErrShortBuffer
Expand All @@ -79,9 +79,8 @@ func (l *boltDBappLogs) Write(p []byte) (int, error) {
}

defer func() {
err := db.Close()
if err != nil {
panic(err)
if closeErr := db.Close(); err == nil {
err = closeErr
}
}()

Expand Down
5 changes: 4 additions & 1 deletion pkg/router/route_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,10 @@ func (rg *RouteGroup) read(p []byte) (int, error) {
}

func (rg *RouteGroup) write(data []byte, tp *transport.ManagedTransport, rule routing.Rule) (int, error) {
packet := routing.MakeDataPacket(rule.NextRouteID(), data)
packet, err := routing.MakeDataPacket(rule.NextRouteID(), data)
if err != nil {
return 0, err
}

rg.logger.Debugf("Writing packet of type %s, route ID %d and next ID %d", packet.Type(),
rule.KeyRouteID(), rule.NextRouteID())
Expand Down
22 changes: 19 additions & 3 deletions pkg/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync"
"time"

"github.com/SkycoinProject/dmsg"
"github.com/SkycoinProject/dmsg/cipher"
"github.com/SkycoinProject/skycoin/src/util/logging"

Expand Down Expand Up @@ -286,7 +287,11 @@ func (r *router) serveTransportManager(ctx context.Context) {
for {
packet, err := r.tm.ReadPacket()
if err != nil {
r.logger.WithError(err).Errorf("Failed to read packet")
if err == transport.ErrNotServing {
r.logger.WithError(err).Info("Stopped reading packets")
return
}
r.logger.WithError(err).Error("Stopped reading packets due to unexpected error.")
return
}

Expand All @@ -305,7 +310,12 @@ func (r *router) serveSetup() {
for {
conn, err := r.sl.AcceptConn()
if err != nil {
r.logger.WithError(err).Warnf("setup client stopped serving")
log := r.logger.WithError(err)
if err == dmsg.ErrEntityClosed {
log.Info("Setup client stopped serving.")
} else {
log.Error("Setup client stopped serving due to unexpected error.")
}
return
}

Expand Down Expand Up @@ -553,9 +563,15 @@ func (r *router) forwardPacket(ctx context.Context, packet routing.Packet, rule
}

var p routing.Packet

switch packet.Type() {
case routing.DataPacket:
p = routing.MakeDataPacket(rule.NextRouteID(), packet.Payload())
var err error

p, err = routing.MakeDataPacket(rule.NextRouteID(), packet.Payload())
if err != nil {
return err
}
case routing.KeepAlivePacket:
p = routing.MakeKeepAlivePacket(rule.NextRouteID())
case routing.ClosePacket:
Expand Down
Loading

0 comments on commit 86d3d2b

Please sign in to comment.