Skip to content

Commit

Permalink
Move app.LoopAddr -> routing.Loop
Browse files Browse the repository at this point in the history
  • Loading branch information
Darkren committed Jul 7, 2019
1 parent c981b03 commit 3ab1c80
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 64 deletions.
20 changes: 10 additions & 10 deletions pkg/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type App struct {
acceptChan chan [2]*routing.Addr
doneChan chan struct{}

conns map[LoopAddr]io.ReadWriteCloser
conns map[routing.Loop]io.ReadWriteCloser
mu sync.Mutex
}

Expand Down Expand Up @@ -73,7 +73,7 @@ func SetupFromPipe(config *Config, inFD, outFD uintptr) (*App, error) {
proto: NewProtocol(pipeConn),
acceptChan: make(chan [2]*routing.Addr),
doneChan: make(chan struct{}),
conns: make(map[LoopAddr]io.ReadWriteCloser),
conns: make(map[routing.Loop]io.ReadWriteCloser),
}

go app.handleProto()
Expand Down Expand Up @@ -120,7 +120,7 @@ func (app *App) Accept() (net.Conn, error) {
laddr := addrs[0]
raddr := addrs[1]

addr := &LoopAddr{laddr.Port, *raddr}
addr := &routing.Loop{Local: *laddr, Remote: *raddr}
conn, out := net.Pipe()
app.mu.Lock()
app.conns[*addr] = conn
Expand All @@ -136,7 +136,7 @@ func (app *App) Dial(raddr *routing.Addr) (net.Conn, error) {
if err != nil {
return nil, err
}
addr := &LoopAddr{laddr.Port, *raddr}
addr := &routing.Loop{Local: *laddr, Remote: *raddr}
conn, out := net.Pipe()
app.mu.Lock()
app.conns[*addr] = conn
Expand Down Expand Up @@ -171,7 +171,7 @@ func (app *App) handleProto() {
}
}

func (app *App) serveConn(addr *LoopAddr, conn io.ReadWriteCloser) {
func (app *App) serveConn(addr *routing.Loop, conn io.ReadWriteCloser) {
defer conn.Close()

for {
Expand Down Expand Up @@ -214,14 +214,14 @@ func (app *App) forwardPacket(data []byte) error {
}

func (app *App) closeConn(data []byte) error {
addr := &LoopAddr{}
if err := json.Unmarshal(data, addr); err != nil {
var loop routing.Loop
if err := json.Unmarshal(data, &loop); err != nil {
return err
}

app.mu.Lock()
conn := app.conns[*addr]
delete(app.conns, *addr)
conn := app.conns[loop]
delete(app.conns, loop)
app.mu.Unlock()

return conn.Close()
Expand All @@ -237,7 +237,7 @@ func (app *App) confirmLoop(data []byte) error {
raddr := addrs[1]

app.mu.Lock()
conn := app.conns[LoopAddr{laddr.Port, *raddr}]
conn := app.conns[routing.Loop{Local: *laddr, Remote: *raddr}]
app.mu.Unlock()

if conn != nil {
Expand Down
51 changes: 29 additions & 22 deletions pkg/app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestAppDial(t *testing.T) {

in, out := net.Pipe()
proto := NewProtocol(out)
app := &App{proto: NewProtocol(in), conns: make(map[LoopAddr]io.ReadWriteCloser)}
app := &App{proto: NewProtocol(in), conns: make(map[routing.Loop]io.ReadWriteCloser)}
go app.handleProto()

dataCh := make(chan []byte)
Expand All @@ -61,17 +61,18 @@ func TestAppDial(t *testing.T) {
assert.Equal(t, rpk.Hex()+":3", conn.RemoteAddr().String())
assert.Equal(t, lpk.Hex()+":2", conn.LocalAddr().String())

require.NotNil(t, app.conns[LoopAddr{Port: 2, Remote: routing.Addr{PubKey: rpk, Port: 3}}])
require.NotNil(t, app.conns[routing.Loop{Local: routing.Addr{PubKey: lpk, Port: 2}, Remote: routing.Addr{PubKey: rpk, Port: 3}}])
require.NoError(t, conn.Close())

// Justified. Attempt to remove produces: FAIL
time.Sleep(100 * time.Millisecond)

addr := &LoopAddr{}
require.NoError(t, json.Unmarshal(<-dataCh, addr))
assert.Equal(t, routing.Port(2), addr.Port)
assert.Equal(t, rpk, addr.Remote.PubKey)
assert.Equal(t, routing.Port(3), addr.Remote.Port)
var loop routing.Loop
require.NoError(t, json.Unmarshal(<-dataCh, &loop))
assert.Equal(t, lpk, loop.Local.PubKey)
assert.Equal(t, routing.Port(2), loop.Local.Port)
assert.Equal(t, rpk, loop.Remote.PubKey)
assert.Equal(t, routing.Port(3), loop.Remote.Port)

app.mu.Lock()
require.Len(t, app.conns, 0)
Expand All @@ -83,7 +84,7 @@ func TestAppAccept(t *testing.T) {
lpk, _ := cipher.GenerateKeyPair()
rpk, _ := cipher.GenerateKeyPair()
in, out := net.Pipe()
app := &App{proto: NewProtocol(in), acceptChan: make(chan [2]*routing.Addr), conns: make(map[LoopAddr]io.ReadWriteCloser)}
app := &App{proto: NewProtocol(in), acceptChan: make(chan [2]*routing.Addr), conns: make(map[routing.Loop]io.ReadWriteCloser)}
go app.handleProto()

proto := NewProtocol(out)
Expand Down Expand Up @@ -123,12 +124,13 @@ func TestAppAccept(t *testing.T) {
}

func TestAppWrite(t *testing.T) {
lpk, _ := cipher.GenerateKeyPair()
rpk, _ := cipher.GenerateKeyPair()
in, out := net.Pipe()
appIn, appOut := net.Pipe()
app := &App{proto: NewProtocol(in)}
go app.handleProto()
go app.serveConn(&LoopAddr{Port: 2, Remote: routing.Addr{PubKey: rpk, Port: 3}}, appIn)
go app.serveConn(&routing.Loop{Local: routing.Addr{PubKey: lpk, Port: 2}, Remote: routing.Addr{PubKey: rpk, Port: 3}}, appIn)

proto := NewProtocol(out)
dataCh := make(chan []byte)
Expand All @@ -151,26 +153,28 @@ func TestAppWrite(t *testing.T) {
require.NoError(t, json.Unmarshal(<-dataCh, packet))
assert.Equal(t, rpk, packet.Addr.Remote.PubKey)
assert.Equal(t, routing.Port(3), packet.Addr.Remote.Port)
assert.Equal(t, routing.Port(2), packet.Addr.Port)
assert.Equal(t, routing.Port(2), packet.Addr.Local.Port)
assert.Equal(t, lpk, packet.Addr.Local.PubKey)
assert.Equal(t, []byte("foo"), packet.Payload)

require.NoError(t, proto.Close())
require.NoError(t, appOut.Close())
}

func TestAppRead(t *testing.T) {
lpk, _ := cipher.GenerateKeyPair()
pk, _ := cipher.GenerateKeyPair()
in, out := net.Pipe()
appIn, appOut := net.Pipe()
app := &App{proto: NewProtocol(in), conns: map[LoopAddr]io.ReadWriteCloser{LoopAddr{Port: 2, Remote: routing.Addr{PubKey: pk, Port: 3}}: appIn}}
app := &App{proto: NewProtocol(in), conns: map[routing.Loop]io.ReadWriteCloser{routing.Loop{Local: routing.Addr{PubKey: lpk, Port: 2}, Remote: routing.Addr{PubKey: pk, Port: 3}}: appIn}}
go app.handleProto()

proto := NewProtocol(out)
go proto.Serve(nil) // nolint: errcheck

errCh := make(chan error)
go func() {
errCh <- proto.Send(FrameSend, &Packet{&LoopAddr{Port: 2, Remote: routing.Addr{PubKey: pk, Port: 3}}, []byte("foo")}, nil)
errCh <- proto.Send(FrameSend, &Packet{&routing.Loop{Local: routing.Addr{PubKey: lpk, Port: 2}, Remote: routing.Addr{PubKey: pk, Port: 3}}, []byte("foo")}, nil)
}()

buf := make([]byte, 3)
Expand Down Expand Up @@ -217,18 +221,19 @@ func TestAppSetup(t *testing.T) {
}

func TestAppCloseConn(t *testing.T) {
pk, _ := cipher.GenerateKeyPair()
lpk, _ := cipher.GenerateKeyPair()
rpk, _ := cipher.GenerateKeyPair()
in, out := net.Pipe()
appIn, appOut := net.Pipe()
app := &App{proto: NewProtocol(in), conns: map[LoopAddr]io.ReadWriteCloser{LoopAddr{Port: 2, Remote: routing.Addr{PubKey: pk, Port: 3}}: appIn}}
app := &App{proto: NewProtocol(in), conns: map[routing.Loop]io.ReadWriteCloser{routing.Loop{Local: routing.Addr{PubKey: lpk, Port: 2}, Remote: routing.Addr{PubKey: rpk, Port: 3}}: appIn}}
go app.handleProto()

proto := NewProtocol(out)
go proto.Serve(nil) // nolint: errcheck

errCh := make(chan error)
go func() {
errCh <- proto.Send(FrameClose, &LoopAddr{Port: 2, Remote: routing.Addr{PubKey: pk, Port: 3}}, nil)
errCh <- proto.Send(FrameClose, &routing.Loop{Local: routing.Addr{PubKey: lpk, Port: 2}, Remote: routing.Addr{PubKey: rpk, Port: 3}}, nil)
}()

_, err := appOut.Read(make([]byte, 3))
Expand All @@ -237,10 +242,11 @@ func TestAppCloseConn(t *testing.T) {
}

func TestAppClose(t *testing.T) {
pk, _ := cipher.GenerateKeyPair()
lpk, _ := cipher.GenerateKeyPair()
rpk, _ := cipher.GenerateKeyPair()
in, out := net.Pipe()
appIn, appOut := net.Pipe()
app := &App{proto: NewProtocol(in), conns: map[LoopAddr]io.ReadWriteCloser{LoopAddr{Port: 2, Remote: routing.Addr{PubKey: pk, Port: 3}}: appIn}, doneChan: make(chan struct{})}
app := &App{proto: NewProtocol(in), conns: map[routing.Loop]io.ReadWriteCloser{routing.Loop{Local: routing.Addr{PubKey: lpk, Port: 2}, Remote: routing.Addr{PubKey: rpk, Port: 3}}: appIn}, doneChan: make(chan struct{})}
go app.handleProto()

proto := NewProtocol(out)
Expand All @@ -259,11 +265,12 @@ func TestAppClose(t *testing.T) {
_, err := appOut.Read(make([]byte, 3))
require.Equal(t, io.EOF, err)

addr := &LoopAddr{}
require.NoError(t, json.Unmarshal(<-dataCh, addr))
assert.Equal(t, routing.Port(2), addr.Port)
assert.Equal(t, pk, addr.Remote.PubKey)
assert.Equal(t, routing.Port(3), addr.Remote.Port)
var loop routing.Loop
require.NoError(t, json.Unmarshal(<-dataCh, &loop))
assert.Equal(t, lpk, loop.Local.PubKey)
assert.Equal(t, routing.Port(2), loop.Local.Port)
assert.Equal(t, rpk, loop.Remote.PubKey)
assert.Equal(t, routing.Port(3), loop.Remote.Port)
}

func TestAppCommand(t *testing.T) {
Expand Down
20 changes: 3 additions & 17 deletions pkg/app/packet.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,9 @@
package app

import (
"fmt"

"github.com/skycoin/skywire/pkg/routing"
)

// LoopAddr stores addressing parameters of a loop packets.
type LoopAddr struct {
Port routing.Port `json:"port"`
Remote routing.Addr `json:"remote"`
}

func (l *LoopAddr) String() string {
return fmt.Sprintf(":%d <-> %s:%d", l.Port, l.Remote.PubKey, l.Remote.Port)
}
import "github.com/skycoin/skywire/pkg/routing"

// Packet represents message exchanged between App and Node.
type Packet struct {
Addr *LoopAddr `json:"addr"`
Payload []byte `json:"payload"`
Addr *routing.Loop `json:"addr"`
Payload []byte `json:"payload"`
}
13 changes: 7 additions & 6 deletions pkg/app/packet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ import (
)

func ExamplePacket() {
pk := cipher.PubKey{}
addr := routing.Addr{PubKey: pk, Port: 0}
loopAddr := LoopAddr{0, addr}
var lpk, rpk cipher.PubKey
laddr := routing.Addr{Port: 0, PubKey: lpk}
raddr := routing.Addr{Port: 0, PubKey: rpk}
loop := routing.Loop{Local: laddr, Remote: raddr}

fmt.Println(addr.Network())
fmt.Printf("%v\n", addr)
fmt.Printf("%v\n", loopAddr)
fmt.Println(raddr.Network())
fmt.Printf("%v\n", raddr)
fmt.Printf("%v\n", loop)

//Output: skywire
// {000000000000000000000000000000000000000000000000000000000000000000 0}
Expand Down
18 changes: 9 additions & 9 deletions pkg/router/app_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const supportedProtocolVersion = "0.0.1"

type appCallbacks struct {
CreateLoop func(conn *app.Protocol, raddr *routing.Addr) (laddr *routing.Addr, err error)
CloseLoop func(conn *app.Protocol, addr *app.LoopAddr) error
CloseLoop func(conn *app.Protocol, addr *routing.Loop) error
Forward func(conn *app.Protocol, packet *app.Packet) error
}

Expand Down Expand Up @@ -51,8 +51,8 @@ func (am *appManager) Serve() error {
}

func (am *appManager) initApp(payload []byte) error {
config := &app.Config{}
if err := json.Unmarshal(payload, config); err != nil {
var config app.Config
if err := json.Unmarshal(payload, &config); err != nil {
return errors.New("invalid Init payload")
}

Expand All @@ -73,21 +73,21 @@ func (am *appManager) initApp(payload []byte) error {
}

func (am *appManager) setupLoop(payload []byte) (*routing.Addr, error) {
raddr := &routing.Addr{}
if err := json.Unmarshal(payload, raddr); err != nil {
var raddr routing.Addr
if err := json.Unmarshal(payload, &raddr); err != nil {
return nil, err
}

return am.callbacks.CreateLoop(am.proto, raddr)
return am.callbacks.CreateLoop(am.proto, &raddr)
}

func (am *appManager) handleCloseLoop(payload []byte) error {
addr := &app.LoopAddr{}
if err := json.Unmarshal(payload, addr); err != nil {
var loop routing.Loop
if err := json.Unmarshal(payload, &loop); err != nil {
return err
}

return am.callbacks.CloseLoop(am.proto, addr)
return am.callbacks.CloseLoop(am.proto, &loop)
}

func (am *appManager) forwardAppPacket(payload []byte) error {
Expand Down
11 changes: 11 additions & 0 deletions pkg/routing/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,17 @@ import (
"github.com/skycoin/dmsg/cipher"
)

// Loop defines a loop over a pair of addresses.
type Loop struct {
Local Addr
Remote Addr
}

// TODO: discuss if we should add local PK to the output
func (l *Loop) String() string {
return fmt.Sprintf(":%d <-> %s:%d", l.Local.Port, l.Remote.PubKey, l.Remote.Port)
}

// LoopDescriptor defines a loop over a pair of routes.
type LoopDescriptor struct {
Local Addr
Expand Down

0 comments on commit 3ab1c80

Please sign in to comment.