Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/remove redialing #842

Merged
merged 45 commits into from
Aug 23, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
1806be6
Remove redialing of transport
i-hate-nicknames Jul 8, 2021
b19b8e6
Fix panics when transport is closed
i-hate-nicknames Jul 8, 2021
8ba1738
Refactor managed transport Serve
i-hate-nicknames Jul 8, 2021
745ea4e
Force sorted edge order in transport entry
i-hate-nicknames Jul 8, 2021
68b3129
Remove up/down transport status
i-hate-nicknames Jul 9, 2021
dd734ef
Remove closed transports in transport manager
i-hate-nicknames Jul 9, 2021
764f34a
Move raw address to network.Conn
i-hate-nicknames Jul 9, 2021
738454c
Simplify dial logic
i-hate-nicknames Jul 9, 2021
8016a92
Delete entry instead of updating status
i-hate-nicknames Jul 9, 2021
f1b5ea0
Fix visor cli transport print
i-hate-nicknames Jul 9, 2021
afc7945
Add persistent transports todo
i-hate-nicknames Jul 9, 2021
edfb6b2
Add heartbeat loop
i-hate-nicknames Jul 9, 2021
43ac032
Merge branch 'develop' into feature/remove-redialing
i-hate-nicknames Jul 27, 2021
d94386e
Send heartbeat request in transport
i-hate-nicknames Jul 27, 2021
f9092b9
Remove transport status
i-hate-nicknames Jul 27, 2021
657734d
Fix linter errors
i-hate-nicknames Jul 28, 2021
6b35e2f
Remove transport entry with status
i-hate-nicknames Jul 28, 2021
9b76a8b
Fix visor cli transports
i-hate-nicknames Jul 28, 2021
300b9c8
Add persistent transports config
i-hate-nicknames Jul 29, 2021
8ed6bfb
Make lint and format
i-hate-nicknames Jul 29, 2021
876f9f3
Fix deadlocks in transport layer
i-hate-nicknames Jul 30, 2021
f84dc64
Relax locking when saving transport
i-hate-nicknames Aug 3, 2021
964e0a6
Add transport connection deadline
i-hate-nicknames Aug 3, 2021
88d397b
Fix transport manager shutdown logic
i-hate-nicknames Aug 3, 2021
c7e14a8
Simplify transport manager closing
i-hate-nicknames Aug 3, 2021
039d017
Merge branch 'develop' into feature/remove-redialing
i-hate-nicknames Aug 3, 2021
21db980
Remove isPublic transport entry field
i-hate-nicknames Aug 4, 2021
11fdf3f
Fix NPE for transport handshake
i-hate-nicknames Aug 5, 2021
1116d80
Code review fixes
i-hate-nicknames Aug 6, 2021
6b8741b
Update heartbeat interval
ersonp Aug 12, 2021
a565ce3
Update comment
ersonp Aug 12, 2021
d18e26c
Remove updateConnDeadline
ersonp Aug 12, 2021
ed0ee66
Rename Conn interface to Transport
ersonp Aug 12, 2021
816474b
Remove EntryWithStatus
ersonp Aug 16, 2021
21cadd8
Add Deprecated Public field to Entry struct
ersonp Aug 16, 2021
c04c889
Make public true
ersonp Aug 17, 2021
3514308
Update and add todo
ersonp Aug 17, 2021
651520a
Remove old todo
ersonp Aug 19, 2021
0a0d5c2
Remove public field from Entry struct
ersonp Aug 20, 2021
8a3c6d8
Rename PersistentRemote to PersistentTransports
ersonp Aug 20, 2021
1b580f4
Requested changes
ersonp Aug 20, 2021
7cd8f5c
Add api for persistent_transports
ersonp Aug 20, 2021
6431095
Add persistent_transports to summary
ersonp Aug 23, 2021
1b8e547
Remove GetPersistentTransports api
ersonp Aug 23, 2021
c8e2a62
Merge branch 'develop' into feature/remove-redialing
ersonp Aug 23, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions pkg/transport/handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ const (
responseInvalidEntry
)

