Skip to content

Commit

Permalink
Changes:
Browse files Browse the repository at this point in the history
1. `transport.Transport` interface:
- added `Edges() [2]cipher.PubKey`
- removed `Local()` and `Remote()`
2. transport.Manager:
- added `Local() cipher.PubKey`
- added `Remote([2]cipher.PubKey) (cipher.PubKey, error)`
3. All calls of Transport.Local() and Transport.Remote() changed to
calls `Manager.Local()` and `Manager.Remote(Transport.Edges())`

WIP:

More tests needed
  • Loading branch information
ayuryshev committed Apr 1, 2019
1 parent 4751cd7 commit f8882a4
Show file tree
Hide file tree
Showing 13 changed files with 113 additions and 58 deletions.
2 changes: 1 addition & 1 deletion cmd/skywire-cli/commands/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func printTransportEntries(entries ...*transport.EntryWithStatus) {
catch(err)
for _, e := range entries {
_, err := fmt.Fprintf(w, "%s\t%s\t%t\t%d\t%t\t%s\t%s\t%t\t%t\n",
e.Entry.ID, e.Entry.Type, e.Entry.Public, e.Registered, e.IsUp, e.Entry.Edges[0], e.Entry.Edges[1], e.Statuses[0], e.Statuses[1])
e.Entry.ID, e.Entry.Type, e.Entry.Public, e.Registered, e.IsUp, e.Entry.Edges()[0], e.Entry.Edges()[1], e.Statuses[0], e.Statuses[1])
catch(err)
}
catch(w.Flush())
Expand Down
4 changes: 4 additions & 0 deletions pkg/messaging/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ type channel struct {
noise *noise.Noise
}

func (ch *channel) Edges() [2]cipher.PubKey {
return [2]cipher.PubKey{ch.link.Local(), ch.remotePK}
}

