From f8882a401ca81bf44f86b7bb99aa8d6f5a064b33 Mon Sep 17 00:00:00 2001 From: Alex Yu Date: Mon, 1 Apr 2019 18:52:00 +0300 Subject: [PATCH] Changes: 1. `transport.Transport` interface: - added `Edges() [2]cipher.PubKey` - removed `Local()` and `Remote()` 2. transport.Manager: - added `Local() cipher.PubKey` - added `Remote([2]cipher.PubKey) (cipher.PubKey, error)` 3. All calls of Transport.Local() and Transport.Remote() changed to calls `Manager.Local()` and `Manager.Remote(Transport.Edges())` WIP: More tests needed --- cmd/skywire-cli/commands/transport.go | 2 +- pkg/messaging/channel.go | 4 +++ pkg/messaging/client.go | 2 ++ pkg/node/rpc.go | 18 ++++++----- pkg/router/router.go | 3 +- pkg/transport-discovery/client/client_test.go | 4 +-- pkg/transport/discovery.go | 2 +- pkg/transport/entry.go | 12 ++++--- pkg/transport/handshake.go | 26 +++++++++------ pkg/transport/manager.go | 32 +++++++++++++++---- pkg/transport/mock.go | 27 +++++++++------- pkg/transport/tcp_transport.go | 28 +++++++++------- pkg/transport/transport.go | 11 ++++--- 13 files changed, 113 insertions(+), 58 deletions(-) diff --git a/cmd/skywire-cli/commands/transport.go b/cmd/skywire-cli/commands/transport.go index be10bc359d..d6e5c5d514 100644 --- a/cmd/skywire-cli/commands/transport.go +++ b/cmd/skywire-cli/commands/transport.go @@ -171,7 +171,7 @@ func printTransportEntries(entries ...*transport.EntryWithStatus) { catch(err) for _, e := range entries { _, err := fmt.Fprintf(w, "%s\t%s\t%t\t%d\t%t\t%s\t%s\t%t\t%t\n", - e.Entry.ID, e.Entry.Type, e.Entry.Public, e.Registered, e.IsUp, e.Entry.Edges[0], e.Entry.Edges[1], e.Statuses[0], e.Statuses[1]) + e.Entry.ID, e.Entry.Type, e.Entry.Public, e.Registered, e.IsUp, e.Entry.Edges()[0], e.Entry.Edges()[1], e.Statuses[0], e.Statuses[1]) catch(err) } catch(w.Flush()) diff --git a/pkg/messaging/channel.go b/pkg/messaging/channel.go index e8b97b1fec..a1f4ac867c 100644 --- a/pkg/messaging/channel.go +++ b/pkg/messaging/channel.go @@ -29,6 +29,10 @@ type channel struct { noise *noise.Noise } +func (ch *channel) Edges() [2]cipher.PubKey { + return [2]cipher.PubKey{ch.link.Local(), ch.remotePK} +} + func newChannel(initiator bool, secKey cipher.SecKey, remote cipher.PubKey, link *Link) (*channel, error) { noiseConf := noise.Config{ LocalSK: secKey, diff --git a/pkg/messaging/client.go b/pkg/messaging/client.go index 8ae8940d06..1abb138044 100644 --- a/pkg/messaging/client.go +++ b/pkg/messaging/client.go @@ -49,9 +49,11 @@ type Config struct { } // Client sends messages to remote client nodes via relay Server. +// Implements Transport type Client struct { Logger *logging.Logger + // edges [2]cipher.PubKey pubKey cipher.PubKey secKey cipher.SecKey dc client.APIClient diff --git a/pkg/node/rpc.go b/pkg/node/rpc.go index 5e0c0fa3d7..2a268840ab 100644 --- a/pkg/node/rpc.go +++ b/pkg/node/rpc.go @@ -69,11 +69,12 @@ type TransportSummary struct { Log *transport.LogEntry `json:"log,omitempty"` } -func newTransportSummary(tp *transport.ManagedTransport, includeLogs bool) *TransportSummary { +func newTransportSummary(tm *transport.Manager, tp *transport.ManagedTransport, includeLogs bool) *TransportSummary { + remote, _ := tm.Remote(tp.Edges()) summary := TransportSummary{ ID: tp.ID, - Local: tp.Local(), - Remote: tp.Remote(), + Local: tm.Local(), + Remote: remote, Type: tp.Type(), } if includeLogs { @@ -94,7 +95,7 @@ type Summary struct { func (r *RPC) Summary(_ *struct{}, out *Summary) error { var summaries []*TransportSummary r.node.tm.WalkTransports(func(tp *transport.ManagedTransport) bool { - summaries = append(summaries, newTransportSummary(tp, false)) + summaries = append(summaries, newTransportSummary(r.node.tm, tp, false)) return true }) *out = Summary{ @@ -179,8 +180,9 @@ func (r *RPC) Transports(in *TransportsIn, out *[]*TransportSummary) error { return true } r.node.tm.WalkTransports(func(tp *transport.ManagedTransport) bool { - if typeIncluded(tp.Type()) && pkIncluded(tp.Local(), tp.Remote()) { - *out = append(*out, newTransportSummary(tp, in.ShowLogs)) + remote, _ := r.node.tm.Remote(tp.Edges()) + if typeIncluded(tp.Type()) && pkIncluded(r.node.tm.Local(), remote) { + *out = append(*out, newTransportSummary(r.node.tm, tp, in.ShowLogs)) } return true }) @@ -193,7 +195,7 @@ func (r *RPC) Transport(in *uuid.UUID, out *TransportSummary) error { if tp == nil { return ErrNotFound } - *out = *newTransportSummary(tp, true) + *out = *newTransportSummary(r.node.tm, tp, true) return nil } @@ -218,7 +220,7 @@ func (r *RPC) AddTransport(in *AddTransportIn, out *TransportSummary) error { if err != nil { return err } - *out = *newTransportSummary(tp, false) + *out = *newTransportSummary(r.node.tm, tp, false) return nil } diff --git a/pkg/router/router.go b/pkg/router/router.go index fb5531a259..3a6cedb64d 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -485,7 +485,8 @@ func (r *Router) advanceNoiseHandshake(addr *app.LoopAddr, noiseMsg []byte) (ni func (r *Router) isSetupTransport(tr transport.Transport) bool { for _, pk := range r.config.SetupNodes { - if tr.Remote() == pk { + remote, _ := r.tm.Remote(tr.Edges()) + if remote == pk { return true } } diff --git a/pkg/transport-discovery/client/client_test.go b/pkg/transport-discovery/client/client_test.go index 8b35938eb7..2fe08d3ca7 100644 --- a/pkg/transport-discovery/client/client_test.go +++ b/pkg/transport-discovery/client/client_test.go @@ -182,14 +182,14 @@ func TestGetTransportByID(t *testing.T) { func TestGetTransportsByEdge(t *testing.T) { entry := &transport.EntryWithStatus{Entry: newTestEntry(), IsUp: true} srv := httptest.NewServer(authHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - assert.Equal(t, fmt.Sprintf("/transports/edge:%s", entry.Entry.Edges[0]), r.URL.String()) + assert.Equal(t, fmt.Sprintf("/transports/edge:%s", entry.Entry.Edges()[0]), r.URL.String()) json.NewEncoder(w).Encode([]*transport.EntryWithStatus{entry}) // nolint: errcheck }))) defer srv.Close() c, err := NewHTTP(srv.URL, testPubKey, testSecKey) require.NoError(t, err) - entries, err := c.GetTransportsByEdge(context.Background(), entry.Entry.Edges[0]) + entries, err := c.GetTransportsByEdge(context.Background(), entry.Entry.Edges()[0]) require.NoError(t, err) require.Len(t, entries, 1) diff --git a/pkg/transport/discovery.go b/pkg/transport/discovery.go index c458f5ce46..78f83ae097 100644 --- a/pkg/transport/discovery.go +++ b/pkg/transport/discovery.go @@ -64,7 +64,7 @@ func (td *mockDiscoveryClient) GetTransportsByEdge(ctx context.Context, pk ciphe td.Lock() res := []*EntryWithStatus{} for _, entry := range td.entries { - if entry.Entry.Edges[0] == pk || entry.Entry.Edges[1] == pk { + if entry.Entry.Edges()[0] == pk || entry.Entry.Edges()[1] == pk { e := &EntryWithStatus{} *e = entry res = append(res, e) diff --git a/pkg/transport/entry.go b/pkg/transport/entry.go index 1df61e05ae..89e9d6c76f 100644 --- a/pkg/transport/entry.go +++ b/pkg/transport/entry.go @@ -16,7 +16,7 @@ type Entry struct { ID uuid.UUID `json:"t_id"` // Edges contains the public keys of the Transport's edge nodes (should only have 2 edges and the least-significant edge should come first). - Edges [2]cipher.PubKey `json:"edges"` + edges [2]cipher.PubKey `json:"edges"` // Type represents the transport type. Type string `json:"type"` @@ -26,6 +26,10 @@ type Entry struct { Public bool `json:"public"` } +func (e *Entry) Edges() [2]cipher.PubKey { + return e.edges +} + // String implements stringer func (e *Entry) String() string { res := "" @@ -37,8 +41,8 @@ func (e *Entry) String() string { res += fmt.Sprintf("\ttype: %s\n", e.Type) res += fmt.Sprintf("\tid: %s\n", e.ID) res += fmt.Sprintf("\tedges:\n") - res += fmt.Sprintf("\t\tedge 1: %s\n", e.Edges[0]) - res += fmt.Sprintf("\t\tedge 2: %s\n", e.Edges[1]) + res += fmt.Sprintf("\t\tedge 1: %s\n", e.Edges()[0]) + res += fmt.Sprintf("\t\tedge 2: %s\n", e.Edges()[1]) return res } @@ -46,7 +50,7 @@ func (e *Entry) String() string { // ToBinary returns binary representation of a Signature. func (e *Entry) ToBinary() []byte { bEntry := e.ID[:] - for _, edge := range e.Edges { + for _, edge := range e.Edges() { bEntry = append(bEntry, edge[:]...) } return append(bEntry, []byte(e.Type)...) diff --git a/pkg/transport/handshake.go b/pkg/transport/handshake.go index 4dcc09cd43..7522d2edee 100644 --- a/pkg/transport/handshake.go +++ b/pkg/transport/handshake.go @@ -34,14 +34,14 @@ func settlementInitiatorHandshake(id uuid.UUID, public bool) settlementHandshake return func(tm *Manager, tr Transport) (*Entry, error) { entry := &Entry{ ID: id, - Edges: SortPubKeys(tr.Local(), tr.Remote()), + edges: tr.Edges(), Type: tr.Type(), Public: public, } newEntry := id == uuid.UUID{} if newEntry { - entry.ID = GetTransportUUID(entry.Edges[0], entry.Edges[1], entry.Type) + entry.ID = GetTransportUUID(entry.Edges()[0], entry.Edges()[1], entry.Type) } sEntry := &SignedEntry{Entry: entry, Signatures: [2]cipher.Sig{entry.Signature(tm.config.SecKey)}} @@ -53,8 +53,12 @@ func settlementInitiatorHandshake(id uuid.UUID, public bool) settlementHandshake return nil, fmt.Errorf("read: %s", err) } - if err := verifySig(sEntry, 1, tr.Remote()); err != nil { - return nil, err + if remote, Ok := tm.Remote(tr.Edges()); Ok == nil { + if err := verifySig(sEntry, 1, remote); err != nil { + return nil, err + } + } else { + return nil, Ok } if newEntry { @@ -71,8 +75,12 @@ func settlementResponderHandshake(tm *Manager, tr Transport) (*Entry, error) { return nil, fmt.Errorf("read: %s", err) } - if err := validateEntry(sEntry, tr); err != nil { - return nil, err + if remote, Ok := tm.Remote(tr.Edges()); Ok == nil { + if err := validateEntry(sEntry, tr, remote); err != nil { + return nil, err + } + } else { + return nil, Ok } sEntry.Signatures[1] = sEntry.Entry.Signature(tm.config.SecKey) @@ -103,13 +111,13 @@ func settlementResponderHandshake(tm *Manager, tr Transport) (*Entry, error) { return sEntry.Entry, nil } -func validateEntry(sEntry *SignedEntry, tr Transport) error { +func validateEntry(sEntry *SignedEntry, tr Transport, rpk cipher.PubKey) error { entry := sEntry.Entry if entry.Type != tr.Type() { return errors.New("invalid entry type") } - if entry.Edges != [2]cipher.PubKey{tr.Remote(), tr.Local()} { + if entry.Edges() != tr.Edges() { return errors.New("invalid entry edges") } @@ -117,7 +125,7 @@ func validateEntry(sEntry *SignedEntry, tr Transport) error { return errors.New("invalid entry signature") } - return verifySig(sEntry, 0, tr.Remote()) + return verifySig(sEntry, 0, rpk) } func verifySig(sEntry *SignedEntry, idx int, pk cipher.PubKey) error { diff --git a/pkg/transport/manager.go b/pkg/transport/manager.go index 19ba514630..a4cca68106 100644 --- a/pkg/transport/manager.go +++ b/pkg/transport/manager.go @@ -121,7 +121,7 @@ func (tm *Manager) ReconnectTransports(ctx context.Context) { entries := tm.entries tm.mu.RUnlock() for _, entry := range entries { - if entry.Edges[0] != tm.config.PubKey { + if entry.Edges()[0] != tm.config.PubKey { continue } @@ -129,7 +129,7 @@ func (tm *Manager) ReconnectTransports(ctx context.Context) { continue } - _, err := tm.createTransport(ctx, entry.Edges[1], entry.Type, entry.ID, entry.Public) + _, err := tm.createTransport(ctx, entry.Edges()[1], entry.Type, entry.ID, entry.Public) if err != nil { tm.Logger.Warnf("Failed to re-establish transport: %s", err) continue @@ -141,14 +141,30 @@ func (tm *Manager) ReconnectTransports(ctx context.Context) { } } +func (tm *Manager) Local() cipher.PubKey { + return tm.config.PubKey +} + +func (tm *Manager) Remote(edges [2]cipher.PubKey) (cipher.PubKey, error) { + if tm.config.PubKey == edges[0] { + return edges[1], nil + } + if tm.config.PubKey == edges[1] { + return edges[0], nil + } + return cipher.PubKey{}, errors.New("Edges does not belongs to this Transport") +} + // CreateDefaultTransports created transports to DefaultNodes if they don't exist. func (tm *Manager) CreateDefaultTransports(ctx context.Context) { for _, pk := range tm.config.DefaultNodes { exist := false tm.WalkTransports(func(tr *ManagedTransport) bool { - if tr.Remote() == pk { - exist = true - return false + if remote, Ok := tm.Remote(tr.Edges()); Ok == nil { + if remote == pk { + exist = true + return false + } } return true }) @@ -347,7 +363,11 @@ func (tm *Manager) acceptTransport(ctx context.Context, factory Factory) (*Manag return nil, err } - tm.Logger.Infof("Accepted new transport with type %s from %s. ID: %s", factory.Type(), tr.Remote(), entry.ID) + remote, err := tm.Remote(tr.Edges()) + if err != nil { + return nil, err + } + tm.Logger.Infof("Accepted new transport with type %s from %s. ID: %s", factory.Type(), remote, entry.ID) managedTr := newManagedTransport(entry.ID, tr, entry.Public) tm.mu.Lock() diff --git a/pkg/transport/mock.go b/pkg/transport/mock.go index 6c5bfa2576..f58ec9c54b 100644 --- a/pkg/transport/mock.go +++ b/pkg/transport/mock.go @@ -77,16 +77,17 @@ func (f *MockFactory) Type() string { // MockTransport is a transport that accepts custom writers and readers to use them in Read and Write // operations type MockTransport struct { - rw io.ReadWriteCloser - local cipher.PubKey - remote cipher.PubKey + rw io.ReadWriteCloser + edges [2]cipher.PubKey + // local cipher.PubKey + // remote cipher.PubKey context context.Context } // NewMockTransport creates a transport with the given secret key and remote public key, taking a writer // and a reader that will be used in the Write and Read operation func NewMockTransport(rw io.ReadWriteCloser, local, remote cipher.PubKey) *MockTransport { - return &MockTransport{rw, local, remote, context.Background()} + return &MockTransport{rw, [2]cipher.PubKey{local, remote}, context.Background()} } // Read implements reader for mock transport @@ -114,15 +115,19 @@ func (m *MockTransport) Close() error { return m.rw.Close() } -// Local returns the local static public key -func (m *MockTransport) Local() cipher.PubKey { - return m.local +func (m *MockTransport) Edges() [2]cipher.PubKey { + return m.edges } -// Remote returns the remote public key fo the mock transport -func (m *MockTransport) Remote() cipher.PubKey { - return m.remote -} +// // Local returns the local static public key +// func (m *MockTransport) Local() cipher.PubKey { +// return m.local +// } + +// // Remote returns the remote public key fo the mock transport +// func (m *MockTransport) Remote() cipher.PubKey { +// return m.remote +// } // SetDeadline sets a deadline for the write/read operations of the mock transport func (m *MockTransport) SetDeadline(t time.Time) error { diff --git a/pkg/transport/tcp_transport.go b/pkg/transport/tcp_transport.go index 701b5e5185..538c79ae05 100644 --- a/pkg/transport/tcp_transport.go +++ b/pkg/transport/tcp_transport.go @@ -41,7 +41,7 @@ func (f *TCPFactory) Accept(ctx context.Context) (Transport, error) { return nil, ErrUnknownRemote } - return &TCPTransport{conn, f.lpk, rpk}, nil + return &TCPTransport{conn, [2]cipher.PubKey{f.lpk, rpk}}, nil } // Dial initiates a Transport with a remote node. @@ -56,7 +56,7 @@ func (f *TCPFactory) Dial(ctx context.Context, remote cipher.PubKey) (Transport, return nil, err } - return &TCPTransport{conn, f.lpk, remote}, nil + return &TCPTransport{conn, [2]cipher.PubKey{f.lpk, remote}}, nil } // Close implements io.Closer @@ -77,19 +77,25 @@ func (f *TCPFactory) Type() string { // TCPTransport implements Transport over TCP connection. type TCPTransport struct { *net.TCPConn - lpk cipher.PubKey - rpk cipher.PubKey + edges [2]cipher.PubKey + // lpk cipher.PubKey + // rpk cipher.PubKey } -// Local returns the local transport edge's public key. -func (tr *TCPTransport) Local() cipher.PubKey { - return tr.lpk +// Local returns the TCPTransport edges. +func (tr *TCPTransport) Edges() [2]cipher.PubKey { + return tr.edges } -// Remote returns the remote transport edge's public key. -func (tr *TCPTransport) Remote() cipher.PubKey { - return tr.rpk -} +// // Local returns the local transport edge's public key. +// func (tr *TCPTransport) Local() cipher.PubKey { +// return tr.lpk +// } + +// // Remote returns the remote transport edge's public key. +// func (tr *TCPTransport) Remote() cipher.PubKey { +// return tr.rpk +// } // Type returns the string representation of the transport type. func (tr *TCPTransport) Type() string { diff --git a/pkg/transport/transport.go b/pkg/transport/transport.go index e9e4981fdd..e7038955bc 100644 --- a/pkg/transport/transport.go +++ b/pkg/transport/transport.go @@ -21,11 +21,14 @@ type Transport interface { // Close implements io.Closer Close() error - // Local returns the local transport edge's public key. - Local() cipher.PubKey + // Edges returns sorted edges of transport + Edges() [2]cipher.PubKey + + // // Local returns the local transport edge's public key. + // Local() cipher.PubKey - // Remote returns the remote transport edge's public key. - Remote() cipher.PubKey + // // Remote returns the remote transport edge's public key. + // Remote() cipher.PubKey // SetDeadline functions the same as that from net.Conn // With a Transport, we don't have a distinction between write and read timeouts.