Skip to content

Commit

Permalink
Merge pull request #386 from ayuryshev/bug/remove-multiple-manageTran…
Browse files Browse the repository at this point in the history
…sport

Further improvements in ManagedTransport concurrency
  • Loading branch information
志宇 authored Jun 9, 2019
2 parents bcae8da + dd820d6 commit 2975549
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 148 deletions.
2 changes: 2 additions & 0 deletions pkg/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ func (r *Router) Serve(ctx context.Context) error {
isAccepted, isSetup := tp.Accepted, r.IsSetupTransport(tp)
r.mu.Unlock()

r.Logger.Infof("New transport: isAccepted: %v, isSetup: %v", isAccepted, isSetup)

var serve func(io.ReadWriter) error
switch {
case isAccepted && isSetup:
Expand Down
11 changes: 6 additions & 5 deletions pkg/transport/discovery_test.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
package transport
package transport_test

import (
"context"
"fmt"

"github.com/skycoin/skywire/pkg/cipher"
"github.com/skycoin/skywire/pkg/transport"
)

func ExampleNewDiscoveryMock() {
dc := NewDiscoveryMock()
dc := transport.NewDiscoveryMock()
pk1, _ := cipher.GenerateKeyPair()
pk2, _ := cipher.GenerateKeyPair()
entry := &Entry{Type: "mock", EdgeKeys: SortPubKeys(pk1, pk2)}
entry := &transport.Entry{Type: "mock", EdgeKeys: transport.SortPubKeys(pk1, pk2)}

sEntry := &SignedEntry{Entry: entry}
sEntry := &transport.SignedEntry{Entry: entry}

if err := dc.RegisterTransports(context.TODO(), sEntry); err == nil {
fmt.Println("RegisterTransport success")
Expand All @@ -33,7 +34,7 @@ func ExampleNewDiscoveryMock() {
fmt.Printf("entriesWS[0].Entry.Edges()[0] == entry.Edges()[0] is %v\n", entriesWS[0].Entry.Edges()[0] == entry.Edges()[0])
}

if _, err := dc.UpdateStatuses(context.TODO(), &Status{}); err == nil {
if _, err := dc.UpdateStatuses(context.TODO(), &transport.Status{}); err == nil {
fmt.Println("UpdateStatuses success")
} else {
fmt.Println(err.Error())
Expand Down
21 changes: 11 additions & 10 deletions pkg/transport/entry_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package transport
package transport_test

import (
"fmt"

"github.com/google/uuid"

"github.com/skycoin/skywire/pkg/cipher"
"github.com/skycoin/skywire/pkg/transport"
)

// ExampleNewEntry shows that with different order of edges:
Expand All @@ -15,8 +16,8 @@ func ExampleNewEntry() {
pkA, _ := cipher.GenerateKeyPair()
pkB, _ := cipher.GenerateKeyPair()

entryAB := NewEntry(pkA, pkB, "", true)
entryBA := NewEntry(pkB, pkA, "", true)
entryAB := transport.NewEntry(pkA, pkB, "", true)
entryBA := transport.NewEntry(pkB, pkA, "", true)

if entryAB.ID == entryBA.ID {
fmt.Println("entryAB.ID == entryBA.ID")
Expand All @@ -32,14 +33,14 @@ func ExampleEntry_Edges() {
pkA, _ := cipher.GenerateKeyPair()
pkB, _ := cipher.GenerateKeyPair()

entryAB := Entry{
entryAB := transport.Entry{
ID: uuid.UUID{},
EdgeKeys: [2]cipher.PubKey{pkA, pkB},
Type: "",
Public: true,
}

entryBA := Entry{
entryBA := transport.Entry{
ID: uuid.UUID{},
EdgeKeys: [2]cipher.PubKey{pkB, pkA},
Type: "",
Expand All @@ -62,7 +63,7 @@ func ExampleEntry_SetEdges() {
pkA, _ := cipher.GenerateKeyPair()
pkB, _ := cipher.GenerateKeyPair()

entryAB, entryBA := Entry{}, Entry{}
entryAB, entryBA := transport.Entry{}, transport.Entry{}

entryAB.SetEdges([2]cipher.PubKey{pkA, pkB})
entryBA.SetEdges([2]cipher.PubKey{pkA, pkB})
Expand All @@ -85,8 +86,8 @@ func ExampleSignedEntry_Sign() {
pkA, skA := cipher.GenerateKeyPair()
pkB, skB := cipher.GenerateKeyPair()

entry := NewEntry(pkA, pkB, "mock", true)
sEntry := &SignedEntry{Entry: entry}
entry := transport.NewEntry(pkA, pkB, "mock", true)
sEntry := &transport.SignedEntry{Entry: entry}

if sEntry.Signatures[0].Null() && sEntry.Signatures[1].Null() {
fmt.Println("No signatures set")
Expand Down Expand Up @@ -119,8 +120,8 @@ func ExampleSignedEntry_Signature() {
pkA, skA := cipher.GenerateKeyPair()
pkB, skB := cipher.GenerateKeyPair()

entry := NewEntry(pkA, pkB, "mock", true)
sEntry := &SignedEntry{Entry: entry}
entry := transport.NewEntry(pkA, pkB, "mock", true)
sEntry := &transport.SignedEntry{Entry: entry}
if ok := sEntry.Sign(pkA, skA); !ok {
fmt.Println("Error signing sEntry with (pkA,skA)")
}
Expand Down
24 changes: 0 additions & 24 deletions pkg/transport/handshake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,30 +199,6 @@ func TestSettlementHandshake(t *testing.T) {

}

/*
func TestSettlementHandshakeInvalidSig(t *testing.T) {
mockEnv := newHsMockEnv()
require.NoError(t, mockEnv.err1)
require.NoError(t, mockEnv.err2)
go settlementInitiatorHandshake(true)(mockEnv.m2, mockEnv.tr1) // nolint: errcheck
_, err := settlementResponderHandshake(mockEnv.m2, mockEnv.tr2)
require.Error(t, err)
assert.Equal(t, "Recovered pubkey does not match pubkey", err.Error())
in, out := net.Pipe()
tr1 := NewMockTransport(in, mockEnv.pk1, mockEnv.pk2)
tr2 := NewMockTransport(out, mockEnv.pk2, mockEnv.pk1)
go settlementResponderHandshake(mockEnv.m1, tr2) // nolint: errcheck
_, err = settlementInitiatorHandshake(true)(mockEnv.m1, tr1)
require.Error(t, err)
assert.Equal(t, "Recovered pubkey does not match pubkey", err.Error())
}
*/

func TestSettlementHandshakePrivate(t *testing.T) {
mockEnv := newHsMockEnv()

Expand Down
13 changes: 7 additions & 6 deletions pkg/transport/log_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package transport
package transport_test

import (
"io/ioutil"
Expand All @@ -7,17 +7,18 @@ import (
"testing"

"github.com/google/uuid"
"github.com/skycoin/skywire/pkg/transport"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func testTransportLogStore(t *testing.T, logStore LogStore) {
func testTransportLogStore(t *testing.T, logStore transport.LogStore) {
t.Helper()

id1 := uuid.New()
entry1 := &LogEntry{big.NewInt(100), big.NewInt(200)}
entry1 := &transport.LogEntry{big.NewInt(100), big.NewInt(200)}
id2 := uuid.New()
entry2 := &LogEntry{big.NewInt(300), big.NewInt(400)}
entry2 := &transport.LogEntry{big.NewInt(300), big.NewInt(400)}

require.NoError(t, logStore.Record(id1, entry1))
require.NoError(t, logStore.Record(id2, entry2))
Expand All @@ -29,15 +30,15 @@ func testTransportLogStore(t *testing.T, logStore LogStore) {
}

func TestInMemoryTransportLogStore(t *testing.T) {
testTransportLogStore(t, InMemoryTransportLogStore())
testTransportLogStore(t, transport.InMemoryTransportLogStore())
}

func TestFileTransportLogStore(t *testing.T) {
dir, err := ioutil.TempDir("", "log_store")
require.NoError(t, err)
defer os.RemoveAll(dir)

ls, err := FileTransportLogStore(dir)
ls, err := transport.FileTransportLogStore(dir)
require.NoError(t, err)
testTransportLogStore(t, ls)
}
66 changes: 27 additions & 39 deletions pkg/transport/managed_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package transport
import (
"math/big"
"sync"
"sync/atomic"

"github.com/google/uuid"
)
Expand All @@ -17,10 +16,10 @@ type ManagedTransport struct {
Accepted bool
LogEntry *LogEntry

doneChan chan struct{}
errChan chan error
isClosing int32
mu sync.RWMutex
doneChan chan struct{}
errChan chan error
mu sync.RWMutex
once sync.Once

readLogChan chan int
writeLogChan chan int
Expand All @@ -34,8 +33,8 @@ func newManagedTransport(id uuid.UUID, tr Transport, public bool, accepted bool)
Accepted: accepted,
doneChan: make(chan struct{}),
errChan: make(chan error),
readLogChan: make(chan int),
writeLogChan: make(chan int),
readLogChan: make(chan int, 16),
writeLogChan: make(chan int, 16),
LogEntry: &LogEntry{new(big.Int), new(big.Int)},
}
}
Expand All @@ -45,22 +44,12 @@ func (tr *ManagedTransport) Read(p []byte) (n int, err error) {
tr.mu.RLock()
n, err = tr.Transport.Read(p) // TODO: data race.
tr.mu.RUnlock()
if err == nil {
select {
case <-tr.doneChan:
return
case tr.readLogChan <- n:
}

return
}

select {
case <-tr.doneChan:
return
case tr.errChan <- err:
if err != nil {
tr.errChan <- err
}

tr.readLogChan <- n
return
}

Expand All @@ -69,42 +58,41 @@ func (tr *ManagedTransport) Write(p []byte) (n int, err error) {
tr.mu.RLock()
n, err = tr.Transport.Write(p)
tr.mu.RUnlock()
if err == nil {
select {
case <-tr.doneChan:
return
case tr.writeLogChan <- n:
}

if err != nil {
tr.errChan <- err
return
}

select {
case <-tr.doneChan:
return
case tr.errChan <- err:
}
tr.writeLogChan <- n

return
}

// killWorker sends signal to Manager.manageTransport goroutine to exit
// it's safe to call it multiple times
func (tr *ManagedTransport) killWorker() {
tr.once.Do(func() {
close(tr.doneChan)
})
}

// Close closes underlying
func (tr *ManagedTransport) Close() error {

atomic.StoreInt32(&tr.isClosing, 1)

tr.mu.RLock()
err := tr.Transport.Close()
tr.mu.RUnlock()

tr.killWorker()
return err
}

func (tr *ManagedTransport) isClosing() bool {
select {
case <-tr.doneChan:
return true
default:

close(tr.doneChan)
return false
}

return err
}

func (tr *ManagedTransport) updateTransport(newTr Transport) {
Expand Down
Loading

0 comments on commit 2975549

Please sign in to comment.