Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/remove redialing #842

Merged
merged 45 commits into from
Aug 23, 2021
Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
1806be6
Remove redialing of transport
i-hate-nicknames Jul 8, 2021
b19b8e6
Fix panics when transport is closed
i-hate-nicknames Jul 8, 2021
8ba1738
Refactor managed transport Serve
i-hate-nicknames Jul 8, 2021
745ea4e
Force sorted edge order in transport entry
i-hate-nicknames Jul 8, 2021
68b3129
Remove up/down transport status
i-hate-nicknames Jul 9, 2021
dd734ef
Remove closed transports in transport manager
i-hate-nicknames Jul 9, 2021
764f34a
Move raw address to network.Conn
i-hate-nicknames Jul 9, 2021
738454c
Simplify dial logic
i-hate-nicknames Jul 9, 2021
8016a92
Delete entry instead of updating status
i-hate-nicknames Jul 9, 2021
f1b5ea0
Fix visor cli transport print
i-hate-nicknames Jul 9, 2021
afc7945
Add persistent transports todo
i-hate-nicknames Jul 9, 2021
edfb6b2
Add heartbeat loop
i-hate-nicknames Jul 9, 2021
43ac032
Merge branch 'develop' into feature/remove-redialing
i-hate-nicknames Jul 27, 2021
d94386e
Send heartbeat request in transport
i-hate-nicknames Jul 27, 2021
f9092b9
Remove transport status
i-hate-nicknames Jul 27, 2021
657734d
Fix linter errors
i-hate-nicknames Jul 28, 2021
6b35e2f
Remove transport entry with status
i-hate-nicknames Jul 28, 2021
9b76a8b
Fix visor cli transports
i-hate-nicknames Jul 28, 2021
300b9c8
Add persistent transports config
i-hate-nicknames Jul 29, 2021
8ed6bfb
Make lint and format
i-hate-nicknames Jul 29, 2021
876f9f3
Fix deadlocks in transport layer
i-hate-nicknames Jul 30, 2021
f84dc64
Relax locking when saving transport
i-hate-nicknames Aug 3, 2021
964e0a6
Add transport connection deadline
i-hate-nicknames Aug 3, 2021
88d397b
Fix transport manager shutdown logic
i-hate-nicknames Aug 3, 2021
c7e14a8
Simplify transport manager closing
i-hate-nicknames Aug 3, 2021
039d017
Merge branch 'develop' into feature/remove-redialing
i-hate-nicknames Aug 3, 2021
21db980
Remove isPublic transport entry field
i-hate-nicknames Aug 4, 2021
11fdf3f
Fix NPE for transport handshake
i-hate-nicknames Aug 5, 2021
1116d80
Code review fixes
i-hate-nicknames Aug 6, 2021
6b8741b
Update heartbeat interval
ersonp Aug 12, 2021
a565ce3
Update comment
ersonp Aug 12, 2021
d18e26c
Remove updateConnDeadline
ersonp Aug 12, 2021
ed0ee66
Rename Conn interface to Transport
ersonp Aug 12, 2021
816474b
Remove EntryWithStatus
ersonp Aug 16, 2021
21cadd8
Add Deprecated Public field to Entry struct
ersonp Aug 16, 2021
c04c889
Make public true
ersonp Aug 17, 2021
3514308
Update and add todo
ersonp Aug 17, 2021
651520a
Remove old todo
ersonp Aug 19, 2021
0a0d5c2
Remove public field from Entry struct
ersonp Aug 20, 2021
8a3c6d8
Rename PersistentRemote to PersistentTransports
ersonp Aug 20, 2021
1b580f4
Requested changes
ersonp Aug 20, 2021
7cd8f5c
Add api for persistent_transports
ersonp Aug 20, 2021
6431095
Add persistent_transports to summary
ersonp Aug 23, 2021
1b8e547
Remove GetPersistentTransports api
ersonp Aug 23, 2021
c8e2a62
Merge branch 'develop' into feature/remove-redialing
ersonp Aug 23, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cmd/skywire-cli/commands/visor/transport_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,13 @@ var discTpCmd = &cobra.Command{
},
}

