diff --git a/pkg/app/app.go b/pkg/app/app.go deleted file mode 100644 index 3c13e7970c..0000000000 --- a/pkg/app/app.go +++ /dev/null @@ -1,316 +0,0 @@ -/* -Package app implements app to node communication interface. -*/ -package app - -import ( - "encoding/json" - "errors" - "fmt" - "io" - "net" - "os" - "os/exec" - "path/filepath" - "sync" - - "github.com/skycoin/skycoin/src/util/logging" - - "github.com/skycoin/skywire/pkg/routing" -) - -const ( - // DefaultIn holds value of inFd for Apps setup via Node - DefaultIn = uintptr(3) - - // DefaultOut holds value of outFd for Apps setup via Node - DefaultOut = uintptr(4) -) - -var ( - log = logging.MustGetLogger("app") -) - -// Config defines configuration parameters for App -type Config struct { - AppName string `json:"app-name"` - AppVersion string `json:"app-version"` - ProtocolVersion string `json:"protocol-version"` -} - -// App represents client side in app's client-server communication -// interface. -type App struct { - config Config - proto *Protocol - - acceptChan chan routing.RouteDescriptor - doneChan chan struct{} - - conns map[routing.RouteDescriptor]net.Conn - mu sync.Mutex -} - -// Command setups pipe connection and returns *exec.Cmd for an App -// with initialized connection. -func Command(config *Config, appsPath string, args []string, env []string) (net.Conn, *exec.Cmd, error) { - srvConn, clientConn, err := OpenPipeConn() - if err != nil { - return nil, nil, fmt.Errorf("failed to open piped connection: %s", err) - } - - binaryPath := filepath.Join(appsPath, fmt.Sprintf("%s.v%s", config.AppName, config.AppVersion)) - cmd := exec.Command(binaryPath, args...) // nolint:gosec - cmd.ExtraFiles = []*os.File{clientConn.inFile, clientConn.outFile} - cmd.Env = env - - return srvConn, cmd, nil -} - -// SetupFromPipe connects to a pipe, starts protocol loop and performs -// initialization request with the Server. -func SetupFromPipe(config *Config, inFD, outFD uintptr) (*App, error) { - pipeConn, err := NewPipeConn(inFD, outFD) - if err != nil { - return nil, fmt.Errorf("failed to open pipe: %s", err) - } - - app := &App{ - config: *config, - proto: NewProtocol(pipeConn), - acceptChan: make(chan routing.RouteDescriptor), - doneChan: make(chan struct{}), - conns: make(map[routing.RouteDescriptor]net.Conn), - } - - go app.handleProto() - - if err := app.proto.Send(FrameInit, config, nil); err != nil { - if err := app.Close(); err != nil { - log.WithError(err).Warn("Failed to close app") - } - return nil, fmt.Errorf("INIT handshake failed: %s", err) - } - - return app, nil -} - -// New creates a new App directly from a `net.Conn` implementation. -func New(conn net.Conn, conf *Config) (*App, error) { - app := &App{ - config: *conf, - proto: NewProtocol(conn), - acceptChan: make(chan routing.RouteDescriptor), - doneChan: make(chan struct{}), - conns: make(map[routing.RouteDescriptor]net.Conn), - } - - go app.handleProto() - - if err := app.proto.Send(FrameInit, conf, nil); err != nil { - if err := app.Close(); err != nil { - log.WithError(err).Warn("Failed to close app") - } - return nil, fmt.Errorf("INIT handshake failed: %s", err) - } - - return app, nil -} - -// Setup setups app using default pair of pipes -func Setup(config *Config) (*App, error) { - return SetupFromPipe(config, DefaultIn, DefaultOut) -} - -// Close implements io.Closer for an App. -func (app *App) Close() error { - if app == nil { - return nil - } - - select { - case <-app.doneChan: // already closed - default: - close(app.doneChan) - } - - app.mu.Lock() - for addr, conn := range app.conns { - connAddr := addr - if err := app.proto.Send(FrameClose, &connAddr, nil); err != nil { - log.WithError(err).Warn("Failed to send command frame") - } - if err := conn.Close(); err != nil { - log.WithError(err).Warn("Failed to close connection") - } - } - app.mu.Unlock() - - return app.proto.Close() -} - -// Accept awaits for incoming loop confirmation request from a Node and -// returns net.Conn for received loop. -func (app *App) Accept() (net.Conn, error) { - desc := <-app.acceptChan - - conn, out := net.Pipe() - app.mu.Lock() - app.conns[desc] = conn - app.mu.Unlock() - go app.serveConn(desc, conn) - return newAppConn(out, desc.Src(), desc.Dst()), nil -} - -// Dial sends create loop request to a Node and returns net.Conn for created loop. -func (app *App) Dial(raddr routing.Addr) (net.Conn, error) { - var laddr routing.Addr - err := app.proto.Send(FrameCreateRoutes, raddr, &laddr) - if err != nil { - return nil, err - } - - desc := routing.NewRouteDescriptor(laddr.PubKey, raddr.PubKey, laddr.Port, raddr.Port) - conn, out := net.Pipe() - - app.mu.Lock() - app.conns[desc] = conn - app.mu.Unlock() - - go app.serveConn(desc, conn) - return newAppConn(out, laddr, raddr), nil -} - -// Addr returns empty Addr, implements net.Listener. -func (app *App) Addr() net.Addr { - return routing.Addr{} -} - -func (app *App) handleProto() { - err := app.proto.Serve(func(frame Frame, payload []byte) (res interface{}, err error) { - switch frame { - case FrameRoutesCreated: - var routes []routing.Route - err = json.Unmarshal(payload, &routes) - if err != nil { - break - } - - err = app.confirmRoutes(routes) - case FrameSend: - err = app.forwardPacket(payload) - case FrameClose: - err = app.closeConn(payload) - default: - err = errors.New("unexpected frame") - } - - return res, err - }) - - if err != nil { - return - } -} - -func (app *App) serveConn(desc routing.RouteDescriptor, conn io.ReadWriteCloser) { - defer func() { - if err := conn.Close(); err != nil { - log.WithError(err).Warn("failed to close connection") - } - }() - - for { - buf := make([]byte, 32*1024) - n, err := conn.Read(buf) - if err != nil { - break - } - - packet := &Packet{Desc: desc, Payload: buf[:n]} - if err := app.proto.Send(FrameSend, packet, nil); err != nil { - break - } - } - - app.mu.Lock() - if _, ok := app.conns[desc]; ok { - if err := app.proto.Send(FrameClose, &desc, nil); err != nil { - log.WithError(err).Warn("Failed to send command frame") - } - } - delete(app.conns, desc) - app.mu.Unlock() -} - -func (app *App) forwardPacket(data []byte) error { - packet := &Packet{} - if err := json.Unmarshal(data, packet); err != nil { - return err - } - - app.mu.Lock() - conn := app.conns[packet.Desc] - app.mu.Unlock() - - if conn == nil { - return errors.New("no listeners") - } - - _, err := conn.Write(packet.Payload) - return err -} - -func (app *App) closeConn(data []byte) error { - var route routing.Route - if err := json.Unmarshal(data, &route); err != nil { - return err - } - - app.mu.Lock() - conn := app.conns[route.Desc] - delete(app.conns, route.Desc) - app.mu.Unlock() - - if conn != nil { - return conn.Close() - } - return nil -} - -func (app *App) confirmRoutes(routes []routing.Route) error { - for _, route := range routes { - app.mu.Lock() - conn := app.conns[route.Desc] - app.mu.Unlock() - - if conn != nil { - return errors.New("loop is already created") - } - - select { - case app.acceptChan <- route.Desc: - default: - } - } - - return nil -} - -type appConn struct { - net.Conn - laddr routing.Addr - raddr routing.Addr -} - -func newAppConn(conn net.Conn, laddr, raddr routing.Addr) *appConn { - return &appConn{conn, laddr, raddr} -} - -func (conn *appConn) LocalAddr() net.Addr { - return conn.laddr -} - -func (conn *appConn) RemoteAddr() net.Addr { - return conn.raddr -} diff --git a/pkg/app/app_test.go b/pkg/app/app_test.go deleted file mode 100644 index 2aafc86cfb..0000000000 --- a/pkg/app/app_test.go +++ /dev/null @@ -1,332 +0,0 @@ -package app - -import ( - "encoding/json" - "errors" - "io" - "net" - "os" - "testing" - "time" - - "github.com/skycoin/dmsg/cipher" - "github.com/skycoin/skycoin/src/util/logging" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/skycoin/skywire/internal/testhelpers" - "github.com/skycoin/skywire/pkg/routing" -) - -func TestMain(m *testing.M) { - loggingLevel, ok := os.LookupEnv("TEST_LOGGING_LEVEL") - if ok { - lvl, err := logging.LevelFromString(loggingLevel) - if err != nil { - log.Fatal(err) - } - logging.SetLevel(lvl) - } else { - logging.Disable() - } - - os.Exit(m.Run()) -} - -// func TestAppDial(t *testing.T) { -// lpk, _ := cipher.GenerateKeyPair() -// rpk, _ := cipher.GenerateKeyPair() -// -// in, out := net.Pipe() -// proto := NewProtocol(out) -// app := &App{proto: NewProtocol(in), conns: make(map[routing.RouteDescriptor]net.Conn)} -// go app.handleProto() -// -// dataCh := make(chan []byte) -// serveErrCh := make(chan error, 1) -// go func() { -// f := func(f Frame, p []byte) (interface{}, error) { -// if f == FrameCreateRoutes { -// return &routing.Addr{PubKey: lpk, Port: 2}, nil -// } -// -// if f == FrameClose { -// go func() { dataCh <- p }() -// return nil, nil -// } -// -// return nil, errors.New("unexpected frame") -// } -// serveErrCh <- proto.Serve(f) -// }() -// conn, err := app.Dial(routing.Addr{PubKey: rpk, Port: 3}) -// require.NoError(t, err) -// require.NotNil(t, conn) -// assert.Equal(t, rpk.Hex()+":3", conn.RemoteAddr().String()) -// assert.Equal(t, lpk.Hex()+":2", conn.LocalAddr().String()) -// -// desc := routing.NewRouteDescriptor(lpk, rpk, 2, 3) -// require.NotNil(t, app.conns[desc]) -// require.NoError(t, conn.Close()) -// -// // Justified. Attempt to remove produces: FAIL -// time.Sleep(100 * time.Millisecond) -// -// var loop routing.Loop -// require.NoError(t, json.Unmarshal(<-dataCh, &loop)) -// assert.Equal(t, routing.Port(2), loop.Local.Port) -// assert.Equal(t, rpk, loop.Remote.PubKey) -// assert.Equal(t, routing.Port(3), loop.Remote.Port) -// -// app.mu.Lock() -// require.Len(t, app.conns, 0) -// app.mu.Unlock() -// require.NoError(t, proto.Close()) -// require.NoError(t, testhelpers.WithinTimeout(serveErrCh)) -// } - -func TestAppAccept(t *testing.T) { - lpk, _ := cipher.GenerateKeyPair() - rpk, _ := cipher.GenerateKeyPair() - in, out := net.Pipe() - app := &App{ - proto: NewProtocol(in), - acceptChan: make(chan routing.RouteDescriptor, 2), - conns: make(map[routing.RouteDescriptor]net.Conn), - } - go app.handleProto() - - proto := NewProtocol(out) - serveErrCh := make(chan error, 1) - go func() { - serveErrCh <- proto.Serve(nil) - }() - - connCh := make(chan net.Conn) - errCh := make(chan error) - go func() { - conn, err := app.Accept() - errCh <- err - connCh <- conn - }() - - require.NoError(t, proto.Send(FrameRoutesCreated, [2]routing.Addr{{PubKey: lpk, Port: 2}, {PubKey: rpk, Port: 3}}, nil)) - - require.NoError(t, <-errCh) - conn := <-connCh - require.NotNil(t, conn) - assert.Equal(t, rpk.Hex()+":3", conn.RemoteAddr().String()) - assert.Equal(t, lpk.Hex()+":2", conn.LocalAddr().String()) - require.Len(t, app.conns, 1) - - go func() { - conn, err := app.Accept() - errCh <- err - connCh <- conn - }() - - require.NoError(t, proto.Send(FrameRoutesCreated, [2]routing.Addr{{PubKey: lpk, Port: 2}, {PubKey: rpk, Port: 2}}, nil)) - - require.NoError(t, <-errCh) - conn = <-connCh - require.NotNil(t, conn) - assert.Equal(t, rpk.Hex()+":2", conn.RemoteAddr().String()) - assert.Equal(t, lpk.Hex()+":2", conn.LocalAddr().String()) - require.Len(t, app.conns, 2) - require.NoError(t, proto.Close()) - require.NoError(t, testhelpers.WithinTimeout(serveErrCh)) -} - -func TestAppWrite(t *testing.T) { - lpk, _ := cipher.GenerateKeyPair() - rpk, _ := cipher.GenerateKeyPair() - in, out := net.Pipe() - appIn, appOut := net.Pipe() - app := &App{proto: NewProtocol(in)} - go app.handleProto() - - desc := routing.NewRouteDescriptor(lpk, rpk, 2, 3) - go app.serveConn(desc, appIn) - - proto := NewProtocol(out) - dataCh := make(chan []byte) - serveErrCh := make(chan error, 1) - go func() { - f := func(f Frame, p []byte) (interface{}, error) { - if f != FrameSend { - return nil, errors.New("unexpected frame") - } - - go func() { dataCh <- p }() - return nil, nil - } - serveErrCh <- proto.Serve(f) - }() - - n, err := appOut.Write([]byte("foo")) - require.NoError(t, err) - assert.Equal(t, 3, n) - - packet := &Packet{} - require.NoError(t, json.Unmarshal(<-dataCh, packet)) - assert.Equal(t, rpk, packet.Desc.DstPK()) - assert.Equal(t, routing.Port(3), packet.Desc.DstPort()) - assert.Equal(t, routing.Port(2), packet.Desc.SrcPort()) - assert.Equal(t, lpk, packet.Desc.SrcPK()) - assert.Equal(t, []byte("foo"), packet.Payload) - - require.NoError(t, proto.Close()) - require.NoError(t, testhelpers.WithinTimeout(serveErrCh)) - require.NoError(t, appOut.Close()) -} - -func TestAppRead(t *testing.T) { - lpk, _ := cipher.GenerateKeyPair() - pk, _ := cipher.GenerateKeyPair() - in, out := net.Pipe() - appIn, appOut := net.Pipe() - - desc := routing.NewRouteDescriptor(lpk, pk, 2, 3) - conns := map[routing.RouteDescriptor]net.Conn{ - desc: appIn, - } - app := &App{proto: NewProtocol(in), conns: conns} - go app.handleProto() - - proto := NewProtocol(out) - serveErrCh := make(chan error, 1) - go func() { - serveErrCh <- proto.Serve(nil) - }() - - errCh := make(chan error) - go func() { - errCh <- proto.Send(FrameSend, &Packet{desc, []byte("foo")}, nil) - }() - - buf := make([]byte, 3) - n, err := appOut.Read(buf) - require.NoError(t, err) - assert.Equal(t, 3, n) - assert.Equal(t, []byte("foo"), buf) - - require.NoError(t, <-errCh) - - require.NoError(t, proto.Close()) - require.NoError(t, testhelpers.WithinTimeout(serveErrCh)) - require.NoError(t, appOut.Close()) -} - -func TestAppSetup(t *testing.T) { - srvConn, clientConn, err := OpenPipeConn() - require.NoError(t, err) - - require.NoError(t, srvConn.SetDeadline(time.Now().Add(time.Second))) - require.NoError(t, clientConn.SetDeadline(time.Now().Add(time.Second))) - - proto := NewProtocol(srvConn) - dataCh := make(chan []byte) - serveErrCh := make(chan error, 1) - go func() { - f := func(f Frame, p []byte) (interface{}, error) { - if f != FrameInit { - return nil, errors.New("unexpected frame") - } - - go func() { dataCh <- p }() - return nil, nil - } - serveErrCh <- proto.Serve(f) - }() - - inFd, outFd := clientConn.Fd() - _, err = SetupFromPipe(&Config{AppName: "foo", AppVersion: "0.0.1", ProtocolVersion: "0.0.1"}, inFd, outFd) - require.NoError(t, err) - - config := &Config{} - require.NoError(t, json.Unmarshal(<-dataCh, config)) - assert.Equal(t, "foo", config.AppName) - assert.Equal(t, "0.0.1", config.AppVersion) - assert.Equal(t, "0.0.1", config.ProtocolVersion) - - require.NoError(t, proto.Close()) - require.NoError(t, testhelpers.WithinTimeout(serveErrCh)) -} - -func TestAppCloseConn(t *testing.T) { - lpk, _ := cipher.GenerateKeyPair() - rpk, _ := cipher.GenerateKeyPair() - in, out := net.Pipe() - appIn, appOut := net.Pipe() - desc := routing.NewRouteDescriptor(lpk, rpk, 2, 3) - conns := map[routing.RouteDescriptor]net.Conn{desc: appIn} - app := &App{proto: NewProtocol(in), conns: conns} - go app.handleProto() - - proto := NewProtocol(out) - serveErrCh := make(chan error, 1) - go func() { - serveErrCh <- proto.Serve(nil) - }() - - errCh := make(chan error) - go func() { - errCh <- proto.Send(FrameClose, desc, nil) - }() - - _, err := appOut.Read(make([]byte, 3)) - require.Equal(t, io.EOF, err) - require.Len(t, app.conns, 0) - - require.NoError(t, proto.Close()) - require.NoError(t, testhelpers.WithinTimeout(serveErrCh)) -} - -// func TestAppClose(t *testing.T) { -// lpk, _ := cipher.GenerateKeyPair() -// rpk, _ := cipher.GenerateKeyPair() -// in, out := net.Pipe() -// appIn, appOut := net.Pipe() -// -// desc := routing.NewRouteDescriptor(lpk, rpk, 2, 3) -// conns := map[routing.RouteDescriptor]net.Conn{desc: appIn} -// app := &App{proto: NewProtocol(in), conns: conns, doneChan: make(chan struct{})} -// go app.handleProto() -// -// proto := NewProtocol(out) -// dataCh := make(chan []byte) -// serveErrCh := make(chan error, 1) -// go func() { -// f := func(f Frame, p []byte) (interface{}, error) { -// if f != FrameClose { -// return nil, errors.New("unexpected frame") -// } -// -// go func() { dataCh <- p }() -// return nil, nil -// } -// -// serveErrCh <- proto.Serve(f) -// }() -// require.NoError(t, app.Close()) -// -// _, err := appOut.Read(make([]byte, 3)) -// require.Equal(t, io.EOF, err) -// -// var loop routing.Loop -// require.NoError(t, json.Unmarshal(<-dataCh, &loop)) -// assert.Equal(t, lpk, loop.Local.PubKey) -// assert.Equal(t, routing.Port(2), loop.Local.Port) -// assert.Equal(t, rpk, loop.Remote.PubKey) -// assert.Equal(t, routing.Port(3), loop.Remote.Port) -// -// require.NoError(t, proto.Close()) -// require.NoError(t, testhelpers.WithinTimeout(serveErrCh)) -// } - -func TestAppCommand(t *testing.T) { - conn, cmd, err := Command(&Config{}, "/apps", nil) - require.NoError(t, err) - assert.NotNil(t, conn) - assert.NotNil(t, cmd) -} diff --git a/pkg/app/conn.go b/pkg/app/conn.go deleted file mode 100644 index 43eb4e5dfa..0000000000 --- a/pkg/app/conn.go +++ /dev/null @@ -1,130 +0,0 @@ -package app - -import ( - "fmt" - "net" - "os" - "time" -) - -// PipeAddr implements net.Addr for PipeConn. -type PipeAddr struct { - pipePath string -} - -// Network returns custom pipe Network type. -func (pa *PipeAddr) Network() string { - return "pipe" -} - -func (pa *PipeAddr) String() string { - return pa.pipePath -} - -// PipeConn implements net.Conn interface over a pair of unix pipes. -type PipeConn struct { - inFile *os.File - outFile *os.File -} - -// OpenPipeConn creates a pair of unix pipe and setups PipeConn over -// that pair. -func OpenPipeConn() (srvConn *PipeConn, clientConn *PipeConn, err error) { - srvIn, clientOut, err := os.Pipe() - if err != nil { - err = fmt.Errorf("failed to open server pipe: %s", err) - return - } - - clientIn, srvOut, err := os.Pipe() - if err != nil { - err = fmt.Errorf("failed to open client pipe: %s", err) - return - } - - clientConn = &PipeConn{clientIn, clientOut} - srvConn = &PipeConn{srvIn, srvOut} - return srvConn, clientConn, err -} - -// NewPipeConn constructs new PipeConn from already opened pipe fds. -func NewPipeConn(inFd, outFd uintptr) (*PipeConn, error) { - inFile := os.NewFile(inFd, "|0") - if _, err := inFile.Stat(); os.IsNotExist(err) { - return nil, fmt.Errorf("inFile does not exist") - } - - outFile := os.NewFile(outFd, "|1") - if _, err := outFile.Stat(); os.IsNotExist(err) { - return nil, fmt.Errorf("outFile does not exist") - } - - return &PipeConn{inFile, outFile}, nil -} - -func (conn *PipeConn) Read(b []byte) (n int, err error) { - return conn.inFile.Read(b) -} - -func (conn *PipeConn) Write(b []byte) (n int, err error) { - return conn.outFile.Write(b) -} - -// Close closes the connection. -func (conn *PipeConn) Close() error { - if conn == nil { - return nil - } - - inErr := conn.inFile.Close() - outErr := conn.outFile.Close() - if inErr != nil { - return fmt.Errorf("failed to close input pipe: %s", inErr) - } - - if outErr != nil { - return fmt.Errorf("failed to close output pipe: %s", outErr) - } - - return nil -} - -// LocalAddr returns the local network address. -func (conn *PipeConn) LocalAddr() net.Addr { - return &PipeAddr{conn.inFile.Name()} -} - -// RemoteAddr returns the remote network address. -func (conn *PipeConn) RemoteAddr() net.Addr { - return &PipeAddr{conn.outFile.Name()} -} - -// SetDeadline implements the Conn SetDeadline method. -func (conn *PipeConn) SetDeadline(t time.Time) error { - if err := conn.inFile.SetDeadline(t); err != nil { - return fmt.Errorf("failed to set input pipe deadline: %s", err) - } - - if err := conn.outFile.SetDeadline(t); err != nil { - return fmt.Errorf("failed to set out pipe deadline: %s", err) - } - - return nil -} - -// SetReadDeadline implements the Conn SetReadDeadline method. -func (conn *PipeConn) SetReadDeadline(t time.Time) error { - return conn.inFile.SetDeadline(t) -} - -// SetWriteDeadline implements the Conn SetWriteDeadline method. -func (conn *PipeConn) SetWriteDeadline(t time.Time) error { - return conn.outFile.SetDeadline(t) -} - -// Fd returns file descriptors for a pipe pair -func (conn *PipeConn) Fd() (inFd uintptr, outFd uintptr) { - inFd = conn.inFile.Fd() - outFd = conn.outFile.Fd() - return -} diff --git a/pkg/app/conn_test.go b/pkg/app/conn_test.go deleted file mode 100644 index 4d4e580afe..0000000000 --- a/pkg/app/conn_test.go +++ /dev/null @@ -1,92 +0,0 @@ -package app - -import ( - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestPipeConn(t *testing.T) { - srv, client, err := OpenPipeConn() - require.NoError(t, err) - - t.Run("server can communicate with client", func(t *testing.T) { - n, err := srv.Write([]byte("foo")) - require.NoError(t, err) - assert.Equal(t, 3, n) - - buf := make([]byte, 3) - n, err = client.Read(buf) - require.NoError(t, err) - assert.Equal(t, 3, n) - assert.Equal(t, []byte("foo"), buf) - }) - - t.Run("client can communicate with server", func(t *testing.T) { - n, err := client.Write([]byte("foo")) - require.NoError(t, err) - assert.Equal(t, 3, n) - - buf := make([]byte, 3) - n, err = srv.Read(buf) - require.NoError(t, err) - assert.Equal(t, 3, n) - assert.Equal(t, []byte("foo"), buf) - }) - - t.Run("returns valid addresses", func(t *testing.T) { - require.NotNil(t, srv.LocalAddr()) - require.NotNil(t, srv.RemoteAddr()) - - require.NotNil(t, client.LocalAddr()) - require.NotNil(t, client.RemoteAddr()) - }) - - t.Run("can set deadlines", func(t *testing.T) { - deadline := time.Now().Add(500 * time.Millisecond) - - require.NoError(t, srv.SetDeadline(deadline)) - require.NoError(t, srv.SetReadDeadline(deadline)) - require.NoError(t, srv.SetWriteDeadline(deadline)) - - require.NoError(t, client.SetDeadline(deadline)) - require.NoError(t, client.SetReadDeadline(deadline)) - require.NoError(t, client.SetWriteDeadline(deadline)) - }) - - t.Run("deadline failures", func(t *testing.T) { - buf := make([]byte, 4) - _, err := srv.Read(buf) - require.Error(t, err) - assert.Contains(t, err.Error(), "i/o timeout") - - _, err = client.Read(buf) - require.Error(t, err) - assert.Contains(t, err.Error(), "i/o timeout") - }) - - t.Run("returns Fds", func(t *testing.T) { - in, out := srv.Fd() - assert.NotNil(t, in) - assert.NotNil(t, out) - - in, out = client.Fd() - assert.NotNil(t, in) - assert.NotNil(t, out) - }) - - t.Run("can re-init from Fds", func(t *testing.T) { - _, err := NewPipeConn(srv.Fd()) - assert.NoError(t, err) - - _, err = NewPipeConn(client.Fd()) - assert.NoError(t, err) - }) - - t.Run("can close", func(t *testing.T) { - require.NoError(t, srv.Close()) - require.NoError(t, client.Close()) - }) -} diff --git a/pkg/app/log.go b/pkg/app/log.go deleted file mode 100644 index 63e72caa62..0000000000 --- a/pkg/app/log.go +++ /dev/null @@ -1,49 +0,0 @@ -package app - -import ( - "io" - "os" - "time" - - "github.com/skycoin/skycoin/src/util/logging" -) - -// NewLogger returns a logger which persists app logs. This logger should be passed down -// for use on any other function used by the app. It's configured from an additional app argument. -// It modifies os.Args stripping from it such value. Should be called before using os.Args inside the app -func NewLogger(appName string) *logging.MasterLogger { - db, err := newBoltDB(os.Args[1], appName) - if err != nil { - panic(err) - } - - l := newAppLogger() - l.SetOutput(io.MultiWriter(l.Out, db)) - os.Args = append([]string{os.Args[0]}, os.Args[2:]...) - - return l -} - -// TimestampFromLog is an utility function for retrieving the timestamp from a log. This function should be modified -// if the time layout is changed -func TimestampFromLog(log string) string { - return log[1:36] -} - -func (app *App) newPersistentLogger(path string) (*logging.MasterLogger, LogStore, error) { - db, err := newBoltDB(path, app.config.AppName) - if err != nil { - return nil, nil, err - } - - l := newAppLogger() - l.SetOutput(io.MultiWriter(l.Out, db)) - - return l, db, nil -} - -func newAppLogger() *logging.MasterLogger { - l := logging.NewMasterLogger() - l.Logger.Formatter.(*logging.TextFormatter).TimestampFormat = time.RFC3339Nano - return l -} diff --git a/pkg/app/log_store.go b/pkg/app/log_store.go deleted file mode 100644 index d9d88cb7d7..0000000000 --- a/pkg/app/log_store.go +++ /dev/null @@ -1,167 +0,0 @@ -package app - -import ( - "bytes" - "fmt" - "io" - "strings" - "time" - - "go.etcd.io/bbolt" -) - -// LogStore stores logs from apps, for later consumption from the hypervisor -type LogStore interface { - // Write implements io.Writer - Write(p []byte) (n int, err error) - - // Store saves given log in db - Store(t time.Time, s string) error - - // LogSince returns the logs since given timestamp. For optimal performance, - // the timestamp should exist in the store (you can get it from previous logs), - // otherwise the DB will be sequentially iterated until finding entries older than given timestamp - LogsSince(t time.Time) ([]string, error) -} - -// NewLogStore returns a LogStore with path and app name of the given kind -func NewLogStore(path, appName, kind string) (LogStore, error) { - switch kind { - case "bbolt": - return newBoltDB(path, appName) - default: - return nil, fmt.Errorf("no LogStore of type %s", kind) - } -} - -type boltDBappLogs struct { - dbpath string - bucket []byte -} - -func newBoltDB(path, appName string) (_ LogStore, err error) { - db, err := bbolt.Open(path, 0600, nil) - if err != nil { - return nil, err - } - defer func() { - cErr := db.Close() - err = cErr - }() - - b := []byte(appName) - err = db.Update(func(tx *bbolt.Tx) error { - if _, err := tx.CreateBucketIfNotExists(b); err != nil { - return fmt.Errorf("failed to create bucket: %s", err) - } - - return nil - }) - if err != nil && !strings.Contains(err.Error(), bbolt.ErrBucketExists.Error()) { - return nil, err - } - - return &boltDBappLogs{path, b}, nil -} - -// Write implements io.Writer -func (l *boltDBappLogs) Write(p []byte) (int, error) { - // ensure there is at least timestamp long bytes - if len(p) < 37 { - return 0, io.ErrShortBuffer - } - - db, err := bbolt.Open(l.dbpath, 0600, nil) - if err != nil { - return 0, err - } - defer func() { - err := db.Close() - if err != nil { - panic(err) - } - }() - - // time in RFC3339Nano is between the bytes 1 and 36. This will change if other time layout is in use - t := p[1:36] - - err = db.Update(func(tx *bbolt.Tx) error { - b := tx.Bucket(l.bucket) - return b.Put(t, p) - }) - - if err != nil { - return 0, err - } - - return len(p), nil -} - -// Store implements LogStore -func (l *boltDBappLogs) Store(t time.Time, s string) (err error) { - db, err := bbolt.Open(l.dbpath, 0600, nil) - if err != nil { - return err - } - defer func() { - cErr := db.Close() - err = cErr - }() - - parsedTime := []byte(t.Format(time.RFC3339Nano)) - return db.Update(func(tx *bbolt.Tx) error { - b := tx.Bucket(l.bucket) - return b.Put(parsedTime, []byte(s)) - }) -} - -// LogSince implements LogStore -func (l *boltDBappLogs) LogsSince(t time.Time) (logs []string, err error) { - db, err := bbolt.Open(l.dbpath, 0600, nil) - if err != nil { - return nil, err - } - defer func() { - cErr := db.Close() - err = cErr - }() - - logs = make([]string, 0) - - err = db.View(func(tx *bbolt.Tx) error { - b := tx.Bucket(l.bucket) - parsedTime := []byte(t.Format(time.RFC3339Nano)) - c := b.Cursor() - - v := b.Get(parsedTime) - if v == nil { - logs = iterateFromBeginning(c, parsedTime) - return nil - } - c.Seek(parsedTime) - logs = iterateFromKey(c) - return nil - }) - - return logs, err -} - -func iterateFromKey(c *bbolt.Cursor) []string { - logs := make([]string, 0) - for k, v := c.Next(); k != nil; k, v = c.Next() { - logs = append(logs, string(v)) - } - return logs -} - -func iterateFromBeginning(c *bbolt.Cursor, parsedTime []byte) []string { - logs := make([]string, 0) - for k, v := c.First(); k != nil; k, v = c.Next() { - if bytes.Compare(k, parsedTime) < 0 { - continue - } - logs = append(logs, string(v)) - } - - return logs -} diff --git a/pkg/app/log_store_test.go b/pkg/app/log_store_test.go deleted file mode 100644 index 8ff172d1fb..0000000000 --- a/pkg/app/log_store_test.go +++ /dev/null @@ -1,56 +0,0 @@ -package app - -import ( - "fmt" - "io/ioutil" - "os" - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -func TestLogStore(t *testing.T) { - p, err := ioutil.TempFile("", "test-db") - require.NoError(t, err) - - defer os.Remove(p.Name()) // nolint - - ls, err := newBoltDB(p.Name(), "foo") - require.NoError(t, err) - - t3, err := time.Parse(time.RFC3339, "2000-03-01T00:00:00Z") - require.NoError(t, err) - - err = ls.Store(t3, "foo") - require.NoError(t, err) - - t1, err := time.Parse(time.RFC3339, "2000-01-01T00:00:00Z") - require.NoError(t, err) - - err = ls.Store(t1, "bar") - fmt.Println("original: ", t1.Format(time.RFC3339Nano)) - require.NoError(t, err) - - t2, err := time.Parse(time.RFC3339, "2000-02-01T00:00:00Z") - require.NoError(t, err) - - err = ls.Store(t2, "middle") - require.NoError(t, err) - - res, err := ls.LogsSince(t1) - require.NoError(t, err) - require.Len(t, res, 2) - require.Contains(t, res[0], "middle") - require.Contains(t, res[1], "foo") - - t4, err := time.Parse(time.RFC3339, "1999-02-01T00:00:00Z") - require.NoError(t, err) - res, err = ls.LogsSince(t4) - require.NoError(t, err) - require.Len(t, res, 3) - require.Contains(t, res[0], "bar") - fmt.Println("b_ :", res[0]) - require.Contains(t, res[1], "middle") - require.Contains(t, res[2], "foo") -} diff --git a/pkg/app/log_test.go b/pkg/app/log_test.go deleted file mode 100644 index d098020338..0000000000 --- a/pkg/app/log_test.go +++ /dev/null @@ -1,38 +0,0 @@ -package app - -import ( - "io/ioutil" - "os" - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -// TestNewLogger tests that after the new logger is created logs with it are persisted into storage -func TestNewLogger(t *testing.T) { - p, err := ioutil.TempFile("", "test-db") - require.NoError(t, err) - - defer os.Remove(p.Name()) // nolint - - a := &App{ - config: Config{ - AppName: "foo", - }, - } - - l, _, err := a.newPersistentLogger(p.Name()) - require.NoError(t, err) - - dbl, err := newBoltDB(p.Name(), a.config.AppName) - require.NoError(t, err) - - l.Info("bar") - - beginning := time.Unix(0, 0) - res, err := dbl.(*boltDBappLogs).LogsSince(beginning) - require.NoError(t, err) - require.Len(t, res, 1) - require.Contains(t, res[0], "bar") -} diff --git a/pkg/app/packet.go b/pkg/app/packet.go deleted file mode 100644 index 81c5f13c0a..0000000000 --- a/pkg/app/packet.go +++ /dev/null @@ -1,9 +0,0 @@ -package app - -import "github.com/skycoin/skywire/pkg/routing" - -// Packet represents message exchanged between App and Node. -type Packet struct { - Desc routing.RouteDescriptor `json:"desc"` - Payload []byte `json:"payload"` -} diff --git a/pkg/app/protocol.go b/pkg/app/protocol.go deleted file mode 100644 index 6958837edc..0000000000 --- a/pkg/app/protocol.go +++ /dev/null @@ -1,218 +0,0 @@ -package app - -import ( - "encoding/binary" - "encoding/json" - "errors" - "fmt" - "io" - "net" - "strings" - "sync" -) - -// Frame defines type for all App frames. -type Frame byte - -func (f Frame) String() string { - switch f { - case FrameInit: - return "Init" - case FrameCreateRoutes: - return "CreateRoutes" - case FrameRoutesCreated: - return "OnRoutesCreated" - case FrameSend: - return "Send" - case FrameClose: - return "Close" - } - - return fmt.Sprintf("Unknown(%d)", f) -} - -const ( - // FrameInit represents Init frame type. - FrameInit Frame = iota - // FrameCreateRoutes represents CreateRoutes request frame type. - FrameCreateRoutes - // FrameRoutesCreated represents visorRoutesCreated request frame type. - FrameRoutesCreated - // FrameSend represents Send frame type. - FrameSend - // FrameClose represents Close frame type - FrameClose // TODO(nkryuchkov): decide whether this needs to be removed - - // FrameFailure represents frame type for failed requests. - FrameFailure = 0xfe - // FrameSuccess represents frame type for successful requests. - FrameSuccess = 0xff -) - -// Protocol implements full-duplex protocol for App to Node communication. -type Protocol struct { - conn net.Conn - chans *chanList -} - -// NewProtocol constructs a new Protocol. -func NewProtocol(conn net.Conn) *Protocol { - return &Protocol{conn, &chanList{chans: map[byte]chan []byte{}}} -} - -// Send sends command Frame with payload and awaits for response. -func (p *Protocol) Send(cmd Frame, payload, res interface{}) error { - id, resChan := p.chans.add() - if err := p.writeFrame(cmd, id, payload); err != nil { - return err - } - - frame, more := <-resChan - if !more { - return io.EOF - } - - if Frame(frame[0]) == FrameFailure { - return errors.New(string(frame[2:])) - } - - if res == nil { - return nil - } - - return json.Unmarshal(frame[2:], res) -} - -// Serve reads incoming frame, passes it to the handleFunc and writes results. -func (p *Protocol) Serve(handleFunc func(Frame, []byte) (interface{}, error)) error { - for { - frame, err := p.readFrame() - if err != nil { - if err == io.EOF || strings.Contains(err.Error(), "closed") { - return nil - } - - return err - } - - fType := Frame(frame[0]) - id := frame[1] - - var resChan chan []byte - if fType == FrameFailure || fType == FrameSuccess { - resChan = p.chans.pull(id) - if resChan == nil { - continue - } - resChan <- frame - continue - } - - go func() { - if handleFunc == nil { - if err := p.writeFrame(FrameSuccess, id, nil); err != nil { - log.WithError(err).Warn("Failed to write frame") - } - return - } - - res, err := handleFunc(fType, frame[2:]) - if err != nil { - if err := p.writeFrame(FrameFailure, id, err); err != nil { - log.WithError(err).Warn("Failed to write frame") - } - return - } - - if err := p.writeFrame(FrameSuccess, id, res); err != nil { - log.WithError(err).Warn("Failed to write frame") - } - }() - } -} - -// Close closes underlying ReadWriter. -func (p *Protocol) Close() error { - if p == nil { - return nil - } - p.chans.closeAll() - return p.conn.Close() -} - -func (p *Protocol) writeFrame(frame Frame, id byte, payload interface{}) (err error) { - var data []byte - if err, ok := payload.(error); ok { - data = []byte(err.Error()) - } else { - data, err = json.Marshal(payload) - if err != nil { - return err - } - } - - packet := append([]byte{byte(frame), id}, data...) - buf := make([]byte, 2) - binary.BigEndian.PutUint16(buf, uint16(len(packet))) - _, err = p.conn.Write(append(buf, packet...)) - return err -} - -func (p *Protocol) readFrame() (frame []byte, err error) { - size := make([]byte, 2) - if _, err = io.ReadFull(p.conn, size); err != nil { - return - } - - frame = make([]byte, binary.BigEndian.Uint16(size)) - if _, err = io.ReadFull(p.conn, frame); err != nil { - return - } - - return frame, nil -} - -type chanList struct { - sync.Mutex - - chans map[byte]chan []byte -} - -func (c *chanList) add() (byte, chan []byte) { - c.Lock() - defer c.Unlock() - - ch := make(chan []byte) - for i := byte(0); i < 255; i++ { - if c.chans[i] == nil { - c.chans[i] = ch - return i, ch - } - } - - panic("no free channels") -} - -func (c *chanList) pull(id byte) chan []byte { - c.Lock() - ch := c.chans[id] - delete(c.chans, id) - c.Unlock() - - return ch -} - -func (c *chanList) closeAll() { - c.Lock() - defer c.Unlock() - - for _, ch := range c.chans { - if ch == nil { - continue - } - - close(ch) - } - - c.chans = make(map[byte]chan []byte) -} diff --git a/pkg/app/protocol_test.go b/pkg/app/protocol_test.go deleted file mode 100644 index 401bf9eb25..0000000000 --- a/pkg/app/protocol_test.go +++ /dev/null @@ -1,102 +0,0 @@ -package app - -import ( - "errors" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestProtocol(t *testing.T) { - rw1, rw2, err := OpenPipeConn() - require.NoError(t, err) - proto1 := NewProtocol(rw1) - proto2 := NewProtocol(rw2) - - errCh1 := make(chan error) - go func() { - errCh1 <- proto1.Serve(func(f Frame, _ []byte) (interface{}, error) { - if f != FrameSend { - return nil, errors.New("unexpected frame") - } - - return nil, nil - }) - }() - - errCh2 := make(chan error) - go func() { - errCh2 <- proto2.Serve(func(f Frame, _ []byte) (interface{}, error) { - if f != FrameCreateRoutes { - return nil, errors.New("unexpected frame") - } - - return nil, nil - }) - }() - - errCh3 := make(chan error) - go func() { - errCh3 <- proto1.Send(FrameCreateRoutes, "foo", nil) - }() - - errCh4 := make(chan error) - go func() { - errCh4 <- proto2.Send(FrameSend, "foo", nil) - }() - - errCh5 := make(chan error) - go func() { - errCh5 <- proto1.Send(FrameSend, "foo", nil) - }() - - require.NoError(t, <-errCh3) - require.NoError(t, <-errCh4) - err = <-errCh5 - require.Error(t, err) - assert.Equal(t, "unexpected frame", err.Error()) - - require.NoError(t, proto1.Close()) - require.NoError(t, proto2.Close()) - - require.NoError(t, <-errCh1) - require.NoError(t, <-errCh2) -} - -func TestProtocolParallel(t *testing.T) { - rw1, rw2, err := OpenPipeConn() - require.NoError(t, err) - proto1 := NewProtocol(rw1) - proto2 := NewProtocol(rw2) - - errCh1 := make(chan error) - go func() { - errCh1 <- proto1.Serve(func(f Frame, _ []byte) (interface{}, error) { - if f != FrameCreateRoutes { - return nil, errors.New("unexpected frame") - } - - return nil, proto1.Send(FrameRoutesCreated, "foo", nil) - }) - }() - - errCh2 := make(chan error) - go func() { - errCh2 <- proto2.Serve(func(f Frame, _ []byte) (interface{}, error) { - if f != FrameRoutesCreated { - return nil, errors.New("unexpected frame") - } - - return nil, nil - }) - }() - - require.NoError(t, proto2.Send(FrameCreateRoutes, "foo", nil)) - - require.NoError(t, proto1.Close()) - require.NoError(t, proto2.Close()) - - require.NoError(t, <-errCh1) - require.NoError(t, <-errCh2) -}