Skip to content

Commit

Permalink
Start fixing
Browse files Browse the repository at this point in the history
  • Loading branch information
Darkren committed Feb 6, 2020
1 parent 5a5f896 commit 08d7c1e
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 9 deletions.
60 changes: 58 additions & 2 deletions pkg/router/route_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,9 @@ type RouteGroup struct {
// - fwd/tps should have the same number of elements.
// - the corresponding element of tps should have tpID of the corresponding rule in fwd.
// - fwd references 'ForwardRule' rules for writes.
fwd []routing.Rule // forward rules (for writing)
rvs []routing.Rule // reverse rules (for reading)
fwd []routing.Rule // forward rules (for writing)
rvs []routing.Rule // reverse rules (for reading)
rvsRouteLastActivity time.Time

lastSent int64

Expand Down Expand Up @@ -123,6 +124,7 @@ func NewRouteGroup(cfg *RouteGroupConfig, rt routing.Table, desc routing.RouteDe
}

go rg.keepAliveLoop(cfg.KeepAliveInterval)
go rg.checkRouteIsAliveLoop()

return rg
}
Expand Down Expand Up @@ -427,6 +429,33 @@ func (rg *RouteGroup) close(code routing.CloseCode) error {
return nil
}

func (rg *RouteGroup) handlePacket(packet routing.Packet) error {
rg.mu.Lock()
// no need to check rule expiry, since router won't allow packet in
// in case it's expired, so simply update the activity
rg.rvsRouteLastActivity = time.Now()
rg.mu.Unlock()

switch packet.Type() {
case routing.ClosePacket:
return rg.handleClosePacket(routing.CloseCode(packet.Payload()[0]))
case routing.DataPacket:
return rg.handleDataPacket(packet)
}

return nil
}

func (rg *RouteGroup) handleDataPacket(packet routing.Packet) error {
select {
case <-rg.closed:
return io.ErrClosedPipe
case rg.readCh <- packet.Payload():
}

return nil
}

func (rg *RouteGroup) handleClosePacket(code routing.CloseCode) error {
rg.logger.Infof("Got close packet with code %d", code)

Expand Down Expand Up @@ -471,6 +500,33 @@ func (rg *RouteGroup) waitForCloseLoop(waitTimeout time.Duration) error {
return nil
}

func (rg *RouteGroup) checkRouteIsAliveLoop() {
ticker := time.NewTicker(routing.DefaultGCInterval)
defer ticker.Stop()

for range ticker.C {
rg.mu.Lock()
rule, err := rg.rule()
if err != nil {
rg.mu.Unlock()
rg.logger.Errorf("Error getting rule to check activity: %v", err)
continue
}

idling := time.Since(rg.rvsRouteLastActivity)
keepAlive := rule.KeepAlive()

if idling > keepAlive {
// rule is timed out, remote closed, stop this loop

rg.mu.Unlock()
return
}

rg.mu.Unlock()
}
}

func (rg *RouteGroup) isCloseInitiator() bool {
return atomic.LoadInt32(&rg.closeInitiated) == 1
}
Expand Down
9 changes: 2 additions & 7 deletions pkg/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,12 +368,7 @@ func (r *router) handleDataPacket(ctx context.Context, packet routing.Packet) er
r.logger.Infof("Got new remote packet with route ID %d. Using rule: %s", packet.RouteID(), rule)
r.logger.Infof("Packet contents (len = %d): %v", len(packet.Payload()), packet.Payload())

select {
case <-rg.closed:
return io.ErrClosedPipe
case rg.readCh <- packet.Payload():
return nil
}
return rg.handlePacket(packet)
}

func (r *router) handleClosePacket(ctx context.Context, packet routing.Packet) error {
Expand Down Expand Up @@ -421,7 +416,7 @@ func (r *router) handleClosePacket(ctx context.Context, packet routing.Packet) e
return io.ErrClosedPipe
}

if err := rg.handleClosePacket(closeCode); err != nil {
if err := rg.handlePacket(packet); err != nil {
return fmt.Errorf("error handling close packet with code %d by route group with descriptor %s: %v",
closeCode, &desc, err)
}
Expand Down

0 comments on commit 08d7c1e

Please sign in to comment.