Skip to content

Commit

Permalink
Merge branch 'feature/dmsg' of https://github.com/evanlinjin/skywire
Browse files Browse the repository at this point in the history
…into fix/dmsg_request_rejected
  • Loading branch information
Darkren committed Jun 20, 2019
2 parents 6894e9f + 8866fd4 commit 81ff6c4
Show file tree
Hide file tree
Showing 155 changed files with 26,123 additions and 4,451 deletions.
4 changes: 2 additions & 2 deletions internal/ioutil/ack_waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (w *Uint16AckWaiter) StopAll() {

// Wait performs the given action, and waits for given seq to be Done.
func (w *Uint16AckWaiter) Wait(ctx context.Context, action func(seq Uint16Seq) error) (err error) {
ackCh := make(chan struct{})
ackCh := make(chan struct{}, 1)

w.mx.Lock()
seq := w.nextSeq
Expand All @@ -78,7 +78,7 @@ func (w *Uint16AckWaiter) Wait(ctx context.Context, action func(seq Uint16Seq) e
case _, ok := <-ackCh:
if !ok {
// waiter stopped manually.
return io.ErrClosedPipe
err = io.ErrClosedPipe
}
case <-ctx.Done():
err = ctx.Err()
Expand Down
49 changes: 35 additions & 14 deletions internal/ioutil/ack_waiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,44 @@ package ioutil

import (
"context"
"sync"
"testing"
)

// Ensure that no race conditions occurs.
func TestUint16AckWaiter_Wait(t *testing.T) {
w := new(Uint16AckWaiter)

seqChan := make(chan Uint16Seq)
defer close(seqChan)
for i := 0; i < 64; i++ {
go w.Wait(context.TODO(), func(seq Uint16Seq) error { //nolint:errcheck,unparam
seqChan <- seq
return nil
})
seq := <-seqChan
for j := 0; j < i; j++ {
go w.Done(seq)

// Ensure that no race conditions occurs when
// each concurrent call to 'Uint16AckWaiter.Wait()' is met with
// multiple concurrent calls to 'Uint16AckWaiter.Done()' with the same seq.
t.Run("ensure_no_race_conditions", func(*testing.T) {
w := new(Uint16AckWaiter)
defer w.StopAll()

seqChan := make(chan Uint16Seq)
defer close(seqChan)

wg := new(sync.WaitGroup)

for i := 0; i < 64; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_ = w.Wait(context.TODO(), func(seq Uint16Seq) error { //nolint:errcheck,unparam
seqChan <- seq
return nil
})
}()

seq := <-seqChan
for j := 0; j <= i; j++ {
wg.Add(1)
go func() {
defer wg.Done()
w.Done(seq)
}()
}
}
}

wg.Wait()
})
}
26 changes: 23 additions & 3 deletions pkg/dmsg/TESTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Note that even though `messaging-discovery` is also considered to be an entity o
**`capped_transport_buffer_should_not_result_in_hang`**

- Given:
- A transport is establised between clientA and clientB.
- A transport is established between clientA and clientB.
- clientA writes to clientB until clientB's buffer is capped (or in other words, clientA's write blocks).
- When:
- clientB dials to clientA and begins reading/writing to/from the newly established transport.
Expand All @@ -45,8 +45,28 @@ Note that even though `messaging-discovery` is also considered to be an entity o

- TODO

**`self_dial_should_work`**

- TODO

### Fuzz testing

We should test the robustness of the system under different conditions and random order of events. These tests should be written consisiting of x-number of servers, clients and a single discovery.
We should test the robustness of the system under different conditions and random order of events. These tests should be written consisting of x-number of servers, clients and a single discovery.

The tests can be event based, with a probability value for each event.

Possible events:
1. Start random server.
2. Stop random server.
3. Start random client.
1. With or without `Accept()` handling.
2. With or without `transport.Read()` handling.
4. Stop random client.
5. Random client dials to another random client.
6. Random write (in len/count) from random established transport.

Notes:
1. We have a set number of possible servers and we are to start all these servers prior to running the test. This way the discovery has entries of the servers which the clients can access when starting.
2. We may need to log the "events" that happen to calculate the expected state of the system
and run the check every x "events".

TODO
8 changes: 7 additions & 1 deletion pkg/dmsg/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ var (
ErrNoSrv = errors.New("remote has no DelegatedServers")
// ErrClientClosed indicates that client is closed and not accepting new connections.
ErrClientClosed = errors.New("client closed")
// ErrClientAcceptMaxed indicates that the client cannot take in more accepts.
ErrClientAcceptMaxed = errors.New("client accepts buffer maxed")
)

// ClientConn represents a connection between a dmsg.Client and dmsg.Server from a client's perspective.
Expand Down Expand Up @@ -137,19 +139,23 @@ func (c *ClientConn) handleRequestFrame(accept chan<- *Transport, id uint16, p [
}

tp := NewTransport(c.Conn, c.log, c.local, initPK, id, c.delTp)
c.setTp(tp)

select {
case <-c.done:
_ = tp.Close() //nolint:errcheck
return initPK, ErrClientClosed

case accept <- tp:
c.setTp(tp)
if err := tp.WriteAccept(); err != nil {
return initPK, err
}
go tp.Serve()
return initPK, nil

default:
_ = tp.Close() //nolint:errcheck
return initPK, ErrClientAcceptMaxed
}
}

Expand Down
37 changes: 19 additions & 18 deletions pkg/transport/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package transport
import (
"context"
"errors"
"io"
"math/big"
"strings"
"sync"
Expand Down Expand Up @@ -209,21 +210,17 @@ func (tm *Manager) CreateTransport(ctx context.Context, remote cipher.PubKey, tp
// DeleteTransport disconnects and removes the Transport of Transport ID.
func (tm *Manager) DeleteTransport(id uuid.UUID) error {
tm.mu.Lock()
tr := tm.transports[id]
delete(tm.transports, id)
if tr, ok := tm.transports[id]; ok {
delete(tm.transports, id)
_ = tr.Close() //nolint:errcheck
}
tm.mu.Unlock()

tr.Close()

if _, err := tm.config.DiscoveryClient.UpdateStatuses(context.Background(), &Status{ID: id, IsUp: false}); err != nil {
tm.Logger.Warnf("Failed to change transport status: %s", err)
}

tm.Logger.Infof("Unregistered transport %s", id)
if tr != nil {
return tr.Close()
}

return nil
}

Expand Down Expand Up @@ -298,11 +295,13 @@ func (tm *Manager) createTransport(ctx context.Context, remote cipher.PubKey, tp
tm.transports[entry.ID] = mTr
tm.mu.Unlock()

tm.TrChan <- mTr

go tm.manageTransport(ctx, mTr, factory, remote, public, false)

return mTr, nil
select {
case <-tm.doneChan:
return nil, io.ErrClosedPipe
case tm.TrChan <- mTr:
go tm.manageTransport(ctx, mTr, factory, remote, public, false)
return mTr, nil
}
}

func (tm *Manager) acceptTransport(ctx context.Context, factory Factory) (*ManagedTransport, error) {
Expand Down Expand Up @@ -338,11 +337,13 @@ func (tm *Manager) acceptTransport(ctx context.Context, factory Factory) (*Manag
tm.transports[entry.ID] = mTr
tm.mu.Unlock()

tm.TrChan <- mTr

go tm.manageTransport(ctx, mTr, factory, remote, true, true)

return mTr, nil
select {
case <-tm.doneChan:
return nil, io.ErrClosedPipe
case tm.TrChan <- mTr:
go tm.manageTransport(ctx, mTr, factory, remote, true, true)
return mTr, nil
}
}

func (tm *Manager) addEntry(entry *Entry) {
Expand Down
69 changes: 36 additions & 33 deletions vendor/golang.org/x/crypto/blake2b/blake2b_generic.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 81ff6c4

Please sign in to comment.