diff --git a/cmd/skywire-cli/commands/visor/transport_discovery.go b/cmd/skywire-cli/commands/visor/transport_discovery.go index e16c359ae6..ec3a6706bf 100644 --- a/cmd/skywire-cli/commands/visor/transport_discovery.go +++ b/cmd/skywire-cli/commands/visor/transport_discovery.go @@ -58,13 +58,13 @@ var discTpCmd = &cobra.Command{ }, } -func printTransportEntries(entries ...*transport.EntryWithStatus) { +func printTransportEntries(entries ...*transport.Entry) { w := tabwriter.NewWriter(os.Stdout, 0, 0, 5, ' ', tabwriter.TabIndent) - _, err := fmt.Fprintln(w, "id\ttype\tpublic\tregistered\tup\tedge1\tedge2\topinion1\topinion2") + _, err := fmt.Fprintln(w, "id\ttype\tedge1\tedge2") internal.Catch(err) for _, e := range entries { - _, err := fmt.Fprintf(w, "%s\t%s\t%t\t%d\t%t\t%s\t%s\t%t\t%t\n", - e.Entry.ID, e.Entry.Type, e.Entry.Public, e.Registered, e.IsUp, e.Entry.Edges[0], e.Entry.Edges[1], e.Statuses[0], e.Statuses[1]) + _, err := fmt.Fprintf(w, "%s\t%s\t%s\t%s\n", + e.ID, e.Type, e.Edges[0], e.Edges[1]) internal.Catch(err) } internal.Catch(w.Flush()) diff --git a/cmd/skywire-cli/commands/visor/transports.go b/cmd/skywire-cli/commands/visor/transports.go index 99cf89cd17..47bc21caa4 100644 --- a/cmd/skywire-cli/commands/visor/transports.go +++ b/cmd/skywire-cli/commands/visor/transports.go @@ -106,10 +106,6 @@ var addTpCmd = &cobra.Command{ logger.WithError(err).Fatalf("Failed to establish %v transport", transportType) } - if !tp.IsUp { - logger.Fatalf("Established %v transport to %v with ID %v, but it isn't up", transportType, pk, tp.ID) - } - logger.Infof("Established %v transport to %v", transportType, pk) } else { transportTypes := []network.Type{ @@ -148,14 +144,14 @@ var rmTpCmd = &cobra.Command{ func printTransports(tps ...*visor.TransportSummary) { sortTransports(tps...) w := tabwriter.NewWriter(os.Stdout, 0, 0, 5, ' ', tabwriter.TabIndent) - _, err := fmt.Fprintln(w, "type\tid\tremote\tmode\tlabel\tis_up") + _, err := fmt.Fprintln(w, "type\tid\tremote\tmode\tlabel") internal.Catch(err) for _, tp := range tps { tpMode := "regular" if tp.IsSetup { tpMode = "setup" } - _, err = fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\t%v\n", tp.Type, tp.ID, tp.Remote, tpMode, tp.Label, tp.IsUp) + _, err = fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\n", tp.Type, tp.ID, tp.Remote, tpMode, tp.Label) internal.Catch(err) } internal.Catch(w.Flush()) diff --git a/pkg/router/router.go b/pkg/router/router.go index 4836d77d08..9792414b3c 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -317,8 +317,6 @@ func (r *router) Serve(ctx context.Context) error { go r.serveSetup() - r.tm.Serve(ctx) - return nil } @@ -728,17 +726,16 @@ func (r *router) Close() error { r.once.Do(func() { close(r.done) - r.mx.Lock() close(r.accept) r.mx.Unlock() }) - if err := r.sl.Close(); err != nil { r.logger.WithError(err).Warnf("closing route_manager returned error") + return err } - return r.tm.Close() + return nil } func (r *router) forwardPacket(ctx context.Context, packet routing.Packet, rule routing.Rule) error { diff --git a/pkg/servicedisc/autoconnect.go b/pkg/servicedisc/autoconnect.go index 07aa5abc74..847f4303f5 100644 --- a/pkg/servicedisc/autoconnect.go +++ b/pkg/servicedisc/autoconnect.go @@ -55,7 +55,7 @@ func (a *autoconnector) Run(ctx context.Context) error { a.log.Errorf("Cannot fetch public services: %s", err) } - tps := a.updateTransports() + tps := a.tm.GetTransportsByLabel(transport.LabelAutomatic) absent := a.filterDuplicates(addresses, tps) for _, pk := range absent { a.log.WithField("pk", pk).Infoln("Adding transport to public visor") @@ -69,21 +69,6 @@ func (a *autoconnector) Run(ctx context.Context) error { } } -// Remove all inactive automatic transports and return all active -// automatic transports -func (a *autoconnector) updateTransports() []*transport.ManagedTransport { - tps := a.tm.GetTransportsByLabel(transport.LabelAutomatic) - var tpsActive []*transport.ManagedTransport - for _, tr := range tps { - if !tr.IsUp() { - a.tm.DeleteTransport(tr.Entry.ID) - } else { - tpsActive = append(tpsActive, tr) - } - } - return tpsActive -} - func (a *autoconnector) fetchPubAddresses(ctx context.Context) ([]cipher.PubKey, error) { retrier := netutil.NewRetrier(fetchServicesDelay, 0, 2) var services []Service diff --git a/pkg/transport/deprecated/deprecated.go b/pkg/transport/deprecated/deprecated.go new file mode 100644 index 0000000000..e394012269 --- /dev/null +++ b/pkg/transport/deprecated/deprecated.go @@ -0,0 +1,17 @@ +package deprecated + +import ( + "github.com/google/uuid" +) + +// Status represents the current state of a Transport from a Transport's single edge. +// Each Transport will have two perspectives; one from each of it's edges. +type Status struct { + + // ID is the Transport ID that identifies the Transport that this status is regarding. + ID uuid.UUID `json:"t_id"` + + // IsUp represents whether the Transport is up. + // A Transport that is down will fail to forward Packets. + IsUp bool `json:"is_up"` +} diff --git a/pkg/transport/discovery.go b/pkg/transport/discovery.go index 1b503d56ba..3ff8803ec8 100644 --- a/pkg/transport/discovery.go +++ b/pkg/transport/discovery.go @@ -6,7 +6,6 @@ import ( "fmt" "net/http" "sync" - "time" "github.com/google/uuid" "github.com/skycoin/dmsg/cipher" @@ -15,41 +14,34 @@ import ( // DiscoveryClient performs Transport discovery operations. type DiscoveryClient interface { RegisterTransports(ctx context.Context, entries ...*SignedEntry) error - GetTransportByID(ctx context.Context, id uuid.UUID) (*EntryWithStatus, error) - GetTransportsByEdge(ctx context.Context, pk cipher.PubKey) ([]*EntryWithStatus, error) + GetTransportByID(ctx context.Context, id uuid.UUID) (*Entry, error) + GetTransportsByEdge(ctx context.Context, pk cipher.PubKey) ([]*Entry, error) DeleteTransport(ctx context.Context, id uuid.UUID) error - UpdateStatuses(ctx context.Context, statuses ...*Status) ([]*EntryWithStatus, error) + HeartBeat(ctx context.Context, id uuid.UUID) error Health(ctx context.Context) (int, error) } type mockDiscoveryClient struct { sync.Mutex - entries map[uuid.UUID]EntryWithStatus + entries map[uuid.UUID]Entry } // NewDiscoveryMock construct a new mock transport discovery client. func NewDiscoveryMock() DiscoveryClient { - return &mockDiscoveryClient{entries: map[uuid.UUID]EntryWithStatus{}} + return &mockDiscoveryClient{entries: map[uuid.UUID]Entry{}} } func (td *mockDiscoveryClient) RegisterTransports(ctx context.Context, entries ...*SignedEntry) error { td.Lock() for _, entry := range entries { - entryWithStatus := &EntryWithStatus{ - Entry: entry.Entry, - IsUp: true, - Registered: time.Now().Unix(), - Statuses: [2]bool{true, true}, - } - td.entries[entry.Entry.ID] = *entryWithStatus - entry.Registered = entryWithStatus.Registered + td.entries[entry.Entry.ID] = *entry.Entry } td.Unlock() return nil } -func (td *mockDiscoveryClient) GetTransportByID(ctx context.Context, id uuid.UUID) (*EntryWithStatus, error) { +func (td *mockDiscoveryClient) GetTransportByID(ctx context.Context, id uuid.UUID) (*Entry, error) { td.Lock() entry, ok := td.entries[id] td.Unlock() @@ -58,20 +50,20 @@ func (td *mockDiscoveryClient) GetTransportByID(ctx context.Context, id uuid.UUI return nil, errors.New("transport not found") } - return &EntryWithStatus{ - Entry: entry.Entry, - IsUp: entry.IsUp, - Registered: entry.Registered, - Statuses: entry.Statuses, + return &Entry{ + ID: entry.ID, + Edges: entry.Edges, + Label: entry.Label, + Type: entry.Type, }, nil } -func (td *mockDiscoveryClient) GetTransportsByEdge(ctx context.Context, pk cipher.PubKey) ([]*EntryWithStatus, error) { +func (td *mockDiscoveryClient) GetTransportsByEdge(ctx context.Context, pk cipher.PubKey) ([]*Entry, error) { td.Lock() - res := make([]*EntryWithStatus, 0) + res := make([]*Entry, 0) for _, entry := range td.entries { - if entry.Entry.HasEdge(pk) { - e := &EntryWithStatus{} + if entry.HasEdge(pk) { + e := &Entry{} *e = entry res = append(res, e) } @@ -100,24 +92,10 @@ func (td *mockDiscoveryClient) DeleteTransport(ctx context.Context, id uuid.UUID return nil } -func (td *mockDiscoveryClient) UpdateStatuses(ctx context.Context, statuses ...*Status) ([]*EntryWithStatus, error) { - res := make([]*EntryWithStatus, 0) - - for _, status := range statuses { - entry, err := td.GetTransportByID(ctx, status.ID) - if err != nil { - return nil, err - } - - td.Lock() - entry.IsUp = status.IsUp - td.entries[status.ID] = *entry - td.Unlock() - } - - return res, nil -} - func (td *mockDiscoveryClient) Health(_ context.Context) (int, error) { return http.StatusOK, nil } + +func (td *mockDiscoveryClient) HeartBeat(_ context.Context, _ uuid.UUID) error { + return nil +} diff --git a/pkg/transport/discovery_test.go b/pkg/transport/discovery_test.go index d8e449b8f7..72e5c9d062 100644 --- a/pkg/transport/discovery_test.go +++ b/pkg/transport/discovery_test.go @@ -22,12 +22,9 @@ func TestNewDiscoveryMock(t *testing.T) { entryWS, err := dc.GetTransportByID(context.TODO(), sEntry.Entry.ID) require.NoError(t, err) - require.True(t, entryWS.Entry.ID == sEntry.Entry.ID) + require.True(t, entryWS.ID == sEntry.Entry.ID) entriesWS, err := dc.GetTransportsByEdge(context.TODO(), pk1) require.NoError(t, err) - require.Equal(t, entry.Edges, entriesWS[0].Entry.Edges) - - _, err = dc.UpdateStatuses(context.TODO(), &transport.Status{}) - require.NoError(t, err) + require.Equal(t, entry.Edges, entriesWS[0].Edges) } diff --git a/pkg/transport/entry.go b/pkg/transport/entry.go index 24f0a9ad28..9aef69a4c2 100644 --- a/pkg/transport/entry.go +++ b/pkg/transport/entry.go @@ -3,7 +3,6 @@ package transport import ( "errors" "fmt" - "strings" "github.com/google/uuid" "github.com/skycoin/dmsg/cipher" @@ -36,29 +35,23 @@ type Entry struct { ID uuid.UUID `json:"t_id"` // Edges contains the public keys of the Transport's edge nodes - // (should only have 2 edges and the first edge is transport original initiator). + // Edges should always be sorted in ascending order Edges [2]cipher.PubKey `json:"edges"` // Type represents the transport type. Type network.Type `json:"type"` - // Public determines whether the transport is to be exposed to other nodes or not. - // Public transports are to be registered in the Transport Discovery. - Public bool `json:"public"` // TODO(evanlinjin): remove this. - Label Label `json:"label"` } // MakeEntry creates a new transport entry -func MakeEntry(initiator, target cipher.PubKey, netType network.Type, public bool, label Label) Entry { +func MakeEntry(aPK, bPK cipher.PubKey, netType network.Type, label Label) Entry { entry := Entry{ - ID: MakeTransportID(initiator, target, netType), - Type: netType, - Public: public, - Label: label, + ID: MakeTransportID(aPK, bPK, netType), + Type: netType, + Label: label, + Edges: SortEdges(aPK, bPK), } - entry.Edges[0] = initiator - entry.Edges[1] = target return entry } @@ -84,6 +77,12 @@ func (e *Entry) EdgeIndex(pk cipher.PubKey) int { return -1 } +// IsLeastSignificantEdge returns true if given pk is least significant edge +// of this entry +func (e *Entry) IsLeastSignificantEdge(pk cipher.PubKey) bool { + return e.EdgeIndex(pk) == 0 +} + // HasEdge returns true if the provided edge is present in 'e.Edges' field. func (e *Entry) HasEdge(edge cipher.PubKey) bool { for _, pk := range e.Edges { @@ -97,16 +96,11 @@ func (e *Entry) HasEdge(edge cipher.PubKey) bool { // String implements stringer func (e *Entry) String() string { res := "" - if e.Public { - res += "visibility: public\n" - } else { - res += "visibility: private\n" - } res += fmt.Sprintf("\ttype: %s\n", e.Type) res += fmt.Sprintf("\tid: %s\n", e.ID) res += "\tedges:\n" - res += fmt.Sprintf("\t\tedge 1 (initiator): %s\n", e.Edges[0]) - res += fmt.Sprintf("\t\tedge 2 (target): %s\n", e.Edges[1]) + res += fmt.Sprintf("\t\tedge 1: %s\n", e.Edges[0]) + res += fmt.Sprintf("\t\tedge 2: %s\n", e.Edges[1]) return res } @@ -170,41 +164,3 @@ func NewSignedEntry(entry *Entry, pk cipher.PubKey, secKey cipher.SecKey) (*Sign se := &SignedEntry{Entry: entry} return se, se.Sign(pk, secKey) } - -// Status represents the current state of a Transport from a Transport's single edge. -// Each Transport will have two perspectives; one from each of it's edges. -type Status struct { - - // ID is the Transport ID that identifies the Transport that this status is regarding. - ID uuid.UUID `json:"t_id"` - - // IsUp represents whether the Transport is up. - // A Transport that is down will fail to forward Packets. - IsUp bool `json:"is_up"` -} - -// EntryWithStatus stores Entry and Statuses returned by both Edges. -type EntryWithStatus struct { - Entry *Entry `json:"entry"` - IsUp bool `json:"is_up"` - Registered int64 `json:"registered"` - Updated int64 `json:"updated"` - Statuses [2]bool `json:"statuses"` -} - -// String implements stringer -func (e *EntryWithStatus) String() string { - res := "entry:\n" - res += fmt.Sprintf("\tregistered at: %d\n", e.Registered) - res += fmt.Sprintf("\tstatus returned by edge 1: %t\n", e.Statuses[0]) - res += fmt.Sprintf("\tstatus returned by edge 2: %t\n", e.Statuses[1]) - if e.IsUp { - res += "\ttransport: up\n" - } else { - res += "\ttransport: down\n" - } - indentedStr := strings.Replace(e.Entry.String(), "\n\t", "\n\t\t", -1) - res += fmt.Sprintf("\ttransport info: \n\t\t%s", indentedStr) - - return res -} diff --git a/pkg/transport/entry_test.go b/pkg/transport/entry_test.go index 862cbad6ba..ddcd31302c 100644 --- a/pkg/transport/entry_test.go +++ b/pkg/transport/entry_test.go @@ -14,8 +14,8 @@ func TestNewEntry(t *testing.T) { pkA, _ := cipher.GenerateKeyPair() pkB, _ := cipher.GenerateKeyPair() - entryAB := transport.MakeEntry(pkA, pkB, "", true, transport.LabelUser) - entryBA := transport.MakeEntry(pkA, pkB, "", true, transport.LabelUser) + entryAB := transport.MakeEntry(pkA, pkB, "", transport.LabelUser) + entryBA := transport.MakeEntry(pkA, pkB, "", transport.LabelUser) assert.True(t, entryAB.Edges == entryBA.Edges) assert.True(t, entryAB.ID == entryBA.ID) @@ -27,7 +27,7 @@ func ExampleSignedEntry_Sign() { pkA, skA := cipher.GenerateKeyPair() pkB, skB := cipher.GenerateKeyPair() - entry := transport.MakeEntry(pkA, pkB, "mock", true, transport.LabelUser) + entry := transport.MakeEntry(pkA, pkB, "mock", transport.LabelUser) sEntry := &transport.SignedEntry{Entry: &entry} if sEntry.Signatures[0].Null() && sEntry.Signatures[1].Null() { @@ -62,7 +62,7 @@ func ExampleSignedEntry_Signature() { pkA, skA := cipher.GenerateKeyPair() pkB, skB := cipher.GenerateKeyPair() - entry := transport.MakeEntry(pkA, pkB, "mock", true, transport.LabelUser) + entry := transport.MakeEntry(pkA, pkB, "mock", transport.LabelUser) sEntry := &transport.SignedEntry{Entry: &entry} if err := sEntry.Sign(pkA, skA); err != nil { diff --git a/pkg/transport/handshake.go b/pkg/transport/handshake.go index 16e3fdf8e1..40d84d76ff 100644 --- a/pkg/transport/handshake.go +++ b/pkg/transport/handshake.go @@ -23,12 +23,9 @@ const ( responseInvalidEntry ) -func makeEntryFromTpConn(conn network.Conn, isInitiator bool) Entry { - initiator, target := conn.LocalPK(), conn.RemotePK() - if !isInitiator { - initiator, target = target, initiator - } - return MakeEntry(initiator, target, conn.Network(), true, LabelUser) +func makeEntryFromTransport(transport network.Transport) Entry { + aPK, bPK := transport.LocalPK(), transport.RemotePK() + return MakeEntry(aPK, bPK, transport.Network(), LabelUser) } func compareEntries(expected, received *Entry) error { @@ -44,10 +41,6 @@ func compareEntries(expected, received *Entry) error { return errors.New("received entry's 'type' is not of expected") } - if expected.Public != received.Public { - return errors.New("received entry's 'public' is not of expected") - } - return nil } @@ -58,6 +51,10 @@ func receiveAndVerifyEntry(r io.Reader, expected *Entry, remotePK cipher.PubKey) return nil, fmt.Errorf("failed to read entry: %w", err) } + if recvSE.Entry == nil { + return nil, fmt.Errorf("failed to read entry: entry part of singed entry is empty") + } + if err := compareEntries(expected, recvSE.Entry); err != nil { return nil, err } @@ -76,13 +73,13 @@ func receiveAndVerifyEntry(r io.Reader, expected *Entry, remotePK cipher.PubKey) // SettlementHS represents a settlement handshake. // This is the handshake responsible for registering a transport to transport discovery. -type SettlementHS func(ctx context.Context, dc DiscoveryClient, conn network.Conn, sk cipher.SecKey) error +type SettlementHS func(ctx context.Context, dc DiscoveryClient, transport network.Transport, sk cipher.SecKey) error // Do performs the settlement handshake. -func (hs SettlementHS) Do(ctx context.Context, dc DiscoveryClient, conn network.Conn, sk cipher.SecKey) (err error) { +func (hs SettlementHS) Do(ctx context.Context, dc DiscoveryClient, transport network.Transport, sk cipher.SecKey) (err error) { done := make(chan struct{}) go func() { - err = hs(ctx, dc, conn, sk) + err = hs(ctx, dc, transport, sk) close(done) }() select { @@ -98,29 +95,21 @@ func (hs SettlementHS) Do(ctx context.Context, dc DiscoveryClient, conn network. // The handshake logic only REGISTERS the transport, and does not update the status of the transport. func MakeSettlementHS(init bool) SettlementHS { // initiating logic. - initHS := func(ctx context.Context, dc DiscoveryClient, conn network.Conn, sk cipher.SecKey) (err error) { - entry := makeEntryFromTpConn(conn, true) - - // TODO(evanlinjin): Probably not needed as this is called in mTp already. Need to double check. - //defer func() { - // // @evanlinjin: I used background context to ensure status is always updated. - // if _, err := dc.UpdateStatuses(context.Background(), &Status{ID: entry.ID, IsUp: err == nil}); err != nil { - // log.WithError(err).Error("Failed to update statuses") - // } - //}() + initHS := func(ctx context.Context, dc DiscoveryClient, transport network.Transport, sk cipher.SecKey) (err error) { + entry := makeEntryFromTransport(transport) // create signed entry and send it to responding visor. - se, err := NewSignedEntry(&entry, conn.LocalPK(), sk) + se, err := NewSignedEntry(&entry, transport.LocalPK(), sk) if err != nil { return fmt.Errorf("failed to sign entry: %w", err) } - if err := json.NewEncoder(conn).Encode(se); err != nil { + if err := json.NewEncoder(transport).Encode(se); err != nil { return fmt.Errorf("failed to write entry: %w", err) } // await okay signal. accepted := make([]byte, 1) - if _, err := io.ReadFull(conn, accepted); err != nil { + if _, err := io.ReadFull(transport, accepted); err != nil { return fmt.Errorf("failed to read response: %w", err) } switch hsResponse(accepted[0]) { @@ -138,18 +127,18 @@ func MakeSettlementHS(init bool) SettlementHS { } // responding logic. - respHS := func(ctx context.Context, dc DiscoveryClient, conn network.Conn, sk cipher.SecKey) error { - entry := makeEntryFromTpConn(conn, false) + respHS := func(ctx context.Context, dc DiscoveryClient, transport network.Transport, sk cipher.SecKey) error { + entry := makeEntryFromTransport(transport) // receive, verify and sign entry. - recvSE, err := receiveAndVerifyEntry(conn, &entry, conn.RemotePK()) + recvSE, err := receiveAndVerifyEntry(transport, &entry, transport.RemotePK()) if err != nil { - writeHsResponse(conn, responseInvalidEntry) //nolint:errcheck, gosec + writeHsResponse(transport, responseInvalidEntry) //nolint:errcheck, gosec return err } - if err := recvSE.Sign(conn.LocalPK(), sk); err != nil { - writeHsResponse(conn, responseSignatureErr) //nolint:errcheck, gosec + if err := recvSE.Sign(transport.LocalPK(), sk); err != nil { + writeHsResponse(transport, responseSignatureErr) //nolint:errcheck, gosec return fmt.Errorf("failed to sign received entry: %w", err) } @@ -164,7 +153,7 @@ func MakeSettlementHS(init bool) SettlementHS { log.WithError(err).Error("Failed to register transport.") } } - return writeHsResponse(conn, responseOK) + return writeHsResponse(transport, responseOK) } if init { diff --git a/pkg/transport/managed_transport.go b/pkg/transport/managed_transport.go index c1ed2e22f6..c5469f3ab4 100644 --- a/pkg/transport/managed_transport.go +++ b/pkg/transport/managed_transport.go @@ -13,16 +13,19 @@ import ( "github.com/skycoin/dmsg/cipher" "github.com/skycoin/dmsg/httputil" - "github.com/skycoin/dmsg/netutil" "github.com/skycoin/skycoin/src/util/logging" + "github.com/skycoin/skywire/internal/netutil" "github.com/skycoin/skywire/pkg/app/appevent" "github.com/skycoin/skywire/pkg/routing" "github.com/skycoin/skywire/pkg/skyenv" "github.com/skycoin/skywire/pkg/transport/network" ) -const logWriteInterval = time.Second * 3 +const ( + logWriteInterval = time.Second * 3 + heartbeatInterval = time.Minute * 10 +) // Records number of managedTransports. var mTpCount int32 @@ -31,33 +34,24 @@ var ( // ErrNotServing is the error returned when a transport is no longer served. ErrNotServing = errors.New("transport is no longer being served") - // ErrConnAlreadyExists occurs when an underlying transport connection already exists. - ErrConnAlreadyExists = errors.New("underlying transport connection already exists") -) - -// Constants associated with transport redial loop. -const ( - tpInitBO = time.Millisecond * 500 - tpMaxBO = time.Minute - tpTries = 0 - tpFactor = 2 - tpTimeout = time.Second * 3 // timeout for a single try + // ErrTransportAlreadyExists occurs when an underlying transport already exists. + ErrTransportAlreadyExists = errors.New("underlying transport already exists") ) // ManagedTransportConfig is a configuration for managed transport. type ManagedTransportConfig struct { - client network.Client - ebc *appevent.Broadcaster - DC DiscoveryClient - LS LogStore - RemotePK cipher.PubKey - TransportLabel Label + client network.Client + ebc *appevent.Broadcaster + DC DiscoveryClient + LS LogStore + RemotePK cipher.PubKey + TransportLabel Label + InactiveTimeout time.Duration } // ManagedTransport manages a direct line of communication between two visor nodes. -// There is a single underlying connection between two edges. -// Initial dialing can be requested by either edge of the connection. -// However, only the edge with the least-significant public key can redial. +// There is a single underlying transport between two edges. +// Initial dialing can be requested by either edge of the transport. type ManagedTransport struct { log *logging.Logger @@ -66,69 +60,41 @@ type ManagedTransport struct { LogEntry *LogEntry logUpdates uint32 - dc DiscoveryClient - ls LogStore - ebc *appevent.Broadcaster + dc DiscoveryClient + ls LogStore - isUp bool // records last successful status update to discovery - isUpErr error // records whether the last status update was successful or not - isUpMux sync.Mutex - - redialCancel context.CancelFunc // for canceling redialling logic - redialMx sync.Mutex - - client network.Client - conn network.Conn - connCh chan struct{} - connMx sync.Mutex + client network.Client + transport network.Transport + transportCh chan struct{} + transportMx sync.Mutex done chan struct{} - once sync.Once wg sync.WaitGroup - remoteAddr string + timeout time.Duration } // NewManagedTransport creates a new ManagedTransport. -func NewManagedTransport(conf ManagedTransportConfig, isInitiator bool) *ManagedTransport { - initiator, target := conf.client.PK(), conf.RemotePK - if !isInitiator { - initiator, target = target, initiator - } +func NewManagedTransport(conf ManagedTransportConfig) *ManagedTransport { + aPK, bPK := conf.client.PK(), conf.RemotePK mt := &ManagedTransport{ - log: logging.MustGetLogger(fmt.Sprintf("tp:%s", conf.RemotePK.String()[:6])), - rPK: conf.RemotePK, - dc: conf.DC, - ls: conf.LS, - client: conf.client, - Entry: MakeEntry(initiator, target, conf.client.Type(), true, conf.TransportLabel), - LogEntry: new(LogEntry), - connCh: make(chan struct{}, 1), - done: make(chan struct{}), - ebc: conf.ebc, + log: logging.MustGetLogger(fmt.Sprintf("tp:%s", conf.RemotePK.String()[:6])), + rPK: conf.RemotePK, + dc: conf.DC, + ls: conf.LS, + client: conf.client, + Entry: MakeEntry(aPK, bPK, conf.client.Type(), conf.TransportLabel), + LogEntry: new(LogEntry), + transportCh: make(chan struct{}, 1), + done: make(chan struct{}), + timeout: conf.InactiveTimeout, } - mt.wg.Add(2) return mt } -// IsUp returns true if transport status is up. -func (mt *ManagedTransport) IsUp() bool { - mt.isUpMux.Lock() - isUp := mt.isUp && mt.isUpErr == nil - mt.isUpMux.Unlock() - return isUp -} - // Serve serves and manages the transport. func (mt *ManagedTransport) Serve(readCh chan<- routing.Packet) { - defer mt.wg.Done() - - ctx, cancel := context.WithCancel(context.Background()) - go func() { - <-mt.done - cancel() - }() - + mt.wg.Add(3) log := mt.log. WithField("tp_id", mt.Entry.ID). WithField("remote_pk", mt.rPK). @@ -137,83 +103,77 @@ func (mt *ManagedTransport) Serve(readCh chan<- routing.Packet) { log.Info("Serving.") defer func() { - // Ensure logs tp logs are up to date before closing. - if mt.logMod() { - if err := mt.ls.Record(mt.Entry.ID, mt.LogEntry); err != nil { - log.WithError(err).Warn("Failed to record log entry.") - } - } - - // End connection. - mt.connMx.Lock() - close(mt.connCh) - if mt.conn != nil { - if err := mt.conn.Close(); err != nil { - log.WithError(err).Warn("Failed to close underlying connection.") - } - mt.conn = nil - } - mt.connMx.Unlock() - + mt.close() log.WithField("remaining_tps", atomic.AddInt32(&mTpCount, -1)). Info("Stopped serving.") }() - // Read loop. - go func() { - log := mt.log.WithField("src", "read_loop") - defer func() { - cancel() - mt.wg.Done() - log.Debug("Closed read loop.") - }() - for { - p, err := mt.readPacket() - if err != nil { - if err == ErrNotServing { - mt.log.WithError(err).Debug("Failed to read packet. Returning...") - return - } - mt.connMx.Lock() - mt.clearConn() - mt.connMx.Unlock() - log.WithError(err).Warn("Failed to read packet.") - continue - } - select { - case <-mt.done: - return + go mt.readLoop(readCh) + if mt.Entry.IsLeastSignificantEdge(mt.client.PK()) { + go mt.heartbeatLoop() + } + mt.logLoop() +} - case readCh <- p: - } +// readLoop continuously reads packets from the underlying transport +// and sends them to readCh +// This is a blocking call +func (mt *ManagedTransport) readLoop(readCh chan<- routing.Packet) { + log := mt.log.WithField("src", "read_loop") + defer mt.wg.Done() + for { + p, err := mt.readPacket() + if err != nil { + log.WithError(err).Warn("Failed to read packet, closing transport") + mt.close() + return } + select { + case <-mt.done: + return + case readCh <- p: + } + } +} + +func (mt *ManagedTransport) heartbeatLoop() { + defer func() { + mt.wg.Done() + log.Debug("Stopped heartbeat loop") }() + ticker := time.NewTicker(heartbeatInterval) + for { + select { + case <-mt.done: + ticker.Stop() + return + case <-ticker.C: + err := mt.dc.HeartBeat(context.Background(), mt.Entry.ID) + if err != nil { + log.Warn("Failed to send heartbeat") + } + } + } +} - // Logging & redialing loop. +// logLoop continuously stores transport data in the log entry, +// in case there is data to store +// This is a blocking call +func (mt *ManagedTransport) logLoop() { + defer func() { + mt.recordLog() + mt.wg.Done() + mt.log.Debug("Stopped log loop") + }() + // Ensure logs tp logs are up to date before closing. logTicker := time.NewTicker(logWriteInterval) for { select { case <-mt.done: logTicker.Stop() return - case <-logTicker.C: - if mt.logMod() { - if err := mt.ls.Record(mt.Entry.ID, mt.LogEntry); err != nil { - mt.log.WithError(err).Warn("Failed to record log entry.") - } - continue - } - - // Only initiator is responsible for redialing. - if !mt.isInitiator() { - continue - } - - // If there has not been any activity, ensure underlying 'write' tp is still up. - if err := mt.redialLoop(ctx); err != nil { - mt.log.WithError(err).Debug("Stopped reconnecting underlying connection.") - } + mt.recordLog() } } } @@ -232,43 +192,63 @@ func (mt *ManagedTransport) isServing() bool { // It only returns an error if transport status update fails. func (mt *ManagedTransport) Close() (err error) { mt.close() + mt.log.Debug("Waiting for the waitgroup") mt.wg.Wait() - - mt.isUpMux.Lock() - err = mt.isUpErr - mt.isUpMux.Unlock() - - return err + return nil } -func (mt *ManagedTransport) close() { - mt.disconnect() - if mt.Type() == network.STCPR && mt.remoteAddr != "" { - mt.ebc.SendTPClose(context.Background(), string(network.STCPR), mt.remoteAddr) +// IsClosed returns true when the transport is closed +// This instance cannot be used anymore and should be discarded +func (mt *ManagedTransport) IsClosed() bool { + select { + case <-mt.done: + return true + default: + return false } } -// disconnect stops serving the transport and ensures that transport status is updated to DOWN. -// It also waits until mt.Serve returns if specified. -func (mt *ManagedTransport) disconnect() { - mt.once.Do(func() { close(mt.done) }) - _ = mt.updateStatus(false, 1) //nolint:errcheck +// close underlying transport and remove the entry from transport discovery +// todo: this currently performs http request to discovery service +// it only makes sense to wait for the completion if we are closing the visor itself, +// regular transport close operations should probably call it concurrently +// need to find a way to handle this properly (done channel in return?) +func (mt *ManagedTransport) close() { + mt.log.Debug("Closing...") + select { + case <-mt.done: + return + default: + close(mt.done) + } + mt.log.Debug("Locking transportMx") + mt.transportMx.Lock() + close(mt.transportCh) + if mt.transport != nil { + if err := mt.transport.Close(); err != nil { + log.WithError(err).Warn("Failed to close underlying transport.") + } + mt.transport = nil + } + mt.transportMx.Unlock() + mt.log.Debug("Unlocking transportMx") + _ = mt.deleteFromDiscovery() //nolint:errcheck } -// Accept accepts a new underlying connection. -func (mt *ManagedTransport) Accept(ctx context.Context, conn network.Conn) error { - mt.connMx.Lock() - defer mt.connMx.Unlock() +// Accept accepts a new underlying transport. +func (mt *ManagedTransport) Accept(ctx context.Context, transport network.Transport) error { + mt.transportMx.Lock() + defer mt.transportMx.Unlock() - if conn.Network() != mt.Type() { + if transport.Network() != mt.Type() { return ErrWrongNetwork } if !mt.isServing() { mt.log.WithError(ErrNotServing).Debug() - if err := conn.Close(); err != nil { + if err := transport.Close(); err != nil { mt.log.WithError(err). - Warn("Failed to close newly accepted connection.") + Warn("Failed to close newly accepted transport.") } return ErrNotServing } @@ -277,31 +257,37 @@ func (mt *ManagedTransport) Accept(ctx context.Context, conn network.Conn) error defer cancel() mt.log.Debug("Performing settlement handshake...") - if err := MakeSettlementHS(false).Do(ctx, mt.dc, conn, mt.client.SK()); err != nil { + if err := MakeSettlementHS(false).Do(ctx, mt.dc, transport, mt.client.SK()); err != nil { return fmt.Errorf("settlement handshake failed: %w", err) } - mt.log.Debug("Setting underlying connection...") - return mt.setConn(conn) + mt.log.Debug("Setting underlying transport...") + return mt.setTransport(transport) } -// Dial dials a new underlying connection. +// Dial dials a new underlying transport. func (mt *ManagedTransport) Dial(ctx context.Context) error { - mt.connMx.Lock() - defer mt.connMx.Unlock() + mt.transportMx.Lock() + defer mt.transportMx.Unlock() if !mt.isServing() { return ErrNotServing } - if mt.conn != nil { + if mt.transport != nil { return nil } return mt.dial(ctx) } +// DialAsync is asynchronous version of dial that allows dialing in a different +// goroutine +func (mt *ManagedTransport) DialAsync(ctx context.Context, errCh chan error) { + errCh <- mt.Dial(ctx) +} + func (mt *ManagedTransport) dial(ctx context.Context) error { - conn, err := mt.client.Dial(ctx, mt.rPK, skyenv.DmsgTransportPort) + transport, err := mt.client.Dial(ctx, mt.rPK, skyenv.DmsgTransportPort) if err != nil { return fmt.Errorf("snet.Dial: %w", err) } @@ -309,217 +295,82 @@ func (mt *ManagedTransport) dial(ctx context.Context) error { ctx, cancel := context.WithTimeout(ctx, time.Second*20) defer cancel() - if err := MakeSettlementHS(true).Do(ctx, mt.dc, conn, mt.client.SK()); err != nil { + if err := MakeSettlementHS(true).Do(ctx, mt.dc, transport, mt.client.SK()); err != nil { return fmt.Errorf("settlement handshake failed: %w", err) } - if err := mt.setConn(conn); err != nil { - return fmt.Errorf("setConn: %w", err) + if err := mt.setTransport(transport); err != nil { + return fmt.Errorf("setTransport: %w", err) } return nil } -// redial only actually dials if transport is still registered in transport discovery. -// The 'retry' output specifies whether we can retry dial on failure. -func (mt *ManagedTransport) redial(ctx context.Context) error { - if !mt.isServing() { - return ErrNotServing - } - - if _, err := mt.dc.GetTransportByID(ctx, mt.Entry.ID); err != nil { - // If the error is a temporary network error, we should retry at a later stage. - if netErr, ok := err.(net.Error); ok && netErr.Temporary() { - - return err - } - - // If the error is not temporary, it most likely means that the transport is no longer registered. - // Hence, we should close the managed transport. - mt.disconnect() - mt.log. - WithError(err). - Warn("Transport closed due to redial failure. Transport is likely no longer in discovery.") - - return ErrNotServing - } - - return mt.dial(ctx) -} - -// redialLoop calls redial in a loop with exponential back-off until success or transport closure. -func (mt *ManagedTransport) redialLoop(ctx context.Context) error { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - mt.redialMx.Lock() - mt.redialCancel = cancel - mt.redialMx.Unlock() - - retry := netutil.NewRetrier(mt.log, tpInitBO, tpMaxBO, tpTries, tpFactor). - WithErrWhitelist(ErrNotServing, context.Canceled) - - // Only redial when there is no underlying conn. - return retry.Do(ctx, func() (err error) { - tryCtx, cancel := context.WithTimeout(ctx, tpTimeout) - defer cancel() - mt.connMx.Lock() - if mt.conn == nil { - err = mt.redial(tryCtx) - } - mt.connMx.Unlock() - return err - }) -} - func (mt *ManagedTransport) isLeastSignificantEdge() bool { sorted := SortEdges(mt.Entry.Edges[0], mt.Entry.Edges[1]) return sorted[0] == mt.client.PK() } -func (mt *ManagedTransport) isInitiator() bool { - return mt.Entry.EdgeIndex(mt.client.PK()) == 0 -} - /* - <<< UNDERLYING CONNECTION >>> + <<< UNDERLYING TRANSPORT>>> */ -func (mt *ManagedTransport) getConn() network.Conn { +func (mt *ManagedTransport) getTransport() network.Transport { if !mt.isServing() { return nil } - mt.connMx.Lock() - conn := mt.conn - mt.connMx.Unlock() - return conn + mt.transportMx.Lock() + transport := mt.transport + mt.transportMx.Unlock() + return transport } -// setConn sets 'mt.conn' (the underlying connection). -// If 'mt.conn' is already occupied, close the newly introduced connection. -func (mt *ManagedTransport) setConn(newConn network.Conn) error { - - if mt.conn != nil { +// set sets 'mt.transport' (the underlying transport). +// If 'mt.transport' is already occupied, close the newly introduced transport. +func (mt *ManagedTransport) setTransport(newTransport network.Transport) error { + if mt.transport != nil { if mt.isLeastSignificantEdge() { - mt.log.Debug("Underlying conn already exists, closing new conn.") - if err := newConn.Close(); err != nil { - log.WithError(err).Warn("Failed to close new conn.") + mt.log.Debug("Underlying transport already exists, closing new transport.") + if err := newTransport.Close(); err != nil { + log.WithError(err).Warn("Failed to close new transport.") } - return ErrConnAlreadyExists + return ErrTransportAlreadyExists } - mt.log.Debug("Underlying conn already exists, closing old conn.") - if err := mt.conn.Close(); err != nil { - log.WithError(err).Warn("Failed to close old conn.") + mt.log.Debug("Underlying transport already exists, closing old transport.") + if err := mt.transport.Close(); err != nil { + log.WithError(err).Warn("Failed to close old transport.") } - mt.conn = nil - } - - if err := mt.updateStatus(true, 1); err != nil { - return fmt.Errorf("failed to update transport status: %w", err) + mt.transport = nil } - // Set new underlying connection. - mt.conn = newConn + // Set new underlying transport. + mt.transport = newTransport select { - case mt.connCh <- struct{}{}: - mt.log.Debug("Sent signal to 'mt.connCh'.") + case mt.transportCh <- struct{}{}: + mt.log.Debug("Sent signal to 'mt.transportCh'.") default: } - - // Cancel reconnection logic. - mt.redialMx.Lock() - if mt.redialCancel != nil { - mt.redialCancel() - } - mt.redialMx.Unlock() - return nil } -func (mt *ManagedTransport) clearConn() { - if !mt.isServing() { - return - } - - if mt.conn != nil { - if err := mt.conn.Close(); err != nil { - log.WithError(err).Warn("Failed to close connection") - } - mt.conn = nil - } - _ = mt.updateStatus(false, 1) //nolint:errcheck -} - -func (mt *ManagedTransport) updateStatus(isUp bool, tries int) (err error) { - if tries < 1 { - panic(fmt.Errorf("mt.updateStatus: invalid input: got tries=%d (want tries > 0)", tries)) - } - - // If not serving, we should update status to 'DOWN' and ensure 'updateStatus' returns error. - if !mt.isServing() { - isUp = false - } - defer func() { - if err == nil && !mt.isServing() { - err = ErrNotServing +func (mt *ManagedTransport) deleteFromDiscovery() error { + retrier := netutil.NewRetrier(1*time.Second, 5, 2) + return retrier.Do(func() error { + err := mt.dc.DeleteTransport(context.Background(), mt.Entry.ID) + if netErr, ok := err.(net.Error); ok && netErr.Temporary() { + mt.log. + WithError(err). + WithField("temporary", true). + Warn("Failed to update transport status.") + return err } - }() - - mt.isUpMux.Lock() - defer mt.isUpMux.Unlock() - - // If last update is the same as current, nothing needs to be done. - if mt.isUp == isUp { - return nil - } - - for i := 0; i < tries; i++ { - // @evanlinjin: We don't pass context as we always want transport status to be updated. - if _, err = mt.dc.UpdateStatuses(context.Background(), &Status{ID: mt.Entry.ID, IsUp: isUp}); err != nil { - - // Only retry if error is temporary. - if netErr, ok := err.(net.Error); ok && netErr.Temporary() { - mt.log. - WithError(err). - WithField("temporary", true). - WithField("retry", i+1 < tries). - Warn("Failed to update transport status.") - continue - } - - // Close managed transport if associated entry is not in discovery. - if httpErr, ok := err.(*httputil.HTTPError); ok && httpErr.Status == http.StatusNotFound { - mt.log. - WithError(err). - WithField("temporary", false). - WithField("retry", false). - Warn("Failed to update transport status. Closing transport...") - mt.isUp = false - mt.isUpErr = httpErr - mt.once.Do(func() { close(mt.done) }) // Only time when mt.done is closed outside of mt.close() - return - } - - break + if httpErr, ok := err.(*httputil.HTTPError); ok && httpErr.Status == http.StatusNotFound { + return nil } - mt.log. - WithField("status", statusString(isUp)). - Info("Transport status updated.") - break - } - - mt.isUp = isUp - mt.isUpErr = err - return err -} - -func statusString(isUp bool) string { - if isUp { - return "UP" - } - return "DOWN" + return err + }) } /* @@ -528,24 +379,16 @@ func statusString(isUp bool) string { // WritePacket writes a packet to the remote. func (mt *ManagedTransport) WritePacket(ctx context.Context, packet routing.Packet) error { - mt.connMx.Lock() - defer mt.connMx.Unlock() - - if mt.conn == nil { - if err := mt.redial(ctx); err != nil { + mt.transportMx.Lock() + defer mt.transportMx.Unlock() - // TODO(evanlinjin): Determine whether we need to call 'mt.wg.Wait()' here. - if err == ErrNotServing { - mt.wg.Wait() - } - - return fmt.Errorf("failed to redial underlying connection: %w", err) - } + if mt.transport == nil { + return fmt.Errorf("write packet: cannot write to transport, transport is not set up") } - n, err := mt.conn.Write(packet) + n, err := mt.transport.Write(packet) if err != nil { - mt.clearConn() + mt.close() return err } if n > routing.PacketHeaderSize { @@ -558,29 +401,28 @@ func (mt *ManagedTransport) WritePacket(ctx context.Context, packet routing.Pack func (mt *ManagedTransport) readPacket() (packet routing.Packet, err error) { log := mt.log.WithField("func", "readPacket") - var conn network.Conn + var transport network.Transport for { - if conn = mt.getConn(); conn != nil { + if transport = mt.getTransport(); transport != nil { break } select { case <-mt.done: return nil, ErrNotServing - case <-mt.connCh: + case <-mt.transportCh: } } log.Debug("Awaiting packet...") h := make(routing.Packet, routing.PacketHeaderSize) - if _, err = io.ReadFull(conn, h); err != nil { + if _, err = io.ReadFull(transport, h); err != nil { log.WithError(err).Debugf("Failed to read packet header.") return nil, err } log.WithField("header_len", len(h)).WithField("header_raw", h).Debug("Read packet header.") - p := make([]byte, h.Size()) - if _, err = io.ReadFull(conn, p); err != nil { + if _, err = io.ReadFull(transport, p); err != nil { log.WithError(err).Debugf("Failed to read packet payload.") return nil, err } @@ -612,6 +454,8 @@ func (mt *ManagedTransport) logRecv(b uint64) { atomic.AddUint32(&mt.logUpdates, 1) } +// logMod flushes the number of operations performed in this transport +// and returns true if it was bigger than 0 func (mt *ManagedTransport) logMod() bool { if ops := atomic.SwapUint32(&mt.logUpdates, 0); ops > 0 { mt.log.Infof("entry log: recording %d operations", ops) @@ -620,6 +464,16 @@ func (mt *ManagedTransport) logMod() bool { return false } +// records this transport's log, in case there is data to be logged +func (mt *ManagedTransport) recordLog() { + if !mt.logMod() { + return + } + if err := mt.ls.Record(mt.Entry.ID, mt.LogEntry); err != nil { + log.WithError(err).Warn("Failed to record log entry.") + } +} + // Remote returns the remote public key. func (mt *ManagedTransport) Remote() cipher.PubKey { return mt.rPK } diff --git a/pkg/transport/manager.go b/pkg/transport/manager.go index e5150a8dfc..7925339637 100644 --- a/pkg/transport/manager.go +++ b/pkg/transport/manager.go @@ -19,12 +19,22 @@ import ( "github.com/skycoin/skywire/pkg/transport/network/addrresolver" ) +const reconnectPhaseDelay = 10 * time.Second +const reconnectRemoteTimeout = 3 * time.Second + +// PersistentTransports is a persistent transports description +type PersistentTransports struct { + PK cipher.PubKey `json:"pk"` + NetType network.Type `json:"type"` +} + // ManagerConfig configures a Manager. type ManagerConfig struct { - PubKey cipher.PubKey - SecKey cipher.SecKey - DiscoveryClient DiscoveryClient - LogStore LogStore + PubKey cipher.PubKey + SecKey cipher.SecKey + DiscoveryClient DiscoveryClient + LogStore LogStore + PersistentTransports []PersistentTransports } // Manager manages Transports. @@ -35,13 +45,10 @@ type Manager struct { arClient addrresolver.APIClient ebc *appevent.Broadcaster - readCh chan routing.Packet - mx sync.RWMutex - wgMu sync.Mutex - wg sync.WaitGroup - serveOnce sync.Once // ensure we only serve once. - closeOnce sync.Once // ensure we only close once. - done chan struct{} + readCh chan routing.Packet + mx sync.RWMutex + wg sync.WaitGroup + done chan struct{} factory network.ClientFactory netClients map[network.Type]network.Client @@ -67,20 +74,51 @@ func NewManager(log *logging.Logger, arClient addrresolver.APIClient, ebc *appev return tm, nil } -// Serve runs listening loop across all registered factories. +// Serve starts all network clients and starts accepting connections +// from all those clients +// Additionally, it runs cleanup and persistent reconnection routines func (tm *Manager) Serve(ctx context.Context) { - tm.serveOnce.Do(func() { - tm.serve(ctx) - }) -} - -func (tm *Manager) serve(ctx context.Context) { tm.initClients() tm.runClients(ctx) - tm.initTransports(ctx) + // for cleanup and reconnect goroutines + tm.wg.Add(2) + go tm.cleanupTransports(ctx) + go tm.runReconnectPersistent(ctx) tm.Logger.Info("transport manager is serving.") } +func (tm *Manager) runReconnectPersistent(ctx context.Context) { + defer tm.wg.Done() + ticker := time.NewTicker(reconnectPhaseDelay) + tm.reconnectPersistent(ctx) + for { + select { + case <-ticker.C: + tm.reconnectPersistent(ctx) + // wait full timeout no matter how long the last phase took + ticker = time.NewTicker(reconnectPhaseDelay) + case <-tm.done: + case <-ctx.Done(): + return + } + } +} + +func (tm *Manager) reconnectPersistent(ctx context.Context) { + for _, remote := range tm.Conf.PersistentTransports { + tm.Logger.Debugf("Reconnecting to persistent transport to %s, type %s", remote.PK, remote.NetType) + deadlined, cancel := context.WithTimeout(ctx, reconnectRemoteTimeout) + _, err := tm.saveTransport(deadlined, remote.PK, remote.NetType, LabelUser) + if err != nil { + tm.Logger.WithError(err). + WithField("remote_pk", remote.PK). + WithField("network_type", remote.NetType). + Warnf("Cannot connect to persistent remote") + } + cancel() + } +} + func (tm *Manager) initClients() { acceptedNetworks := []network.Type{network.STCP, network.STCPR, network.SUDPH, network.DMSG} for _, netType := range acceptedNetworks { @@ -111,15 +149,47 @@ func (tm *Manager) runClients(ctx context.Context) { return } tm.Logger.Infof("listening on network: %s", client.Type()) - tm.wgMu.Lock() - tm.wg.Add(1) - tm.wgMu.Unlock() - go tm.acceptTransports(ctx, lis) + if client.Type() != network.DMSG { + tm.wg.Add(1) + } + go tm.acceptTransports(ctx, lis, client.Type()) } } -func (tm *Manager) acceptTransports(ctx context.Context, lis network.Listener) { +func (tm *Manager) cleanupTransports(ctx context.Context) { defer tm.wg.Done() + ticker := time.NewTicker(1 * time.Second) + for { + select { + case <-ticker.C: + tm.mx.Lock() + tm.Logger.Debugf("Locked in cleanup") + var toDelete []*ManagedTransport + for _, tp := range tm.tps { + if tp.IsClosed() { + toDelete = append(toDelete, tp) + } + } + for _, tp := range toDelete { + delete(tm.tps, tp.Entry.ID) + } + tm.mx.Unlock() + tm.Logger.Debugf("Unlocked in cleanup") + if len(toDelete) > 0 { + tm.Logger.Infof("Deleted %d unused transport entries", len(toDelete)) + } + case <-ctx.Done(): + case <-tm.done: + return + } + } +} + +func (tm *Manager) acceptTransports(ctx context.Context, lis network.Listener, t network.Type) { + // we do not close dmsg client explicitly, so we don't have to wait for it to finish + if t != network.DMSG { + defer tm.wg.Done() + } for { select { case <-ctx.Done(): @@ -127,7 +197,7 @@ func (tm *Manager) acceptTransports(ctx context.Context, lis network.Listener) { return default: if err := tm.acceptTransport(ctx, lis); err != nil { - tm.Logger.Warnf("Failed to accept connection: %v", err) + tm.Logger.Warnf("Failed to accept transport: %v", err) if errors.Is(err, io.ErrClosedPipe) { return } @@ -145,29 +215,6 @@ func (tm *Manager) Networks() []string { return nets } -func (tm *Manager) initTransports(ctx context.Context) { - - entries, err := tm.Conf.DiscoveryClient.GetTransportsByEdge(ctx, tm.Conf.PubKey) - if err != nil { - log.Warnf("No transports found for local visor: %v", err) - } - tm.Logger.Debugf("Initializing %d transports", len(entries)) - for _, entry := range entries { - tm.Logger.Debugf("Initializing TP %v", *entry.Entry) - var ( - tpType = entry.Entry.Type - remote = entry.Entry.RemoteEdge(tm.Conf.PubKey) - tpID = entry.Entry.ID - ) - isInitiator := tm.Conf.PubKey == entry.Entry.Edges[0] - if _, err := tm.saveTransport(ctx, remote, isInitiator, tpType, entry.Entry.Label); err != nil { - tm.Logger.Warnf("INIT: failed to init tp: type(%s) remote(%s) tpID(%s)", tpType, remote, tpID) - } else { - tm.Logger.Debugf("Successfully initialized TP %v", *entry.Entry) - } - } -} - // Stcpr returns stcpr client func (tm *Manager) Stcpr() (network.Client, bool) { c, ok := tm.netClients[network.STCP] @@ -175,15 +222,17 @@ func (tm *Manager) Stcpr() (network.Client, bool) { } func (tm *Manager) acceptTransport(ctx context.Context, lis network.Listener) error { - conn, err := lis.AcceptConn() // TODO: tcp panic. + transport, err := lis.AcceptTransport() // TODO: tcp panic. if err != nil { return err } - tm.Logger.Infof("recv transport connection request: type(%s) remote(%s)", lis.Network(), conn.RemotePK()) + tm.Logger.Infof("recv transport request: type(%s) remote(%s)", lis.Network(), transport.RemotePK()) tm.mx.Lock() + tm.Logger.Debugf("Locked in accept") defer tm.mx.Unlock() + defer tm.Logger.Debugf("Unlocked in accept") if tm.isClosing() { return errors.New("transport.Manager is closing. Skipping incoming transport") @@ -191,11 +240,11 @@ func (tm *Manager) acceptTransport(ctx context.Context, lis network.Listener) er // For transports for purpose(data). - tpID := tm.tpIDFromPK(conn.RemotePK(), conn.Network()) + tpID := tm.tpIDFromPK(transport.RemotePK(), transport.Network()) - client, ok := tm.netClients[network.Type(conn.Network())] + client, ok := tm.netClients[network.Type(transport.Network())] if !ok { - return fmt.Errorf("client not found for the type %s", conn.Network()) + return fmt.Errorf("client not found for the type %s", transport.Network()) } mTp, ok := tm.tps[tpID] @@ -206,17 +255,19 @@ func (tm *Manager) acceptTransport(ctx context.Context, lis network.Listener) er client: client, DC: tm.Conf.DiscoveryClient, LS: tm.Conf.LogStore, - RemotePK: conn.RemotePK(), + RemotePK: transport.RemotePK(), TransportLabel: LabelUser, ebc: tm.ebc, - }, false) + }) go func() { mTp.Serve(tm.readCh) tm.mx.Lock() + tm.Logger.Debugf("Locked in deleting after serve in accept") delete(tm.tps, mTp.Entry.ID) tm.mx.Unlock() + tm.Logger.Debugf("Locked in deleting after serve in accept") }() tm.tps[tpID] = mTp @@ -224,12 +275,11 @@ func (tm *Manager) acceptTransport(ctx context.Context, lis network.Listener) er tm.Logger.Debugln("TP found, accepting...") } - if err := mTp.Accept(ctx, conn); err != nil { + if err := mTp.Accept(ctx, transport); err != nil { return err } - tm.Logger.Infof("accepted tp: type(%s) remote(%s) tpID(%s) new(%v)", lis.Network(), conn.RemotePK(), tpID, !ok) - + tm.Logger.Infof("accepted tp: type(%s) remote(%s) tpID(%s) new(%v)", lis.Network(), transport.RemotePK(), tpID, !ok) return nil } @@ -242,6 +292,8 @@ var ErrUnknownNetwork = errors.New("unknown network type") // IsKnownNetwork returns true when netName is a known // network type that we are able to operate in func (tm *Manager) IsKnownNetwork(netName network.Type) bool { + tm.mx.RLock() + defer tm.mx.RUnlock() _, ok := tm.netClients[netName] return ok } @@ -264,6 +316,8 @@ func (tm *Manager) GetTransport(remote cipher.PubKey, netType network.Type) (*Ma // GetTransportByID retrieves transport by its ID, if it exists func (tm *Manager) GetTransportByID(tpID uuid.UUID) (*ManagedTransport, error) { + tm.mx.RLock() + defer tm.mx.RUnlock() tp, ok := tm.tps[tpID] if !ok { return nil, ErrNotFound @@ -290,7 +344,7 @@ func (tm *Manager) SaveTransport(ctx context.Context, remote cipher.PubKey, netT return nil, io.ErrClosedPipe } for { - mTp, err := tm.saveTransport(ctx, remote, true, netType, label) + mTp, err := tm.saveTransport(ctx, remote, netType, label) if err != nil { if err == ErrNotServing { @@ -302,9 +356,7 @@ func (tm *Manager) SaveTransport(ctx context.Context, remote cipher.PubKey, netT } } -func (tm *Manager) saveTransport(ctx context.Context, remote cipher.PubKey, initiator bool, netType network.Type, label Label) (*ManagedTransport, error) { - tm.mx.Lock() - defer tm.mx.Unlock() +func (tm *Manager) saveTransport(ctx context.Context, remote cipher.PubKey, netType network.Type, label Label) (*ManagedTransport, error) { if !tm.IsKnownNetwork(netType) { return nil, ErrUnknownNetwork } @@ -312,13 +364,15 @@ func (tm *Manager) saveTransport(ctx context.Context, remote cipher.PubKey, init tpID := tm.tpIDFromPK(remote, netType) tm.Logger.Debugf("Initializing TP with ID %s", tpID) - oldMTp, ok := tm.tps[tpID] - if ok { + oldMTp, err := tm.GetTransportByID(tpID) + if err == nil { tm.Logger.Debug("Found an old mTp from internal map.") return oldMTp, nil } + tm.mx.RLock() client, ok := tm.netClients[network.Type(netType)] + tm.mx.RUnlock() if !ok { return nil, fmt.Errorf("client not found for the type %s", netType) } @@ -330,43 +384,25 @@ func (tm *Manager) saveTransport(ctx context.Context, remote cipher.PubKey, init LS: tm.Conf.LogStore, RemotePK: remote, TransportLabel: label, - }, initiator) - - // todo: do we need this here? Client dial will run resolve anyway - if mTp.Type() == network.STCPR && tm.arClient != nil { - visorData, err := tm.arClient.Resolve(context.Background(), string(mTp.Type()), remote) - if err == nil { - mTp.remoteAddr = visorData.RemoteAddr - } else { - if err != addrresolver.ErrNoEntry { - return nil, fmt.Errorf("failed to resolve %s: %w", remote, err) - } - } - } + }) tm.Logger.Debugf("Dialing transport to %v via %v", mTp.Remote(), mTp.client.Type()) - if err := mTp.Dial(ctx); err != nil { + errCh := make(chan error) + go mTp.DialAsync(ctx, errCh) + err = <-errCh + if err != nil { tm.Logger.Debugf("Error dialing transport to %v via %v: %v", mTp.Remote(), mTp.client.Type(), err) - // The first occurs when an old tp is returned by 'tm.saveTransport', meaning a tp of the same transport ID was - // just deleted (and has not yet fully closed). Hence, we should close and delete the old tp and try again. - // The second occurs when the tp type is STCP and the requested remote PK is not associated with an IP address in - // the STCP table. There is no point in retrying as a connection would be impossible, so we just return an - // error. - if err == ErrNotServing || errors.Is(err, network.ErrStcpEntryNotFound) { - if closeErr := mTp.Close(); closeErr != nil { - tm.Logger.WithError(err).Warn("Closing mTp returns non-nil error.") - } - tm.deleteTransport(mTp.Entry.ID) + if closeErr := mTp.Close(); closeErr != nil { + tm.Logger.WithError(err).Warn("Error closing transport") } - tm.Logger.WithError(err).Warn("Underlying transport connection is not established.") return nil, err } - - go func() { - mTp.Serve(tm.readCh) - tm.deleteTransport(mTp.Entry.ID) - }() + go mTp.Serve(tm.readCh) + tm.mx.Lock() + tm.Logger.Debug("Locked in saveTransport") tm.tps[tpID] = mTp + tm.mx.Unlock() + tm.Logger.Debug("Unlocked in saveTransport") tm.Logger.Infof("saved transport: remote(%s) type(%s) tpID(%s)", remote, netType, tpID) return mTp, nil } @@ -379,8 +415,9 @@ func (tm *Manager) STCPRRemoteAddrs() []string { defer tm.mx.RUnlock() for _, tp := range tm.tps { - if tp.Entry.Type == network.STCPR && tp.remoteAddr != "" { - addrs = append(addrs, tp.remoteAddr) + remoteRaw := tp.transport.RemoteRawAddr().String() + if tp.Entry.Type == network.STCPR && remoteRaw != "" { + addrs = append(addrs, remoteRaw) } } @@ -390,36 +427,33 @@ func (tm *Manager) STCPRRemoteAddrs() []string { // DeleteTransport deregisters the Transport of Transport ID in transport discovery and deletes it locally. func (tm *Manager) DeleteTransport(id uuid.UUID) { tm.mx.Lock() + tm.Logger.Debugf("Locked in DeleteTransport") defer tm.mx.Unlock() + defer tm.Logger.Debug("Unlocked in DeleteTransport") if tm.isClosing() { return } - // Deregister transport before closing the underlying connection. + // Deregister transport before closing the underlying transport. if tp, ok := tm.tps[id]; ok { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() - // Deregister transport. + // todo: this should probably be moved to tp.close because we want to deregister + // a transport completely and not deal with transport statuses at all if err := tm.Conf.DiscoveryClient.DeleteTransport(ctx, id); err != nil { tm.Logger.WithError(err).Warnf("Failed to deregister transport of ID %s from discovery.", id) } else { tm.Logger.Infof("De-registered transport of ID %s from discovery.", id) } - // Close underlying connection. + // Close underlying transport. tp.close() delete(tm.tps, id) } } -func (tm *Manager) deleteTransport(id uuid.UUID) { - tm.mx.Lock() - defer tm.mx.Unlock() - delete(tm.tps, id) -} - // ReadPacket reads data packets from routes. func (tm *Manager) ReadPacket() (routing.Packet, error) { p, ok := <-tm.readCh @@ -457,33 +491,30 @@ func (tm *Manager) Local() cipher.PubKey { return tm.Conf.PubKey } -// Close closes opened transports and registered factories. -func (tm *Manager) Close() error { - tm.closeOnce.Do(tm.close) - return nil -} - -func (tm *Manager) close() { +// Close closes opened transports, network clients +// and all service tasks of transport manager +func (tm *Manager) Close() { + select { + case <-tm.done: + return + default: + } + close(tm.done) tm.Logger.Info("transport manager is closing.") defer tm.Logger.Info("transport manager closed.") - tm.mx.Lock() defer tm.mx.Unlock() - close(tm.done) - - statuses := make([]*Status, 0, len(tm.tps)) for _, tr := range tm.tps { tr.close() } - if _, err := tm.Conf.DiscoveryClient.UpdateStatuses(context.Background(), statuses...); err != nil { - tm.Logger.Warnf("failed to update transport statuses: %v", err) + for _, client := range tm.netClients { + err := client.Close() + if err != nil { + tm.Logger.WithError(err).Warnf("Failed to close %s client", client.Type()) + } } - - tm.wgMu.Lock() tm.wg.Wait() - tm.wgMu.Unlock() - close(tm.readCh) } diff --git a/pkg/transport/network/client.go b/pkg/transport/network/client.go index b1e3d05328..53f1810269 100644 --- a/pkg/transport/network/client.go +++ b/pkg/transport/network/client.go @@ -21,25 +21,25 @@ import ( // Client provides access to skywire network // It allows dialing remote visors using their public keys, as -// well as listening to incoming connections from other visors +// well as listening to incoming transports from other visors type Client interface { // Dial remote visor, that is listening on the given skywire port - Dial(ctx context.Context, remote cipher.PubKey, port uint16) (Conn, error) + Dial(ctx context.Context, remote cipher.PubKey, port uint16) (Transport, error) // Start initializes the client and prepares it for listening. It is required - // to be called to start accepting connections + // to be called to start accepting transports Start() error // Listen on the given skywire port. This can be called multiple times // for different ports for the same client. It requires Start to be called - // to start accepting connections + // to start accepting transports Listen(port uint16) (Listener, error) // LocalAddr returns the actual network address under which this client listens to - // new connections + // new transports LocalAddr() (net.Addr, error) // PK returns public key of the visor running this client PK() cipher.PubKey // SK returns secret key of the visor running this client SK() cipher.SecKey - // Close the client, stop accepting connections. Connections returned by the + // Close the client, stop accepting transports. Connections returned by the // client should be closed manually Close() error // Type returns skywire network type in which this client operates @@ -93,9 +93,9 @@ func (f *ClientFactory) MakeClient(netType Type) (Client, error) { // The main responsibility is handshaking over incoming // and outgoing raw network connections, obtaining remote information // from the handshake and wrapping raw connections with skywire -// connection type. -// Incoming connections also directed to appropriate listener using -// skywire port, obtained from incoming connection handshake +// transport type. +// Incoming transports also directed to appropriate listener using +// skywire port, obtained from incoming transport handshake type genericClient struct { lPK cipher.PubKey lSK cipher.SecKey @@ -114,10 +114,10 @@ type genericClient struct { closeOnce sync.Once } -// initConnection will initialize skywire connection over opened raw connection to +// initTransport will initialize skywire transport over opened raw connection to // the remote client // The process will perform handshake over raw connection -func (c *genericClient) initConnection(ctx context.Context, conn net.Conn, rPK cipher.PubKey, rPort uint16) (*conn, error) { +func (c *genericClient) initTransport(ctx context.Context, conn net.Conn, rPK cipher.PubKey, rPort uint16) (*transport, error) { lPort, freePort, err := c.porter.ReserveEphemeral(ctx) if err != nil { return nil, err @@ -126,20 +126,20 @@ func (c *genericClient) initConnection(ctx context.Context, conn net.Conn, rPK c remoteAddr := conn.RemoteAddr() c.log.Infof("Performing handshake with %v", remoteAddr) hs := handshake.InitiatorHandshake(c.lSK, lAddr, rAddr) - return c.wrapConn(conn, hs, true, freePort) + return c.wrapTransport(conn, hs, true, freePort) } -// acceptConnections continuously accepts incoming connections that come from given listener +// acceptTransports continuously accepts incoming transports that come from given listener // these connections will be properly handshaked and passed to an appropriate skywire listener // using skywire port -func (c *genericClient) acceptConnections(lis net.Listener) { +func (c *genericClient) acceptTransports(lis net.Listener) { c.mu.Lock() c.connListener = lis close(c.listenStarted) c.mu.Unlock() c.log.Infof("listening on addr: %v", c.connListener.Addr()) for { - if err := c.acceptConn(); err != nil { + if err := c.acceptTransport(); err != nil { if errors.Is(err, io.EOF) { continue // likely it's a dummy connection from service discovery } @@ -152,27 +152,27 @@ func (c *genericClient) acceptConnections(lis net.Listener) { } } -// wrapConn performs handshake over provided raw connection and wraps it in -// network.Conn type using the data obtained from handshake process -func (c *genericClient) wrapConn(rawConn net.Conn, hs handshake.Handshake, initiator bool, onClose func()) (*conn, error) { - conn, err := doHandshake(rawConn, hs, c.netType, c.log) +// wrapTransport performs handshake over provided raw connection and wraps it in +// network.Transport type using the data obtained from handshake process +func (c *genericClient) wrapTransport(rawConn net.Conn, hs handshake.Handshake, initiator bool, onClose func()) (*transport, error) { + transport, err := doHandshake(rawConn, hs, c.netType, c.log) if err != nil { onClose() return nil, err } - conn.freePort = onClose - c.log.Infof("Sent handshake to %v, local addr %v, remote addr %v", rawConn.RemoteAddr(), conn.lAddr, conn.rAddr) - if err := conn.encrypt(c.lPK, c.lSK, initiator); err != nil { + transport.freePort = onClose + c.log.Infof("Sent handshake to %v, local addr %v, remote addr %v", rawConn.RemoteAddr(), transport.lAddr, transport.rAddr) + if err := transport.encrypt(c.lPK, c.lSK, initiator); err != nil { return nil, err } - return conn, nil + return transport, nil } -// acceptConn accepts new connection in underlying raw network listener, +// acceptConn accepts new transport in underlying raw network listener, // performs handshake, and using the data from the handshake wraps // connection and delivers it to the appropriate listener. -// The listener is chosen using skywire port from the incoming visor connection -func (c *genericClient) acceptConn() error { +// The listener is chosen using skywire port from the incoming visor transport +func (c *genericClient) acceptTransport() error { if c.isClosed() { return io.ErrClosedPipe } @@ -185,15 +185,15 @@ func (c *genericClient) acceptConn() error { onClose := func() {} hs := handshake.ResponderHandshake(handshake.MakeF2PortChecker(c.checkListener)) - wrappedConn, err := c.wrapConn(conn, hs, false, onClose) + wrappedTransport, err := c.wrapTransport(conn, hs, false, onClose) if err != nil { return err } - lis, err := c.getListener(wrappedConn.lAddr.Port) + lis, err := c.getListener(wrappedTransport.lAddr.Port) if err != nil { return err } - return lis.introduce(wrappedConn) + return lis.introduce(wrappedTransport) } // LocalAddr returns local address. This is network address the client diff --git a/pkg/transport/network/connection.go b/pkg/transport/network/connection.go index 2bd5b5d482..b1256d57ce 100644 --- a/pkg/transport/network/connection.go +++ b/pkg/transport/network/connection.go @@ -15,45 +15,51 @@ import ( const encryptHSTimout = 5 * time.Second -// Conn represents a network connection between two visors in skywire network -// This connection wraps raw network connection and is ready to use for sending data. +// Transport represents a network connection between two visors in skywire network +// This transport wraps raw network connection and is ready to use for sending data. // It also provides skywire-specific methods on top of net.Conn -type Conn interface { +type Transport interface { net.Conn - // LocalPK returns local public key of connection + // LocalPK returns local public key of transport LocalPK() cipher.PubKey - // RemotePK returns remote public key of connection + // RemotePK returns remote public key of transport RemotePK() cipher.PubKey - // LocalPort returns local skywire port of connection + // LocalPort returns local skywire port of transport // This is not underlying OS port, but port within skywire network LocalPort() uint16 - // RemotePort returns remote skywire port of connection + // RemotePort returns remote skywire port of transport // This is not underlying OS port, but port within skywire network RemotePort() uint16 - // Network returns network of connection + // LocalRawAddr returns local raw network address (not skywire address) + LocalRawAddr() net.Addr + + // RemoteRawAddr returns remote raw network address (not skywire address) + RemoteRawAddr() net.Addr + + // Network returns network of transport Network() Type } -type conn struct { +type transport struct { net.Conn - lAddr, rAddr dmsg.Addr - freePort func() - connType Type + lAddr, rAddr dmsg.Addr + freePort func() + transportType Type } // DoHandshake performs given handshake over given raw connection and wraps -// connection in network.Conn -func DoHandshake(rawConn net.Conn, hs handshake.Handshake, netType Type, log *logging.Logger) (Conn, error) { +// connection in network.Transport +func DoHandshake(rawConn net.Conn, hs handshake.Handshake, netType Type, log *logging.Logger) (Transport, error) { return doHandshake(rawConn, hs, netType, log) } // handshake performs given handshake over given raw connection and wraps -// connection in network.conn -func doHandshake(rawConn net.Conn, hs handshake.Handshake, netType Type, log *logging.Logger) (*conn, error) { +// connection in network.transport +func doHandshake(rawConn net.Conn, hs handshake.Handshake, netType Type, log *logging.Logger) (*transport, error) { lAddr, rAddr, err := hs(rawConn, time.Now().Add(handshake.Timeout)) if err != nil { if err := rawConn.Close(); err != nil { @@ -61,11 +67,11 @@ func doHandshake(rawConn net.Conn, hs handshake.Handshake, netType Type, log *lo } return nil, err } - handshakedConn := &conn{Conn: rawConn, lAddr: lAddr, rAddr: rAddr, connType: netType} + handshakedConn := &transport{Conn: rawConn, lAddr: lAddr, rAddr: rAddr, transportType: netType} return handshakedConn, nil } -func (c *conn) encrypt(lPK cipher.PubKey, lSK cipher.SecKey, initator bool) error { +func (c *transport) encrypt(lPK cipher.PubKey, lSK cipher.SecKey, initator bool) error { config := noise.Config{ LocalPK: lPK, LocalSK: lSK, @@ -98,17 +104,27 @@ func EncryptConn(config noise.Config, conn net.Conn) (net.Conn, error) { } // LocalAddr implements net.Conn -func (c *conn) LocalAddr() net.Addr { +func (c *transport) LocalAddr() net.Addr { return c.lAddr } // RemoteAddr implements net.Conn -func (c *conn) RemoteAddr() net.Addr { +func (c *transport) RemoteAddr() net.Addr { return c.rAddr } +// LocalAddr implements net.Conn +func (c *transport) LocalRawAddr() net.Addr { + return c.Conn.LocalAddr() +} + +// RemoteAddr implements net.Conn +func (c *transport) RemoteRawAddr() net.Addr { + return c.Conn.RemoteAddr() +} + // Close implements net.Conn -func (c *conn) Close() error { +func (c *transport) Close() error { if c.freePort != nil { c.freePort() } @@ -116,19 +132,19 @@ func (c *conn) Close() error { return c.Conn.Close() } -// LocalPK returns local public key of connection -func (c *conn) LocalPK() cipher.PubKey { return c.lAddr.PK } +// LocalPK returns local public key of transport +func (c *transport) LocalPK() cipher.PubKey { return c.lAddr.PK } -// RemotePK returns remote public key of connection -func (c *conn) RemotePK() cipher.PubKey { return c.rAddr.PK } +// RemotePK returns remote public key of transport +func (c *transport) RemotePK() cipher.PubKey { return c.rAddr.PK } -// LocalPort returns local skywire port of connection +// LocalPort returns local skywire port of transport // This is not underlying OS port, but port within skywire network -func (c *conn) LocalPort() uint16 { return c.lAddr.Port } +func (c *transport) LocalPort() uint16 { return c.lAddr.Port } -// RemotePort returns remote skywire port of connection +// RemotePort returns remote skywire port of transport // This is not underlying OS port, but port within skywire network -func (c *conn) RemotePort() uint16 { return c.rAddr.Port } +func (c *transport) RemotePort() uint16 { return c.rAddr.Port } -// Network returns network of connection -func (c *conn) Network() Type { return c.connType } +// Network returns network of transport +func (c *transport) Network() Type { return c.transportType } diff --git a/pkg/transport/network/dmsg.go b/pkg/transport/network/dmsg.go index f9f97121ec..8b5a074aa2 100644 --- a/pkg/transport/network/dmsg.go +++ b/pkg/transport/network/dmsg.go @@ -28,12 +28,12 @@ func (c *dmsgClientAdapter) LocalAddr() (net.Addr, error) { } // Dial implements Client interface -func (c *dmsgClientAdapter) Dial(ctx context.Context, remote cipher.PubKey, port uint16) (Conn, error) { - conn, err := c.dmsgC.DialStream(ctx, dmsg.Addr{PK: remote, Port: port}) +func (c *dmsgClientAdapter) Dial(ctx context.Context, remote cipher.PubKey, port uint16) (Transport, error) { + transport, err := c.dmsgC.DialStream(ctx, dmsg.Addr{PK: remote, Port: port}) if err != nil { return nil, err } - return &dmsgConnAdapter{conn}, nil + return &dmsgTransportAdapter{transport}, nil } // Start implements Client interface @@ -79,13 +79,13 @@ type dmsgListenerAdapter struct { *dmsg.Listener } -// AcceptConn implements Listener interface -func (lis *dmsgListenerAdapter) AcceptConn() (Conn, error) { +// AcceptTransport implements Listener interface +func (lis *dmsgListenerAdapter) AcceptTransport() (Transport, error) { stream, err := lis.Listener.AcceptStream() if err != nil { return nil, err } - return &dmsgConnAdapter{stream}, nil + return &dmsgTransportAdapter{stream}, nil } // Network implements Listener interface @@ -104,32 +104,42 @@ func (lis *dmsgListenerAdapter) Port() uint16 { } // wrapper around connection returned by dmsg.Client -// that conforms to Conn interface -type dmsgConnAdapter struct { +// that conforms to Transport interface +type dmsgTransportAdapter struct { *dmsg.Stream } -// LocalPK implements Conn interface -func (c *dmsgConnAdapter) LocalPK() cipher.PubKey { +// LocalPK implements Transport interface +func (c *dmsgTransportAdapter) LocalPK() cipher.PubKey { return c.RawLocalAddr().PK } -// RemotePK implements Conn interface -func (c *dmsgConnAdapter) RemotePK() cipher.PubKey { +// RemotePK implements Transport interface +func (c *dmsgTransportAdapter) RemotePK() cipher.PubKey { return c.RawRemoteAddr().PK } -// LocalPort implements Conn interface -func (c *dmsgConnAdapter) LocalPort() uint16 { +// LocalPort implements Transport interface +func (c *dmsgTransportAdapter) LocalPort() uint16 { return c.RawLocalAddr().Port } -// RemotePort implements Conn interface -func (c *dmsgConnAdapter) RemotePort() uint16 { +// RemotePort implements Transport interface +func (c *dmsgTransportAdapter) RemotePort() uint16 { return c.RawRemoteAddr().Port } -// Network implements Conn interface -func (c *dmsgConnAdapter) Network() Type { +// LocalAddr implements Transport interface +func (c *dmsgTransportAdapter) LocalRawAddr() net.Addr { + return c.RawLocalAddr() +} + +// RemoteAddr implements Transport interface +func (c *dmsgTransportAdapter) RemoteRawAddr() net.Addr { + return c.RawRemoteAddr() +} + +// Network implements Transport interface +func (c *dmsgTransportAdapter) Network() Type { return DMSG } diff --git a/pkg/transport/network/listener.go b/pkg/transport/network/listener.go index 96ebb33bde..a761a8a5c9 100644 --- a/pkg/transport/network/listener.go +++ b/pkg/transport/network/listener.go @@ -17,7 +17,7 @@ type Listener interface { PK() cipher.PubKey Port() uint16 Network() Type - AcceptConn() (Conn, error) + AcceptTransport() (Transport, error) } type listener struct { @@ -25,7 +25,7 @@ type listener struct { mx sync.Mutex once sync.Once freePort func() - accept chan *conn + accept chan *transport done chan struct{} network Type } @@ -39,7 +39,7 @@ func newListener(lAddr dmsg.Addr, freePort func(), network Type) *listener { return &listener{ lAddr: lAddr, freePort: freePort, - accept: make(chan *conn), + accept: make(chan *transport), done: make(chan struct{}), network: network, } @@ -47,11 +47,11 @@ func newListener(lAddr dmsg.Addr, freePort func(), network Type) *listener { // Accept implements net.Listener, returns generic net.Conn func (l *listener) Accept() (net.Conn, error) { - return l.AcceptConn() + return l.AcceptTransport() } -// AcceptConn accepts a skywire connection and returns network.Conn -func (l *listener) AcceptConn() (Conn, error) { +// AcceptTransport accepts a skywire transport and returns network.Transport +func (l *listener) AcceptTransport() (Transport, error) { c, ok := <-l.accept if !ok { return nil, io.ErrClosedPipe @@ -67,8 +67,8 @@ func (l *listener) Close() error { l.mx.Lock() close(l.accept) l.mx.Unlock() - for conn := range l.accept { - conn.Close() //nolint: errcheck, gosec + for transport := range l.accept { + transport.Close() //nolint: errcheck, gosec } l.freePort() }) @@ -96,8 +96,8 @@ func (l *listener) Network() Type { return l.network } -// Introduce is used by Client to introduce a new connection to this Listener -func (l *listener) introduce(conn *conn) error { +// Introduce is used by Client to introduce a new transport to this Listener +func (l *listener) introduce(transport *transport) error { select { case <-l.done: return io.ErrClosedPipe @@ -105,7 +105,7 @@ func (l *listener) introduce(conn *conn) error { l.mx.Lock() defer l.mx.Unlock() select { - case l.accept <- conn: + case l.accept <- transport: return nil case <-l.done: return io.ErrClosedPipe diff --git a/pkg/transport/network/stcp.go b/pkg/transport/network/stcp.go index 0972e04c72..cfc9672628 100644 --- a/pkg/transport/network/stcp.go +++ b/pkg/transport/network/stcp.go @@ -33,7 +33,7 @@ func newStcp(generic *genericClient, table stcp.PKTable) Client { var ErrStcpEntryNotFound = errors.New("entry not found in PK table") // Dial implements Client interface -func (c *stcpClient) Dial(ctx context.Context, rPK cipher.PubKey, rPort uint16) (Conn, error) { +func (c *stcpClient) Dial(ctx context.Context, rPK cipher.PubKey, rPort uint16) (Transport, error) { if c.isClosed() { return nil, io.ErrClosedPipe } @@ -46,13 +46,14 @@ func (c *stcpClient) Dial(ctx context.Context, rPK cipher.PubKey, rPort uint16) return nil, ErrStcpEntryNotFound } c.eb.SendTCPDial(context.Background(), string(STCP), addr) - conn, err := net.Dial("tcp", addr) + dialer := net.Dialer{} + conn, err := dialer.DialContext(ctx, "tcp", addr) if err != nil { return nil, err } c.log.Infof("Dialed %v:%v@%v", rPK, rPort, conn.RemoteAddr()) - return c.initConnection(ctx, conn, rPK, rPort) + return c.initTransport(ctx, conn, rPK, rPort) } // Start implements Client interface @@ -70,5 +71,5 @@ func (c *stcpClient) serve() { c.log.Errorf("Failed to listen on %q: %v", c.listenAddr, err) return } - c.acceptConnections(lis) + c.acceptTransports(lis) } diff --git a/pkg/transport/network/stcpr.go b/pkg/transport/network/stcpr.go index 5444da12f1..8313c31b14 100644 --- a/pkg/transport/network/stcpr.go +++ b/pkg/transport/network/stcpr.go @@ -22,7 +22,7 @@ func newStcpr(resolved *resolvedClient) Client { } // Dial implements interface -func (c *stcprClient) Dial(ctx context.Context, rPK cipher.PubKey, rPort uint16) (Conn, error) { +func (c *stcprClient) Dial(ctx context.Context, rPK cipher.PubKey, rPort uint16) (Transport, error) { if c.isClosed() { return nil, io.ErrClosedPipe } @@ -37,7 +37,7 @@ func (c *stcprClient) Dial(ctx context.Context, rPK cipher.PubKey, rPort uint16) return nil, err } - return c.initConnection(ctx, conn, rPK, rPort) + return c.initTransport(ctx, conn, rPK, rPort) } func (c *stcprClient) dial(ctx context.Context, addr string) (net.Conn, error) { @@ -82,5 +82,5 @@ func (c *stcprClient) serve() { return } c.log.Infof("Successfully bound stcpr to port %s", port) - c.acceptConnections(lis) + c.acceptTransports(lis) } diff --git a/pkg/transport/network/sudph.go b/pkg/transport/network/sudph.go index fc4bef3e33..ccd2eebfae 100644 --- a/pkg/transport/network/sudph.go +++ b/pkg/transport/network/sudph.go @@ -52,7 +52,7 @@ func (c *sudphClient) serve() { c.log.Errorf("Failed to listen on random port: %v", err) return } - c.acceptConnections(lis) + c.acceptTransports(lis) } // listen @@ -106,7 +106,7 @@ func (c *sudphClient) acceptAddresses(conn net.PacketConn, addrCh <-chan addrres } // Dial implements interface -func (c *sudphClient) Dial(ctx context.Context, rPK cipher.PubKey, rPort uint16) (Conn, error) { +func (c *sudphClient) Dial(ctx context.Context, rPK cipher.PubKey, rPort uint16) (Transport, error) { if c.isClosed() { return nil, io.ErrClosedPipe } @@ -116,7 +116,7 @@ func (c *sudphClient) Dial(ctx context.Context, rPK cipher.PubKey, rPort uint16) return nil, err } - return c.initConnection(ctx, conn, rPK, rPort) + return c.initTransport(ctx, conn, rPK, rPort) } func (c *sudphClient) dialWithTimeout(ctx context.Context, addr string) (net.Conn, error) { diff --git a/pkg/transport/setup/rpc.go b/pkg/transport/setup/rpc.go index 6252261578..38fb831cbd 100644 --- a/pkg/transport/setup/rpc.go +++ b/pkg/transport/setup/rpc.go @@ -35,7 +35,6 @@ type TransportResponse struct { Local cipher.PubKey Remote cipher.PubKey Type network.Type - IsUp bool } // BoolResponse is a simple boolean wrapped in structure for RPC responses @@ -56,7 +55,6 @@ func (gw *TransportGateway) AddTransport(req TransportRequest, res *TransportRes res.Local = gw.tm.Local() res.Remote = tp.Remote() res.Type = tp.Type() - res.IsUp = tp.IsUp() return nil } @@ -87,7 +85,6 @@ func (gw *TransportGateway) GetTransports(_ struct{}, res *[]TransportResponse) Local: gw.tm.Local(), Remote: tp.Remote(), Type: tp.Type(), - IsUp: tp.IsUp(), } *res = append(*res, tResp) } diff --git a/pkg/transport/tpdclient/client.go b/pkg/transport/tpdclient/client.go index 9b168b14a4..28b6d4fa67 100644 --- a/pkg/transport/tpdclient/client.go +++ b/pkg/transport/tpdclient/client.go @@ -102,7 +102,7 @@ func (c *apiClient) RegisterTransports(ctx context.Context, entries ...*transpor } // GetTransportByID returns Transport for corresponding ID. -func (c *apiClient) GetTransportByID(ctx context.Context, id uuid.UUID) (*transport.EntryWithStatus, error) { +func (c *apiClient) GetTransportByID(ctx context.Context, id uuid.UUID) (*transport.Entry, error) { resp, err := c.Get(ctx, fmt.Sprintf("/transports/id:%s", id.String())) if err != nil { return nil, err @@ -118,7 +118,7 @@ func (c *apiClient) GetTransportByID(ctx context.Context, id uuid.UUID) (*transp return nil, err } - entry := &transport.EntryWithStatus{} + entry := &transport.Entry{} if err := json.NewDecoder(resp.Body).Decode(entry); err != nil { return nil, fmt.Errorf("json: %w", err) } @@ -127,7 +127,7 @@ func (c *apiClient) GetTransportByID(ctx context.Context, id uuid.UUID) (*transp } // GetTransportsByEdge returns all Transports registered for the edge. -func (c *apiClient) GetTransportsByEdge(ctx context.Context, pk cipher.PubKey) ([]*transport.EntryWithStatus, error) { +func (c *apiClient) GetTransportsByEdge(ctx context.Context, pk cipher.PubKey) ([]*transport.Entry, error) { resp, err := c.Get(ctx, fmt.Sprintf("/transports/edge:%s", pk)) if err != nil { return nil, err @@ -143,7 +143,7 @@ func (c *apiClient) GetTransportsByEdge(ctx context.Context, pk cipher.PubKey) ( return nil, err } - var entries []*transport.EntryWithStatus + var entries []*transport.Entry if err := json.NewDecoder(resp.Body).Decode(&entries); err != nil { return nil, fmt.Errorf("json: %w", err) } @@ -167,15 +167,10 @@ func (c *apiClient) DeleteTransport(ctx context.Context, id uuid.UUID) error { return httputil.ErrorFromResp(resp) } -// UpdateStatuses updates statuses of transports in discovery. -func (c *apiClient) UpdateStatuses(ctx context.Context, statuses ...*transport.Status) ([]*transport.EntryWithStatus, error) { - if len(statuses) == 0 { - return nil, nil - } - - resp, err := c.Post(ctx, "/statuses", statuses) +func (c *apiClient) Health(ctx context.Context) (int, error) { + resp, err := c.Get(ctx, "/health") if err != nil { - return nil, err + return 0, err } defer func() { @@ -184,22 +179,13 @@ func (c *apiClient) UpdateStatuses(ctx context.Context, statuses ...*transport.S } }() - if err := httputil.ErrorFromResp(resp); err != nil { - return nil, err - } - - var entries []*transport.EntryWithStatus - if err := json.NewDecoder(resp.Body).Decode(&entries); err != nil { - return nil, fmt.Errorf("json: %w", err) - } - - return entries, nil + return resp.StatusCode, nil } -func (c *apiClient) Health(ctx context.Context) (int, error) { - resp, err := c.Get(ctx, "/health") +func (c *apiClient) HeartBeat(ctx context.Context, id uuid.UUID) error { + resp, err := c.Post(ctx, fmt.Sprintf("/heartbeat/id:%s", id.String()), nil) if err != nil { - return 0, err + return err } defer func() { @@ -208,5 +194,5 @@ func (c *apiClient) Health(ctx context.Context) (int, error) { } }() - return resp.StatusCode, nil + return nil } diff --git a/pkg/transport/tpdclient/client_test.go b/pkg/transport/tpdclient/client_test.go index d081df4512..988915fed4 100644 --- a/pkg/transport/tpdclient/client_test.go +++ b/pkg/transport/tpdclient/client_test.go @@ -41,9 +41,8 @@ var testPubKey, testSecKey = cipher.GenerateKeyPair() func newTestEntry() *transport.Entry { pk1, _ := cipher.GenerateKeyPair() entry := &transport.Entry{ - ID: transport.MakeTransportID(pk1, testPubKey, "dmsg"), - Type: "dmsg", - Public: true, + ID: transport.MakeTransportID(pk1, testPubKey, "dmsg"), + Type: "dmsg", } entry.Edges[0] = pk1 entry.Edges[1] = testPubKey @@ -187,59 +186,36 @@ func TestRegisterTransports(t *testing.T) { } func TestGetTransportByID(t *testing.T) { - entry := &transport.EntryWithStatus{Entry: newTestEntry(), IsUp: true} + entry := newTestEntry() srv := httptest.NewServer(authHandler(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - assert.Equal(t, fmt.Sprintf("/transports/id:%s", entry.Entry.ID), r.URL.String()) + assert.Equal(t, fmt.Sprintf("/transports/id:%s", entry.ID), r.URL.String()) require.NoError(t, json.NewEncoder(w).Encode(entry)) }))) defer srv.Close() c, err := NewHTTP(srv.URL, testPubKey, testSecKey) require.NoError(t, err) - resEntry, err := c.GetTransportByID(context.Background(), entry.Entry.ID) + resEntry, err := c.GetTransportByID(context.Background(), entry.ID) require.NoError(t, err) - assert.Equal(t, entry.Entry, resEntry.Entry) - assert.True(t, entry.IsUp) + assert.Equal(t, entry, resEntry) } func TestGetTransportsByEdge(t *testing.T) { - entry := &transport.EntryWithStatus{Entry: newTestEntry(), IsUp: true} + entry := newTestEntry() srv := httptest.NewServer(authHandler(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - assert.Equal(t, fmt.Sprintf("/transports/edge:%s", entry.Entry.Edges[0]), r.URL.String()) - require.NoError(t, json.NewEncoder(w).Encode([]*transport.EntryWithStatus{entry})) + assert.Equal(t, fmt.Sprintf("/transports/edge:%s", entry.Edges[0]), r.URL.String()) + require.NoError(t, json.NewEncoder(w).Encode([]*transport.Entry{entry})) }))) defer srv.Close() c, err := NewHTTP(srv.URL, testPubKey, testSecKey) require.NoError(t, err) - entries, err := c.GetTransportsByEdge(context.Background(), entry.Entry.Edges[0]) + entries, err := c.GetTransportsByEdge(context.Background(), entry.Edges[0]) require.NoError(t, err) require.Len(t, entries, 1) - assert.Equal(t, entry.Entry, entries[0].Entry) - assert.True(t, entries[0].IsUp) -} - -func TestUpdateStatuses(t *testing.T) { - entry := &transport.EntryWithStatus{Entry: newTestEntry(), IsUp: true} - srv := httptest.NewServer(authHandler(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - assert.Equal(t, "/statuses", r.URL.String()) - statuses := make([]*transport.Status, 0) - require.NoError(t, json.NewDecoder(r.Body).Decode(&statuses)) - require.Len(t, statuses, 1) - assert.Equal(t, entry.Entry.ID, statuses[0].ID) - require.NoError(t, json.NewEncoder(w).Encode([]*transport.EntryWithStatus{entry})) - }))) - defer srv.Close() - - c, err := NewHTTP(srv.URL, testPubKey, testSecKey) - require.NoError(t, err) - entries, err := c.UpdateStatuses(context.Background(), &transport.Status{ID: entry.Entry.ID, IsUp: false}) - require.NoError(t, err) - - require.Len(t, entries, 1) - assert.Equal(t, entry.Entry, entries[0].Entry) + assert.Equal(t, entry, entries[0]) } func authHandler(t *testing.T, next http.Handler) http.Handler { diff --git a/pkg/visor/api.go b/pkg/visor/api.go index 7ac4dad7b5..6741ff31df 100644 --- a/pkg/visor/api.go +++ b/pkg/visor/api.go @@ -55,8 +55,8 @@ type API interface { AddTransport(remote cipher.PubKey, tpType string, public bool, timeout time.Duration) (*TransportSummary, error) RemoveTransport(tid uuid.UUID) error - DiscoverTransportsByPK(pk cipher.PubKey) ([]*transport.EntryWithStatus, error) - DiscoverTransportByID(id uuid.UUID) (*transport.EntryWithStatus, error) + DiscoverTransportsByPK(pk cipher.PubKey) ([]*transport.Entry, error) + DiscoverTransportByID(id uuid.UUID) (*transport.Entry, error) RoutingRules() ([]routing.Rule, error) RoutingRule(key routing.RouteID) (routing.Rule, error) @@ -74,6 +74,8 @@ type API interface { RuntimeLogs() (string, error) SetMinHops(uint16) error + + SetPersistentTransports([]transport.PersistentTransports) error } // HealthCheckable resource returns its health status as an integer @@ -134,15 +136,16 @@ func (v *Visor) Overview() (*Overview, error) { // Summary provides detailed info including overview and health of the visor. type Summary struct { - Overview *Overview `json:"overview"` - Health *HealthInfo `json:"health"` - Uptime float64 `json:"uptime"` - Routes []routingRuleResp `json:"routes"` - IsHypervisor bool `json:"is_hypervisor,omitempty"` - DmsgStats *dmsgtracker.DmsgClientSummary `json:"dmsg_stats"` - Online bool `json:"online"` - MinHops uint16 `json:"min_hops"` - SkybianBuildVersion string `json:"skybian_build_version"` + Overview *Overview `json:"overview"` + Health *HealthInfo `json:"health"` + Uptime float64 `json:"uptime"` + Routes []routingRuleResp `json:"routes"` + IsHypervisor bool `json:"is_hypervisor,omitempty"` + DmsgStats *dmsgtracker.DmsgClientSummary `json:"dmsg_stats"` + Online bool `json:"online"` + MinHops uint16 `json:"min_hops"` + PersistentTransports []transport.PersistentTransports `json:"persistent_transports"` + SkybianBuildVersion string `json:"skybian_build_version"` } // Summary implements API. @@ -178,13 +181,19 @@ func (v *Visor) Summary() (*Summary, error) { }) } + pts, err := v.conf.GetPersistentTransports() + if err != nil { + return nil, fmt.Errorf("pts") + } + summary := &Summary{ - Overview: overview, - Health: health, - Uptime: uptime, - Routes: extraRoutes, - MinHops: v.conf.Routing.MinHops, - SkybianBuildVersion: skybianBuildVersion, + Overview: overview, + Health: health, + Uptime: uptime, + Routes: extraRoutes, + MinHops: v.conf.Routing.MinHops, + PersistentTransports: pts, + SkybianBuildVersion: skybianBuildVersion, } return summary, nil @@ -560,7 +569,7 @@ func (v *Visor) RemoveTransport(tid uuid.UUID) error { } // DiscoverTransportsByPK implements API. -func (v *Visor) DiscoverTransportsByPK(pk cipher.PubKey) ([]*transport.EntryWithStatus, error) { +func (v *Visor) DiscoverTransportsByPK(pk cipher.PubKey) ([]*transport.Entry, error) { tpD := v.tpDiscClient() entries, err := tpD.GetTransportsByEdge(context.Background(), pk) @@ -572,7 +581,7 @@ func (v *Visor) DiscoverTransportsByPK(pk cipher.PubKey) ([]*transport.EntryWith } // DiscoverTransportByID implements API. -func (v *Visor) DiscoverTransportByID(id uuid.UUID) (*transport.EntryWithStatus, error) { +func (v *Visor) DiscoverTransportByID(id uuid.UUID) (*transport.Entry, error) { tpD := v.tpDiscClient() entry, err := tpD.GetTransportByID(context.Background(), id) @@ -754,3 +763,8 @@ func (v *Visor) RuntimeLogs() (string, error) { func (v *Visor) SetMinHops(in uint16) error { return v.conf.UpdateMinHops(in) } + +// SetPersistentTransports sets min_hops routing config of visor +func (v *Visor) SetPersistentTransports(pts []transport.PersistentTransports) error { + return v.conf.UpdatePersistentTransports(pts) +} diff --git a/pkg/visor/hypervisor.go b/pkg/visor/hypervisor.go index b135186deb..5b57069d00 100644 --- a/pkg/visor/hypervisor.go +++ b/pkg/visor/hypervisor.go @@ -30,6 +30,7 @@ import ( "github.com/skycoin/skywire/pkg/app/launcher" "github.com/skycoin/skywire/pkg/routing" "github.com/skycoin/skywire/pkg/skyenv" + "github.com/skycoin/skywire/pkg/transport" "github.com/skycoin/skywire/pkg/util/updater" "github.com/skycoin/skywire/pkg/visor/dmsgtracker" "github.com/skycoin/skywire/pkg/visor/hypervisorconfig" @@ -261,6 +262,7 @@ func (hv *Hypervisor) makeMux() chi.Router { r.Get("/visors/{pk}/update/available/{channel}", hv.visorUpdateAvailable()) r.Get("/visors/{pk}/runtime-logs", hv.getRuntimeLogs()) r.Post("/visors/{pk}/min-hops", hv.postMinHops()) + r.Put("/visors/{pk}/persistent-transports", hv.putPersistentTransports()) }) }) @@ -1321,6 +1323,26 @@ func (hv *Hypervisor) postMinHops() http.HandlerFunc { }) } +func (hv *Hypervisor) putPersistentTransports() http.HandlerFunc { + return hv.withCtx(hv.visorCtx, func(w http.ResponseWriter, r *http.Request, ctx *httpCtx) { + var reqBody []transport.PersistentTransports + + if err := httputil.ReadJSON(r, &reqBody); err != nil { + if err != io.EOF { + hv.log(r).Warnf("putPersistentTransports request: %v", err) + } + httputil.WriteJSON(w, r, http.StatusBadRequest, usermanager.ErrMalformedRequest) + return + } + + if err := ctx.API.SetPersistentTransports(reqBody); err != nil { + httputil.WriteJSON(w, r, http.StatusInternalServerError, err) + return + } + httputil.WriteJSON(w, r, http.StatusOK, struct{}{}) + }) +} + /* <<< Helper functions >>> */ diff --git a/pkg/visor/init.go b/pkg/visor/init.go index 93f479722e..0586bb3cba 100644 --- a/pkg/visor/init.go +++ b/pkg/visor/init.go @@ -236,10 +236,11 @@ func initTransport(ctx context.Context, v *Visor, log *logging.Logger) error { logS := transport.InMemoryTransportLogStore() tpMConf := transport.ManagerConfig{ - PubKey: v.conf.PK, - SecKey: v.conf.SK, - DiscoveryClient: tpdC, - LogStore: logS, + PubKey: v.conf.PK, + SecKey: v.conf.SK, + DiscoveryClient: tpdC, + LogStore: logS, + PersistentTransports: v.conf.PersistentTransports, } managerLogger := v.MasterLogger().PackageLogger("transport_manager") @@ -271,9 +272,9 @@ func initTransport(ctx context.Context, v *Visor, log *logging.Logger) error { v.pushCloseStack("transport.manager", func() error { cancel() - err := tpM.Close() + tpM.Close() wg.Wait() - return err + return nil }) v.initLock.Lock() diff --git a/pkg/visor/rpc.go b/pkg/visor/rpc.go index 34f0bcb9ba..13bf40f07e 100644 --- a/pkg/visor/rpc.go +++ b/pkg/visor/rpc.go @@ -125,7 +125,6 @@ type TransportSummary struct { Type network.Type `json:"type"` Log *transport.LogEntry `json:"log,omitempty"` IsSetup bool `json:"is_setup"` - IsUp bool `json:"is_up"` Label transport.Label `json:"label"` } @@ -136,7 +135,6 @@ func newTransportSummary(tm *transport.Manager, tp *transport.ManagedTransport, Remote: tp.Remote(), Type: tp.Type(), IsSetup: isSetup, - IsUp: tp.IsUp(), Label: tp.Entry.Label, } if includeLogs { @@ -374,7 +372,7 @@ func (r *RPC) RemoveTransport(tid *uuid.UUID, _ *struct{}) (err error) { */ // DiscoverTransportsByPK obtains available transports via the transport discovery via given public key. -func (r *RPC) DiscoverTransportsByPK(pk *cipher.PubKey, out *[]*transport.EntryWithStatus) (err error) { +func (r *RPC) DiscoverTransportsByPK(pk *cipher.PubKey, out *[]*transport.Entry) (err error) { defer rpcutil.LogCall(r.log, "DiscoverTransportsByPK", pk)(out, &err) entries, err := r.visor.DiscoverTransportsByPK(*pk) @@ -384,7 +382,7 @@ func (r *RPC) DiscoverTransportsByPK(pk *cipher.PubKey, out *[]*transport.EntryW } // DiscoverTransportByID obtains available transports via the transport discovery via a given transport ID. -func (r *RPC) DiscoverTransportByID(id *uuid.UUID, out *transport.EntryWithStatus) (err error) { +func (r *RPC) DiscoverTransportByID(id *uuid.UUID, out *transport.Entry) (err error) { defer rpcutil.LogCall(r.log, "DiscoverTransportByID", id)(out, &err) entry, err := r.visor.DiscoverTransportByID(*id) diff --git a/pkg/visor/rpc_client.go b/pkg/visor/rpc_client.go index b573273b99..7ca7095872 100644 --- a/pkg/visor/rpc_client.go +++ b/pkg/visor/rpc_client.go @@ -270,14 +270,14 @@ func (rc *rpcClient) RemoveTransport(tid uuid.UUID) error { return rc.Call("RemoveTransport", &tid, &struct{}{}) } -func (rc *rpcClient) DiscoverTransportsByPK(pk cipher.PubKey) ([]*transport.EntryWithStatus, error) { - entries := make([]*transport.EntryWithStatus, 0) +func (rc *rpcClient) DiscoverTransportsByPK(pk cipher.PubKey) ([]*transport.Entry, error) { + entries := make([]*transport.Entry, 0) err := rc.Call("DiscoverTransportsByPK", &pk, &entries) return entries, err } -func (rc *rpcClient) DiscoverTransportByID(id uuid.UUID) (*transport.EntryWithStatus, error) { - var entry transport.EntryWithStatus +func (rc *rpcClient) DiscoverTransportByID(id uuid.UUID) (*transport.Entry, error) { + var entry transport.Entry err := rc.Call("DiscoverTransportByID", &id, &entry) return &entry, err } @@ -345,6 +345,12 @@ func (rc *rpcClient) SetMinHops(hops uint16) error { return err } +// SetPersistentTransports sets the persistent_transports from visor routing config +func (rc *rpcClient) SetPersistentTransports(pts []transport.PersistentTransports) error { + err := rc.Call("SetPersistentTransports", &pts, &struct{}{}) + return err +} + // StatusMessage defines a status of visor update. type StatusMessage struct { Text string @@ -847,11 +853,11 @@ func (mc *mockRPCClient) RemoveTransport(tid uuid.UUID) error { }) } -func (mc *mockRPCClient) DiscoverTransportsByPK(cipher.PubKey) ([]*transport.EntryWithStatus, error) { +func (mc *mockRPCClient) DiscoverTransportsByPK(cipher.PubKey) ([]*transport.Entry, error) { return nil, ErrNotImplemented } -func (mc *mockRPCClient) DiscoverTransportByID(uuid.UUID) (*transport.EntryWithStatus, error) { +func (mc *mockRPCClient) DiscoverTransportByID(uuid.UUID) (*transport.Entry, error) { return nil, ErrNotImplemented } @@ -936,6 +942,11 @@ func (mc *mockRPCClient) RuntimeLogs() (string, error) { } // SetMinHops implements API -func (mc *mockRPCClient) SetMinHops(n uint16) error { +func (mc *mockRPCClient) SetMinHops(_ uint16) error { + return nil +} + +// SetPersistentTransports implements API +func (mc *mockRPCClient) SetPersistentTransports(_ []transport.PersistentTransports) error { return nil } diff --git a/pkg/visor/visor.go b/pkg/visor/visor.go index 9486028bc5..c48eaf70c1 100644 --- a/pkg/visor/visor.go +++ b/pkg/visor/visor.go @@ -37,7 +37,7 @@ const ( shortHashLen = 6 // moduleShutdownTimeout is the timeout given to a module to shutdown cleanly. // Otherwise the shutdown logic will continue and report a timeout error. - moduleShutdownTimeout = time.Second * 2 + moduleShutdownTimeout = time.Second * 4 ) // Visor provides messaging runtime for Apps by setting up all diff --git a/pkg/visor/visorconfig/v1.go b/pkg/visor/visorconfig/v1.go index 42815688db..8a5ac2bfb4 100644 --- a/pkg/visor/visorconfig/v1.go +++ b/pkg/visor/visorconfig/v1.go @@ -9,6 +9,7 @@ import ( "github.com/skycoin/skywire/pkg/app/launcher" "github.com/skycoin/skywire/pkg/dmsgc" + "github.com/skycoin/skywire/pkg/transport" "github.com/skycoin/skywire/pkg/transport/network" "github.com/skycoin/skywire/pkg/visor/hypervisorconfig" ) @@ -57,6 +58,8 @@ type V1 struct { RestartCheckDelay Duration `json:"restart_check_delay,omitempty"` // time value, examples: 10s, 1m, etc IsPublic bool `json:"is_public"` + PersistentTransports []transport.PersistentTransports `json:"persistent_transports"` + Hypervisor *hypervisorconfig.Config `json:"hypervisor,omitempty"` } @@ -186,6 +189,22 @@ func (v1 *V1) UpdateMinHops(hops uint16) error { return v1.flush(v1) } +// UpdatePersistentTransports updates min_hops config +func (v1 *V1) UpdatePersistentTransports(pts []transport.PersistentTransports) error { + v1.mu.Lock() + v1.PersistentTransports = pts + v1.mu.Unlock() + + return v1.flush(v1) +} + +// GetPersistentTransports updates min_hops config +func (v1 *V1) GetPersistentTransports() ([]transport.PersistentTransports, error) { + v1.mu.Lock() + defer v1.mu.Unlock() + return v1.PersistentTransports, nil +} + // updateStringArg updates the cli non-boolean flag of the specified app config and also within the launcher. // It removes argName from app args if value is an empty string. // The updated config gets flushed to file if there are any changes.