Skip to content

Commit

Permalink
Merge branch 'feature/dmsg' into feature/dmsg-tests
Browse files Browse the repository at this point in the history
  • Loading branch information
志宇 authored Jun 20, 2019
2 parents 40ecf2e + 32bab81 commit 95e9469
Show file tree
Hide file tree
Showing 160 changed files with 26,136 additions and 4,460 deletions.
10 changes: 6 additions & 4 deletions internal/ioutil/ack_waiter_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package ioutil
package ioutil_test

import (
"context"
"sync"
"testing"

"github.com/skycoin/skywire/internal/ioutil"
)

func TestUint16AckWaiter_Wait(t *testing.T) {
Expand All @@ -12,10 +14,10 @@ func TestUint16AckWaiter_Wait(t *testing.T) {
// each concurrent call to 'Uint16AckWaiter.Wait()' is met with
// multiple concurrent calls to 'Uint16AckWaiter.Done()' with the same seq.
t.Run("ensure_no_race_conditions", func(*testing.T) {
w := new(Uint16AckWaiter)
w := new(ioutil.Uint16AckWaiter)
defer w.StopAll()

seqChan := make(chan Uint16Seq)
seqChan := make(chan ioutil.Uint16Seq)
defer close(seqChan)

wg := new(sync.WaitGroup)
Expand All @@ -24,7 +26,7 @@ func TestUint16AckWaiter_Wait(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
_ = w.Wait(context.TODO(), func(seq Uint16Seq) error { //nolint:errcheck,unparam
_ = w.Wait(context.TODO(), func(seq ioutil.Uint16Seq) error { //nolint:errcheck,unparam
seqChan <- seq
return nil
})
Expand Down
2 changes: 1 addition & 1 deletion internal/ioutil/atomic_bool.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func (b *AtomicBool) Set(v bool) bool {
if v {
newF = 1
}
return newF != atomic.SwapInt32(&b.flag, newF)
return atomic.CompareAndSwapInt32(&b.flag, b.flag, newF)
}

// Get obtains the current boolean value.
Expand Down
5 changes: 4 additions & 1 deletion internal/noise/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ var (

// HandshakeKK is the KK handshake pattern.
HandshakeKK = noise.HandshakeKK

// AcceptHandshakeTimeout determines how long a noise hs should take.
AcceptHandshakeTimeout = time.Second * 10
)

// RPCClientDialer attempts to redial to a remotely served RPCClient.
Expand Down Expand Up @@ -219,7 +222,7 @@ func (ml *Listener) Accept() (net.Conn, error) {
return nil, err
}
rw := NewReadWriter(conn, ns)
if err := rw.Handshake(time.Second * 10); err != nil {
if err := rw.Handshake(AcceptHandshakeTimeout); err != nil {
log.WithError(err).Warn("accept: noise handshake failed.")
continue
}
Expand Down
26 changes: 23 additions & 3 deletions pkg/dmsg/TESTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Note that even though `messaging-discovery` is also considered to be an entity o
**`capped_transport_buffer_should_not_result_in_hang`**

- Given:
- A transport is establised between clientA and clientB.
- A transport is established between clientA and clientB.
- clientA writes to clientB until clientB's buffer is capped (or in other words, clientA's write blocks).
- When:
- clientB dials to clientA and begins reading/writing to/from the newly established transport.
Expand All @@ -45,8 +45,28 @@ Note that even though `messaging-discovery` is also considered to be an entity o

- TODO

**`self_dial_should_work`**

- TODO

### Fuzz testing

We should test the robustness of the system under different conditions and random order of events. These tests should be written consisiting of x-number of servers, clients and a single discovery.
We should test the robustness of the system under different conditions and random order of events. These tests should be written consisting of x-number of servers, clients and a single discovery.

The tests can be event based, with a probability value for each event.

Possible events:
1. Start random server.
2. Stop random server.
3. Start random client.
1. With or without `Accept()` handling.
2. With or without `transport.Read()` handling.
4. Stop random client.
5. Random client dials to another random client.
6. Random write (in len/count) from random established transport.

Notes:
1. We have a set number of possible servers and we are to start all these servers prior to running the test. This way the discovery has entries of the servers which the clients can access when starting.
2. We may need to log the "events" that happen to calculate the expected state of the system
and run the check every x "events".

TODO
33 changes: 23 additions & 10 deletions pkg/dmsg/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,20 @@ func (c *ClientConn) setNextInitID(nextInitID uint16) {
c.mx.Unlock()
}

func (c *ClientConn) readOK() error {
fr, err := readFrame(c.Conn)
if err != nil {
return errors.New("failed to get OK from server")
}

ft, _, _ := fr.Disassemble()
if ft != OkType {
return fmt.Errorf("wrong frame from server: %v", ft)
}

return nil
}

func (c *ClientConn) handleRequestFrame(accept chan<- *Transport, id uint16, p []byte) (cipher.PubKey, error) {
// remotely-initiated tps should:
// - have a payload structured as 'init_pk:resp_pk'.
Expand Down Expand Up @@ -242,9 +256,7 @@ func (c *ClientConn) DialTransport(ctx context.Context, clientPK cipher.PubKey)

// Close closes the connection to dms_server.
func (c *ClientConn) Close() error {
closed := false
c.once.Do(func() {
closed = true
c.log.WithField("remoteServer", c.remoteSrv).Infoln("ClosingConnection")
close(c.done)
c.mx.Lock()
Expand All @@ -257,10 +269,6 @@ func (c *ClientConn) Close() error {
c.mx.Unlock()
c.wg.Wait()
})

if !closed {
return ErrClientClosed
}
return nil
}

Expand Down Expand Up @@ -288,7 +296,7 @@ func NewClient(pk cipher.PubKey, sk cipher.SecKey, dc client.APIClient) *Client
sk: sk,
dc: dc,
conns: make(map[cipher.PubKey]*ClientConn),
accept: make(chan *Transport, acceptChSize),
accept: make(chan *Transport, AcceptBufferSize),
done: make(chan struct{}),
}
}
Expand Down Expand Up @@ -316,9 +324,9 @@ func (c *Client) updateDiscEntry(ctx context.Context) error {
return c.dc.UpdateEntry(ctx, c.sk, entry)
}

func (c *Client) setConn(ctx context.Context, l *ClientConn) {
func (c *Client) setConn(ctx context.Context, conn *ClientConn) {
c.mx.Lock()
c.conns[l.remoteSrv] = l
c.conns[conn.remoteSrv] = conn
if err := c.updateDiscEntry(ctx); err != nil {
c.log.WithError(err).Warn("updateEntry: failed")
}
Expand Down Expand Up @@ -423,13 +431,18 @@ func (c *Client) findOrConnectToServer(ctx context.Context, srvPK cipher.PubKey)
if err != nil {
return nil, err
}
nc, err := noise.WrapConn(tcpConn, ns, hsTimeout)
nc, err := noise.WrapConn(tcpConn, ns, TransportHandshakeTimeout)
if err != nil {
return nil, err
}

conn := NewClientConn(c.log, nc, c.pk, srvPK)
if err := conn.readOK(); err != nil {
return nil, err
}

c.setConn(ctx, conn)

go func() {
err := conn.Serve(ctx, c.accept)
conn.log.WithError(err).WithField("remoteServer", srvPK).Warn("connected with server closed")
Expand Down
16 changes: 8 additions & 8 deletions pkg/dmsg/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,8 @@ func TestClient(t *testing.T) {
conn1 := NewClientConn(logger, p1, pk1, pk2)
conn2 := NewClientConn(logger, p2, pk2, pk1)

conn2.setNextInitID(randID(false))

ch1 := make(chan *Transport, acceptChSize)
ch2 := make(chan *Transport, acceptChSize)
ch1 := make(chan *Transport, AcceptBufferSize)
ch2 := make(chan *Transport, AcceptBufferSize)

ctx := context.TODO()

Expand Down Expand Up @@ -110,10 +108,10 @@ func TestClient(t *testing.T) {
conn2.setNextInitID(randID(false))
conn4.setNextInitID(randID(false))

ch1 := make(chan *Transport, acceptChSize)
ch2 := make(chan *Transport, acceptChSize)
ch3 := make(chan *Transport, acceptChSize)
ch4 := make(chan *Transport, acceptChSize)
ch1 := make(chan *Transport, AcceptBufferSize)
ch2 := make(chan *Transport, AcceptBufferSize)
ch3 := make(chan *Transport, AcceptBufferSize)
ch4 := make(chan *Transport, AcceptBufferSize)

ctx := context.TODO()

Expand Down Expand Up @@ -277,10 +275,12 @@ func isReadChannelOpen(ch chan Frame) bool {
}
}

// used so that we can get two 'ClientConn's directly communicating with one another.
type invertedIDConn struct {
net.Conn
}

// Write ensures odd IDs turn even, and even IDs turn odd on write.
func (c invertedIDConn) Write(b []byte) (n int, err error) {
frame := Frame(b)
newFrame := MakeFrame(frame.Type(), frame.TpID()^1, frame.Pay())
Expand Down
12 changes: 10 additions & 2 deletions pkg/dmsg/frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,20 @@ const (
// Type returns the transport type string.
Type = "dmsg"

hsTimeout = time.Second * 10
tpBufCap = math.MaxUint16
tpBufFrameCap = math.MaxUint8
tpAckCap = math.MaxUint8
acceptChSize = 20
headerLen = 5 // fType(1 byte), chID(2 byte), payLen(2 byte)
)

var (
// TransportHandshakeTimeout defines the duration a transport handshake should take.
TransportHandshakeTimeout = time.Second * 10

// AcceptBufferSize defines the size of the accepts buffer.
AcceptBufferSize = 20
)

func isInitiatorID(tpID uint16) bool { return tpID%2 == 0 }

func randID(initiator bool) uint16 {
Expand Down Expand Up @@ -52,6 +58,7 @@ func (ft FrameType) String() string {
CloseType: "CLOSE",
FwdType: "FWD",
AckType: "ACK",
OkType: "OK",
}
if int(ft) >= len(names) {
return fmt.Sprintf("UNKNOWN:%d", ft)
Expand All @@ -61,6 +68,7 @@ func (ft FrameType) String() string {

// Frame types.
const (
OkType = FrameType(0x0)
RequestType = FrameType(0x1)
AcceptType = FrameType(0x2)
CloseType = FrameType(0x3)
Expand Down
6 changes: 3 additions & 3 deletions pkg/dmsg/frame_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ func TestFrameType_String(t *testing.T) {
want: "ACK",
},
{
name: "Empty type",
ft: 0,
want: "",
name: "Ok type",
ft: OkType,
want: "OK",
},
{
name: "Unknown type",
Expand Down
35 changes: 21 additions & 14 deletions pkg/dmsg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ var ErrListenerAlreadyWrappedToNoise = errors.New("listener is already wrapped t

// NextConn provides information on the next connection.
type NextConn struct {
l *ServerConn
id uint16
conn *ServerConn
id uint16
}

func (r *NextConn) writeFrame(ft FrameType, p []byte) error {
if err := writeFrame(r.l.Conn, MakeFrame(ft, r.id, p)); err != nil {
go r.l.Close()
if err := writeFrame(r.conn.Conn, MakeFrame(ft, r.id, p)); err != nil {
go r.conn.Close()
return err
}
return nil
Expand Down Expand Up @@ -114,6 +114,11 @@ func (c *ServerConn) Serve(ctx context.Context, getConn getConnFunc) (err error)
}()
log.WithField("connCount", incrementServeCount()).Infoln("ServingConn")

err = c.writeOK()
if err != nil {
return fmt.Errorf("sending OK failed: %s", err)
}

for {
f, err := readFrame(c.Conn)
if err != nil {
Expand All @@ -125,7 +130,7 @@ func (c *ServerConn) Serve(ctx context.Context, getConn getConnFunc) (err error)

switch ft {
case RequestType:
ctx, cancel := context.WithTimeout(ctx, hsTimeout)
ctx, cancel := context.WithTimeout(ctx, TransportHandshakeTimeout)
_, why, ok := c.handleRequest(ctx, getConn, id, p)
cancel()
if !ok {
Expand All @@ -151,7 +156,7 @@ func (c *ServerConn) Serve(ctx context.Context, getConn getConnFunc) (err error)
// On success, if Close frame, delete the associations.
if ft == CloseType {
c.delNext(id)
next.l.delNext(next.id)
next.conn.delNext(next.id)
}

default:
Expand All @@ -170,6 +175,13 @@ func (c *ServerConn) delChan(id uint16, why byte) error {
return nil
}

func (c *ServerConn) writeOK() error {
if err := writeFrame(c.Conn, MakeFrame(OkType, 0, nil)); err != nil {
return err
}
return nil
}

func (c *ServerConn) forwardFrame(ft FrameType, id uint16, p []byte) (*NextConn, byte, bool) { //nolint:unparam
next, ok := c.getNext(id)
if !ok {
Expand All @@ -192,11 +204,11 @@ func (c *ServerConn) handleRequest(ctx context.Context, getLink getConnFunc, id
}

// set next relations.
respID, err := respL.addNext(ctx, &NextConn{l: c, id: id})
respID, err := respL.addNext(ctx, &NextConn{conn: c, id: id})
if err != nil {
return nil, 0, false
}
next := &NextConn{l: respL, id: respID}
next := &NextConn{conn: respL, id: respID}
c.setNext(id, next)

// forward to responding client.
Expand Down Expand Up @@ -281,11 +293,6 @@ func (s *Server) connCount() int {

// Close closes the dms_server.
func (s *Server) Close() (err error) {
defer func() {
if r := recover(); r != nil {
err = errors.New("listener has not started")
}
}()
if err = s.lis.Close(); err != nil {
return err
}
Expand All @@ -303,7 +310,7 @@ func (s *Server) Serve() error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

if err := s.retryUpdateEntry(ctx, hsTimeout); err != nil {
if err := s.retryUpdateEntry(ctx, TransportHandshakeTimeout); err != nil {
return fmt.Errorf("updating server's discovery entry failed with: %s", err)
}

Expand Down
1 change: 0 additions & 1 deletion pkg/dmsg/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,6 @@ func (tp *Transport) Serve() {

// notify of new data via 'bufCh'
select {
case <-tp.done:
case tp.bufCh <- struct{}{}:
default:
}
Expand Down
Loading

0 comments on commit 95e9469

Please sign in to comment.