Skip to content

Commit

Permalink
Improve code quality
Browse files Browse the repository at this point in the history
  • Loading branch information
nkryuchkov committed Dec 13, 2019
1 parent cb815a0 commit 3750f35
Show file tree
Hide file tree
Showing 5 changed files with 236 additions and 208 deletions.
16 changes: 16 additions & 0 deletions pkg/routing/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,22 @@ type BidirectionalRoute struct {
Reverse Path
}

func (br *BidirectionalRoute) ForwardAndReverse() (forward, reverse Route) {
forwardRoute := Route{
Desc: br.Desc,
Path: br.Forward,
KeepAlive: br.KeepAlive,
}

reverseRoute := Route{
Desc: br.Desc.Invert(),
Path: br.Reverse,
KeepAlive: br.KeepAlive,
}

return forwardRoute, reverseRoute
}

// EdgeRules represents edge forward and reverse rules. Edge rules are forward and consume rules.
type EdgeRules struct {
Desc RouteDescriptor
Expand Down
10 changes: 5 additions & 5 deletions pkg/routing/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,17 +366,17 @@ func (r Rule) Summary() *RuleSummary {
}

// ConsumeRule constructs a new Consume rule.
func ConsumeRule(keepAlive time.Duration, key RouteID, localPK, remotePK cipher.PubKey, localPort, remotePort Port) Rule {
func ConsumeRule(keepAlive time.Duration, key RouteID, lPK, rPK cipher.PubKey, lPort, rPort Port) Rule {
rule := Rule(make([]byte, RuleHeaderSize+routeDescriptorSize))

rule.setKeepAlive(keepAlive)
rule.setType(RuleConsume)
rule.SetKeyRouteID(key)

rule.setSrcPK(localPK)
rule.setDstPK(remotePK)
rule.setDstPort(remotePort)
rule.setSrcPort(localPort)
rule.setSrcPK(lPK)
rule.setDstPK(rPK)
rule.setDstPort(rPort)
rule.setSrcPort(lPort)

return rule
}
Expand Down
64 changes: 30 additions & 34 deletions pkg/setup/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,7 @@ func (sn *Node) handleDialRouteGroup(ctx context.Context, route routing.Bidirect
return routing.EdgeRules{}, err
}

forwardRoute := routing.Route{
Desc: route.Desc,
Path: route.Forward,
KeepAlive: route.KeepAlive,
}
reverseRoute := routing.Route{
Desc: route.Desc.Invert(),
Path: route.Reverse,
KeepAlive: route.KeepAlive,
}
forwardRoute, reverseRoute := route.ForwardAndReverse()

// Determine the rules to send to visors using loop descriptor and reserved route IDs.
forwardRules, consumeRules, intermediaryRules, err := idr.GenerateRules(forwardRoute, reverseRoute)
Expand All @@ -132,30 +123,7 @@ func (sn *Node) handleDialRouteGroup(ctx context.Context, route routing.Bidirect
sn.logger.Infof("generated consume rules: %v", consumeRules)
sn.logger.Infof("generated intermediary rules: %v", intermediaryRules)

errCh := make(chan error, len(intermediaryRules))

var wg sync.WaitGroup

for pk, rules := range intermediaryRules {
pk, rules := pk, rules

sn.logger.WithField("remote", pk).Info("Adding rules to intermediary node")

wg.Add(1)

go func() {
defer wg.Done()
if _, err := routerclient.AddIntermediaryRules(ctx, sn.logger, sn.dmsgC, pk, rules); err != nil {
sn.logger.WithField("remote", pk).WithError(err).Warn("failed to add rules")
errCh <- err
}
}()
}

wg.Wait()
close(errCh)

if err := finalError(len(intermediaryRules), errCh); err != nil {
if err := sn.addIntermediaryRules(ctx, intermediaryRules); err != nil {
return routing.EdgeRules{}, err
}

Expand All @@ -164,6 +132,7 @@ func (sn *Node) handleDialRouteGroup(ctx context.Context, route routing.Bidirect
Forward: forwardRules[route.Desc.SrcPK()],
Reverse: consumeRules[route.Desc.SrcPK()],
}

respRouteRules := routing.EdgeRules{
Desc: forwardRoute.Desc,
Forward: forwardRules[route.Desc.DstPK()],
Expand All @@ -184,6 +153,33 @@ func (sn *Node) handleDialRouteGroup(ctx context.Context, route routing.Bidirect
return initRouteRules, nil
}

func (sn *Node) addIntermediaryRules(ctx context.Context, intermediaryRules RulesMap) error {
errCh := make(chan error, len(intermediaryRules))

var wg sync.WaitGroup

for pk, rules := range intermediaryRules {
pk, rules := pk, rules

sn.logger.WithField("remote", pk).Info("Adding rules to intermediary node")

wg.Add(1)

go func() {
defer wg.Done()
if _, err := routerclient.AddIntermediaryRules(ctx, sn.logger, sn.dmsgC, pk, rules); err != nil {
sn.logger.WithField("remote", pk).WithError(err).Warn("failed to add rules")
errCh <- err
}
}()
}

wg.Wait()
close(errCh)

return finalError(len(intermediaryRules), errCh)
}

func (sn *Node) reserveRouteIDs(ctx context.Context, route routing.BidirectionalRoute) (*idReservoir, error) {
reservoir, total := newIDReservoir(route.Forward, route.Reverse)
sn.logger.Infof("There are %d route IDs to reserve.", total)
Expand Down
Loading

0 comments on commit 3750f35

Please sign in to comment.