Skip to content

Commit

Permalink
Fix route group closing on timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
Darkren committed Feb 11, 2020
1 parent 8505c2e commit f7c3fdf
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 16 deletions.
2 changes: 2 additions & 0 deletions pkg/app/appserver/rpc_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@ func (r *RPCGateway) Read(req *ReadReq, resp *ReadResp) error {
copy(resp.B, buf[:resp.N])
}

fmt.Printf("ERROR READING FROM APP CONN SERVER SIDE: %v\n", err)

resp.Err = ioErrToRPCIOErr(err)

// avoid error in RPC pipeline, error is included in response body
Expand Down
1 change: 1 addition & 0 deletions pkg/app/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Conn struct {

// Read reads from connection.
func (c *Conn) Read(b []byte) (int, error) {
fmt.Println("LOCKIN ON READ APP CLIENT CONN")
n, err := c.rpc.Read(c.id, b)
if err == io.EOF {
fmt.Println("EOF READING FROM APP CONN")
Expand Down
39 changes: 23 additions & 16 deletions pkg/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,20 @@ func (r *router) ReserveKeys(n int) ([]routing.RouteID, error) {
return ids, err
}

func (r *router) popRouteGroup(desc routing.RouteDescriptor) (*RouteGroup, bool) {
r.mx.Lock()
defer r.mx.Unlock()

rg, ok := r.rgs[desc]
if !ok {
return nil, false
}

delete(r.rgs, desc)

return rg, true
}

func (r *router) routeGroup(desc routing.RouteDescriptor) (*RouteGroup, bool) {
r.mx.Lock()
defer r.mx.Unlock()
Expand Down Expand Up @@ -702,40 +716,33 @@ func (r *router) rulesGCLoop() {
func (r *router) rulesGC() {
removedRules := r.rt.CollectGarbage()

r.logger.Debugf("Removed %d rules", len(removedRules))
r.logger.Infof("Removed %d rules", len(removedRules))

for _, rule := range removedRules {
// we need to process only consume rules, cause we don't
// really care about the other ones, other rules removal
// doesn't affect our work here
if rule.Type() == routing.RuleConsume {
cnsmRuleDesc := rule.RouteDescriptor()
r.logger.Debugf("Removed consume rule with desc %s", &cnsmRuleDesc)
fwdRuleDesc := cnsmRuleDesc.Invert()
rg, ok := r.routeGroup(fwdRuleDesc)
r.logger.Infof("Removed consume rule with desc %s", &cnsmRuleDesc)

rg, ok := r.popRouteGroup(cnsmRuleDesc)
if !ok {
r.logger.Debugln("Couldn't remove route group after consume rule expired: route group not found")
r.logger.Infoln("Couldn't remove route group after consume rule expired: route group not found")
continue
}
r.logger.Debugln("Got route group for removed consume rule with desc %s", &cnsmRuleDesc)

r.removeRouteGroup(fwdRuleDesc)

r.logger.Debugln("Removed route group for removed consume rule with desc %s", &cnsmRuleDesc)
r.logger.Infoln("Removed route group for removed consume rule with desc %s", &cnsmRuleDesc)

if !rg.isClosed() {
r.logger.Debugln("Closing route group")
// instantly signal to route group that remote is closed, so that we
// won't need to initiate close loop in the network
rg.setRemoteClosed()
r.logger.Debugln("Set remote closed for route group")
r.logger.Infoln("Closing route group")
if err := rg.Close(); err != nil {
r.logger.Errorf("Error closing route group during rule GC: %v", err)
} else {
r.logger.Debugln("Successfully closed route group")
r.logger.Infoln("Successfully closed route group")
}
} else {
r.logger.Debugln("Route group is ALREADY closed")
r.logger.Infoln("Route group is ALREADY closed")
}
}
}
Expand Down

0 comments on commit f7c3fdf

Please sign in to comment.