Skip to content

Commit

Permalink
Further changes to fix #160
Browse files Browse the repository at this point in the history
* transport.Manager now deletes from 'tps' when the associated mTp stops serving.

* The initiating settlement handshake no longer updates transport discovery status. This logic is now moved to ManagedTransport.

* ManagedTransport now has a better mechanism to update statuses.

* Only the least-significant edge of a transport can redial the underlying connection.
  • Loading branch information
志宇 committed Mar 2, 2020
1 parent bdc72de commit 40b7792
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 88 deletions.
14 changes: 8 additions & 6 deletions pkg/transport/handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,19 @@ func (hs SettlementHS) Do(ctx context.Context, dc DiscoveryClient, conn *snet.Co

// MakeSettlementHS creates a settlement handshake.
// `init` determines whether the local side is initiating or responding.
// The handshake logic only REGISTERS the transport, and does not update the status of the transport.
func MakeSettlementHS(init bool) SettlementHS {
// initiating logic.
initHS := func(ctx context.Context, dc DiscoveryClient, conn *snet.Conn, sk cipher.SecKey) (err error) {
entry := makeEntryFromTpConn(conn)

defer func() {
// @evanlinjin: I used background context to ensure status is always updated.
if _, err := dc.UpdateStatuses(context.Background(), &Status{ID: entry.ID, IsUp: err == nil}); err != nil {
log.WithError(err).Error("Failed to update statuses")
}
}()
// TODO(evanlinjin): Probably not needed as this is called in mTp already. Need to double check.
//defer func() {
// // @evanlinjin: I used background context to ensure status is always updated.
// if _, err := dc.UpdateStatuses(context.Background(), &Status{ID: entry.ID, IsUp: err == nil}); err != nil {
// log.WithError(err).Error("Failed to update statuses")
// }
//}()

// create signed entry and send it to responding visor.
se, ok := NewSignedEntry(&entry, conn.LocalPK(), sk)
Expand Down
217 changes: 149 additions & 68 deletions pkg/transport/managed_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ var (
)

// ManagedTransport manages a direct line of communication between two visor nodes.
// It is made up of two underlying uni-directional connections.
// There is a single underlying connection between two edges.
// Initial dialing can be requested by either edge of the connection.
// However, only the edge with the least-significant public key can redial.
type ManagedTransport struct {
log *logging.Logger

Expand All @@ -46,6 +48,10 @@ type ManagedTransport struct {
dc DiscoveryClient
ls LogStore

isUp bool // records last successful status update to discovery
isUpErr error // records whether the last status update was successful or not
isUpMux sync.Mutex

n *snet.Network
conn *snet.Conn
connCh chan struct{}
Expand Down Expand Up @@ -75,7 +81,7 @@ func NewManagedTransport(n *snet.Network, dc DiscoveryClient, ls LogStore, rPK c
}

// Serve serves and manages the transport.
func (mt *ManagedTransport) Serve(readCh chan<- routing.Packet, done <-chan struct{}) {
func (mt *ManagedTransport) Serve(readCh chan<- routing.Packet) {
defer mt.wg.Done()

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -129,19 +135,20 @@ func (mt *ManagedTransport) Serve(readCh chan<- routing.Packet, done <-chan stru
p, err := mt.readPacket()
if err != nil {
if err == ErrNotServing {
mt.log.WithError(err).Debugf("Failed to read packet")
mt.log.WithError(err).Debug("Failed to read packet. Returning...")
return
}
mt.connMx.Lock()
mt.clearConn(ctx)
mt.clearConn()
mt.connMx.Unlock()
log.WithError(err).
Warn("Failed to read packet.")
continue
}
select {
case <-done:
case <-mt.done:
return

case readCh <- p:
}
}
Expand All @@ -158,20 +165,27 @@ func (mt *ManagedTransport) Serve(readCh chan<- routing.Packet, done <-chan stru
if err := mt.ls.Record(mt.Entry.ID, mt.LogEntry); err != nil {
mt.log.Warnf("Failed to record log entry: %s", err)
}
} else {
// If there has not been any activity, ensure underlying 'write' tp is still up.
mt.connMx.Lock()
if mt.conn == nil {
if ok, err := mt.redial(ctx, false); err != nil {
mt.log.Warnf("failed to redial underlying connection (redial loop): %v", err)
if !ok {
mt.connMx.Unlock()
return
}
continue
}

// Only least significant edge is responsible for redialing.
if !mt.isLeastSignificantEdge() {
continue
}

// If there has not been any activity, ensure underlying 'write' tp is still up.
mt.connMx.Lock()
if mt.conn == nil {
if ok, err := mt.redial(ctx); err != nil {
mt.log.Warnf("failed to redial underlying connection (redial loop): %v", err)
if !ok {
mt.connMx.Unlock()
return
}
}
mt.connMx.Unlock()
}
mt.connMx.Unlock()

}
}
}
Expand All @@ -185,27 +199,25 @@ func (mt *ManagedTransport) isServing() bool {
}
}

// Close stops serving the transport.
func (mt *ManagedTransport) Close() {
if mt.close(true) {
// Update transport entry.
if _, err := mt.dc.UpdateStatuses(context.Background(), &Status{ID: mt.Entry.ID, IsUp: false}); err != nil {
mt.log.Warnf("Failed to update transport status: %s", err)
}
}
// Close implements io.Closer
// It also waits for transport to stop serving before it returns.
// It only returns an error if transport status update fails.
func (mt *ManagedTransport) Close() (err error) {
mt.close()
mt.wg.Wait()

mt.isUpMux.Lock()
err = mt.isUpErr
mt.isUpMux.Unlock()

return err
}

// close closes the 'mt.done' once.
// close stops serving the transport and ensures that transport status is updated to DOWN.
// It also waits until mt.Serve returns if specified.
func (mt *ManagedTransport) close(wait bool) (closed bool) {
mt.once.Do(func() {
close(mt.done)
closed = true
})
if wait {
mt.wg.Wait()
}
return closed
func (mt *ManagedTransport) close() {
mt.once.Do(func() { close(mt.done) })
_ = mt.updateStatus(false, 1) //nolint:errcheck
}

// Accept accepts a new underlying connection.
Expand All @@ -218,23 +230,24 @@ func (mt *ManagedTransport) Accept(ctx context.Context, conn *snet.Conn) error {
}

if !mt.isServing() {
mt.log.Debugln(ErrNotServing.Error())
mt.log.WithError(ErrNotServing).Debug()
if err := conn.Close(); err != nil {
log.WithError(err).Warn("Failed to close connection")
mt.log.WithError(err).
Warn("Failed to close newly accepted connection.")
}
return ErrNotServing
}

ctx, cancel := context.WithTimeout(ctx, time.Second*20)
defer cancel()
mt.log.Debugln("Performing handshake...")
mt.log.Debug("Performing handshake...")
if err := MakeSettlementHS(false).Do(ctx, mt.dc, conn, mt.n.LocalSK()); err != nil {
return fmt.Errorf("settlement handshake failed: %v", err)
}

mt.log.Debugln("Setting TP conn...")
mt.log.Debug("Setting underlying connection...")

return mt.setIfConnNil(ctx, conn)
return mt.setConn(conn)
}

// Dial dials a new underlying connection.
Expand Down Expand Up @@ -264,12 +277,12 @@ func (mt *ManagedTransport) dial(ctx context.Context) error {
if err := MakeSettlementHS(true).Do(ctx, mt.dc, tp, mt.n.LocalSK()); err != nil {
return fmt.Errorf("settlement handshake failed: %v", err)
}
return mt.setIfConnNil(ctx, tp)
return mt.setConn(tp)
}

// redial only actually dials if transport is still registered in transport discovery.
// The 'retry' output specifies whether we can retry dial on failure.
func (mt *ManagedTransport) redial(ctx context.Context, waitOnClose bool) (retry bool, err error) {
func (mt *ManagedTransport) redial(ctx context.Context) (retry bool, err error) {
if !mt.isServing() {
return false, ErrNotServing
}
Expand All @@ -284,17 +297,25 @@ func (mt *ManagedTransport) redial(ctx context.Context, waitOnClose bool) (retry

// If the error is not temporary, it most likely means that the transport is no longer registered.
// Hence, we should close the managed transport.
mt.close(waitOnClose)
mt.close()
mt.log.
WithError(err).
Warn("Transport closed due to redial failure. Transport is likely no longer in discovery.")

return false, fmt.Errorf("transport is no longer registered in discovery: %v", err)
return false, ErrNotServing
}

return true, mt.dial(ctx)
}

func (mt *ManagedTransport) isLeastSignificantEdge() bool {
return mt.Entry.EdgeIndex(mt.n.LocalPK()) == 0
}

/*
<<< UNDERLYING CONNECTION >>>
*/

func (mt *ManagedTransport) getConn() *snet.Conn {
if !mt.isServing() {
return nil
Expand All @@ -306,37 +327,40 @@ func (mt *ManagedTransport) getConn() *snet.Conn {
return conn
}

// sets conn if `mt.conn` is nil otherwise, closes the conn.
// TODO: Add logging here.
func (mt *ManagedTransport) setIfConnNil(ctx context.Context, conn *snet.Conn) error {
// setConn sets 'mt.conn' (the underlying connection).
// If 'mt.conn' is already occupied, close the newly introduced connection.
func (mt *ManagedTransport) setConn(newConn *snet.Conn) error {

if mt.conn != nil {
mt.log.Debugln("TP conn already exists, closing it")
if err := conn.Close(); err != nil {
log.WithError(err).Warn("Failed to close connection")
if mt.isLeastSignificantEdge() {
mt.log.Debug("Underlying conn already exists, closing new conn.")
if err := newConn.Close(); err != nil {
log.WithError(err).Warn("Failed to close new conn.")
}
return ErrConnAlreadyExists
}
return ErrConnAlreadyExists
}

var err error
for i := 0; i < 3; i++ {
if _, err = mt.dc.UpdateStatuses(ctx, &Status{ID: mt.Entry.ID, IsUp: true}); err != nil {
mt.log.Warnf("Failed to update transport status: %s, retrying...", err)
continue
mt.log.Debug("Underlying conn already exists, closing old conn.")
if err := mt.conn.Close(); err != nil {
log.WithError(err).Warn("Failed to close old conn.")
}
mt.log.Infoln("Status updated: UP")
break
mt.conn = nil
}

mt.conn = conn
if err := mt.updateStatus(true, 1); err != nil {
return fmt.Errorf("failed to update transport status: %v", err)
}

mt.conn = newConn
select {
case mt.connCh <- struct{}{}:
mt.log.Debugln("Sent signal to connCh")
mt.log.Debug("Sent signal to 'mt.connCh'.")
default:
}
return nil
}

func (mt *ManagedTransport) clearConn(ctx context.Context) {
func (mt *ManagedTransport) clearConn() {
if !mt.isServing() {
return
}
Expand All @@ -347,27 +371,84 @@ func (mt *ManagedTransport) clearConn(ctx context.Context) {
}
mt.conn = nil
}
if _, err := mt.dc.UpdateStatuses(ctx, &Status{ID: mt.Entry.ID, IsUp: false}); err != nil {
mt.log.Warnf("Failed to update transport status: %s", err)
return
_ = mt.updateStatus(false, 1) //nolint:errcheck
}

func (mt *ManagedTransport) updateStatus(isUp bool, tries int) (err error) {
if tries < 1 {
panic(fmt.Errorf("mt.updateStatus: invalid input: got tries=%d (want tries > 0)", tries))
}

// If not serving, we should update status to 'DOWN' and ensure 'updateStatus' returns error.
if !mt.isServing() {
isUp = false
}
mt.log.Infoln("Status updated: DOWN")
defer func() {
if err == nil && !mt.isServing() {
err = ErrNotServing
}
}()

mt.isUpMux.Lock()

// If last update is the same as current, nothing needs to be done.
if mt.isUp == isUp {
mt.isUpMux.Unlock()
return nil
}

for i := 0; i < tries; i++ {
// @evanlinjin: We don't pass context as we always want transport status to be updated.
if _, err = mt.dc.UpdateStatuses(context.Background(), &Status{ID: mt.Entry.ID, IsUp: isUp}); err != nil {
mt.log.
WithError(err).
WithField("retry", i < tries).
Warn("Failed to update transport status.")
continue
}
mt.log.
WithField("status", statusString(isUp)).
Info("Transport status updated.")
break
}

mt.isUp = isUp
mt.isUpErr = err
mt.isUpMux.Unlock()
return err
}

func statusString(isUp bool) string {
if isUp {
return "UP"
}
return "DOWN"
}

/*
<<< PACKET MANAGEMENT >>>
*/

// WritePacket writes a packet to the remote.
func (mt *ManagedTransport) WritePacket(ctx context.Context, packet routing.Packet) error {
mt.connMx.Lock()
defer mt.connMx.Unlock()

if mt.conn == nil {
if _, err := mt.redial(ctx, true); err != nil {
if _, err := mt.redial(ctx); err != nil {

// TODO(evanlinjin): Determine whether we need to call 'mt.wg.Wait()' here.
if err == ErrNotServing {
mt.wg.Wait()
}

return fmt.Errorf("failed to redial underlying connection: %v", err)
}
}

n, err := mt.conn.Write(packet)
if err != nil {
mt.clearConn(ctx)
mt.clearConn()
return err
}
if n > routing.PacketHeaderSize {
Expand Down Expand Up @@ -413,7 +494,7 @@ func (mt *ManagedTransport) readPacket() (packet routing.Packet, err error) {
}

/*
TRANSPORT LOGGING
<<< TRANSPORT LOGGING >>>
*/

func (mt *ManagedTransport) logSent(b uint64) {
Expand Down
Loading

0 comments on commit 40b7792

Please sign in to comment.