Skip to content

Commit

Permalink
Use a single underlying connection for transports.
Browse files Browse the repository at this point in the history
  • Loading branch information
Evan Lin committed Aug 12, 2019
1 parent 8f14b0b commit 985f056
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 115 deletions.
4 changes: 4 additions & 0 deletions pkg/router/route_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ func (rm *routeManager) GetRule(routeID routing.RouteID) (routing.Rule, error) {
return nil, errors.New("unknown RouteID")
}

if len(rule) < 13 {
return nil, errors.New("corrupted rule")
}

if rule.Expiry().Before(time.Now()) {
return nil, errors.New("expired routing rule")
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (r *Router) Serve(ctx context.Context) error {
if err != nil {
return
}
if err := r.handlePacket(ctx, packet); err != nil {
if err := r.handlePacket(ctx, packet); err != nil { // TODO: race
if err == transport.ErrNotServing {
r.Logger.WithError(err).Warnf("Stopped serving Transport.")
return
Expand Down Expand Up @@ -214,7 +214,7 @@ func (r *Router) forwardPacket(ctx context.Context, payload []byte, rule routing
if err := tp.WritePacket(ctx, rule.RouteID(), payload); err != nil {
return err
}
r.Logger.Infof("Forwarded packet via Transport %s using rule %d", rule.TransportID(), rule.RouteID())
r.Logger.Infof("Forwarded packet via Transport %s using rule %d", rule.TransportID(), rule.RouteID()) // TODO: race TransportID()
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/routing/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (r Rule) Expiry() time.Time {
}

// Type returns type of a rule.
func (r Rule) Type() RuleType {
func (r Rule) Type() RuleType { // TODO: segfault
return RuleType(r[8])
}

Expand Down
204 changes: 100 additions & 104 deletions pkg/transport/managed_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@ const logWriteInterval = time.Second * 3
// Records number of managedTransports.
var mTpCount int32

// ErrNotServing is the error returned when a transport is no longer served.
var ErrNotServing = errors.New("transport is no longer being served")
var (
// ErrNotServing is the error returned when a transport is no longer served.
ErrNotServing = errors.New("transport is no longer being served")

// ErrConnAlreadyExists occurs when an underlying transport connection already exists.
ErrConnAlreadyExists = errors.New("underlying transport connection already exists")
)

// ManagedTransport manages a direct line of communication between two visor nodes.
// It is made up of two underlying uni-directional connections.
Expand All @@ -40,11 +45,9 @@ type ManagedTransport struct {
LogEntry *LogEntry
logUpdates uint32

readConn Transport
writeConn Transport
acceptCh chan Transport
acceptMx sync.RWMutex
dialMx sync.Mutex
conn Transport
connCh chan struct{}
connMx sync.Mutex

done chan struct{}
once sync.Once
Expand All @@ -62,15 +65,15 @@ func NewManagedTransport(fac Factory, dc DiscoveryClient, ls LogStore, rPK ciphe
ls: ls,
Entry: makeEntry(fac.Local(), rPK, dmsg.Type),
LogEntry: new(LogEntry),
acceptCh: make(chan Transport, 1),
connCh: make(chan struct{}, 1),
done: make(chan struct{}),
}
mt.wg.Add(2)
return mt
}

// Serve serves and manages the transport.
func (mt *ManagedTransport) Serve(readCh chan<- routing.Packet) {
func (mt *ManagedTransport) Serve(readCh chan<- routing.Packet, done <-chan struct{}) {
defer mt.wg.Done()

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -93,23 +96,14 @@ func (mt *ManagedTransport) Serve(readCh chan<- routing.Packet) {
}
}

// End reading connection.
mt.acceptMx.Lock()
close(mt.acceptCh)
mt.acceptCh = nil
if mt.readConn != nil {
_ = mt.readConn.Close() //nolint:errcheck
mt.readConn = nil
}
mt.acceptMx.Unlock()

// End writing connection.
mt.dialMx.Lock()
if mt.writeConn != nil {
_ = mt.writeConn.Close() //nolint:errcheck
mt.writeConn = nil
// End connection.
mt.connMx.Lock()
close(mt.connCh)
if mt.conn != nil {
_ = mt.conn.Close() //nolint:errcheck
mt.conn = nil
}
mt.dialMx.Unlock()
mt.connMx.Unlock()
}()

go func() {
Expand All @@ -123,13 +117,16 @@ func (mt *ManagedTransport) Serve(readCh chan<- routing.Packet) {
if err == ErrNotServing {
return
}
mt.connMx.Lock()
mt.clearConn(ctx)
mt.connMx.Unlock()
mt.log.Warnf("failed to read packet: %v", err)
continue
}
if !mt.isServing() {
return
select {
case <-done:
case readCh <- p:
}
readCh <- p // TODO: data race
}
}()

Expand All @@ -145,16 +142,13 @@ func (mt *ManagedTransport) Serve(readCh chan<- routing.Packet) {
}
} else {
// If there has not been any activity, ensure underlying 'write' tp is still up.
mt.dialMx.Lock()
if mt.writeConn == nil {
if !mt.isServing() {
return
}
mt.connMx.Lock()
if mt.conn == nil {
if err := mt.dial(ctx); err != nil {
mt.log.Warnf("failed to dial underlying 'write' transport: %v", err)
mt.log.Warnf("failed to redial underlying connection: %v", err)
}
}
mt.dialMx.Unlock()
mt.connMx.Unlock()
}
}
}
Expand Down Expand Up @@ -188,10 +182,10 @@ func (mt *ManagedTransport) close() (closed bool) {
return closed
}

// Accept accepts a new underlying 'read' connection (and close/replace the old one).
// Accept accepts a new underlying connection.
func (mt *ManagedTransport) Accept(ctx context.Context, tp Transport) error {
mt.acceptMx.RLock()
defer mt.acceptMx.RUnlock()
mt.connMx.Lock()
defer mt.connMx.Unlock()

if !mt.isServing() {
_ = tp.Close() //nolint:errcheck
Expand All @@ -204,125 +198,127 @@ func (mt *ManagedTransport) Accept(ctx context.Context, tp Transport) error {
return fmt.Errorf("settlement handshake failed: %v", err)
}

for {
select {
case oldTp, ok := <-mt.acceptCh:
if !ok {
return ErrNotServing
}
_ = oldTp.Close() //nolint:errcheck
default:
mt.acceptCh <- tp
return nil
}
}
return mt.setIfConnNil(ctx, tp)
}

// Dial dials a new underlying 'write' connection (and close/replace the old one).
// Dial dials a new underlying connection.
func (mt *ManagedTransport) Dial(ctx context.Context) error {
mt.dialMx.Lock()
defer mt.dialMx.Unlock()
mt.connMx.Lock()
defer mt.connMx.Unlock()

if !mt.isServing() {
return ErrNotServing
}

if mt.writeConn != nil {
_ = mt.writeConn.Close() //nolint:errcheck
if mt.conn != nil {
return nil
}
return mt.dial(ctx)
}

// TODO: Figure out where this fella is called.
func (mt *ManagedTransport) dial(ctx context.Context) error {
tp, err := mt.fac.Dial(ctx, mt.rPK)
if err != nil {
return err
}

ctx, cancel := context.WithTimeout(ctx, time.Second*20)
defer cancel()
if err := MakeSettlementHS(true).Do(ctx, mt.dc, tp, mt.lSK); err != nil {
return fmt.Errorf("settlement handshake failed: %v", err)
}
mt.writeConn = tp

return mt.setIfConnNil(ctx, tp)
}

func (mt *ManagedTransport) getConn() Transport {
mt.connMx.Lock()
conn := mt.conn
mt.connMx.Unlock()
return conn
}

// sets conn if `mt.conn` is nil otherwise, closes the conn.
// TODO: Add logging here.
func (mt *ManagedTransport) setIfConnNil(ctx context.Context, conn Transport) error {
if mt.conn != nil {
_ = conn.Close() //nolint:errcheck
return ErrConnAlreadyExists
}

if _, err := mt.dc.UpdateStatuses(ctx, &Status{ID: mt.Entry.ID, IsUp: true}); err != nil {
mt.log.Warnf("Failed to update transport status: %s", err)
}
mt.log.Infoln("Status updated: UP")
mt.conn = conn
select {
case mt.connCh <- struct{}{}:
default:
}
return nil
}

func (mt *ManagedTransport) clearConn(ctx context.Context) {
if _, err := mt.dc.UpdateStatuses(ctx, &Status{ID: mt.Entry.ID, IsUp: false}); err != nil {
mt.log.Warnf("Failed to update transport status: %s", err)
}
mt.log.Infoln("Status updated: DOWN")
mt.conn = nil
}

// WritePacket writes a packet to the remote.
func (mt *ManagedTransport) WritePacket(ctx context.Context, rtID routing.RouteID, payload []byte) (err error) {
mt.dialMx.Lock()
defer mt.dialMx.Unlock()
func (mt *ManagedTransport) WritePacket(ctx context.Context, rtID routing.RouteID, payload []byte) error {
mt.connMx.Lock()
defer mt.connMx.Unlock()

if !mt.isServing() {
return ErrNotServing
}

if mt.writeConn == nil { // TODO: race condition
if mt.conn == nil {
if err := mt.dial(ctx); err != nil {
return fmt.Errorf("failed to redial transport: %v", err)
return fmt.Errorf("failed to redial underlying connection: %v", err)
}
}

n, err := mt.writeConn.Write(routing.MakePacket(rtID, payload))
n, err := mt.conn.Write(routing.MakePacket(rtID, payload))
if err != nil {
if _, err := mt.dc.UpdateStatuses(context.Background(), &Status{ID: mt.Entry.ID, IsUp: false}); err != nil {
mt.log.Warnf("Failed to change transport status: %s", err)
}
mt.writeConn = nil
mt.clearConn(ctx)
return err
}
if n > 0 {
mt.logSent(uint64(len(payload)))
if n > 6 {
mt.logSent(uint64(n - 6))
}
return nil
}

func (mt *ManagedTransport) latestReadTp() (Transport, error) {
mt.acceptMx.RLock()
defer mt.acceptMx.RUnlock()

if mt.readConn != nil {
return mt.readConn, nil
}

select {
case <-mt.done:
return nil, ErrNotServing

case tp, ok := <-mt.acceptCh:
if !ok {
return nil, ErrNotServing
}
mt.readConn = tp
return mt.readConn, nil
}
}

// WARNING: Not thread safe.
func (mt *ManagedTransport) readPacket() (packet routing.Packet, err error) {
tp, err := mt.latestReadTp()
if err != nil {
return nil, err
}

defer func() {
if err != nil && mt.isServing() {
mt.acceptMx.RLock()
mt.readConn = nil
mt.acceptMx.RUnlock()
var conn Transport
for {
if conn = mt.getConn(); conn != nil {
break
}
}()
select {
case <-mt.done:
return nil, ErrNotServing
case <-mt.connCh:
}
}

h := make(routing.Packet, 6)
if _, err := io.ReadFull(tp, h); err != nil {
if _, err = io.ReadFull(conn, h); err != nil {
return nil, err
}

p := make([]byte, h.Size())
if _, err := io.ReadFull(tp, p); err != nil {
if _, err = io.ReadFull(conn, p); err != nil {
return nil, err
}
packet = append(h, p...)
mt.logRecv(uint64(len(p)))
if n := len(packet); n > 6 {
mt.logRecv(uint64(n - 6))
}
mt.log.Infof("recv packet: rtID(%d) size(%d)", packet.RouteID(), packet.Size())
return packet, nil
}
Expand Down
Loading

0 comments on commit 985f056

Please sign in to comment.