func printTransportEntries(entries ...*transport.EntryWithStatus) {
func printTransportEntries(entries ...*transport.Entry) {
w := tabwriter.NewWriter(os.Stdout, 0, 0, 5, ' ', tabwriter.TabIndent)
_, err := fmt.Fprintln(w, "id\ttype\tpublic\tregistered\tup\tedge1\tedge2\topinion1\topinion2")
_, err := fmt.Fprintln(w, "id\ttype\tedge1\tedge2")
internal.Catch(err)
for _, e := range entries {
_, err := fmt.Fprintf(w, "%s\t%s\t%t\t%d\t%t\t%s\t%s\t%t\t%t\n",
e.Entry.ID, e.Entry.Type, e.Entry.Public, e.Registered, e.IsUp, e.Entry.Edges[0], e.Entry.Edges[1], e.Statuses[0], e.Statuses[1])
_, err := fmt.Fprintf(w, "%s\t%s\t%s\t%s\n",
e.ID, e.Type, e.Edges[0], e.Edges[1])
internal.Catch(err)
}
internal.Catch(w.Flush())
Expand Down
8 changes: 2 additions & 6 deletions cmd/skywire-cli/commands/visor/transports.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,6 @@ var addTpCmd = &cobra.Command{
logger.WithError(err).Fatalf("Failed to establish %v transport", transportType)
}

if !tp.IsUp {
logger.Fatalf("Established %v transport to %v with ID %v, but it isn't up", transportType, pk, tp.ID)
}

logger.Infof("Established %v transport to %v", transportType, pk)
} else {
transportTypes := []network.Type{
Expand Down Expand Up @@ -148,14 +144,14 @@ var rmTpCmd = &cobra.Command{
func printTransports(tps ...*visor.TransportSummary) {
sortTransports(tps...)
w := tabwriter.NewWriter(os.Stdout, 0, 0, 5, ' ', tabwriter.TabIndent)
_, err := fmt.Fprintln(w, "type\tid\tremote\tmode\tlabel\tis_up")
_, err := fmt.Fprintln(w, "type\tid\tremote\tmode\tlabel")
internal.Catch(err)
for _, tp := range tps {
tpMode := "regular"
if tp.IsSetup {
tpMode = "setup"
}
_, err = fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\t%v\n", tp.Type, tp.ID, tp.Remote, tpMode, tp.Label, tp.IsUp)
_, err = fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\n", tp.Type, tp.ID, tp.Remote, tpMode, tp.Label)
internal.Catch(err)
}
internal.Catch(w.Flush())
Expand Down
7 changes: 2 additions & 5 deletions pkg/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,6 @@ func (r *router) Serve(ctx context.Context) error {

go r.serveSetup()

r.tm.Serve(ctx)

return nil
}

Expand Down Expand Up @@ -728,17 +726,16 @@ func (r *router) Close() error {

r.once.Do(func() {
close(r.done)

r.mx.Lock()
close(r.accept)
r.mx.Unlock()
})

if err := r.sl.Close(); err != nil {
r.logger.WithError(err).Warnf("closing route_manager returned error")
return err
}

return r.tm.Close()
return nil
}

func (r *router) forwardPacket(ctx context.Context, packet routing.Packet, rule routing.Rule) error {
Expand Down
17 changes: 1 addition & 16 deletions pkg/servicedisc/autoconnect.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (a *autoconnector) Run(ctx context.Context) error {
a.log.Errorf("Cannot fetch public services: %s", err)
}

tps := a.updateTransports()
tps := a.tm.GetTransportsByLabel(transport.LabelAutomatic)
absent := a.filterDuplicates(addresses, tps)
for _, pk := range absent {
a.log.WithField("pk", pk).Infoln("Adding transport to public visor")
Expand All @@ -69,21 +69,6 @@ func (a *autoconnector) Run(ctx context.Context) error {
}
}

// Remove all inactive automatic transports and return all active
// automatic transports
func (a *autoconnector) updateTransports() []*transport.ManagedTransport {
tps := a.tm.GetTransportsByLabel(transport.LabelAutomatic)
var tpsActive []*transport.ManagedTransport
for _, tr := range tps {
if !tr.IsUp() {
a.tm.DeleteTransport(tr.Entry.ID)
} else {
tpsActive = append(tpsActive, tr)
}
}
return tpsActive
}

func (a *autoconnector) fetchPubAddresses(ctx context.Context) ([]cipher.PubKey, error) {
retrier := netutil.NewRetrier(fetchServicesDelay, 0, 2)
var services []Service
Expand Down
17 changes: 17 additions & 0 deletions pkg/transport/deprecated/deprecated.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package deprecated

import (
"github.com/google/uuid"
)

// Status represents the current state of a Transport from a Transport's single edge.
// Each Transport will have two perspectives; one from each of it's edges.
type Status struct {

// ID is the Transport ID that identifies the Transport that this status is regarding.
ID uuid.UUID `json:"t_id"`

// IsUp represents whether the Transport is up.
// A Transport that is down will fail to forward Packets.
IsUp bool `json:"is_up"`
}
62 changes: 20 additions & 42 deletions pkg/transport/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"net/http"
"sync"
"time"

"github.com/google/uuid"
"github.com/skycoin/dmsg/cipher"
Expand All @@ -15,41 +14,34 @@ import (
// DiscoveryClient performs Transport discovery operations.
type DiscoveryClient interface {
RegisterTransports(ctx context.Context, entries ...*SignedEntry) error
GetTransportByID(ctx context.Context, id uuid.UUID) (*EntryWithStatus, error)
GetTransportsByEdge(ctx context.Context, pk cipher.PubKey) ([]*EntryWithStatus, error)
GetTransportByID(ctx context.Context, id uuid.UUID) (*Entry, error)
GetTransportsByEdge(ctx context.Context, pk cipher.PubKey) ([]*Entry, error)
DeleteTransport(ctx context.Context, id uuid.UUID) error
UpdateStatuses(ctx context.Context, statuses ...*Status) ([]*EntryWithStatus, error)
HeartBeat(ctx context.Context, id uuid.UUID) error
Health(ctx context.Context) (int, error)
}

type mockDiscoveryClient struct {
sync.Mutex
entries map[uuid.UUID]EntryWithStatus
entries map[uuid.UUID]Entry
}

// NewDiscoveryMock construct a new mock transport discovery client.
func NewDiscoveryMock() DiscoveryClient {
return &mockDiscoveryClient{entries: map[uuid.UUID]EntryWithStatus{}}
return &mockDiscoveryClient{entries: map[uuid.UUID]Entry{}}
}

func (td *mockDiscoveryClient) RegisterTransports(ctx context.Context, entries ...*SignedEntry) error {
td.Lock()
for _, entry := range entries {
entryWithStatus := &EntryWithStatus{
Entry: entry.Entry,
IsUp: true,
Registered: time.Now().Unix(),
Statuses: [2]bool{true, true},
}
td.entries[entry.Entry.ID] = *entryWithStatus
entry.Registered = entryWithStatus.Registered
td.entries[entry.Entry.ID] = *entry.Entry
}
td.Unlock()

return nil
}

func (td *mockDiscoveryClient) GetTransportByID(ctx context.Context, id uuid.UUID) (*EntryWithStatus, error) {
func (td *mockDiscoveryClient) GetTransportByID(ctx context.Context, id uuid.UUID) (*Entry, error) {
td.Lock()
entry, ok := td.entries[id]
td.Unlock()
Expand All @@ -58,20 +50,20 @@ func (td *mockDiscoveryClient) GetTransportByID(ctx context.Context, id uuid.UUI
return nil, errors.New("transport not found")
}

return &EntryWithStatus{
Entry: entry.Entry,
IsUp: entry.IsUp,
Registered: entry.Registered,
Statuses: entry.Statuses,
return &Entry{
ID: entry.ID,
Edges: entry.Edges,
Label: entry.Label,
Type: entry.Type,
}, nil
}

func (td *mockDiscoveryClient) GetTransportsByEdge(ctx context.Context, pk cipher.PubKey) ([]*EntryWithStatus, error) {
func (td *mockDiscoveryClient) GetTransportsByEdge(ctx context.Context, pk cipher.PubKey) ([]*Entry, error) {
td.Lock()
res := make([]*EntryWithStatus, 0)
res := make([]*Entry, 0)
for _, entry := range td.entries {
if entry.Entry.HasEdge(pk) {
e := &EntryWithStatus{}
if entry.HasEdge(pk) {
e := &Entry{}
*e = entry
res = append(res, e)
}
Expand Down Expand Up @@ -100,24 +92,10 @@ func (td *mockDiscoveryClient) DeleteTransport(ctx context.Context, id uuid.UUID
return nil
}

func (td *mockDiscoveryClient) UpdateStatuses(ctx context.Context, statuses ...*Status) ([]*EntryWithStatus, error) {
res := make([]*EntryWithStatus, 0)

for _, status := range statuses {
entry, err := td.GetTransportByID(ctx, status.ID)
if err != nil {
return nil, err
}

td.Lock()
entry.IsUp = status.IsUp
td.entries[status.ID] = *entry
td.Unlock()
}

return res, nil
}

func (td *mockDiscoveryClient) Health(_ context.Context) (int, error) {
return http.StatusOK, nil
}

func (td *mockDiscoveryClient) HeartBeat(_ context.Context, _ uuid.UUID) error {
return nil
}
7 changes: 2 additions & 5 deletions pkg/transport/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,9 @@ func TestNewDiscoveryMock(t *testing.T) {

entryWS, err := dc.GetTransportByID(context.TODO(), sEntry.Entry.ID)
require.NoError(t, err)
require.True(t, entryWS.Entry.ID == sEntry.Entry.ID)
require.True(t, entryWS.ID == sEntry.Entry.ID)

entriesWS, err := dc.GetTransportsByEdge(context.TODO(), pk1)
require.NoError(t, err)
require.Equal(t, entry.Edges, entriesWS[0].Entry.Edges)

_, err = dc.UpdateStatuses(context.TODO(), &transport.Status{})
require.NoError(t, err)
require.Equal(t, entry.Edges, entriesWS[0].Edges)
}
72 changes: 16 additions & 56 deletions pkg/transport/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package transport
import (
"errors"
"fmt"
"strings"

"github.com/google/uuid"
"github.com/skycoin/dmsg/cipher"
Expand Down Expand Up @@ -36,29 +35,27 @@ type Entry struct {
ID uuid.UUID `json:"t_id"`

// Edges contains the public keys of the Transport's edge nodes
// (should only have 2 edges and the first edge is transport original initiator).
// Edges should always be sorted in ascending order
Edges [2]cipher.PubKey `json:"edges"`

// Type represents the transport type.
Type network.Type `json:"type"`

// Public determines whether the transport is to be exposed to other nodes or not.
// Public transports are to be registered in the Transport Discovery.
Public bool `json:"public"` // TODO(evanlinjin): remove this.

Label Label `json:"label"`

// Deprecated
Public bool `json:"public"` //TODO(ersonp): Remove after a month of release
}

// MakeEntry creates a new transport entry
func MakeEntry(initiator, target cipher.PubKey, netType network.Type, public bool, label Label) Entry {
func MakeEntry(aPK, bPK cipher.PubKey, netType network.Type, label Label) Entry {
entry := Entry{
ID: MakeTransportID(initiator, target, netType),
ID: MakeTransportID(aPK, bPK, netType),
Type: netType,
Public: public,
Label: label,
Edges: SortEdges(aPK, bPK),
Public: true,
}
entry.Edges[0] = initiator
entry.Edges[1] = target
return entry
}

Expand All @@ -84,6 +81,12 @@ func (e *Entry) EdgeIndex(pk cipher.PubKey) int {
return -1
}

// IsLeastSignificantEdge returns true if given pk is least significant edge
// of this entry
func (e *Entry) IsLeastSignificantEdge(pk cipher.PubKey) bool {
return e.EdgeIndex(pk) == 0
}

// HasEdge returns true if the provided edge is present in 'e.Edges' field.
func (e *Entry) HasEdge(edge cipher.PubKey) bool {
for _, pk := range e.Edges {
Expand All @@ -97,16 +100,11 @@ func (e *Entry) HasEdge(edge cipher.PubKey) bool {
// String implements stringer
func (e *Entry) String() string {
res := ""
if e.Public {
res += "visibility: public\n"
} else {
res += "visibility: private\n"
}
res += fmt.Sprintf("\ttype: %s\n", e.Type)
res += fmt.Sprintf("\tid: %s\n", e.ID)
res += "\tedges:\n"
res += fmt.Sprintf("\t\tedge 1 (initiator): %s\n", e.Edges[0])
res += fmt.Sprintf("\t\tedge 2 (target): %s\n", e.Edges[1])
res += fmt.Sprintf("\t\tedge 1: %s\n", e.Edges[0])
res += fmt.Sprintf("\t\tedge 2: %s\n", e.Edges[1])
return res
}

Expand Down Expand Up @@ -170,41 +168,3 @@ func NewSignedEntry(entry *Entry, pk cipher.PubKey, secKey cipher.SecKey) (*Sign
se := &SignedEntry{Entry: entry}
return se, se.Sign(pk, secKey)
}

// Status represents the current state of a Transport from a Transport's single edge.
// Each Transport will have two perspectives; one from each of it's edges.
type Status struct {

// ID is the Transport ID that identifies the Transport that this status is regarding.
ID uuid.UUID `json:"t_id"`

// IsUp represents whether the Transport is up.
// A Transport that is down will fail to forward Packets.
IsUp bool `json:"is_up"`
}

// EntryWithStatus stores Entry and Statuses returned by both Edges.
type EntryWithStatus struct {
Entry *Entry `json:"entry"`
IsUp bool `json:"is_up"`
Registered int64 `json:"registered"`
Updated int64 `json:"updated"`
Statuses [2]bool `json:"statuses"`
}

// String implements stringer
func (e *EntryWithStatus) String() string {
res := "entry:\n"
res += fmt.Sprintf("\tregistered at: %d\n", e.Registered)
res += fmt.Sprintf("\tstatus returned by edge 1: %t\n", e.Statuses[0])
res += fmt.Sprintf("\tstatus returned by edge 2: %t\n", e.Statuses[1])
if e.IsUp {
res += "\ttransport: up\n"
} else {
res += "\ttransport: down\n"
}
indentedStr := strings.Replace(e.Entry.String(), "\n\t", "\n\t\t", -1)
res += fmt.Sprintf("\ttransport info: \n\t\t%s", indentedStr)

return res
}
Loading