From f7c3fdf7970b001d0991b5f7b4460a7879ec1a03 Mon Sep 17 00:00:00 2001 From: Sir Darkrengarius Date: Tue, 11 Feb 2020 14:57:36 +0300 Subject: [PATCH] Fix route group closing on timeout --- pkg/app/appserver/rpc_gateway.go | 2 ++ pkg/app/conn.go | 1 + pkg/router/router.go | 39 +++++++++++++++++++------------- 3 files changed, 26 insertions(+), 16 deletions(-) diff --git a/pkg/app/appserver/rpc_gateway.go b/pkg/app/appserver/rpc_gateway.go index cc28be610b..4f4c0a89a2 100644 --- a/pkg/app/appserver/rpc_gateway.go +++ b/pkg/app/appserver/rpc_gateway.go @@ -255,6 +255,8 @@ func (r *RPCGateway) Read(req *ReadReq, resp *ReadResp) error { copy(resp.B, buf[:resp.N]) } + fmt.Printf("ERROR READING FROM APP CONN SERVER SIDE: %v\n", err) + resp.Err = ioErrToRPCIOErr(err) // avoid error in RPC pipeline, error is included in response body diff --git a/pkg/app/conn.go b/pkg/app/conn.go index b0b15ad3f4..f190c284e7 100644 --- a/pkg/app/conn.go +++ b/pkg/app/conn.go @@ -24,6 +24,7 @@ type Conn struct { // Read reads from connection. func (c *Conn) Read(b []byte) (int, error) { + fmt.Println("LOCKIN ON READ APP CLIENT CONN") n, err := c.rpc.Read(c.id, b) if err == io.EOF { fmt.Println("EOF READING FROM APP CONN") diff --git a/pkg/router/router.go b/pkg/router/router.go index 1eaae4a18c..dbba6eec45 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -627,6 +627,20 @@ func (r *router) ReserveKeys(n int) ([]routing.RouteID, error) { return ids, err } +func (r *router) popRouteGroup(desc routing.RouteDescriptor) (*RouteGroup, bool) { + r.mx.Lock() + defer r.mx.Unlock() + + rg, ok := r.rgs[desc] + if !ok { + return nil, false + } + + delete(r.rgs, desc) + + return rg, true +} + func (r *router) routeGroup(desc routing.RouteDescriptor) (*RouteGroup, bool) { r.mx.Lock() defer r.mx.Unlock() @@ -702,7 +716,7 @@ func (r *router) rulesGCLoop() { func (r *router) rulesGC() { removedRules := r.rt.CollectGarbage() - r.logger.Debugf("Removed %d rules", len(removedRules)) + r.logger.Infof("Removed %d rules", len(removedRules)) for _, rule := range removedRules { // we need to process only consume rules, cause we don't @@ -710,32 +724,25 @@ func (r *router) rulesGC() { // doesn't affect our work here if rule.Type() == routing.RuleConsume { cnsmRuleDesc := rule.RouteDescriptor() - r.logger.Debugf("Removed consume rule with desc %s", &cnsmRuleDesc) - fwdRuleDesc := cnsmRuleDesc.Invert() - rg, ok := r.routeGroup(fwdRuleDesc) + r.logger.Infof("Removed consume rule with desc %s", &cnsmRuleDesc) + + rg, ok := r.popRouteGroup(cnsmRuleDesc) if !ok { - r.logger.Debugln("Couldn't remove route group after consume rule expired: route group not found") + r.logger.Infoln("Couldn't remove route group after consume rule expired: route group not found") continue } - r.logger.Debugln("Got route group for removed consume rule with desc %s", &cnsmRuleDesc) - - r.removeRouteGroup(fwdRuleDesc) - r.logger.Debugln("Removed route group for removed consume rule with desc %s", &cnsmRuleDesc) + r.logger.Infoln("Removed route group for removed consume rule with desc %s", &cnsmRuleDesc) if !rg.isClosed() { - r.logger.Debugln("Closing route group") - // instantly signal to route group that remote is closed, so that we - // won't need to initiate close loop in the network - rg.setRemoteClosed() - r.logger.Debugln("Set remote closed for route group") + r.logger.Infoln("Closing route group") if err := rg.Close(); err != nil { r.logger.Errorf("Error closing route group during rule GC: %v", err) } else { - r.logger.Debugln("Successfully closed route group") + r.logger.Infoln("Successfully closed route group") } } else { - r.logger.Debugln("Route group is ALREADY closed") + r.logger.Infoln("Route group is ALREADY closed") } } }