diff --git a/pkg/router/route_group_test.go b/pkg/router/route_group_test.go index 46d9f3620..d8a3a42c9 100644 --- a/pkg/router/route_group_test.go +++ b/pkg/router/route_group_test.go @@ -465,21 +465,33 @@ func pushPackets(ctx context.Context, t *testing.T, from *transport.Manager, to panic("malformed packet") } - to.readChMu.Lock() - select { - case <-ctx.Done(): - to.readChMu.Unlock() + if safeSend(to, ctx, payload) { return - case <-to.done: - to.readChMu.Unlock() - return - case to.readCh <- payload: - to.readChMu.Unlock() } } } } +func safeSend(to *RouteGroup, ctx context.Context, payload []byte) (interrupt bool) { + defer func() { + if r := recover(); r != nil { + interrupt = r != "send on closed channel" // TODO: come up with idea how to handle this case + } + }() + + to.readChMu.Lock() + defer to.readChMu.Unlock() + + select { + case <-ctx.Done(): + return true + case <-to.done: + return true + case to.readCh <- payload: + return false + } +} + func createRouteGroup() *RouteGroup { rt := routing.NewTable(routing.DefaultConfig()) diff --git a/pkg/router/router.go b/pkg/router/router.go index fb2e5f4c8..efbe6809e 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -330,7 +330,8 @@ func (r *router) handleDataPacket(ctx context.Context, packet routing.Packet) er return err } - if rule.Type() == routing.RuleIntermediaryForward { + switch rule.Type() { + case routing.RuleForward, routing.RuleIntermediaryForward: r.logger.Infoln("Handling intermediary packet") return r.forwardPacket(ctx, packet.Payload(), rule) } @@ -352,11 +353,6 @@ func (r *router) handleDataPacket(ctx context.Context, packet routing.Packet) er r.logger.Infof("Got new remote packet with route ID %d. Using rule: %s", packet.RouteID(), rule) r.logger.Infof("Packet contents: %s", packet.Payload()) - // TODO: maybe move this up along the execution flow? `rg` doesn't seem to be necessary here - if t := rule.Type(); t == routing.RuleForward { - return r.forwardPacket(ctx, packet.Payload(), rule) - } - if rg.isClosed() { r.logger.Infoln("RG IS CLOSED") return io.ErrClosedPipe diff --git a/pkg/router/router_test.go b/pkg/router/router_test.go index 171b5e705..034455fa7 100644 --- a/pkg/router/router_test.go +++ b/pkg/router/router_test.go @@ -218,10 +218,13 @@ func TestRouter_handleTransportPacket(t *testing.T) { fwdRule := routing.ForwardRule(1*time.Hour, dstRtIDs[0], routing.RouteID(7), tp1.Entry.ID, keys[0].PK, keys[1].PK, 0, 0) cnsmRule := routing.ConsumeRule(1*time.Hour, dstRtIDs[1], keys[1].PK, keys[0].PK, 0, 0) + err = r1.rt.SaveRule(fwdRule) require.NoError(t, err) + err = r1.rt.SaveRule(cnsmRule) require.NoError(t, err) + fwdRtDesc := fwdRule.RouteDescriptor() r1.saveRouteGroupRules(routing.EdgeRules{ Desc: fwdRtDesc.Invert(),