func newChannel(initiator bool, secKey cipher.SecKey, remote cipher.PubKey, link *Link) (*channel, error) {
noiseConf := noise.Config{
LocalSK: secKey,
Expand Down
2 changes: 2 additions & 0 deletions pkg/messaging/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,11 @@ type Config struct {
}

// Client sends messages to remote client nodes via relay Server.
// Implements Transport
type Client struct {
Logger *logging.Logger

// edges [2]cipher.PubKey
pubKey cipher.PubKey
secKey cipher.SecKey
dc client.APIClient
Expand Down
18 changes: 10 additions & 8 deletions pkg/node/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,12 @@ type TransportSummary struct {
Log *transport.LogEntry `json:"log,omitempty"`
}

func newTransportSummary(tp *transport.ManagedTransport, includeLogs bool) *TransportSummary {
func newTransportSummary(tm *transport.Manager, tp *transport.ManagedTransport, includeLogs bool) *TransportSummary {
remote, _ := tm.Remote(tp.Edges())
summary := TransportSummary{
ID: tp.ID,
Local: tp.Local(),
Remote: tp.Remote(),
Local: tm.Local(),
Remote: remote,
Type: tp.Type(),
}
if includeLogs {
Expand All @@ -94,7 +95,7 @@ type Summary struct {
func (r *RPC) Summary(_ *struct{}, out *Summary) error {
var summaries []*TransportSummary
r.node.tm.WalkTransports(func(tp *transport.ManagedTransport) bool {
summaries = append(summaries, newTransportSummary(tp, false))
summaries = append(summaries, newTransportSummary(r.node.tm, tp, false))
return true
})
*out = Summary{
Expand Down Expand Up @@ -179,8 +180,9 @@ func (r *RPC) Transports(in *TransportsIn, out *[]*TransportSummary) error {
return true
}
r.node.tm.WalkTransports(func(tp *transport.ManagedTransport) bool {
if typeIncluded(tp.Type()) && pkIncluded(tp.Local(), tp.Remote()) {
*out = append(*out, newTransportSummary(tp, in.ShowLogs))
remote, _ := r.node.tm.Remote(tp.Edges())
if typeIncluded(tp.Type()) && pkIncluded(r.node.tm.Local(), remote) {
*out = append(*out, newTransportSummary(r.node.tm, tp, in.ShowLogs))
}
return true
})
Expand All @@ -193,7 +195,7 @@ func (r *RPC) Transport(in *uuid.UUID, out *TransportSummary) error {
if tp == nil {
return ErrNotFound
}
*out = *newTransportSummary(tp, true)
*out = *newTransportSummary(r.node.tm, tp, true)
return nil
}

Expand All @@ -218,7 +220,7 @@ func (r *RPC) AddTransport(in *AddTransportIn, out *TransportSummary) error {
if err != nil {
return err
}
*out = *newTransportSummary(tp, false)
*out = *newTransportSummary(r.node.tm, tp, false)
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,8 @@ func (r *Router) advanceNoiseHandshake(addr *app.LoopAddr, noiseMsg []byte) (ni

func (r *Router) isSetupTransport(tr transport.Transport) bool {
for _, pk := range r.config.SetupNodes {
if tr.Remote() == pk {
remote, _ := r.tm.Remote(tr.Edges())
if remote == pk {
return true
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/transport-discovery/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,14 @@ func TestGetTransportByID(t *testing.T) {
func TestGetTransportsByEdge(t *testing.T) {
entry := &transport.EntryWithStatus{Entry: newTestEntry(), IsUp: true}
srv := httptest.NewServer(authHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, fmt.Sprintf("/transports/edge:%s", entry.Entry.Edges[0]), r.URL.String())
assert.Equal(t, fmt.Sprintf("/transports/edge:%s", entry.Entry.Edges()[0]), r.URL.String())
json.NewEncoder(w).Encode([]*transport.EntryWithStatus{entry}) // nolint: errcheck
})))
defer srv.Close()

c, err := NewHTTP(srv.URL, testPubKey, testSecKey)
require.NoError(t, err)
entries, err := c.GetTransportsByEdge(context.Background(), entry.Entry.Edges[0])
entries, err := c.GetTransportsByEdge(context.Background(), entry.Entry.Edges()[0])
require.NoError(t, err)

require.Len(t, entries, 1)
Expand Down
2 changes: 1 addition & 1 deletion pkg/transport/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (td *mockDiscoveryClient) GetTransportsByEdge(ctx context.Context, pk ciphe
td.Lock()
res := []*EntryWithStatus{}
for _, entry := range td.entries {
if entry.Entry.Edges[0] == pk || entry.Entry.Edges[1] == pk {
if entry.Entry.Edges()[0] == pk || entry.Entry.Edges()[1] == pk {
e := &EntryWithStatus{}
*e = entry
res = append(res, e)
Expand Down
12 changes: 8 additions & 4 deletions pkg/transport/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type Entry struct {
ID uuid.UUID `json:"t_id"`

// Edges contains the public keys of the Transport's edge nodes (should only have 2 edges and the least-significant edge should come first).
Edges [2]cipher.PubKey `json:"edges"`
edges [2]cipher.PubKey `json:"edges"`

// Type represents the transport type.
Type string `json:"type"`
Expand All @@ -26,6 +26,10 @@ type Entry struct {
Public bool `json:"public"`
}

func (e *Entry) Edges() [2]cipher.PubKey {
return e.edges
}

// String implements stringer
func (e *Entry) String() string {
res := ""
Expand All @@ -37,16 +41,16 @@ func (e *Entry) String() string {
res += fmt.Sprintf("\ttype: %s\n", e.Type)
res += fmt.Sprintf("\tid: %s\n", e.ID)
res += fmt.Sprintf("\tedges:\n")
res += fmt.Sprintf("\t\tedge 1: %s\n", e.Edges[0])
res += fmt.Sprintf("\t\tedge 2: %s\n", e.Edges[1])
res += fmt.Sprintf("\t\tedge 1: %s\n", e.Edges()[0])
res += fmt.Sprintf("\t\tedge 2: %s\n", e.Edges()[1])

return res
}

// ToBinary returns binary representation of a Signature.
func (e *Entry) ToBinary() []byte {
bEntry := e.ID[:]
for _, edge := range e.Edges {
for _, edge := range e.Edges() {
bEntry = append(bEntry, edge[:]...)
}
return append(bEntry, []byte(e.Type)...)
Expand Down
26 changes: 17 additions & 9 deletions pkg/transport/handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ func settlementInitiatorHandshake(id uuid.UUID, public bool) settlementHandshake
return func(tm *Manager, tr Transport) (*Entry, error) {
entry := &Entry{
ID: id,
Edges: SortPubKeys(tr.Local(), tr.Remote()),
edges: tr.Edges(),
Type: tr.Type(),
Public: public,
}

newEntry := id == uuid.UUID{}
if newEntry {
entry.ID = GetTransportUUID(entry.Edges[0], entry.Edges[1], entry.Type)
entry.ID = GetTransportUUID(entry.Edges()[0], entry.Edges()[1], entry.Type)
}

sEntry := &SignedEntry{Entry: entry, Signatures: [2]cipher.Sig{entry.Signature(tm.config.SecKey)}}
Expand All @@ -53,8 +53,12 @@ func settlementInitiatorHandshake(id uuid.UUID, public bool) settlementHandshake
return nil, fmt.Errorf("read: %s", err)
}

if err := verifySig(sEntry, 1, tr.Remote()); err != nil {
return nil, err
if remote, Ok := tm.Remote(tr.Edges()); Ok == nil {
if err := verifySig(sEntry, 1, remote); err != nil {
return nil, err
}
} else {
return nil, Ok
}

if newEntry {
Expand All @@ -71,8 +75,12 @@ func settlementResponderHandshake(tm *Manager, tr Transport) (*Entry, error) {
return nil, fmt.Errorf("read: %s", err)
}

if err := validateEntry(sEntry, tr); err != nil {
return nil, err
if remote, Ok := tm.Remote(tr.Edges()); Ok == nil {
if err := validateEntry(sEntry, tr, remote); err != nil {
return nil, err
}
} else {
return nil, Ok
}

sEntry.Signatures[1] = sEntry.Entry.Signature(tm.config.SecKey)
Expand Down Expand Up @@ -103,21 +111,21 @@ func settlementResponderHandshake(tm *Manager, tr Transport) (*Entry, error) {
return sEntry.Entry, nil
}

func validateEntry(sEntry *SignedEntry, tr Transport) error {
func validateEntry(sEntry *SignedEntry, tr Transport, rpk cipher.PubKey) error {
entry := sEntry.Entry
if entry.Type != tr.Type() {
return errors.New("invalid entry type")
}

if entry.Edges != [2]cipher.PubKey{tr.Remote(), tr.Local()} {
if entry.Edges() != tr.Edges() {
return errors.New("invalid entry edges")
}

if sEntry.Signatures[0].Null() {
return errors.New("invalid entry signature")
}

return verifySig(sEntry, 0, tr.Remote())
return verifySig(sEntry, 0, rpk)
}

func verifySig(sEntry *SignedEntry, idx int, pk cipher.PubKey) error {
Expand Down
32 changes: 26 additions & 6 deletions pkg/transport/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,15 @@ func (tm *Manager) ReconnectTransports(ctx context.Context) {
entries := tm.entries
tm.mu.RUnlock()
for _, entry := range entries {
if entry.Edges[0] != tm.config.PubKey {
if entry.Edges()[0] != tm.config.PubKey {
continue
}

if tm.Transport(entry.ID) != nil {
continue
}

_, err := tm.createTransport(ctx, entry.Edges[1], entry.Type, entry.ID, entry.Public)
_, err := tm.createTransport(ctx, entry.Edges()[1], entry.Type, entry.ID, entry.Public)
if err != nil {
tm.Logger.Warnf("Failed to re-establish transport: %s", err)
continue
Expand All @@ -141,14 +141,30 @@ func (tm *Manager) ReconnectTransports(ctx context.Context) {
}
}

func (tm *Manager) Local() cipher.PubKey {
return tm.config.PubKey
}

func (tm *Manager) Remote(edges [2]cipher.PubKey) (cipher.PubKey, error) {
if tm.config.PubKey == edges[0] {
return edges[1], nil
}
if tm.config.PubKey == edges[1] {
return edges[0], nil
}
return cipher.PubKey{}, errors.New("Edges does not belongs to this Transport")
}

// CreateDefaultTransports created transports to DefaultNodes if they don't exist.
func (tm *Manager) CreateDefaultTransports(ctx context.Context) {
for _, pk := range tm.config.DefaultNodes {
exist := false
tm.WalkTransports(func(tr *ManagedTransport) bool {
if tr.Remote() == pk {
exist = true
return false
if remote, Ok := tm.Remote(tr.Edges()); Ok == nil {
if remote == pk {
exist = true
return false
}
}
return true
})
Expand Down Expand Up @@ -347,7 +363,11 @@ func (tm *Manager) acceptTransport(ctx context.Context, factory Factory) (*Manag
return nil, err
}

tm.Logger.Infof("Accepted new transport with type %s from %s. ID: %s", factory.Type(), tr.Remote(), entry.ID)
remote, err := tm.Remote(tr.Edges())
if err != nil {
return nil, err
}
tm.Logger.Infof("Accepted new transport with type %s from %s. ID: %s", factory.Type(), remote, entry.ID)
managedTr := newManagedTransport(entry.ID, tr, entry.Public)
tm.mu.Lock()

Expand Down
27 changes: 16 additions & 11 deletions pkg/transport/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,17 @@ func (f *MockFactory) Type() string {
// MockTransport is a transport that accepts custom writers and readers to use them in Read and Write
// operations
type MockTransport struct {
rw io.ReadWriteCloser
local cipher.PubKey
remote cipher.PubKey
rw io.ReadWriteCloser
edges [2]cipher.PubKey
// local cipher.PubKey
// remote cipher.PubKey
context context.Context
}

// NewMockTransport creates a transport with the given secret key and remote public key, taking a writer
// and a reader that will be used in the Write and Read operation
func NewMockTransport(rw io.ReadWriteCloser, local, remote cipher.PubKey) *MockTransport {
return &MockTransport{rw, local, remote, context.Background()}
return &MockTransport{rw, [2]cipher.PubKey{local, remote}, context.Background()}
}

// Read implements reader for mock transport
Expand Down Expand Up @@ -114,15 +115,19 @@ func (m *MockTransport) Close() error {
return m.rw.Close()
}

// Local returns the local static public key
func (m *MockTransport) Local() cipher.PubKey {
return m.local
func (m *MockTransport) Edges() [2]cipher.PubKey {
return m.edges
}

// Remote returns the remote public key fo the mock transport
func (m *MockTransport) Remote() cipher.PubKey {
return m.remote
}
// // Local returns the local static public key
// func (m *MockTransport) Local() cipher.PubKey {
// return m.local
// }

// // Remote returns the remote public key fo the mock transport
// func (m *MockTransport) Remote() cipher.PubKey {
// return m.remote
// }

// SetDeadline sets a deadline for the write/read operations of the mock transport
func (m *MockTransport) SetDeadline(t time.Time) error {
Expand Down
Loading

0 comments on commit f8882a4

Please sign in to comment.