From af6f71d9db2fdaa68445d8d0ffa7626d2394df33 Mon Sep 17 00:00:00 2001 From: Alex Yu Date: Tue, 4 Jun 2019 10:14:40 +0300 Subject: [PATCH 1/8] Further improvements into ManagedTransport concurrency There are three changes in `Manager`-`ManagedTransport`: 1. Eliminated subtle bug with `manageTransport` and `manageTransportLogs` both trying to select from `ManagedTransport.doneChan`. `manageTransportLogs` was eliminated in the process 2. Eliminated subtle bug with only one redial on errors in `manageTransport` 3. Some tests in `transport` are now blackbox-tests: - put in separate package `transport_test` - test only public API --- pkg/transport/discovery_test.go | 11 ++-- pkg/transport/entry_test.go | 21 ++++--- pkg/transport/manager.go | 97 +++++++++++++---------------- pkg/transport/tcp_transport_test.go | 13 ++-- 4 files changed, 68 insertions(+), 74 deletions(-) diff --git a/pkg/transport/discovery_test.go b/pkg/transport/discovery_test.go index db01261433..190f86de42 100644 --- a/pkg/transport/discovery_test.go +++ b/pkg/transport/discovery_test.go @@ -1,19 +1,20 @@ -package transport +package transport_test import ( "context" "fmt" "github.com/skycoin/skywire/pkg/cipher" + "github.com/skycoin/skywire/pkg/transport" ) func ExampleNewDiscoveryMock() { - dc := NewDiscoveryMock() + dc := transport.NewDiscoveryMock() pk1, _ := cipher.GenerateKeyPair() pk2, _ := cipher.GenerateKeyPair() - entry := &Entry{Type: "mock", EdgeKeys: SortPubKeys(pk1, pk2)} + entry := &transport.Entry{Type: "mock", EdgeKeys: transport.SortPubKeys(pk1, pk2)} - sEntry := &SignedEntry{Entry: entry} + sEntry := &transport.SignedEntry{Entry: entry} if err := dc.RegisterTransports(context.TODO(), sEntry); err == nil { fmt.Println("RegisterTransport success") @@ -33,7 +34,7 @@ func ExampleNewDiscoveryMock() { fmt.Printf("entriesWS[0].Entry.Edges()[0] == entry.Edges()[0] is %v\n", entriesWS[0].Entry.Edges()[0] == entry.Edges()[0]) } - if _, err := dc.UpdateStatuses(context.TODO(), &Status{}); err == nil { + if _, err := dc.UpdateStatuses(context.TODO(), &transport.Status{}); err == nil { fmt.Println("UpdateStatuses success") } else { fmt.Println(err.Error()) diff --git a/pkg/transport/entry_test.go b/pkg/transport/entry_test.go index 974dee8153..8e882ffc49 100644 --- a/pkg/transport/entry_test.go +++ b/pkg/transport/entry_test.go @@ -1,4 +1,4 @@ -package transport +package transport_test import ( "fmt" @@ -6,6 +6,7 @@ import ( "github.com/google/uuid" "github.com/skycoin/skywire/pkg/cipher" + "github.com/skycoin/skywire/pkg/transport" ) // ExampleNewEntry shows that with different order of edges: @@ -15,8 +16,8 @@ func ExampleNewEntry() { pkA, _ := cipher.GenerateKeyPair() pkB, _ := cipher.GenerateKeyPair() - entryAB := NewEntry(pkA, pkB, "", true) - entryBA := NewEntry(pkB, pkA, "", true) + entryAB := transport.NewEntry(pkA, pkB, "", true) + entryBA := transport.NewEntry(pkB, pkA, "", true) if entryAB.ID == entryBA.ID { fmt.Println("entryAB.ID == entryBA.ID") @@ -32,14 +33,14 @@ func ExampleEntry_Edges() { pkA, _ := cipher.GenerateKeyPair() pkB, _ := cipher.GenerateKeyPair() - entryAB := Entry{ + entryAB := transport.Entry{ ID: uuid.UUID{}, EdgeKeys: [2]cipher.PubKey{pkA, pkB}, Type: "", Public: true, } - entryBA := Entry{ + entryBA := transport.Entry{ ID: uuid.UUID{}, EdgeKeys: [2]cipher.PubKey{pkB, pkA}, Type: "", @@ -62,7 +63,7 @@ func ExampleEntry_SetEdges() { pkA, _ := cipher.GenerateKeyPair() pkB, _ := cipher.GenerateKeyPair() - entryAB, entryBA := Entry{}, Entry{} + entryAB, entryBA := transport.Entry{}, transport.Entry{} entryAB.SetEdges([2]cipher.PubKey{pkA, pkB}) entryBA.SetEdges([2]cipher.PubKey{pkA, pkB}) @@ -85,8 +86,8 @@ func ExampleSignedEntry_Sign() { pkA, skA := cipher.GenerateKeyPair() pkB, skB := cipher.GenerateKeyPair() - entry := NewEntry(pkA, pkB, "mock", true) - sEntry := &SignedEntry{Entry: entry} + entry := transport.NewEntry(pkA, pkB, "mock", true) + sEntry := &transport.SignedEntry{Entry: entry} if sEntry.Signatures[0].Null() && sEntry.Signatures[1].Null() { fmt.Println("No signatures set") @@ -119,8 +120,8 @@ func ExampleSignedEntry_Signature() { pkA, skA := cipher.GenerateKeyPair() pkB, skB := cipher.GenerateKeyPair() - entry := NewEntry(pkA, pkB, "mock", true) - sEntry := &SignedEntry{Entry: entry} + entry := transport.NewEntry(pkA, pkB, "mock", true) + sEntry := &transport.SignedEntry{Entry: entry} if ok := sEntry.Sign(pkA, skA); !ok { fmt.Println("Error signing sEntry with (pkA,skA)") } diff --git a/pkg/transport/manager.go b/pkg/transport/manager.go index 82c8e830ef..ad7215882b 100644 --- a/pkg/transport/manager.go +++ b/pkg/transport/manager.go @@ -309,21 +309,19 @@ func (tm *Manager) createTransport(ctx context.Context, remote cipher.PubKey, tp } tm.Logger.Infof("Dialed to %s using %s factory. Transport ID: %s", remote, tpType, entry.ID) - managedTr := newManagedTransport(entry.ID, tr, entry.Public, false) + mTr := newManagedTransport(entry.ID, tr, entry.Public, false) tm.mu.Lock() - tm.transports[entry.ID] = managedTr + tm.transports[entry.ID] = mTr select { case <-tm.doneChan: - case tm.TrChan <- managedTr: + case tm.TrChan <- mTr: default: } tm.mu.Unlock() - go tm.manageTransport(ctx, managedTr, factory, remote, public, false) + go tm.manageTransport(ctx, mTr, factory, remote, public, false) - go tm.manageTransportLogs(managedTr) - - return managedTr, nil + return mTr, nil } func (tm *Manager) acceptTransport(ctx context.Context, factory Factory) (*ManagedTransport, error) { @@ -345,22 +343,20 @@ func (tm *Manager) acceptTransport(ctx context.Context, factory Factory) (*Manag } tm.Logger.Infof("Accepted new transport with type %s from %s. ID: %s", factory.Type(), remote, entry.ID) - managedTr := newManagedTransport(entry.ID, tr, entry.Public, true) + mTr := newManagedTransport(entry.ID, tr, entry.Public, true) tm.mu.Lock() - tm.transports[entry.ID] = managedTr + tm.transports[entry.ID] = mTr select { case <-tm.doneChan: - case tm.TrChan <- managedTr: + case tm.TrChan <- mTr: default: } tm.mu.Unlock() - go tm.manageTransport(ctx, managedTr, factory, remote, true, true) - - go tm.manageTransportLogs(managedTr) + go tm.manageTransport(ctx, mTr, factory, remote, true, true) - return managedTr, nil + return mTr, nil } func (tm *Manager) walkEntries(walkFunc func(*Entry) bool) *Entry { @@ -382,49 +378,44 @@ func (tm *Manager) addEntry(entry *Entry) { tm.mu.Unlock() } -func (tm *Manager) manageTransport(ctx context.Context, managedTr *ManagedTransport, factory Factory, remote cipher.PubKey, public bool, accepted bool) { - select { - case <-managedTr.doneChan: - tm.Logger.Infof("Transport %s closed", managedTr.ID) - return - case err := <-managedTr.errChan: - if atomic.LoadInt32(&managedTr.isClosing) == 0 { - tm.Logger.Infof("Transport %s failed with error: %s. Re-dialing...", managedTr.ID, err) - if accepted { - if err := tm.DeleteTransport(managedTr.ID); err != nil { - tm.Logger.Warnf("Failed to delete accepted transport: %s", err) - } - } else { - tr, _, err := tm.dialTransport(ctx, factory, remote, public) - if err != nil { - tm.Logger.Infof("Failed to re-dial Transport %s: %s", managedTr.ID, err) - if err := tm.DeleteTransport(managedTr.ID); err != nil { - tm.Logger.Warnf("Failed to delete re-dialled transport: %s", err) +func (tm *Manager) manageTransport(ctx context.Context, mTr *ManagedTransport, factory Factory, remote cipher.PubKey, public bool, accepted bool) { + for { + select { + case <-mTr.doneChan: + tm.Logger.Infof("Transport %s closed", mTr.ID) + return + case err := <-mTr.errChan: + if atomic.LoadInt32(&mTr.isClosing) == 0 { + tm.Logger.Infof("Transport %s failed with error: %s. Re-dialing...", mTr.ID, err) + if accepted { + if err := tm.DeleteTransport(mTr.ID); err != nil { + tm.Logger.Warnf("Failed to delete accepted transport: %s", err) } } else { - managedTr.updateTransport(tr) + tr, _, err := tm.dialTransport(ctx, factory, remote, public) + if err != nil { + tm.Logger.Infof("Failed to re-dial Transport %s: %s", mTr.ID, err) + if err := tm.DeleteTransport(mTr.ID); err != nil { + tm.Logger.Warnf("Failed to delete re-dialled transport: %s", err) + } + } else { + tm.Logger.Infof("Updating transport %s", mTr.ID) + mTr.updateTransport(tr) + } } + } else { + tm.Logger.Infof("Transport %s is already closing. Skipped error: %s", mTr.ID, err) + } + case n := <-mTr.readLogChan: + mTr.LogEntry.ReceivedBytes.Add(mTr.LogEntry.ReceivedBytes, big.NewInt(int64(n))) + if err := tm.config.LogStore.Record(mTr.ID, mTr.LogEntry); err != nil { + tm.Logger.Warnf("Failed to record log entry: %s", err) + } + case n := <-mTr.writeLogChan: + mTr.LogEntry.SentBytes.Add(mTr.LogEntry.SentBytes, big.NewInt(int64(n))) + if err := tm.config.LogStore.Record(mTr.ID, mTr.LogEntry); err != nil { + tm.Logger.Warnf("Failed to record log entry: %s", err) } - } else { - tm.Logger.Infof("Transport %s is already closing. Skipped error: %s", managedTr.ID, err) - } - - } -} - -func (tm *Manager) manageTransportLogs(tr *ManagedTransport) { - for { - select { - case <-tr.doneChan: - return - case n := <-tr.readLogChan: - tr.LogEntry.ReceivedBytes.Add(tr.LogEntry.ReceivedBytes, big.NewInt(int64(n))) - case n := <-tr.writeLogChan: - tr.LogEntry.SentBytes.Add(tr.LogEntry.SentBytes, big.NewInt(int64(n))) - } - - if err := tm.config.LogStore.Record(tr.ID, tr.LogEntry); err != nil { - tm.Logger.Warnf("Failed to record log entry: %s", err) } } } diff --git a/pkg/transport/tcp_transport_test.go b/pkg/transport/tcp_transport_test.go index 0715a9c381..cd60504f87 100644 --- a/pkg/transport/tcp_transport_test.go +++ b/pkg/transport/tcp_transport_test.go @@ -1,4 +1,4 @@ -package transport +package transport_test import ( "context" @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/require" "github.com/skycoin/skywire/pkg/cipher" + "github.com/skycoin/skywire/pkg/transport" ) func TestTCPFactory(t *testing.T) { @@ -28,10 +29,10 @@ func TestTCPFactory(t *testing.T) { l2, err := net.ListenTCP("tcp", addr2) require.NoError(t, err) - pkt1 := InMemoryPubKeyTable(map[cipher.PubKey]*net.TCPAddr{pk2: addr2}) - pkt2 := InMemoryPubKeyTable(map[cipher.PubKey]*net.TCPAddr{pk1: addr1}) + pkt1 := transport.InMemoryPubKeyTable(map[cipher.PubKey]*net.TCPAddr{pk2: addr2}) + pkt2 := transport.InMemoryPubKeyTable(map[cipher.PubKey]*net.TCPAddr{pk1: addr1}) - f1 := NewTCPFactory(pk1, pkt1, l1) + f1 := transport.NewTCPFactory(pk1, pkt1, l1) errCh := make(chan error) go func() { tr, err := f1.Accept(context.TODO()) @@ -48,7 +49,7 @@ func TestTCPFactory(t *testing.T) { errCh <- nil }() - f2 := NewTCPFactory(pk2, pkt2, l2) + f2 := transport.NewTCPFactory(pk2, pkt2, l2) assert.Equal(t, "tcp", f2.Type()) assert.Equal(t, pk2, f2.Local()) @@ -79,7 +80,7 @@ func TestFilePKTable(t *testing.T) { _, err = tmpfile.Write([]byte(fmt.Sprintf("%s\t%s\n", pk, addr))) require.NoError(t, err) - pkt, err := FilePubKeyTable(tmpfile.Name()) + pkt, err := transport.FilePubKeyTable(tmpfile.Name()) require.NoError(t, err) raddr := pkt.RemoteAddr(pk) From c035c01868d4170d23417cd872e7d2e2ea5b8c8d Mon Sep 17 00:00:00 2001 From: Alex Yu Date: Tue, 4 Jun 2019 12:40:39 +0300 Subject: [PATCH 2/8] Changes: log_test.go - whiteboxed, handshake_test.go - removed commented test --- pkg/transport/handshake_test.go | 24 ------------------------ pkg/transport/log_test.go | 13 +++++++------ 2 files changed, 7 insertions(+), 30 deletions(-) diff --git a/pkg/transport/handshake_test.go b/pkg/transport/handshake_test.go index 8f7093aabc..2aa5b448c1 100644 --- a/pkg/transport/handshake_test.go +++ b/pkg/transport/handshake_test.go @@ -199,30 +199,6 @@ func TestSettlementHandshake(t *testing.T) { } -/* -func TestSettlementHandshakeInvalidSig(t *testing.T) { - mockEnv := newHsMockEnv() - - require.NoError(t, mockEnv.err1) - require.NoError(t, mockEnv.err2) - - go settlementInitiatorHandshake(true)(mockEnv.m2, mockEnv.tr1) // nolint: errcheck - _, err := settlementResponderHandshake(mockEnv.m2, mockEnv.tr2) - require.Error(t, err) - assert.Equal(t, "Recovered pubkey does not match pubkey", err.Error()) - - in, out := net.Pipe() - tr1 := NewMockTransport(in, mockEnv.pk1, mockEnv.pk2) - tr2 := NewMockTransport(out, mockEnv.pk2, mockEnv.pk1) - - go settlementResponderHandshake(mockEnv.m1, tr2) // nolint: errcheck - _, err = settlementInitiatorHandshake(true)(mockEnv.m1, tr1) - require.Error(t, err) - assert.Equal(t, "Recovered pubkey does not match pubkey", err.Error()) - -} -*/ - func TestSettlementHandshakePrivate(t *testing.T) { mockEnv := newHsMockEnv() diff --git a/pkg/transport/log_test.go b/pkg/transport/log_test.go index 092f75bf87..984214d6bb 100644 --- a/pkg/transport/log_test.go +++ b/pkg/transport/log_test.go @@ -1,4 +1,4 @@ -package transport +package transport_test import ( "io/ioutil" @@ -7,17 +7,18 @@ import ( "testing" "github.com/google/uuid" + "github.com/skycoin/skywire/pkg/transport" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -func testTransportLogStore(t *testing.T, logStore LogStore) { +func testTransportLogStore(t *testing.T, logStore transport.LogStore) { t.Helper() id1 := uuid.New() - entry1 := &LogEntry{big.NewInt(100), big.NewInt(200)} + entry1 := &transport.LogEntry{big.NewInt(100), big.NewInt(200)} id2 := uuid.New() - entry2 := &LogEntry{big.NewInt(300), big.NewInt(400)} + entry2 := &transport.LogEntry{big.NewInt(300), big.NewInt(400)} require.NoError(t, logStore.Record(id1, entry1)) require.NoError(t, logStore.Record(id2, entry2)) @@ -29,7 +30,7 @@ func testTransportLogStore(t *testing.T, logStore LogStore) { } func TestInMemoryTransportLogStore(t *testing.T) { - testTransportLogStore(t, InMemoryTransportLogStore()) + testTransportLogStore(t, transport.InMemoryTransportLogStore()) } func TestFileTransportLogStore(t *testing.T) { @@ -37,7 +38,7 @@ func TestFileTransportLogStore(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(dir) - ls, err := FileTransportLogStore(dir) + ls, err := transport.FileTransportLogStore(dir) require.NoError(t, err) testTransportLogStore(t, ls) } From 3be2313fcdb6af28cdc85e0c9b46269dcfa48d6b Mon Sep 17 00:00:00 2001 From: Alex Yu Date: Tue, 4 Jun 2019 17:16:25 +0300 Subject: [PATCH 3/8] Changes: 1. Eliminated subtle bug with multiple select from Manager.doneChan 2. Eliminated subtle bug with accepting new transport during close tested in skywire & skywire-services Integration test still displays data race on nodeB exit after multiple startup.sh invocations --- pkg/transport/manager.go | 32 ++++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/pkg/transport/manager.go b/pkg/transport/manager.go index ad7215882b..0763632e91 100644 --- a/pkg/transport/manager.go +++ b/pkg/transport/manager.go @@ -33,9 +33,10 @@ type Manager struct { transports map[uuid.UUID]*ManagedTransport entries map[Entry]struct{} - doneChan chan struct{} - TrChan chan *ManagedTransport - mu sync.RWMutex + doneChan chan struct{} + isClosing int32 + TrChan chan *ManagedTransport + mu sync.RWMutex } // NewManager creates a Manager with the provided configuration and transport factories. @@ -175,8 +176,10 @@ func (tm *Manager) Serve(ctx context.Context) error { for { select { case <-ctx.Done(): + tm.Logger.Info("Received ctx.Done()") return case <-tm.doneChan: + tm.Logger.Info("Received tm.doneCh") return default: if _, err := tm.acceptTransport(ctx, f); err != nil { @@ -187,6 +190,7 @@ func (tm *Manager) Serve(ctx context.Context) error { tm.Logger.Warnf("Failed to accept connection: %s", err) } } + } }(factory) } @@ -255,6 +259,7 @@ func (tm *Manager) DeleteTransport(id uuid.UUID) error { // Close closes opened transports and registered factories. func (tm *Manager) Close() error { + atomic.StoreInt32(&tm.isClosing, 1) close(tm.doneChan) tm.Logger.Info("Closing transport manager") @@ -310,15 +315,13 @@ func (tm *Manager) createTransport(ctx context.Context, remote cipher.PubKey, tp tm.Logger.Infof("Dialed to %s using %s factory. Transport ID: %s", remote, tpType, entry.ID) mTr := newManagedTransport(entry.ID, tr, entry.Public, false) + tm.mu.Lock() tm.transports[entry.ID] = mTr - select { - case <-tm.doneChan: - case tm.TrChan <- mTr: - default: - } tm.mu.Unlock() + tm.TrChan <- mTr + go tm.manageTransport(ctx, mTr, factory, remote, public, false) return mTr, nil @@ -330,6 +333,10 @@ func (tm *Manager) acceptTransport(ctx context.Context, factory Factory) (*Manag return nil, err } + if atomic.LoadInt32(&tm.isClosing) != 0 { + return nil, errors.New("transport.Manager is closing. Skipping incoming transport") + } + var handshake settlementHandshake = settlementResponderHandshake entry, err := handshake.Do(tm, tr, 30*time.Second) if err != nil { @@ -344,16 +351,13 @@ func (tm *Manager) acceptTransport(ctx context.Context, factory Factory) (*Manag tm.Logger.Infof("Accepted new transport with type %s from %s. ID: %s", factory.Type(), remote, entry.ID) mTr := newManagedTransport(entry.ID, tr, entry.Public, true) - tm.mu.Lock() + tm.mu.Lock() tm.transports[entry.ID] = mTr - select { - case <-tm.doneChan: - case tm.TrChan <- mTr: - default: - } tm.mu.Unlock() + tm.TrChan <- mTr + go tm.manageTransport(ctx, mTr, factory, remote, true, true) return mTr, nil From 7c7f64498ec1c306eaa116fa52b8c1d91ffd427b Mon Sep 17 00:00:00 2001 From: Alex Yu Date: Tue, 4 Jun 2019 21:14:54 +0300 Subject: [PATCH 4/8] Implemenented ManagedTransport.killWorker. No multiple transports --- pkg/transport/managed_transport.go | 41 ++++++++++-------------------- pkg/transport/manager.go | 21 ++++++++++++++- 2 files changed, 33 insertions(+), 29 deletions(-) diff --git a/pkg/transport/managed_transport.go b/pkg/transport/managed_transport.go index 17181b3ac9..d025e5f18e 100644 --- a/pkg/transport/managed_transport.go +++ b/pkg/transport/managed_transport.go @@ -45,22 +45,12 @@ func (tr *ManagedTransport) Read(p []byte) (n int, err error) { tr.mu.RLock() n, err = tr.Transport.Read(p) // TODO: data race. tr.mu.RUnlock() - if err == nil { - select { - case <-tr.doneChan: - return - case tr.readLogChan <- n: - } - return - } - - select { - case <-tr.doneChan: - return - case tr.errChan <- err: + if err != nil { + tr.errChan <- err } + tr.readLogChan <- n return } @@ -69,23 +59,23 @@ func (tr *ManagedTransport) Write(p []byte) (n int, err error) { tr.mu.RLock() n, err = tr.Transport.Write(p) tr.mu.RUnlock() - if err == nil { - select { - case <-tr.doneChan: - return - case tr.writeLogChan <- n: - } + if err != nil { + tr.errChan <- err return } + tr.writeLogChan <- n + return +} + +func (tr *ManagedTransport) killWorker() { select { case <-tr.doneChan: return - case tr.errChan <- err: + default: + close(tr.doneChan) } - - return } // Close closes underlying @@ -97,12 +87,7 @@ func (tr *ManagedTransport) Close() error { err := tr.Transport.Close() tr.mu.RUnlock() - select { - case <-tr.doneChan: - default: - - close(tr.doneChan) - } + tr.killWorker() return err } diff --git a/pkg/transport/manager.go b/pkg/transport/manager.go index 0763632e91..e00127e466 100644 --- a/pkg/transport/manager.go +++ b/pkg/transport/manager.go @@ -37,6 +37,8 @@ type Manager struct { isClosing int32 TrChan chan *ManagedTransport mu sync.RWMutex + + mgrQty int32 // Count of spawned manageTransport goroutines } // NewManager creates a Manager with the provided configuration and transport factories. @@ -243,6 +245,10 @@ func (tm *Manager) DeleteTransport(id uuid.UUID) error { tr := tm.transports[id] delete(tm.transports, id) tm.mu.Unlock() + mgrQty := atomic.AddInt32(&tm.mgrQty, -1) + tm.Logger.Infof("Manager.DeleteTransport id: %v, mgrQty = %v", id, mgrQty) + + tr.Close() if _, err := tm.config.DiscoveryClient.UpdateStatuses(context.Background(), &Status{ID: id, IsUp: false}); err != nil { tm.Logger.Warnf("Failed to change transport status: %s", err) @@ -313,6 +319,11 @@ func (tm *Manager) createTransport(ctx context.Context, remote cipher.PubKey, tp return nil, err } + oldTr := tm.Transport(entry.ID) + if oldTr != nil { + oldTr.killWorker() + } + tm.Logger.Infof("Dialed to %s using %s factory. Transport ID: %s", remote, tpType, entry.ID) mTr := newManagedTransport(entry.ID, tr, entry.Public, false) @@ -350,6 +361,11 @@ func (tm *Manager) acceptTransport(ctx context.Context, factory Factory) (*Manag } tm.Logger.Infof("Accepted new transport with type %s from %s. ID: %s", factory.Type(), remote, entry.ID) + + oldTr := tm.Transport(entry.ID) + if oldTr != nil { + oldTr.killWorker() + } mTr := newManagedTransport(entry.ID, tr, entry.Public, true) tm.mu.Lock() @@ -383,10 +399,13 @@ func (tm *Manager) addEntry(entry *Entry) { } func (tm *Manager) manageTransport(ctx context.Context, mTr *ManagedTransport, factory Factory, remote cipher.PubKey, public bool, accepted bool) { + mgrQty := atomic.AddInt32(&tm.mgrQty, 1) + tm.Logger.Infof("Spawned manageTransport for mTr.ID: %v. mgrQty: %v", mTr.ID, mgrQty) for { select { case <-mTr.doneChan: - tm.Logger.Infof("Transport %s closed", mTr.ID) + mgrQty := atomic.AddInt32(&tm.mgrQty, -1) + tm.Logger.Infof("manageTransport exit for %v. mgrQty: %v", mTr.ID, mgrQty) return case err := <-mTr.errChan: if atomic.LoadInt32(&mTr.isClosing) == 0 { From 7dabb5ffdf4f5b63dbb925b80b93ded556cbbbbc Mon Sep 17 00:00:00 2001 From: Alex Yu Date: Wed, 5 Jun 2019 11:24:06 +0300 Subject: [PATCH 5/8] Comments edited --- pkg/transport/managed_transport.go | 2 ++ pkg/transport/manager.go | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/transport/managed_transport.go b/pkg/transport/managed_transport.go index d025e5f18e..58e09e5ff6 100644 --- a/pkg/transport/managed_transport.go +++ b/pkg/transport/managed_transport.go @@ -69,6 +69,8 @@ func (tr *ManagedTransport) Write(p []byte) (n int, err error) { return } +// killWorker sends signal to Manager.manageTransport goroutine to exit +// it's safe to call it multiple times func (tr *ManagedTransport) killWorker() { select { case <-tr.doneChan: diff --git a/pkg/transport/manager.go b/pkg/transport/manager.go index e00127e466..1ad99a5617 100644 --- a/pkg/transport/manager.go +++ b/pkg/transport/manager.go @@ -205,7 +205,7 @@ func (tm *Manager) Serve(ctx context.Context) error { // MakeTransportID generates uuid.UUID from pair of keys + type + public // Generated uuid is: // - always the same for a given pair -// - GenTransportUUID(keyA,keyB) == GenTransportUUID(keyB, keyA) +// - MakeTransportUUID(keyA,keyB) == MakeTransportUUID(keyB, keyA) func MakeTransportID(keyA, keyB cipher.PubKey, tpType string, public bool) uuid.UUID { keys := SortPubKeys(keyA, keyB) if public { From 2ba8af057e2292d34aca0d222a7a87f728193f70 Mon Sep 17 00:00:00 2001 From: Alex Yu Date: Wed, 5 Jun 2019 18:33:52 +0300 Subject: [PATCH 6/8] Changed killWorker - now it's using sync.Once --- pkg/router/router.go | 2 ++ pkg/transport/managed_transport.go | 12 +++++------- pkg/transport/manager.go | 2 -- 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/pkg/router/router.go b/pkg/router/router.go index d5b62302ef..9cd06f564e 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -85,6 +85,8 @@ func (r *Router) Serve(ctx context.Context) error { isAccepted, isSetup := tp.Accepted, r.IsSetupTransport(tp) r.mu.Unlock() + r.Logger.Infof("New transport: isAccepted: %v, isSetup: %v", isAccepted, isSetup) + var serve func(io.ReadWriter) error switch { case isAccepted && isSetup: diff --git a/pkg/transport/managed_transport.go b/pkg/transport/managed_transport.go index 58e09e5ff6..912bab151a 100644 --- a/pkg/transport/managed_transport.go +++ b/pkg/transport/managed_transport.go @@ -21,6 +21,7 @@ type ManagedTransport struct { errChan chan error isClosing int32 mu sync.RWMutex + once sync.Once readLogChan chan int writeLogChan chan int @@ -34,8 +35,8 @@ func newManagedTransport(id uuid.UUID, tr Transport, public bool, accepted bool) Accepted: accepted, doneChan: make(chan struct{}), errChan: make(chan error), - readLogChan: make(chan int), - writeLogChan: make(chan int), + readLogChan: make(chan int, 16), + writeLogChan: make(chan int, 16), LogEntry: &LogEntry{new(big.Int), new(big.Int)}, } } @@ -72,12 +73,9 @@ func (tr *ManagedTransport) Write(p []byte) (n int, err error) { // killWorker sends signal to Manager.manageTransport goroutine to exit // it's safe to call it multiple times func (tr *ManagedTransport) killWorker() { - select { - case <-tr.doneChan: - return - default: + tr.once.Do(func() { close(tr.doneChan) - } + }) } // Close closes underlying diff --git a/pkg/transport/manager.go b/pkg/transport/manager.go index 1ad99a5617..80fcbc2158 100644 --- a/pkg/transport/manager.go +++ b/pkg/transport/manager.go @@ -245,8 +245,6 @@ func (tm *Manager) DeleteTransport(id uuid.UUID) error { tr := tm.transports[id] delete(tm.transports, id) tm.mu.Unlock() - mgrQty := atomic.AddInt32(&tm.mgrQty, -1) - tm.Logger.Infof("Manager.DeleteTransport id: %v, mgrQty = %v", id, mgrQty) tr.Close() From 76ed23653ae4f46c47be513b3e3a9fd91ff699dd Mon Sep 17 00:00:00 2001 From: Alex Yu Date: Wed, 5 Jun 2019 19:16:38 +0300 Subject: [PATCH 7/8] Atomic fields `int32 isClosing` changed into `isClosing()` method in Manager and ManagedTransport --- pkg/transport/managed_transport.go | 23 +++++++++++++---------- pkg/transport/manager.go | 22 ++++++++++++++-------- 2 files changed, 27 insertions(+), 18 deletions(-) diff --git a/pkg/transport/managed_transport.go b/pkg/transport/managed_transport.go index 912bab151a..0873f9b2d4 100644 --- a/pkg/transport/managed_transport.go +++ b/pkg/transport/managed_transport.go @@ -3,7 +3,6 @@ package transport import ( "math/big" "sync" - "sync/atomic" "github.com/google/uuid" ) @@ -17,11 +16,10 @@ type ManagedTransport struct { Accepted bool LogEntry *LogEntry - doneChan chan struct{} - errChan chan error - isClosing int32 - mu sync.RWMutex - once sync.Once + doneChan chan struct{} + errChan chan error + mu sync.RWMutex + once sync.Once readLogChan chan int writeLogChan chan int @@ -80,18 +78,23 @@ func (tr *ManagedTransport) killWorker() { // Close closes underlying func (tr *ManagedTransport) Close() error { - - atomic.StoreInt32(&tr.isClosing, 1) - tr.mu.RLock() err := tr.Transport.Close() tr.mu.RUnlock() tr.killWorker() - return err } +func (tr *ManagedTransport) isClosing() bool { + select { + case <-tr.doneChan: + return true + default: + return false + } +} + func (tr *ManagedTransport) updateTransport(newTr Transport) { tr.mu.Lock() tr.Transport = newTr diff --git a/pkg/transport/manager.go b/pkg/transport/manager.go index 80fcbc2158..3115fadc97 100644 --- a/pkg/transport/manager.go +++ b/pkg/transport/manager.go @@ -33,10 +33,9 @@ type Manager struct { transports map[uuid.UUID]*ManagedTransport entries map[Entry]struct{} - doneChan chan struct{} - isClosing int32 - TrChan chan *ManagedTransport - mu sync.RWMutex + doneChan chan struct{} + TrChan chan *ManagedTransport + mu sync.RWMutex mgrQty int32 // Count of spawned manageTransport goroutines } @@ -262,8 +261,6 @@ func (tm *Manager) DeleteTransport(id uuid.UUID) error { // Close closes opened transports and registered factories. func (tm *Manager) Close() error { - - atomic.StoreInt32(&tm.isClosing, 1) close(tm.doneChan) tm.Logger.Info("Closing transport manager") @@ -342,7 +339,7 @@ func (tm *Manager) acceptTransport(ctx context.Context, factory Factory) (*Manag return nil, err } - if atomic.LoadInt32(&tm.isClosing) != 0 { + if tm.isClosing() { return nil, errors.New("transport.Manager is closing. Skipping incoming transport") } @@ -396,6 +393,15 @@ func (tm *Manager) addEntry(entry *Entry) { tm.mu.Unlock() } +func (tm *Manager) isClosing() bool { + select { + case <-tm.doneChan: + return true + default: + return false + } +} + func (tm *Manager) manageTransport(ctx context.Context, mTr *ManagedTransport, factory Factory, remote cipher.PubKey, public bool, accepted bool) { mgrQty := atomic.AddInt32(&tm.mgrQty, 1) tm.Logger.Infof("Spawned manageTransport for mTr.ID: %v. mgrQty: %v", mTr.ID, mgrQty) @@ -406,7 +412,7 @@ func (tm *Manager) manageTransport(ctx context.Context, mTr *ManagedTransport, f tm.Logger.Infof("manageTransport exit for %v. mgrQty: %v", mTr.ID, mgrQty) return case err := <-mTr.errChan: - if atomic.LoadInt32(&mTr.isClosing) == 0 { + if !mTr.isClosing() { tm.Logger.Infof("Transport %s failed with error: %s. Re-dialing...", mTr.ID, err) if accepted { if err := tm.DeleteTransport(mTr.ID); err != nil { From dd820d6306e00197ac1cd82aae6c8f908e7eb80a Mon Sep 17 00:00:00 2001 From: Alex Yu Date: Fri, 7 Jun 2019 08:17:15 +0300 Subject: [PATCH 8/8] Added check Manager.isClosing() in Manager.dialTransport ```golang func (tm *Manager) dialTransport(ctx context.Context, factory Factory, remote cipher.PubKey, public bool) (Transport, *Entry, error) { if tm.isClosing() { return nil, nil, errors.New("transport.Manager is closing. Skipping dialling transport") } ``` It seems that last possibility for redial during shutdown eliminated. Integration tests working No observed problems in ssh test too. --- pkg/transport/manager.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/transport/manager.go b/pkg/transport/manager.go index 3115fadc97..b432f6c642 100644 --- a/pkg/transport/manager.go +++ b/pkg/transport/manager.go @@ -289,6 +289,10 @@ func (tm *Manager) Close() error { func (tm *Manager) dialTransport(ctx context.Context, factory Factory, remote cipher.PubKey, public bool) (Transport, *Entry, error) { + if tm.isClosing() { + return nil, nil, errors.New("transport.Manager is closing. Skipping dialling transport") + } + tr, err := factory.Dial(ctx, remote) if err != nil { return nil, nil, err