From 08d7c1e4af30bf42691653b8a0d07ef85b0aced4 Mon Sep 17 00:00:00 2001 From: Sir Darkrengarius Date: Thu, 6 Feb 2020 20:31:43 +0300 Subject: [PATCH] Start fixing --- pkg/router/route_group.go | 60 +++++++++++++++++++++++++++++++++++++-- pkg/router/router.go | 9 ++---- 2 files changed, 60 insertions(+), 9 deletions(-) diff --git a/pkg/router/route_group.go b/pkg/router/route_group.go index 6a3ebfd1ad..a88b58c9ee 100644 --- a/pkg/router/route_group.go +++ b/pkg/router/route_group.go @@ -76,8 +76,9 @@ type RouteGroup struct { // - fwd/tps should have the same number of elements. // - the corresponding element of tps should have tpID of the corresponding rule in fwd. // - fwd references 'ForwardRule' rules for writes. - fwd []routing.Rule // forward rules (for writing) - rvs []routing.Rule // reverse rules (for reading) + fwd []routing.Rule // forward rules (for writing) + rvs []routing.Rule // reverse rules (for reading) + rvsRouteLastActivity time.Time lastSent int64 @@ -123,6 +124,7 @@ func NewRouteGroup(cfg *RouteGroupConfig, rt routing.Table, desc routing.RouteDe } go rg.keepAliveLoop(cfg.KeepAliveInterval) + go rg.checkRouteIsAliveLoop() return rg } @@ -427,6 +429,33 @@ func (rg *RouteGroup) close(code routing.CloseCode) error { return nil } +func (rg *RouteGroup) handlePacket(packet routing.Packet) error { + rg.mu.Lock() + // no need to check rule expiry, since router won't allow packet in + // in case it's expired, so simply update the activity + rg.rvsRouteLastActivity = time.Now() + rg.mu.Unlock() + + switch packet.Type() { + case routing.ClosePacket: + return rg.handleClosePacket(routing.CloseCode(packet.Payload()[0])) + case routing.DataPacket: + return rg.handleDataPacket(packet) + } + + return nil +} + +func (rg *RouteGroup) handleDataPacket(packet routing.Packet) error { + select { + case <-rg.closed: + return io.ErrClosedPipe + case rg.readCh <- packet.Payload(): + } + + return nil +} + func (rg *RouteGroup) handleClosePacket(code routing.CloseCode) error { rg.logger.Infof("Got close packet with code %d", code) @@ -471,6 +500,33 @@ func (rg *RouteGroup) waitForCloseLoop(waitTimeout time.Duration) error { return nil } +func (rg *RouteGroup) checkRouteIsAliveLoop() { + ticker := time.NewTicker(routing.DefaultGCInterval) + defer ticker.Stop() + + for range ticker.C { + rg.mu.Lock() + rule, err := rg.rule() + if err != nil { + rg.mu.Unlock() + rg.logger.Errorf("Error getting rule to check activity: %v", err) + continue + } + + idling := time.Since(rg.rvsRouteLastActivity) + keepAlive := rule.KeepAlive() + + if idling > keepAlive { + // rule is timed out, remote closed, stop this loop + + rg.mu.Unlock() + return + } + + rg.mu.Unlock() + } +} + func (rg *RouteGroup) isCloseInitiator() bool { return atomic.LoadInt32(&rg.closeInitiated) == 1 } diff --git a/pkg/router/router.go b/pkg/router/router.go index 71e98906d3..9157009d92 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -368,12 +368,7 @@ func (r *router) handleDataPacket(ctx context.Context, packet routing.Packet) er r.logger.Infof("Got new remote packet with route ID %d. Using rule: %s", packet.RouteID(), rule) r.logger.Infof("Packet contents (len = %d): %v", len(packet.Payload()), packet.Payload()) - select { - case <-rg.closed: - return io.ErrClosedPipe - case rg.readCh <- packet.Payload(): - return nil - } + return rg.handlePacket(packet) } func (r *router) handleClosePacket(ctx context.Context, packet routing.Packet) error { @@ -421,7 +416,7 @@ func (r *router) handleClosePacket(ctx context.Context, packet routing.Packet) e return io.ErrClosedPipe } - if err := rg.handleClosePacket(closeCode); err != nil { + if err := rg.handlePacket(packet); err != nil { return fmt.Errorf("error handling close packet with code %d by route group with descriptor %s: %v", closeCode, &desc, err) }