diff --git a/pkg/app/app.go b/pkg/app/app.go index 9a88d525c6..65822c5f64 100644 --- a/pkg/app/app.go +++ b/pkg/app/app.go @@ -41,7 +41,7 @@ type App struct { acceptChan chan [2]*routing.Addr doneChan chan struct{} - conns map[LoopAddr]io.ReadWriteCloser + conns map[routing.Loop]io.ReadWriteCloser mu sync.Mutex } @@ -73,7 +73,7 @@ func SetupFromPipe(config *Config, inFD, outFD uintptr) (*App, error) { proto: NewProtocol(pipeConn), acceptChan: make(chan [2]*routing.Addr), doneChan: make(chan struct{}), - conns: make(map[LoopAddr]io.ReadWriteCloser), + conns: make(map[routing.Loop]io.ReadWriteCloser), } go app.handleProto() @@ -120,7 +120,7 @@ func (app *App) Accept() (net.Conn, error) { laddr := addrs[0] raddr := addrs[1] - addr := &LoopAddr{laddr.Port, *raddr} + addr := &routing.Loop{Local: *laddr, Remote: *raddr} conn, out := net.Pipe() app.mu.Lock() app.conns[*addr] = conn @@ -136,7 +136,7 @@ func (app *App) Dial(raddr *routing.Addr) (net.Conn, error) { if err != nil { return nil, err } - addr := &LoopAddr{laddr.Port, *raddr} + addr := &routing.Loop{Local: *laddr, Remote: *raddr} conn, out := net.Pipe() app.mu.Lock() app.conns[*addr] = conn @@ -171,7 +171,7 @@ func (app *App) handleProto() { } } -func (app *App) serveConn(addr *LoopAddr, conn io.ReadWriteCloser) { +func (app *App) serveConn(addr *routing.Loop, conn io.ReadWriteCloser) { defer conn.Close() for { @@ -214,14 +214,14 @@ func (app *App) forwardPacket(data []byte) error { } func (app *App) closeConn(data []byte) error { - addr := &LoopAddr{} - if err := json.Unmarshal(data, addr); err != nil { + var loop routing.Loop + if err := json.Unmarshal(data, &loop); err != nil { return err } app.mu.Lock() - conn := app.conns[*addr] - delete(app.conns, *addr) + conn := app.conns[loop] + delete(app.conns, loop) app.mu.Unlock() return conn.Close() @@ -237,7 +237,7 @@ func (app *App) confirmLoop(data []byte) error { raddr := addrs[1] app.mu.Lock() - conn := app.conns[LoopAddr{laddr.Port, *raddr}] + conn := app.conns[routing.Loop{Local: *laddr, Remote: *raddr}] app.mu.Unlock() if conn != nil { diff --git a/pkg/app/app_test.go b/pkg/app/app_test.go index c9476e9b73..b6b3577cf6 100644 --- a/pkg/app/app_test.go +++ b/pkg/app/app_test.go @@ -39,7 +39,7 @@ func TestAppDial(t *testing.T) { in, out := net.Pipe() proto := NewProtocol(out) - app := &App{proto: NewProtocol(in), conns: make(map[LoopAddr]io.ReadWriteCloser)} + app := &App{proto: NewProtocol(in), conns: make(map[routing.Loop]io.ReadWriteCloser)} go app.handleProto() dataCh := make(chan []byte) @@ -61,17 +61,18 @@ func TestAppDial(t *testing.T) { assert.Equal(t, rpk.Hex()+":3", conn.RemoteAddr().String()) assert.Equal(t, lpk.Hex()+":2", conn.LocalAddr().String()) - require.NotNil(t, app.conns[LoopAddr{Port: 2, Remote: routing.Addr{PubKey: rpk, Port: 3}}]) + require.NotNil(t, app.conns[routing.Loop{Local: routing.Addr{PubKey: lpk, Port: 2}, Remote: routing.Addr{PubKey: rpk, Port: 3}}]) require.NoError(t, conn.Close()) // Justified. Attempt to remove produces: FAIL time.Sleep(100 * time.Millisecond) - addr := &LoopAddr{} - require.NoError(t, json.Unmarshal(<-dataCh, addr)) - assert.Equal(t, routing.Port(2), addr.Port) - assert.Equal(t, rpk, addr.Remote.PubKey) - assert.Equal(t, routing.Port(3), addr.Remote.Port) + var loop routing.Loop + require.NoError(t, json.Unmarshal(<-dataCh, &loop)) + assert.Equal(t, lpk, loop.Local.PubKey) + assert.Equal(t, routing.Port(2), loop.Local.Port) + assert.Equal(t, rpk, loop.Remote.PubKey) + assert.Equal(t, routing.Port(3), loop.Remote.Port) app.mu.Lock() require.Len(t, app.conns, 0) @@ -83,7 +84,7 @@ func TestAppAccept(t *testing.T) { lpk, _ := cipher.GenerateKeyPair() rpk, _ := cipher.GenerateKeyPair() in, out := net.Pipe() - app := &App{proto: NewProtocol(in), acceptChan: make(chan [2]*routing.Addr), conns: make(map[LoopAddr]io.ReadWriteCloser)} + app := &App{proto: NewProtocol(in), acceptChan: make(chan [2]*routing.Addr), conns: make(map[routing.Loop]io.ReadWriteCloser)} go app.handleProto() proto := NewProtocol(out) @@ -123,12 +124,13 @@ func TestAppAccept(t *testing.T) { } func TestAppWrite(t *testing.T) { + lpk, _ := cipher.GenerateKeyPair() rpk, _ := cipher.GenerateKeyPair() in, out := net.Pipe() appIn, appOut := net.Pipe() app := &App{proto: NewProtocol(in)} go app.handleProto() - go app.serveConn(&LoopAddr{Port: 2, Remote: routing.Addr{PubKey: rpk, Port: 3}}, appIn) + go app.serveConn(&routing.Loop{Local: routing.Addr{PubKey: lpk, Port: 2}, Remote: routing.Addr{PubKey: rpk, Port: 3}}, appIn) proto := NewProtocol(out) dataCh := make(chan []byte) @@ -151,7 +153,8 @@ func TestAppWrite(t *testing.T) { require.NoError(t, json.Unmarshal(<-dataCh, packet)) assert.Equal(t, rpk, packet.Addr.Remote.PubKey) assert.Equal(t, routing.Port(3), packet.Addr.Remote.Port) - assert.Equal(t, routing.Port(2), packet.Addr.Port) + assert.Equal(t, routing.Port(2), packet.Addr.Local.Port) + assert.Equal(t, lpk, packet.Addr.Local.PubKey) assert.Equal(t, []byte("foo"), packet.Payload) require.NoError(t, proto.Close()) @@ -159,10 +162,11 @@ func TestAppWrite(t *testing.T) { } func TestAppRead(t *testing.T) { + lpk, _ := cipher.GenerateKeyPair() pk, _ := cipher.GenerateKeyPair() in, out := net.Pipe() appIn, appOut := net.Pipe() - app := &App{proto: NewProtocol(in), conns: map[LoopAddr]io.ReadWriteCloser{LoopAddr{Port: 2, Remote: routing.Addr{PubKey: pk, Port: 3}}: appIn}} + app := &App{proto: NewProtocol(in), conns: map[routing.Loop]io.ReadWriteCloser{routing.Loop{Local: routing.Addr{PubKey: lpk, Port: 2}, Remote: routing.Addr{PubKey: pk, Port: 3}}: appIn}} go app.handleProto() proto := NewProtocol(out) @@ -170,7 +174,7 @@ func TestAppRead(t *testing.T) { errCh := make(chan error) go func() { - errCh <- proto.Send(FrameSend, &Packet{&LoopAddr{Port: 2, Remote: routing.Addr{PubKey: pk, Port: 3}}, []byte("foo")}, nil) + errCh <- proto.Send(FrameSend, &Packet{&routing.Loop{Local: routing.Addr{PubKey: lpk, Port: 2}, Remote: routing.Addr{PubKey: pk, Port: 3}}, []byte("foo")}, nil) }() buf := make([]byte, 3) @@ -217,10 +221,11 @@ func TestAppSetup(t *testing.T) { } func TestAppCloseConn(t *testing.T) { - pk, _ := cipher.GenerateKeyPair() + lpk, _ := cipher.GenerateKeyPair() + rpk, _ := cipher.GenerateKeyPair() in, out := net.Pipe() appIn, appOut := net.Pipe() - app := &App{proto: NewProtocol(in), conns: map[LoopAddr]io.ReadWriteCloser{LoopAddr{Port: 2, Remote: routing.Addr{PubKey: pk, Port: 3}}: appIn}} + app := &App{proto: NewProtocol(in), conns: map[routing.Loop]io.ReadWriteCloser{routing.Loop{Local: routing.Addr{PubKey: lpk, Port: 2}, Remote: routing.Addr{PubKey: rpk, Port: 3}}: appIn}} go app.handleProto() proto := NewProtocol(out) @@ -228,7 +233,7 @@ func TestAppCloseConn(t *testing.T) { errCh := make(chan error) go func() { - errCh <- proto.Send(FrameClose, &LoopAddr{Port: 2, Remote: routing.Addr{PubKey: pk, Port: 3}}, nil) + errCh <- proto.Send(FrameClose, &routing.Loop{Local: routing.Addr{PubKey: lpk, Port: 2}, Remote: routing.Addr{PubKey: rpk, Port: 3}}, nil) }() _, err := appOut.Read(make([]byte, 3)) @@ -237,10 +242,11 @@ func TestAppCloseConn(t *testing.T) { } func TestAppClose(t *testing.T) { - pk, _ := cipher.GenerateKeyPair() + lpk, _ := cipher.GenerateKeyPair() + rpk, _ := cipher.GenerateKeyPair() in, out := net.Pipe() appIn, appOut := net.Pipe() - app := &App{proto: NewProtocol(in), conns: map[LoopAddr]io.ReadWriteCloser{LoopAddr{Port: 2, Remote: routing.Addr{PubKey: pk, Port: 3}}: appIn}, doneChan: make(chan struct{})} + app := &App{proto: NewProtocol(in), conns: map[routing.Loop]io.ReadWriteCloser{routing.Loop{Local: routing.Addr{PubKey: lpk, Port: 2}, Remote: routing.Addr{PubKey: rpk, Port: 3}}: appIn}, doneChan: make(chan struct{})} go app.handleProto() proto := NewProtocol(out) @@ -259,11 +265,12 @@ func TestAppClose(t *testing.T) { _, err := appOut.Read(make([]byte, 3)) require.Equal(t, io.EOF, err) - addr := &LoopAddr{} - require.NoError(t, json.Unmarshal(<-dataCh, addr)) - assert.Equal(t, routing.Port(2), addr.Port) - assert.Equal(t, pk, addr.Remote.PubKey) - assert.Equal(t, routing.Port(3), addr.Remote.Port) + var loop routing.Loop + require.NoError(t, json.Unmarshal(<-dataCh, &loop)) + assert.Equal(t, lpk, loop.Local.PubKey) + assert.Equal(t, routing.Port(2), loop.Local.Port) + assert.Equal(t, rpk, loop.Remote.PubKey) + assert.Equal(t, routing.Port(3), loop.Remote.Port) } func TestAppCommand(t *testing.T) { diff --git a/pkg/app/packet.go b/pkg/app/packet.go index 5fdc96e9a7..b2daa4043e 100644 --- a/pkg/app/packet.go +++ b/pkg/app/packet.go @@ -1,23 +1,9 @@ package app -import ( - "fmt" - - "github.com/skycoin/skywire/pkg/routing" -) - -// LoopAddr stores addressing parameters of a loop packets. -type LoopAddr struct { - Port routing.Port `json:"port"` - Remote routing.Addr `json:"remote"` -} - -func (l *LoopAddr) String() string { - return fmt.Sprintf(":%d <-> %s:%d", l.Port, l.Remote.PubKey, l.Remote.Port) -} +import "github.com/skycoin/skywire/pkg/routing" // Packet represents message exchanged between App and Node. type Packet struct { - Addr *LoopAddr `json:"addr"` - Payload []byte `json:"payload"` + Addr *routing.Loop `json:"addr"` + Payload []byte `json:"payload"` } diff --git a/pkg/app/packet_test.go b/pkg/app/packet_test.go index 1dabe98b03..6e917bf20d 100644 --- a/pkg/app/packet_test.go +++ b/pkg/app/packet_test.go @@ -9,13 +9,14 @@ import ( ) func ExamplePacket() { - pk := cipher.PubKey{} - addr := routing.Addr{PubKey: pk, Port: 0} - loopAddr := LoopAddr{0, addr} + var lpk, rpk cipher.PubKey + laddr := routing.Addr{Port: 0, PubKey: lpk} + raddr := routing.Addr{Port: 0, PubKey: rpk} + loop := routing.Loop{Local: laddr, Remote: raddr} - fmt.Println(addr.Network()) - fmt.Printf("%v\n", addr) - fmt.Printf("%v\n", loopAddr) + fmt.Println(raddr.Network()) + fmt.Printf("%v\n", raddr) + fmt.Printf("%v\n", loop) //Output: skywire // {000000000000000000000000000000000000000000000000000000000000000000 0} diff --git a/pkg/router/app_manager.go b/pkg/router/app_manager.go index f9b452628a..bda22442b1 100644 --- a/pkg/router/app_manager.go +++ b/pkg/router/app_manager.go @@ -14,7 +14,7 @@ const supportedProtocolVersion = "0.0.1" type appCallbacks struct { CreateLoop func(conn *app.Protocol, raddr *routing.Addr) (laddr *routing.Addr, err error) - CloseLoop func(conn *app.Protocol, addr *app.LoopAddr) error + CloseLoop func(conn *app.Protocol, addr *routing.Loop) error Forward func(conn *app.Protocol, packet *app.Packet) error } @@ -51,8 +51,8 @@ func (am *appManager) Serve() error { } func (am *appManager) initApp(payload []byte) error { - config := &app.Config{} - if err := json.Unmarshal(payload, config); err != nil { + var config app.Config + if err := json.Unmarshal(payload, &config); err != nil { return errors.New("invalid Init payload") } @@ -73,21 +73,21 @@ func (am *appManager) initApp(payload []byte) error { } func (am *appManager) setupLoop(payload []byte) (*routing.Addr, error) { - raddr := &routing.Addr{} - if err := json.Unmarshal(payload, raddr); err != nil { + var raddr routing.Addr + if err := json.Unmarshal(payload, &raddr); err != nil { return nil, err } - return am.callbacks.CreateLoop(am.proto, raddr) + return am.callbacks.CreateLoop(am.proto, &raddr) } func (am *appManager) handleCloseLoop(payload []byte) error { - addr := &app.LoopAddr{} - if err := json.Unmarshal(payload, addr); err != nil { + var loop routing.Loop + if err := json.Unmarshal(payload, &loop); err != nil { return err } - return am.callbacks.CloseLoop(am.proto, addr) + return am.callbacks.CloseLoop(am.proto, &loop) } func (am *appManager) forwardAppPacket(payload []byte) error { diff --git a/pkg/routing/loop.go b/pkg/routing/loop.go index 623054b161..e09de0c3b4 100644 --- a/pkg/routing/loop.go +++ b/pkg/routing/loop.go @@ -7,6 +7,17 @@ import ( "github.com/skycoin/dmsg/cipher" ) +// Loop defines a loop over a pair of addresses. +type Loop struct { + Local Addr + Remote Addr +} + +// TODO: discuss if we should add local PK to the output +func (l *Loop) String() string { + return fmt.Sprintf(":%d <-> %s:%d", l.Local.Port, l.Remote.PubKey, l.Remote.Port) +} + // LoopDescriptor defines a loop over a pair of routes. type LoopDescriptor struct { Local Addr