func makeEntryFromTpConn(conn network.Conn) Entry {
aPK, bPK := conn.LocalPK(), conn.RemotePK()
return MakeEntry(aPK, bPK, conn.Network(), LabelUser)
func makeEntryFromTransport(transport network.Transport) Entry {
aPK, bPK := transport.LocalPK(), transport.RemotePK()
return MakeEntry(aPK, bPK, transport.Network(), LabelUser)
}

func compareEntries(expected, received *Entry) error {
Expand Down Expand Up @@ -73,13 +73,13 @@ func receiveAndVerifyEntry(r io.Reader, expected *Entry, remotePK cipher.PubKey)

// SettlementHS represents a settlement handshake.
// This is the handshake responsible for registering a transport to transport discovery.
type SettlementHS func(ctx context.Context, dc DiscoveryClient, conn network.Conn, sk cipher.SecKey) error
type SettlementHS func(ctx context.Context, dc DiscoveryClient, transport network.Transport, sk cipher.SecKey) error

// Do performs the settlement handshake.
func (hs SettlementHS) Do(ctx context.Context, dc DiscoveryClient, conn network.Conn, sk cipher.SecKey) (err error) {
func (hs SettlementHS) Do(ctx context.Context, dc DiscoveryClient, transport network.Transport, sk cipher.SecKey) (err error) {
done := make(chan struct{})
go func() {
err = hs(ctx, dc, conn, sk)
err = hs(ctx, dc, transport, sk)
close(done)
}()
select {
Expand All @@ -95,8 +95,8 @@ func (hs SettlementHS) Do(ctx context.Context, dc DiscoveryClient, conn network.
// The handshake logic only REGISTERS the transport, and does not update the status of the transport.
func MakeSettlementHS(init bool) SettlementHS {
// initiating logic.
initHS := func(ctx context.Context, dc DiscoveryClient, conn network.Conn, sk cipher.SecKey) (err error) {
entry := makeEntryFromTpConn(conn)
initHS := func(ctx context.Context, dc DiscoveryClient, transport network.Transport, sk cipher.SecKey) (err error) {
entry := makeEntryFromTransport(transport)

// TODO(evanlinjin): Probably not needed as this is called in mTp already. Need to double check.
ersonp marked this conversation as resolved.
Show resolved Hide resolved
//defer func() {
Expand All @@ -107,17 +107,17 @@ func MakeSettlementHS(init bool) SettlementHS {
//}()

// create signed entry and send it to responding visor.
se, err := NewSignedEntry(&entry, conn.LocalPK(), sk)
se, err := NewSignedEntry(&entry, transport.LocalPK(), sk)
if err != nil {
return fmt.Errorf("failed to sign entry: %w", err)
}
if err := json.NewEncoder(conn).Encode(se); err != nil {
if err := json.NewEncoder(transport).Encode(se); err != nil {
return fmt.Errorf("failed to write entry: %w", err)
}

// await okay signal.
accepted := make([]byte, 1)
if _, err := io.ReadFull(conn, accepted); err != nil {
if _, err := io.ReadFull(transport, accepted); err != nil {
return fmt.Errorf("failed to read response: %w", err)
}
switch hsResponse(accepted[0]) {
Expand All @@ -135,18 +135,18 @@ func MakeSettlementHS(init bool) SettlementHS {
}

// responding logic.
respHS := func(ctx context.Context, dc DiscoveryClient, conn network.Conn, sk cipher.SecKey) error {
entry := makeEntryFromTpConn(conn)
respHS := func(ctx context.Context, dc DiscoveryClient, transport network.Transport, sk cipher.SecKey) error {
entry := makeEntryFromTransport(transport)

// receive, verify and sign entry.
recvSE, err := receiveAndVerifyEntry(conn, &entry, conn.RemotePK())
recvSE, err := receiveAndVerifyEntry(transport, &entry, transport.RemotePK())
if err != nil {
writeHsResponse(conn, responseInvalidEntry) //nolint:errcheck, gosec
writeHsResponse(transport, responseInvalidEntry) //nolint:errcheck, gosec
return err
}

if err := recvSE.Sign(conn.LocalPK(), sk); err != nil {
writeHsResponse(conn, responseSignatureErr) //nolint:errcheck, gosec
if err := recvSE.Sign(transport.LocalPK(), sk); err != nil {
writeHsResponse(transport, responseSignatureErr) //nolint:errcheck, gosec
return fmt.Errorf("failed to sign received entry: %w", err)
}

Expand All @@ -161,7 +161,7 @@ func MakeSettlementHS(init bool) SettlementHS {
log.WithError(err).Error("Failed to register transport.")
}
}
return writeHsResponse(conn, responseOK)
return writeHsResponse(transport, responseOK)
}

if init {
Expand Down
156 changes: 78 additions & 78 deletions pkg/transport/managed_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ var (
// ErrNotServing is the error returned when a transport is no longer served.
ErrNotServing = errors.New("transport is no longer being served")

// ErrConnAlreadyExists occurs when an underlying transport connection already exists.
ErrConnAlreadyExists = errors.New("underlying transport connection already exists")
// ErrTransportAlreadyExists occurs when an underlying transport connection already exists.
ErrTransportAlreadyExists = errors.New("underlying transport connection already exists")
)

// ManagedTransportConfig is a configuration for managed transport.
Expand All @@ -50,8 +50,8 @@ type ManagedTransportConfig struct {
}

// ManagedTransport manages a direct line of communication between two visor nodes.
// There is a single underlying connection between two edges.
// Initial dialing can be requested by either edge of the connection.
// There is a single underlying transport connection between two edges.
// Initial dialing can be requested by either edge of the transport connection.
ersonp marked this conversation as resolved.
Show resolved Hide resolved
type ManagedTransport struct {
log *logging.Logger

Expand All @@ -63,10 +63,10 @@ type ManagedTransport struct {
dc DiscoveryClient
ls LogStore

client network.Client
conn network.Conn
connCh chan struct{}
connMx sync.Mutex
client network.Client
transport network.Transport
transportCh chan struct{}
transportMx sync.Mutex

done chan struct{}
wg sync.WaitGroup
Expand All @@ -78,16 +78,16 @@ type ManagedTransport struct {
func NewManagedTransport(conf ManagedTransportConfig) *ManagedTransport {
aPK, bPK := conf.client.PK(), conf.RemotePK
mt := &ManagedTransport{
log: logging.MustGetLogger(fmt.Sprintf("tp:%s", conf.RemotePK.String()[:6])),
rPK: conf.RemotePK,
dc: conf.DC,
ls: conf.LS,
client: conf.client,
Entry: MakeEntry(aPK, bPK, conf.client.Type(), conf.TransportLabel),
LogEntry: new(LogEntry),
connCh: make(chan struct{}, 1),
done: make(chan struct{}),
timeout: conf.InactiveTimeout,
log: logging.MustGetLogger(fmt.Sprintf("tp:%s", conf.RemotePK.String()[:6])),
rPK: conf.RemotePK,
dc: conf.DC,
ls: conf.LS,
client: conf.client,
Entry: MakeEntry(aPK, bPK, conf.client.Type(), conf.TransportLabel),
LogEntry: new(LogEntry),
transportCh: make(chan struct{}, 1),
done: make(chan struct{}),
timeout: conf.InactiveTimeout,
}
return mt
}
Expand Down Expand Up @@ -208,7 +208,7 @@ func (mt *ManagedTransport) IsClosed() bool {
}
}

// close underlying connection and remove the entry from transport discovery
// close underlying transport connection and remove the entry from transport discovery
ersonp marked this conversation as resolved.
Show resolved Hide resolved
// todo: this currently performs http request to discovery service
// it only makes sense to wait for the completion if we are closing the visor itself,
// regular transport close operations should probably call it concurrently
Expand All @@ -221,34 +221,34 @@ func (mt *ManagedTransport) close() {
default:
close(mt.done)
}
mt.log.Debug("Locking connMx")
mt.connMx.Lock()
close(mt.connCh)
if mt.conn != nil {
if err := mt.conn.Close(); err != nil {
log.WithError(err).Warn("Failed to close underlying connection.")
mt.log.Debug("Locking transportMx")
mt.transportMx.Lock()
close(mt.transportCh)
if mt.transport != nil {
if err := mt.transport.Close(); err != nil {
log.WithError(err).Warn("Failed to close underlying transport connection.")
}
mt.conn = nil
mt.transport = nil
}
mt.connMx.Unlock()
mt.log.Debug("Unlocking connMx")
mt.transportMx.Unlock()
mt.log.Debug("Unlocking transportMx")
_ = mt.deleteFromDiscovery() //nolint:errcheck
}

// Accept accepts a new underlying connection.
func (mt *ManagedTransport) Accept(ctx context.Context, conn network.Conn) error {
mt.connMx.Lock()
defer mt.connMx.Unlock()
// Accept accepts a new underlying transport connection.
ersonp marked this conversation as resolved.
Show resolved Hide resolved
func (mt *ManagedTransport) Accept(ctx context.Context, transport network.Transport) error {
mt.transportMx.Lock()
defer mt.transportMx.Unlock()

if conn.Network() != mt.Type() {
if transport.Network() != mt.Type() {
return ErrWrongNetwork
}

if !mt.isServing() {
mt.log.WithError(ErrNotServing).Debug()
if err := conn.Close(); err != nil {
if err := transport.Close(); err != nil {
mt.log.WithError(err).
Warn("Failed to close newly accepted connection.")
Warn("Failed to close newly accepted transport connection.")
}
return ErrNotServing
}
Expand All @@ -257,24 +257,24 @@ func (mt *ManagedTransport) Accept(ctx context.Context, conn network.Conn) error
defer cancel()

mt.log.Debug("Performing settlement handshake...")
if err := MakeSettlementHS(false).Do(ctx, mt.dc, conn, mt.client.SK()); err != nil {
if err := MakeSettlementHS(false).Do(ctx, mt.dc, transport, mt.client.SK()); err != nil {
return fmt.Errorf("settlement handshake failed: %w", err)
}

mt.log.Debug("Setting underlying connection...")
return mt.setConn(conn)
mt.log.Debug("Setting underlying transport connection...")
ersonp marked this conversation as resolved.
Show resolved Hide resolved
return mt.setTransport(transport)
}

// Dial dials a new underlying connection.
// Dial dials a new underlying transport connection.
func (mt *ManagedTransport) Dial(ctx context.Context) error {
mt.connMx.Lock()
defer mt.connMx.Unlock()
mt.transportMx.Lock()
defer mt.transportMx.Unlock()

if !mt.isServing() {
return ErrNotServing
}

if mt.conn != nil {
if mt.transport != nil {
return nil
}
return mt.dial(ctx)
Expand All @@ -287,20 +287,20 @@ func (mt *ManagedTransport) DialAsync(ctx context.Context, errCh chan error) {
}

func (mt *ManagedTransport) dial(ctx context.Context) error {
conn, err := mt.client.Dial(ctx, mt.rPK, skyenv.DmsgTransportPort)
transportType, err := mt.client.Dial(ctx, mt.rPK, skyenv.DmsgTransportPort)
ersonp marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("snet.Dial: %w", err)
}

ctx, cancel := context.WithTimeout(ctx, time.Second*20)
defer cancel()

if err := MakeSettlementHS(true).Do(ctx, mt.dc, conn, mt.client.SK()); err != nil {
if err := MakeSettlementHS(true).Do(ctx, mt.dc, transportType, mt.client.SK()); err != nil {
return fmt.Errorf("settlement handshake failed: %w", err)
}

if err := mt.setConn(conn); err != nil {
return fmt.Errorf("setConn: %w", err)
if err := mt.setTransport(transportType); err != nil {
return fmt.Errorf("setTransport: %w", err)
}

return nil
Expand All @@ -312,44 +312,44 @@ func (mt *ManagedTransport) isLeastSignificantEdge() bool {
}

/*
<<< UNDERLYING CONNECTION >>>
<<< UNDERLYING TRANSPORT CONNECTION>>>
*/

func (mt *ManagedTransport) getConn() network.Conn {
func (mt *ManagedTransport) getTransport() network.Transport {
if !mt.isServing() {
return nil
}

mt.connMx.Lock()
conn := mt.conn
mt.connMx.Unlock()
return conn
mt.transportMx.Lock()
transport := mt.transport
mt.transportMx.Unlock()
return transport
}

// setConn sets 'mt.conn' (the underlying connection).
// If 'mt.conn' is already occupied, close the newly introduced connection.
func (mt *ManagedTransport) setConn(newConn network.Conn) error {
if mt.conn != nil {
// set sets 'mt.transport' (the underlying transport connection).
// If 'mt.transport' is already occupied, close the newly introduced transport connection.
func (mt *ManagedTransport) setTransport(newTransport network.Transport) error {
if mt.transport != nil {
if mt.isLeastSignificantEdge() {
mt.log.Debug("Underlying conn already exists, closing new conn.")
if err := newConn.Close(); err != nil {
log.WithError(err).Warn("Failed to close new conn.")
mt.log.Debug("Underlying transport connection already exists, closing new transport connection.")
if err := newTransport.Close(); err != nil {
log.WithError(err).Warn("Failed to close new transport connection.")
}
return ErrConnAlreadyExists
return ErrTransportAlreadyExists
}

mt.log.Debug("Underlying conn already exists, closing old conn.")
if err := mt.conn.Close(); err != nil {
log.WithError(err).Warn("Failed to close old conn.")
mt.log.Debug("Underlying transport connection already exists, closing old transport connection.")
if err := mt.transport.Close(); err != nil {
log.WithError(err).Warn("Failed to close old transport connection.")
}
mt.conn = nil
mt.transport = nil
}

// Set new underlying connection.
mt.conn = newConn
// Set new underlying transport connection.
mt.transport = newTransport
select {
case mt.connCh <- struct{}{}:
mt.log.Debug("Sent signal to 'mt.connCh'.")
case mt.transportCh <- struct{}{}:
mt.log.Debug("Sent signal to 'mt.transportCh'.")
default:
}
return nil
Expand Down Expand Up @@ -379,14 +379,14 @@ func (mt *ManagedTransport) deleteFromDiscovery() error {

// WritePacket writes a packet to the remote.
func (mt *ManagedTransport) WritePacket(ctx context.Context, packet routing.Packet) error {
mt.connMx.Lock()
defer mt.connMx.Unlock()
mt.transportMx.Lock()
defer mt.transportMx.Unlock()

if mt.conn == nil {
return fmt.Errorf("write packet: cannot write to conn, conn is not set up")
if mt.transport == nil {
return fmt.Errorf("write packet: cannot write to transport, transport is not set up")
}

n, err := mt.conn.Write(packet)
n, err := mt.transport.Write(packet)
if err != nil {
mt.close()
return err
Expand All @@ -401,28 +401,28 @@ func (mt *ManagedTransport) WritePacket(ctx context.Context, packet routing.Pack
func (mt *ManagedTransport) readPacket() (packet routing.Packet, err error) {
log := mt.log.WithField("func", "readPacket")

var conn network.Conn
var transport network.Transport
for {
if conn = mt.getConn(); conn != nil {
if transport = mt.getTransport(); transport != nil {
break
}
select {
case <-mt.done:
return nil, ErrNotServing
case <-mt.connCh:
case <-mt.transportCh:
}
}

log.Debug("Awaiting packet...")

h := make(routing.Packet, routing.PacketHeaderSize)
if _, err = io.ReadFull(conn, h); err != nil {
if _, err = io.ReadFull(transport, h); err != nil {
log.WithError(err).Debugf("Failed to read packet header.")
return nil, err
}
log.WithField("header_len", len(h)).WithField("header_raw", h).Debug("Read packet header.")
p := make([]byte, h.Size())
if _, err = io.ReadFull(conn, p); err != nil {
if _, err = io.ReadFull(transport, p); err != nil {
log.WithError(err).Debugf("Failed to read packet payload.")
return nil, err
}
Expand Down
Loading