From 40b77921b109c3e0202f45e3ba38d1df8bd69408 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Tue, 3 Mar 2020 01:16:13 +0800 Subject: [PATCH] Further changes to fix #160 * transport.Manager now deletes from 'tps' when the associated mTp stops serving. * The initiating settlement handshake no longer updates transport discovery status. This logic is now moved to ManagedTransport. * ManagedTransport now has a better mechanism to update statuses. * Only the least-significant edge of a transport can redial the underlying connection. --- pkg/transport/handshake.go | 14 +- pkg/transport/managed_transport.go | 217 ++++++++++++++++++++--------- pkg/transport/manager.go | 35 +++-- 3 files changed, 178 insertions(+), 88 deletions(-) diff --git a/pkg/transport/handshake.go b/pkg/transport/handshake.go index 13879986e..cf9d52a88 100644 --- a/pkg/transport/handshake.go +++ b/pkg/transport/handshake.go @@ -80,17 +80,19 @@ func (hs SettlementHS) Do(ctx context.Context, dc DiscoveryClient, conn *snet.Co // MakeSettlementHS creates a settlement handshake. // `init` determines whether the local side is initiating or responding. +// The handshake logic only REGISTERS the transport, and does not update the status of the transport. func MakeSettlementHS(init bool) SettlementHS { // initiating logic. initHS := func(ctx context.Context, dc DiscoveryClient, conn *snet.Conn, sk cipher.SecKey) (err error) { entry := makeEntryFromTpConn(conn) - defer func() { - // @evanlinjin: I used background context to ensure status is always updated. - if _, err := dc.UpdateStatuses(context.Background(), &Status{ID: entry.ID, IsUp: err == nil}); err != nil { - log.WithError(err).Error("Failed to update statuses") - } - }() + // TODO(evanlinjin): Probably not needed as this is called in mTp already. Need to double check. + //defer func() { + // // @evanlinjin: I used background context to ensure status is always updated. + // if _, err := dc.UpdateStatuses(context.Background(), &Status{ID: entry.ID, IsUp: err == nil}); err != nil { + // log.WithError(err).Error("Failed to update statuses") + // } + //}() // create signed entry and send it to responding visor. se, ok := NewSignedEntry(&entry, conn.LocalPK(), sk) diff --git a/pkg/transport/managed_transport.go b/pkg/transport/managed_transport.go index 58fa69f70..bb8bb48b1 100644 --- a/pkg/transport/managed_transport.go +++ b/pkg/transport/managed_transport.go @@ -33,7 +33,9 @@ var ( ) // ManagedTransport manages a direct line of communication between two visor nodes. -// It is made up of two underlying uni-directional connections. +// There is a single underlying connection between two edges. +// Initial dialing can be requested by either edge of the connection. +// However, only the edge with the least-significant public key can redial. type ManagedTransport struct { log *logging.Logger @@ -46,6 +48,10 @@ type ManagedTransport struct { dc DiscoveryClient ls LogStore + isUp bool // records last successful status update to discovery + isUpErr error // records whether the last status update was successful or not + isUpMux sync.Mutex + n *snet.Network conn *snet.Conn connCh chan struct{} @@ -75,7 +81,7 @@ func NewManagedTransport(n *snet.Network, dc DiscoveryClient, ls LogStore, rPK c } // Serve serves and manages the transport. -func (mt *ManagedTransport) Serve(readCh chan<- routing.Packet, done <-chan struct{}) { +func (mt *ManagedTransport) Serve(readCh chan<- routing.Packet) { defer mt.wg.Done() ctx, cancel := context.WithCancel(context.Background()) @@ -129,19 +135,20 @@ func (mt *ManagedTransport) Serve(readCh chan<- routing.Packet, done <-chan stru p, err := mt.readPacket() if err != nil { if err == ErrNotServing { - mt.log.WithError(err).Debugf("Failed to read packet") + mt.log.WithError(err).Debug("Failed to read packet. Returning...") return } mt.connMx.Lock() - mt.clearConn(ctx) + mt.clearConn() mt.connMx.Unlock() log.WithError(err). Warn("Failed to read packet.") continue } select { - case <-done: + case <-mt.done: return + case readCh <- p: } } @@ -158,20 +165,27 @@ func (mt *ManagedTransport) Serve(readCh chan<- routing.Packet, done <-chan stru if err := mt.ls.Record(mt.Entry.ID, mt.LogEntry); err != nil { mt.log.Warnf("Failed to record log entry: %s", err) } - } else { - // If there has not been any activity, ensure underlying 'write' tp is still up. - mt.connMx.Lock() - if mt.conn == nil { - if ok, err := mt.redial(ctx, false); err != nil { - mt.log.Warnf("failed to redial underlying connection (redial loop): %v", err) - if !ok { - mt.connMx.Unlock() - return - } + continue + } + + // Only least significant edge is responsible for redialing. + if !mt.isLeastSignificantEdge() { + continue + } + + // If there has not been any activity, ensure underlying 'write' tp is still up. + mt.connMx.Lock() + if mt.conn == nil { + if ok, err := mt.redial(ctx); err != nil { + mt.log.Warnf("failed to redial underlying connection (redial loop): %v", err) + if !ok { + mt.connMx.Unlock() + return } } - mt.connMx.Unlock() } + mt.connMx.Unlock() + } } } @@ -185,27 +199,25 @@ func (mt *ManagedTransport) isServing() bool { } } -// Close stops serving the transport. -func (mt *ManagedTransport) Close() { - if mt.close(true) { - // Update transport entry. - if _, err := mt.dc.UpdateStatuses(context.Background(), &Status{ID: mt.Entry.ID, IsUp: false}); err != nil { - mt.log.Warnf("Failed to update transport status: %s", err) - } - } +// Close implements io.Closer +// It also waits for transport to stop serving before it returns. +// It only returns an error if transport status update fails. +func (mt *ManagedTransport) Close() (err error) { + mt.close() + mt.wg.Wait() + + mt.isUpMux.Lock() + err = mt.isUpErr + mt.isUpMux.Unlock() + + return err } -// close closes the 'mt.done' once. +// close stops serving the transport and ensures that transport status is updated to DOWN. // It also waits until mt.Serve returns if specified. -func (mt *ManagedTransport) close(wait bool) (closed bool) { - mt.once.Do(func() { - close(mt.done) - closed = true - }) - if wait { - mt.wg.Wait() - } - return closed +func (mt *ManagedTransport) close() { + mt.once.Do(func() { close(mt.done) }) + _ = mt.updateStatus(false, 1) //nolint:errcheck } // Accept accepts a new underlying connection. @@ -218,23 +230,24 @@ func (mt *ManagedTransport) Accept(ctx context.Context, conn *snet.Conn) error { } if !mt.isServing() { - mt.log.Debugln(ErrNotServing.Error()) + mt.log.WithError(ErrNotServing).Debug() if err := conn.Close(); err != nil { - log.WithError(err).Warn("Failed to close connection") + mt.log.WithError(err). + Warn("Failed to close newly accepted connection.") } return ErrNotServing } ctx, cancel := context.WithTimeout(ctx, time.Second*20) defer cancel() - mt.log.Debugln("Performing handshake...") + mt.log.Debug("Performing handshake...") if err := MakeSettlementHS(false).Do(ctx, mt.dc, conn, mt.n.LocalSK()); err != nil { return fmt.Errorf("settlement handshake failed: %v", err) } - mt.log.Debugln("Setting TP conn...") + mt.log.Debug("Setting underlying connection...") - return mt.setIfConnNil(ctx, conn) + return mt.setConn(conn) } // Dial dials a new underlying connection. @@ -264,12 +277,12 @@ func (mt *ManagedTransport) dial(ctx context.Context) error { if err := MakeSettlementHS(true).Do(ctx, mt.dc, tp, mt.n.LocalSK()); err != nil { return fmt.Errorf("settlement handshake failed: %v", err) } - return mt.setIfConnNil(ctx, tp) + return mt.setConn(tp) } // redial only actually dials if transport is still registered in transport discovery. // The 'retry' output specifies whether we can retry dial on failure. -func (mt *ManagedTransport) redial(ctx context.Context, waitOnClose bool) (retry bool, err error) { +func (mt *ManagedTransport) redial(ctx context.Context) (retry bool, err error) { if !mt.isServing() { return false, ErrNotServing } @@ -284,17 +297,25 @@ func (mt *ManagedTransport) redial(ctx context.Context, waitOnClose bool) (retry // If the error is not temporary, it most likely means that the transport is no longer registered. // Hence, we should close the managed transport. - mt.close(waitOnClose) + mt.close() mt.log. WithError(err). Warn("Transport closed due to redial failure. Transport is likely no longer in discovery.") - return false, fmt.Errorf("transport is no longer registered in discovery: %v", err) + return false, ErrNotServing } return true, mt.dial(ctx) } +func (mt *ManagedTransport) isLeastSignificantEdge() bool { + return mt.Entry.EdgeIndex(mt.n.LocalPK()) == 0 +} + +/* + <<< UNDERLYING CONNECTION >>> +*/ + func (mt *ManagedTransport) getConn() *snet.Conn { if !mt.isServing() { return nil @@ -306,37 +327,40 @@ func (mt *ManagedTransport) getConn() *snet.Conn { return conn } -// sets conn if `mt.conn` is nil otherwise, closes the conn. -// TODO: Add logging here. -func (mt *ManagedTransport) setIfConnNil(ctx context.Context, conn *snet.Conn) error { +// setConn sets 'mt.conn' (the underlying connection). +// If 'mt.conn' is already occupied, close the newly introduced connection. +func (mt *ManagedTransport) setConn(newConn *snet.Conn) error { + if mt.conn != nil { - mt.log.Debugln("TP conn already exists, closing it") - if err := conn.Close(); err != nil { - log.WithError(err).Warn("Failed to close connection") + if mt.isLeastSignificantEdge() { + mt.log.Debug("Underlying conn already exists, closing new conn.") + if err := newConn.Close(); err != nil { + log.WithError(err).Warn("Failed to close new conn.") + } + return ErrConnAlreadyExists } - return ErrConnAlreadyExists - } - var err error - for i := 0; i < 3; i++ { - if _, err = mt.dc.UpdateStatuses(ctx, &Status{ID: mt.Entry.ID, IsUp: true}); err != nil { - mt.log.Warnf("Failed to update transport status: %s, retrying...", err) - continue + mt.log.Debug("Underlying conn already exists, closing old conn.") + if err := mt.conn.Close(); err != nil { + log.WithError(err).Warn("Failed to close old conn.") } - mt.log.Infoln("Status updated: UP") - break + mt.conn = nil } - mt.conn = conn + if err := mt.updateStatus(true, 1); err != nil { + return fmt.Errorf("failed to update transport status: %v", err) + } + + mt.conn = newConn select { case mt.connCh <- struct{}{}: - mt.log.Debugln("Sent signal to connCh") + mt.log.Debug("Sent signal to 'mt.connCh'.") default: } return nil } -func (mt *ManagedTransport) clearConn(ctx context.Context) { +func (mt *ManagedTransport) clearConn() { if !mt.isServing() { return } @@ -347,27 +371,84 @@ func (mt *ManagedTransport) clearConn(ctx context.Context) { } mt.conn = nil } - if _, err := mt.dc.UpdateStatuses(ctx, &Status{ID: mt.Entry.ID, IsUp: false}); err != nil { - mt.log.Warnf("Failed to update transport status: %s", err) - return + _ = mt.updateStatus(false, 1) //nolint:errcheck +} + +func (mt *ManagedTransport) updateStatus(isUp bool, tries int) (err error) { + if tries < 1 { + panic(fmt.Errorf("mt.updateStatus: invalid input: got tries=%d (want tries > 0)", tries)) + } + + // If not serving, we should update status to 'DOWN' and ensure 'updateStatus' returns error. + if !mt.isServing() { + isUp = false } - mt.log.Infoln("Status updated: DOWN") + defer func() { + if err == nil && !mt.isServing() { + err = ErrNotServing + } + }() + + mt.isUpMux.Lock() + + // If last update is the same as current, nothing needs to be done. + if mt.isUp == isUp { + mt.isUpMux.Unlock() + return nil + } + + for i := 0; i < tries; i++ { + // @evanlinjin: We don't pass context as we always want transport status to be updated. + if _, err = mt.dc.UpdateStatuses(context.Background(), &Status{ID: mt.Entry.ID, IsUp: isUp}); err != nil { + mt.log. + WithError(err). + WithField("retry", i < tries). + Warn("Failed to update transport status.") + continue + } + mt.log. + WithField("status", statusString(isUp)). + Info("Transport status updated.") + break + } + + mt.isUp = isUp + mt.isUpErr = err + mt.isUpMux.Unlock() + return err } +func statusString(isUp bool) string { + if isUp { + return "UP" + } + return "DOWN" +} + +/* + <<< PACKET MANAGEMENT >>> +*/ + // WritePacket writes a packet to the remote. func (mt *ManagedTransport) WritePacket(ctx context.Context, packet routing.Packet) error { mt.connMx.Lock() defer mt.connMx.Unlock() if mt.conn == nil { - if _, err := mt.redial(ctx, true); err != nil { + if _, err := mt.redial(ctx); err != nil { + + // TODO(evanlinjin): Determine whether we need to call 'mt.wg.Wait()' here. + if err == ErrNotServing { + mt.wg.Wait() + } + return fmt.Errorf("failed to redial underlying connection: %v", err) } } n, err := mt.conn.Write(packet) if err != nil { - mt.clearConn(ctx) + mt.clearConn() return err } if n > routing.PacketHeaderSize { @@ -413,7 +494,7 @@ func (mt *ManagedTransport) readPacket() (packet routing.Packet, err error) { } /* - TRANSPORT LOGGING + <<< TRANSPORT LOGGING >>> */ func (mt *ManagedTransport) logSent(b uint64) { diff --git a/pkg/transport/manager.go b/pkg/transport/manager.go index 807687c3d..15002c64d 100644 --- a/pkg/transport/manager.go +++ b/pkg/transport/manager.go @@ -178,17 +178,21 @@ func (tm *Manager) acceptTransport(ctx context.Context, lis *snet.Listener) erro if !ok { tm.Logger.Debugln("No TP found, creating new one") mTp = NewManagedTransport(tm.n, tm.Conf.DiscoveryClient, tm.Conf.LogStore, conn.RemotePK(), lis.Network()) - if err := mTp.Accept(ctx, conn); err != nil { - return err - } - go mTp.Serve(tm.readCh, tm.done) + go func() { + mTp.Serve(tm.readCh) + tm.mx.Lock() + delete(tm.tps, mTp.Entry.ID) + tm.mx.Unlock() + }() tm.tps[tpID] = mTp } else { tm.Logger.Debugln("TP found, accepting...") - if err := mTp.Accept(ctx, conn); err != nil { - return err - } + + } + + if err := mTp.Accept(ctx, conn); err != nil { + return err } tm.Logger.Infof("accepted tp: type(%s) remote(%s) tpID(%s) new(%v)", lis.Network(), conn.RemotePK(), tpID, !ok) @@ -250,14 +254,19 @@ func (tm *Manager) saveTransport(remote cipher.PubKey, netName string) (*Managed } mTp := NewManagedTransport(tm.n, tm.Conf.DiscoveryClient, tm.Conf.LogStore, remote, netName) - go mTp.Serve(tm.readCh, tm.done) + go func() { + mTp.Serve(tm.readCh) + tm.mx.Lock() + delete(tm.tps, mTp.Entry.ID) + tm.mx.Unlock() + }() tm.tps[tpID] = mTp tm.Logger.Infof("saved transport: remote(%s) type(%s) tpID(%s)", remote, netName, tpID) return mTp, nil } -// DeleteTransport deregisters the Transport of Transport ID in transport discovery and deletes it locally. +// DeleteTransport de-registers the Transport of Transport ID in transport discovery and deletes it locally. func (tm *Manager) DeleteTransport(id uuid.UUID) { tm.mx.Lock() defer tm.mx.Unlock() @@ -275,11 +284,11 @@ func (tm *Manager) DeleteTransport(id uuid.UUID) { if err := tm.Conf.DiscoveryClient.DeleteTransport(ctx, id); err != nil { tm.Logger.WithError(err).Warnf("Failed to deregister transport of ID %s from discovery.", id) } else { - tm.Logger.Infof("Deregistered transport of ID %s from discovery.", id) + tm.Logger.Infof("De-registered transport of ID %s from discovery.", id) } // Close underlying connection. - tp.close(true) + tp.close() delete(tm.tps, id) } } @@ -346,9 +355,7 @@ func (tm *Manager) close() { statuses := make([]*Status, 0, len(tm.tps)) for _, tr := range tm.tps { - if closed := tr.close(true); closed { - statuses = append(statuses[0:], &Status{ID: tr.Entry.ID, IsUp: false}) - } + tr.close() } if _, err := tm.Conf.DiscoveryClient.UpdateStatuses(context.Background(), statuses...); err != nil { tm.Logger.Warnf("failed to update transport statuses: %v", err)