From 085b9de83532c685644ff58ce0d4fb9dc35aead2 Mon Sep 17 00:00:00 2001 From: ivcosla Date: Mon, 26 Aug 2019 18:19:07 +0200 Subject: [PATCH] log in apps and further polishment --- .../therealproxy-client.go | 4 +- cmd/apps/therealproxy/therealproxy.go | 6 ++- .../therealssh-client/therealssh-client.go | 6 ++- cmd/apps/therealssh/therealssh.go | 8 +++- internal/therealproxy/client.go | 9 +++-- internal/therealproxy/server.go | 8 ++-- internal/therealproxy/server_test.go | 2 +- pkg/hypervisor/hypervisor.go | 39 +++++++++++++------ pkg/therealssh/auth.go | 2 +- pkg/therealssh/channel.go | 32 +++++++-------- pkg/therealssh/client.go | 28 ++++++------- pkg/therealssh/server.go | 14 +++---- pkg/therealssh/server_test.go | 2 +- pkg/therealssh/session.go | 9 +++-- pkg/transport/manager_test.go | 1 - pkg/visor/rpc_client.go | 5 ++- 16 files changed, 103 insertions(+), 72 deletions(-) diff --git a/cmd/apps/therealproxy-client/therealproxy-client.go b/cmd/apps/therealproxy-client/therealproxy-client.go index 26366ea4f8..044e001a4a 100644 --- a/cmd/apps/therealproxy-client/therealproxy-client.go +++ b/cmd/apps/therealproxy-client/therealproxy-client.go @@ -5,7 +5,6 @@ package main import ( "flag" - "log" "net" "time" @@ -22,6 +21,9 @@ const socksPort = 3 var r = netutil.NewRetrier(time.Second, 0, 1) func main() { + log := app.NewLogger("socksproxy-client") + therealproxy.Log = log.PackageLogger("therealproxy") + var addr = flag.String("addr", ":1080", "Client address to listen on") var serverPK = flag.String("srv", "", "PubKey of the server to connect to") flag.Parse() diff --git a/cmd/apps/therealproxy/therealproxy.go b/cmd/apps/therealproxy/therealproxy.go index f3f1fb0383..9f74777857 100644 --- a/cmd/apps/therealproxy/therealproxy.go +++ b/cmd/apps/therealproxy/therealproxy.go @@ -5,13 +5,15 @@ package main import ( "flag" - "log" "github.com/skycoin/skywire/internal/therealproxy" "github.com/skycoin/skywire/pkg/app" ) func main() { + log := app.NewLogger("socksproxy") + therealproxy.Log = log.PackageLogger("therealproxy") + var passcode = flag.String("passcode", "", "Authorize user against this passcode") flag.Parse() @@ -26,7 +28,7 @@ func main() { } }() - srv, err := therealproxy.NewServer(*passcode) + srv, err := therealproxy.NewServer(*passcode, log) if err != nil { log.Fatal("Failed to create a new server: ", err) } diff --git a/cmd/apps/therealssh-client/therealssh-client.go b/cmd/apps/therealssh-client/therealssh-client.go index 88c547b982..aac5642730 100644 --- a/cmd/apps/therealssh-client/therealssh-client.go +++ b/cmd/apps/therealssh-client/therealssh-client.go @@ -5,7 +5,6 @@ package main import ( "flag" - "log" "net/http" "github.com/sirupsen/logrus" @@ -15,7 +14,12 @@ import ( ssh "github.com/skycoin/skywire/pkg/therealssh" ) +var log *logging.MasterLogger + func main() { + log = app.NewLogger("SSH-client") + ssh.Log = log.PackageLogger("therealssh") + var rpcAddr = flag.String("rpc", ":2222", "Client RPC address to listen on") var debug = flag.Bool("debug", false, "enable debug messages") flag.Parse() diff --git a/cmd/apps/therealssh/therealssh.go b/cmd/apps/therealssh/therealssh.go index 59e244e8a3..fb273a15a0 100644 --- a/cmd/apps/therealssh/therealssh.go +++ b/cmd/apps/therealssh/therealssh.go @@ -5,7 +5,6 @@ package main import ( "flag" - "log" "github.com/mitchellh/go-homedir" "github.com/sirupsen/logrus" @@ -15,7 +14,12 @@ import ( ssh "github.com/skycoin/skywire/pkg/therealssh" ) +var log *logging.MasterLogger + func main() { + log = app.NewLogger("SSH") + ssh.Log = log.PackageLogger("therealssh") + var authFile = flag.String("auth", "~/.therealssh/authorized_keys", "Auth file location. Should contain one PubKey per line.") var debug = flag.Bool("debug", false, "enable debug messages") @@ -47,7 +51,7 @@ func main() { log.Fatal("Failed to setup Authorizer: ", err) } - server := ssh.NewServer(auth) + server := ssh.NewServer(auth, log) defer func() { if err := server.Close(); err != nil { log.Println("Failed to close server:", err) diff --git a/internal/therealproxy/client.go b/internal/therealproxy/client.go index c7f6efbf99..a0747f9f9c 100644 --- a/internal/therealproxy/client.go +++ b/internal/therealproxy/client.go @@ -9,7 +9,8 @@ import ( "github.com/skycoin/skycoin/src/util/logging" ) -var log = logging.MustGetLogger("therealproxy") +// Log is therealproxy package level logger, it can be replaced with a different one from outside the package +var Log = logging.MustGetLogger("therealproxy") // Client implement multiplexing proxy client using yamux. type Client struct { @@ -64,14 +65,14 @@ func (c *Client) ListenAndServe(addr string) error { for err := range errCh { if err := conn.Close(); err != nil { - log.WithError(err).Warn("Failed to close connection") + Log.WithError(err).Warn("Failed to close connection") } if err := stream.Close(); err != nil { - log.WithError(err).Warn("Failed to close stream") + Log.WithError(err).Warn("Failed to close stream") } if err != nil { - log.Error("Copy error:", err) + Log.Error("Copy error:", err) } } }() diff --git a/internal/therealproxy/server.go b/internal/therealproxy/server.go index 866abad52d..eeecc20191 100644 --- a/internal/therealproxy/server.go +++ b/internal/therealproxy/server.go @@ -6,16 +6,18 @@ import ( "github.com/armon/go-socks5" "github.com/hashicorp/yamux" + "github.com/skycoin/skycoin/src/util/logging" ) // Server implements multiplexing proxy server using yamux. type Server struct { socks *socks5.Server listener net.Listener + log *logging.MasterLogger } // NewServer constructs a new Server. -func NewServer(passcode string) (*Server, error) { +func NewServer(passcode string, l *logging.MasterLogger) (*Server, error) { var credentials socks5.CredentialStore if passcode != "" { credentials = passcodeCredentials(passcode) @@ -26,7 +28,7 @@ func NewServer(passcode string) (*Server, error) { return nil, fmt.Errorf("socks5: %s", err) } - return &Server{socks: s}, nil + return &Server{socks: s, log: l}, nil } // Serve accept connections from listener and serves socks5 proxy for @@ -46,7 +48,7 @@ func (s *Server) Serve(l net.Listener) error { go func() { if err := s.socks.Serve(session); err != nil { - log.Error("Failed to start SOCKS5 server:", err) + s.log.Error("Failed to start SOCKS5 server:", err) } }() } diff --git a/internal/therealproxy/server_test.go b/internal/therealproxy/server_test.go index 1fd51ab468..c11e4c45ef 100644 --- a/internal/therealproxy/server_test.go +++ b/internal/therealproxy/server_test.go @@ -22,7 +22,7 @@ func TestMain(m *testing.M) { if ok { lvl, err := logging.LevelFromString(loggingLevel) if err != nil { - log.Fatal(err) + Log.Fatal(err) } logging.SetLevel(lvl) } else { diff --git a/pkg/hypervisor/hypervisor.go b/pkg/hypervisor/hypervisor.go index 1a577a0ecf..5e94448e74 100644 --- a/pkg/hypervisor/hypervisor.go +++ b/pkg/hypervisor/hypervisor.go @@ -27,7 +27,8 @@ import ( ) var ( - log = logging.MustGetLogger("hypervisor") + log = logging.MustGetLogger("hypervisor") + healthTimeout = 5 * time.Second ) type appNodeConn struct { @@ -155,7 +156,7 @@ func (m *Node) ServeHTTP(w http.ResponseWriter, req *http.Request) { r.ServeHTTP(w, req) } -// VisorHealth represents a node's health report attached to it's pk for identification +// VisorHealth represents a node's health report attached to hypervisor to visor request status type VisorHealth struct { Status int `json:"status"` *visor.HealthInfo @@ -163,23 +164,37 @@ type VisorHealth struct { // provides summary of health information for every visor func (m *Node) getHealth() http.HandlerFunc { - healthStatuses := make([]*VisorHealth, 0, len(m.nodes)) - return m.withCtx(m.nodeCtx, func(w http.ResponseWriter, r *http.Request, ctx *httpCtx) { vh := &VisorHealth{} - hi, err := ctx.RPC.Health() - if err != nil { - vh.Status = http.StatusInternalServerError - } else { - vh.HealthInfo = hi - vh.Status = http.StatusOK - healthStatuses = append(healthStatuses, vh) + type healthRes struct { + h *visor.HealthInfo + err error + } + + resCh := make(chan healthRes) + tCh := time.After(healthTimeout) + go func() { + hi, err := ctx.RPC.Health() + resCh <- healthRes{hi, err} + }() + select { + case res := <-resCh: + if res.err != nil { + vh.Status = http.StatusInternalServerError + } else { + vh.HealthInfo = res.h + vh.Status = http.StatusOK + } + httputil.WriteJSON(w, r, http.StatusOK, vh) + return + case <-tCh: + httputil.WriteJSON(w, r, http.StatusRequestTimeout, &VisorHealth{Status: http.StatusRequestTimeout}) } - httputil.WriteJSON(w, r, http.StatusOK, healthStatuses) }) } +// getUptime gets given node's uptime func (m *Node) getUptime() http.HandlerFunc { return m.withCtx(m.nodeCtx, func(w http.ResponseWriter, r *http.Request, ctx *httpCtx) { u, err := ctx.RPC.Uptime() diff --git a/pkg/therealssh/auth.go b/pkg/therealssh/auth.go index 45cfd8fdea..a49852e519 100644 --- a/pkg/therealssh/auth.go +++ b/pkg/therealssh/auth.go @@ -72,7 +72,7 @@ func (auth *FileAuthorizer) Close() error { func (auth *FileAuthorizer) Authorize(remotePK cipher.PubKey) error { defer func() { if _, err := auth.authFile.Seek(0, 0); err != nil { - log.WithError(err).Warn("Failed to seek to the beginning of auth file") + Log.WithError(err).Warn("Failed to seek to the beginning of auth file") } }() diff --git a/pkg/therealssh/channel.go b/pkg/therealssh/channel.go index a6e129a0f3..35af51d3a7 100644 --- a/pkg/therealssh/channel.go +++ b/pkg/therealssh/channel.go @@ -79,7 +79,7 @@ func (sshCh *SSHChannel) Write(p []byte) (n int, err error) { // Request sends request message and waits for response. func (sshCh *SSHChannel) Request(requestType RequestType, payload []byte) ([]byte, error) { - log.Debugf("sending request %x", requestType) + Log.Debugf("sending request %x", requestType) req := append([]byte{byte(requestType)}, payload...) if err := sshCh.Send(CmdChannelRequest, req); err != nil { @@ -98,7 +98,7 @@ func (sshCh *SSHChannel) Request(requestType RequestType, payload []byte) ([]byt func (sshCh *SSHChannel) Serve() error { for data := range sshCh.msgCh { var err error - log.Debugf("new request %x", data[0]) + Log.Debugf("new request %x", data[0]) switch RequestType(data[0]) { case RequestPTY: var u *user.User @@ -151,10 +151,10 @@ func (sshCh *SSHChannel) SocketPath() string { // ServeSocket starts socket handling loop. func (sshCh *SSHChannel) ServeSocket() error { if err := os.Remove(sshCh.SocketPath()); err != nil { - log.WithError(err).Warn("Failed to remove SSH channel socket file") + Log.WithError(err).Warn("Failed to remove SSH channel socket file") } - log.Debugf("waiting for new socket connections on: %s", sshCh.SocketPath()) + Log.Debugf("waiting for new socket connections on: %s", sshCh.SocketPath()) l, err := net.ListenUnix("unix", &net.UnixAddr{Name: sshCh.SocketPath(), Net: "unix"}) if err != nil { return fmt.Errorf("failed to open unix socket: %s", err) @@ -168,22 +168,22 @@ func (sshCh *SSHChannel) ServeSocket() error { return fmt.Errorf("failed to accept connection: %s", err) } - log.Debugln("got new socket connection") + Log.Debugln("got new socket connection") defer func() { if err := conn.Close(); err != nil { - log.WithError(err).Warn("Failed to close connection") + Log.WithError(err).Warn("Failed to close connection") } if err := sshCh.closeListener(); err != nil { - log.WithError(err).Warn("Failed to close listener") + Log.WithError(err).Warn("Failed to close listener") } if err := os.Remove(sshCh.SocketPath()); err != nil { - log.WithError(err).Warn("Failed to close SSH channel socket file") + Log.WithError(err).Warn("Failed to close SSH channel socket file") } }() go func() { if _, err := io.Copy(sshCh, conn); err != nil && !strings.Contains(err.Error(), "use of closed network connection") { - log.Errorf("failed to write to server:", err) + Log.Errorf("failed to write to server:", err) return } }() @@ -201,7 +201,7 @@ func (sshCh *SSHChannel) OpenPTY(user *user.User, sz *pty.Winsize) (err error) { return errors.New("session is already started") } - log.Debugf("starting new session for %s with %#v", user.Username, sz) + Log.Debugf("starting new session for %s with %#v", user.Username, sz) sshCh.session, err = OpenSession(user, sz) if err != nil { sshCh.session = nil @@ -224,11 +224,11 @@ func (sshCh *SSHChannel) Start(command string) error { go func() { if err := sshCh.serveSession(); err != nil { - log.Error("Session failure:", err) + Log.Error("Session failure:", err) } }() - log.Debugf("starting new pty process %s", command) + Log.Debugf("starting new pty process %s", command) return sshCh.session.Start(command) } @@ -246,7 +246,7 @@ func (sshCh *SSHChannel) Run(command string) error { go func() { _, err := sshCh.Write(out) if err != nil { - log.Warn("error writing to channel: ", err) + Log.Warn("error writing to channel: ", err) } }() return err @@ -255,16 +255,16 @@ func (sshCh *SSHChannel) Run(command string) error { func (sshCh *SSHChannel) serveSession() error { defer func() { if err := sshCh.Send(CmdChannelServerClose, nil); err != nil { - log.WithError(err).Warn("Failed to send to SSH channel") + Log.WithError(err).Warn("Failed to send to SSH channel") } if err := sshCh.Close(); err != nil { - log.WithError(err).Warn("Failed to close SSH channel") + Log.WithError(err).Warn("Failed to close SSH channel") } }() go func() { if _, err := io.Copy(sshCh.session, sshCh); err != nil { - log.Error("PTY copy: ", err) + Log.Error("PTY copy: ", err) return } }() diff --git a/pkg/therealssh/client.go b/pkg/therealssh/client.go index 86a5207d82..41365f891b 100644 --- a/pkg/therealssh/client.go +++ b/pkg/therealssh/client.go @@ -57,7 +57,7 @@ func (c *Client) OpenChannel(remotePK cipher.PubKey) (localID uint32, sshCh *SSH } sshCh = OpenClientChannel(0, remotePK, conn) - log.Debugln("sending channel open command") + Log.Debugln("sending channel open command") localID = c.chans.add(sshCh) req := appendU32([]byte{byte(CmdChannelOpen)}, localID) if _, err := conn.Write(req); err != nil { @@ -67,13 +67,13 @@ func (c *Client) OpenChannel(remotePK cipher.PubKey) (localID uint32, sshCh *SSH go func() { if err := c.serveConn(conn); err != nil { - log.Error(err) + Log.Error(err) } }() - log.Debugln("waiting for channel open response") + Log.Debugln("waiting for channel open response") data := <-sshCh.msgCh - log.Debugln("got channel open response") + Log.Debugln("got channel open response") if data[0] == ResponseFail { cErr = fmt.Errorf("failed to open channel: %s", string(data[1:])) return @@ -121,7 +121,7 @@ func (c *Client) serveConn(conn net.Conn) error { } data := payload[5:] - log.Debugf("got new command: %x", payload[0]) + Log.Debugf("got new command: %x", payload[0]) switch CommandType(payload[0]) { case CmdChannelOpenResponse, CmdChannelResponse: sshCh.msgCh <- data @@ -151,7 +151,7 @@ func (c *Client) Close() error { for _, sshCh := range c.chans.dropAll() { if err := sshCh.Close(); err != nil { - log.WithError(err).Warn("Failed to close SSH channel") + Log.WithError(err).Warn("Failed to close SSH channel") } } @@ -165,13 +165,13 @@ type RPCClient struct { // RequestPTY defines RPC request for a new PTY session. func (rpc *RPCClient) RequestPTY(args *RequestPTYArgs, channelID *uint32) error { - log.Debugln("requesting SSH channel") + Log.Debugln("requesting SSH channel") localID, channel, err := rpc.c.OpenChannel(args.RemotePK) if err != nil { return err } - log.Debugln("requesting PTY session") + Log.Debugln("requesting PTY session") if _, err := channel.Request(RequestPTY, args.ToBinary()); err != nil { return fmt.Errorf("PTY request failure: %s", err) } @@ -187,7 +187,7 @@ func (rpc *RPCClient) Exec(args *ExecArgs, socketPath *string) error { return errors.New("unknown channel") } - log.Debugln("requesting shell process") + Log.Debugln("requesting shell process") if args.CommandWithArgs == nil { if _, err := sshCh.Request(RequestShell, nil); err != nil { return fmt.Errorf("shell request failure: %s", err) @@ -200,10 +200,10 @@ func (rpc *RPCClient) Exec(args *ExecArgs, socketPath *string) error { waitCh := make(chan bool) go func() { - log.Debugln("starting socket listener") + Log.Debugln("starting socket listener") waitCh <- true if err := sshCh.ServeSocket(); err != nil { - log.Error("Session failure:", err) + Log.Error("Session failure:", err) } }() @@ -219,17 +219,17 @@ func (rpc *RPCClient) Run(args *ExecArgs, socketPath *string) error { return errors.New("unknown channel") } - log.Debugln("requesting shell-less process execution") + Log.Debugln("requesting shell-less process execution") if _, err := sshCh.Request(RequestExecWithoutShell, args.ToBinary()); err != nil { return fmt.Errorf("run command request failure: %s", err) } waitCh := make(chan bool) go func() { - log.Debugln("starting socket listener") + Log.Debugln("starting socket listener") waitCh <- true if err := sshCh.ServeSocket(); err != nil { - log.Error("Session failure:", err) + Log.Error("Session failure:", err) } }() diff --git a/pkg/therealssh/server.go b/pkg/therealssh/server.go index 1602d5c153..3d28e1edbb 100644 --- a/pkg/therealssh/server.go +++ b/pkg/therealssh/server.go @@ -64,13 +64,13 @@ type Server struct { } // NewServer constructs new Server. -func NewServer(auth Authorizer) *Server { - return &Server{logging.MustGetLogger("therealssh_server"), auth, newChanList()} +func NewServer(auth Authorizer, log *logging.MasterLogger) *Server { + return &Server{log.PackageLogger("therealssh_server"), auth, newChanList()} } // OpenChannel opens new client channel. func (s *Server) OpenChannel(remoteAddr routing.Addr, remoteID uint32, conn net.Conn) error { - log.Debugln("opening new channel") + Log.Debugln("opening new channel") channel := OpenChannel(remoteID, remoteAddr, conn) var res []byte @@ -83,7 +83,7 @@ func (s *Server) OpenChannel(remoteAddr routing.Addr, remoteID uint32, conn net. s.log.Debugln("sending response") if err := channel.Send(CmdChannelOpenResponse, res); err != nil { if err := channel.Close(); err != nil { - log.WithError(err).Warn("Failed to close channel") + Log.WithError(err).Warn("Failed to close channel") } return fmt.Errorf("channel response failure: %s", err) } @@ -91,7 +91,7 @@ func (s *Server) OpenChannel(remoteAddr routing.Addr, remoteID uint32, conn net. go func() { s.log.Debugln("listening for channel requests") if err := channel.Serve(); err != nil { - log.Error("channel failure:", err) + Log.Error("channel failure:", err) } }() @@ -107,7 +107,7 @@ func (s *Server) HandleRequest(remotePK cipher.PubKey, localID uint32, data []by if s.auth.Authorize(remotePK) != nil || channel.RemoteAddr.PubKey != remotePK { if err := channel.Send(CmdChannelResponse, responseUnauthorized); err != nil { - log.Error("failed to send response: ", err) + Log.Error("failed to send response: ", err) } return nil } @@ -188,7 +188,7 @@ func (s *Server) Close() error { for _, channel := range s.chans.dropAll() { if err := channel.Close(); err != nil { - log.WithError(err).Warn("Failed to close channel") + Log.WithError(err).Warn("Failed to close channel") } } diff --git a/pkg/therealssh/server_test.go b/pkg/therealssh/server_test.go index 1c9f2b65e3..64ec0187ee 100644 --- a/pkg/therealssh/server_test.go +++ b/pkg/therealssh/server_test.go @@ -19,7 +19,7 @@ func TestMain(m *testing.M) { if ok { lvl, err := logging.LevelFromString(loggingLevel) if err != nil { - log.Fatal(err) + Log.Fatal(err) } logging.SetLevel(lvl) } else { diff --git a/pkg/therealssh/session.go b/pkg/therealssh/session.go index 06bd1bd3ca..74a70ec8cb 100644 --- a/pkg/therealssh/session.go +++ b/pkg/therealssh/session.go @@ -14,7 +14,8 @@ import ( "github.com/skycoin/skycoin/src/util/logging" ) -var log = logging.MustGetLogger("therealssh") +// Log is the package level logger, which can be replaced from outside +var Log = logging.MustGetLogger("therealssh") // Session represents PTY sessions. Channel normally handles Session's lifecycle. type Session struct { @@ -39,7 +40,7 @@ func OpenSession(user *user.User, sz *pty.Winsize) (s *Session, err error) { if err = pty.Setsize(s.pty, sz); err != nil { if closeErr := s.Close(); closeErr != nil { - log.WithError(closeErr).Warn("Failed to close session") + Log.WithError(closeErr).Warn("Failed to close session") } err = fmt.Errorf("failed to set PTY size: %s", err) } @@ -51,7 +52,7 @@ func OpenSession(user *user.User, sz *pty.Winsize) (s *Session, err error) { func (s *Session) Start(command string) (err error) { defer func() { if err := s.tty.Close(); err != nil { - log.WithError(err).Warn("Failed to close TTY") + Log.WithError(err).Warn("Failed to close TTY") } }() @@ -100,7 +101,7 @@ func (s *Session) Run(command string) ([]byte, error) { defer func() { err = ptmx.Close() if err != nil { - log.Warn("unable to close pty") + Log.Warn("unable to close pty") } }() // Best effort. diff --git a/pkg/transport/manager_test.go b/pkg/transport/manager_test.go index 3ed210f86b..6c4ba723cd 100644 --- a/pkg/transport/manager_test.go +++ b/pkg/transport/manager_test.go @@ -14,7 +14,6 @@ import ( "github.com/skycoin/skywire/pkg/routing" "github.com/skycoin/skywire/pkg/transport" - "github.com/skycoin/skywire/pkg/transport/dmsg" "github.com/skycoin/dmsg/cipher" "github.com/stretchr/testify/assert" diff --git a/pkg/visor/rpc_client.go b/pkg/visor/rpc_client.go index 4c8b246224..80510ddc43 100644 --- a/pkg/visor/rpc_client.go +++ b/pkg/visor/rpc_client.go @@ -288,8 +288,9 @@ func NewMockRPCClient(r *rand.Rand, maxTps int, maxRules int) (cipher.PubKey, RP Transports: tps, RoutesCount: rt.Count(), }, - tpTypes: types, - rt: rt, + tpTypes: types, + rt: rt, + startedAt: time.Now(), } return localPK, client, nil }