Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[M2] Fix default route group timeout #89

Merged
merged 25 commits into from
Jan 15, 2020
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
00f4817
Remove custom timeout timer
Darkren Dec 23, 2019
3cfb825
Fix `writePacketAsync` to handle context properly
Darkren Dec 23, 2019
61b5506
Merge branch 'milestone2' of https://github.com/SkycoinProject/skywir…
Darkren Dec 25, 2019
f45ffec
Add proper handling of `Close` packets by the router
Darkren Dec 25, 2019
e42bc01
Start to implement Close loop
Darkren Dec 25, 2019
5fe03b0
Add proper handling of close packets
Darkren Dec 26, 2019
fceed7a
Add proper EOF return from route group's Read
Darkren Dec 26, 2019
0806d9a
Add closeInitiated flag initialization
Darkren Dec 26, 2019
6dd7c8b
Fix router close packet handling tests
Darkren Dec 27, 2019
972c5ff
Fix close test of route group
Darkren Dec 27, 2019
c70ddf2
Update vendor
Darkren Dec 30, 2019
f21db0e
Rewrite route group's `TestConn`
Darkren Jan 2, 2020
66b3f6b
Fixing route group test conn
Darkren Jan 6, 2020
8c966c9
Almost fix basic IO
Darkren Jan 6, 2020
e489de0
Finally fix the basic io subtest
Darkren Jan 7, 2020
49671fc
Fix basic io?
Darkren Jan 7, 2020
3b7975f
fix
Darkren Jan 9, 2020
4571bd6
Cleanup
Darkren Jan 9, 2020
1dbf3c0
Merge branch 'milestone2' of https://github.com/SkycoinProject/skywir…
Darkren Jan 9, 2020
ae86dae
Fix linter issues, update vendor
Darkren Jan 14, 2020
431885b
Up go version for travis
Darkren Jan 14, 2020
1808b2b
Downgrade go version for travis, remove go 1.13 error wrapping
Darkren Jan 14, 2020
ad399c2
Comment out failing tests
Darkren Jan 14, 2020
145aaa2
Fix visor's `TestListApps`
Darkren Jan 15, 2020
de036b4
Remove finished TODO
Darkren Jan 15, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ require (
golang.org/x/net v0.0.0-20191204025024-5ee1b9f4859a
)

//replace github.com/SkycoinProject/dmsg => ../dmsg
replace github.com/SkycoinProject/dmsg => ../dmsg
Darkren marked this conversation as resolved.
Show resolved Hide resolved
185 changes: 127 additions & 58 deletions pkg/router/route_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (

const (
defaultRouteGroupKeepAliveInterval = 1 * time.Minute
defaultRouteGroupIOTimeout = 3 * time.Second
defaultReadChBufSize = 1024
closeRoutineTimeout = 2 * time.Second
)

var (
Expand All @@ -46,15 +46,13 @@ func (timeoutError) Temporary() bool { return true }
type RouteGroupConfig struct {
ReadChBufSize int
KeepAliveInterval time.Duration
IOTimeout time.Duration
}

// DefaultRouteGroupConfig returns default RouteGroup config.
// Used by default if config is nil.
func DefaultRouteGroupConfig() *RouteGroupConfig {
return &RouteGroupConfig{
KeepAliveInterval: defaultRouteGroupKeepAliveInterval,
IOTimeout: defaultRouteGroupIOTimeout,
ReadChBufSize: defaultReadChBufSize,
}
}
Expand Down Expand Up @@ -94,6 +92,11 @@ type RouteGroup struct {

readDeadline deadline.PipeDeadline
writeDeadline deadline.PipeDeadline

// used as a bool to indicate if this particular route group initiated close loop
closeInitiated int32
// used to wait for all the `Close` packets to run through the loop and come back
closeDone sync.WaitGroup
}

// NewRouteGroup creates a new RouteGroup.
Expand Down Expand Up @@ -149,16 +152,19 @@ func (rg *RouteGroup) Read(p []byte) (n int, err error) {
}
rg.mu.Unlock()

timeout := time.NewTimer(rg.cfg.IOTimeout)
defer timeout.Stop()

var data []byte
var (
data []byte
ok bool
)
select {
case <-rg.readDeadline.Wait():
return 0, timeoutError{}
case <-timeout.C:
case data, ok = <-rg.readCh:
}

if !ok {
// route group got closed
return 0, io.EOF
case data = <-rg.readCh:
}

rg.mu.Lock()
Expand Down Expand Up @@ -197,17 +203,14 @@ func (rg *RouteGroup) Write(p []byte) (n int, err error) {

packet := routing.MakeDataPacket(rule.KeyRouteID(), p)

errCh, cancel := rg.writePacketAsync(tp, packet)
defer cancel()
ctx, cancel := context.WithCancel(context.Background())

timeout := time.NewTimer(rg.cfg.IOTimeout)
defer timeout.Stop()
errCh := rg.writePacketAsync(ctx, tp, packet)
defer cancel()

select {
case <-rg.writeDeadline.Wait():
return 0, timeoutError{}
case <-timeout.C:
return 0, io.EOF
case err := <-errCh:
if err != nil {
return 0, err
Expand All @@ -219,20 +222,14 @@ func (rg *RouteGroup) Write(p []byte) (n int, err error) {
}
}

func (rg *RouteGroup) writePacketAsync(tp *transport.ManagedTransport, packet routing.Packet) (chan error, func()) {
ctx, cancel := context.WithCancel(context.Background())

func (rg *RouteGroup) writePacketAsync(ctx context.Context, tp *transport.ManagedTransport, packet routing.Packet) chan error {
errCh := make(chan error)
evanlinjin marked this conversation as resolved.
Show resolved Hide resolved

go func() {
select {
case <-ctx.Done():
case errCh <- tp.WritePacket(context.Background(), packet):
}
errCh <- tp.WritePacket(ctx, packet)
close(errCh)
}()

return errCh, cancel
return errCh
}

func (rg *RouteGroup) rule() (routing.Rule, error) {
Expand All @@ -259,42 +256,11 @@ func (rg *RouteGroup) tp() (*transport.ManagedTransport, error) {
return tp, nil
}

// Close closes a RouteGroup:
// - Send Close packet for all ForwardRules.
// - Delete all rules (ForwardRules and ConsumeRules) from routing table.
// - Close all go channels.
// Close closes a RouteGroup.
func (rg *RouteGroup) Close() error {
rg.mu.Lock()
defer rg.mu.Unlock()

if len(rg.fwd) != len(rg.tps) {
return ErrRuleTransportMismatch
}

for i := 0; i < len(rg.tps); i++ {
packet := routing.MakeClosePacket(rg.fwd[i].KeyRouteID(), routing.CloseRequested)
if err := rg.tps[i].WritePacket(context.Background(), packet); err != nil {
return err
}
}

rules := rg.rt.RulesWithDesc(rg.desc)
routeIDs := make([]routing.RouteID, 0, len(rules))

for _, rule := range rules {
routeIDs = append(routeIDs, rule.KeyRouteID())
}

rg.rt.DelRules(routeIDs)
atomic.StoreInt32(&rg.closeInitiated, 1)

rg.once.Do(func() {
close(rg.done)
rg.readChMu.Lock()
close(rg.readCh)
rg.readChMu.Unlock()
})

return nil
return rg.close(routing.CloseRequested)
}

// LocalAddr returns destination address of underlying RouteDescriptor.
Expand Down Expand Up @@ -369,6 +335,109 @@ func (rg *RouteGroup) sendKeepAlive() error {
return nil
}

// Close closes a RouteGroup with the specified close `code`:
// - Send Close packet for all ForwardRules with the code `code`.
// - Delete all rules (ForwardRules and ConsumeRules) from routing table.
// - Close all go channels.
func (rg *RouteGroup) close(code routing.CloseCode) error {
rg.mu.Lock()
defer rg.mu.Unlock()

if len(rg.fwd) != len(rg.tps) {
return ErrRuleTransportMismatch
}

closeInitiator := rg.isCloseInitiator()

if closeInitiator {
// will wait for close response from all the transports
rg.closeDone.Add(len(rg.tps))
}

if err := rg.broadcastClosePackets(code); err != nil {
// TODO: decide if we should return this error, or close route group anyway
return err
}

if closeInitiator {
// if this node initiated closing, we need to wait for close packets
// to come back, or to exit with a time out if anything goes wrong in
// the network
if err := rg.waitForCloseLoop(closeRoutineTimeout); err != nil {
rg.logger.Errorf("Error during close loop: %v", err)
}
}

rules := rg.rt.RulesWithDesc(rg.desc)
routeIDs := make([]routing.RouteID, 0, len(rules))

for _, rule := range rules {
routeIDs = append(routeIDs, rule.KeyRouteID())
}

rg.rt.DelRules(routeIDs)

rg.once.Do(func() {
close(rg.done)
rg.readChMu.Lock()
close(rg.readCh)
rg.readChMu.Unlock()
})

return nil
}

func (rg *RouteGroup) handleClosePacket(code routing.CloseCode) error {
rg.logger.Infof("Got close packet with code %d", code)

if rg.isCloseInitiator() {
// this route group initiated close loop and got response
rg.logger.Debugf("Handling response close packet with code %d", code)

rg.closeDone.Done()
return nil
}

// TODO: use `close` with some close code if we decide that it should be different from the current one
return rg.Close()
}

func (rg *RouteGroup) broadcastClosePackets(code routing.CloseCode) error {
for i := 0; i < len(rg.tps); i++ {
packet := routing.MakeClosePacket(rg.fwd[i].KeyRouteID(), code)
if err := rg.tps[i].WritePacket(context.Background(), packet); err != nil {
// TODO: decide if we should return this error, or close route group anyway
return err
}
}

return nil
}

func (rg *RouteGroup) waitForCloseLoop(waitTimeout time.Duration) error {
closeCtx, closeCancel := context.WithTimeout(context.Background(), waitTimeout)
defer closeCancel()

closeDoneCh := make(chan struct{})
go func() {
// wait till all remotes respond to close procedure
rg.closeDone.Wait()
close(closeDoneCh)
}()

select {
case <-closeCtx.Done():
return fmt.Errorf("close loop timed out: %w", closeCtx.Err())
case <-closeDoneCh:
}

return nil
}

func (rg *RouteGroup) isCloseInitiator() bool {
return atomic.LoadInt32(&rg.closeInitiated) == 1
}

func (rg *RouteGroup) isClosed() bool {
select {
case <-rg.done:
Expand Down
75 changes: 61 additions & 14 deletions pkg/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,6 @@ func (r *router) handleTransportPacket(ctx context.Context, packet routing.Packe

func (r *router) handleDataPacket(ctx context.Context, packet routing.Packet) error {
rule, err := r.GetRule(packet.RouteID())

if err != nil {
return err
}
Expand Down Expand Up @@ -369,31 +368,70 @@ func (r *router) handleDataPacket(ctx context.Context, packet routing.Packet) er
r.logger.Infof("Got new remote packet with route ID %d. Using rule: %s", packet.RouteID(), rule)
r.logger.Infof("Packet contents (len = %d): %v", len(packet.Payload()), packet.Payload())

if rg.isClosed() {
r.logger.Infoln("RG IS CLOSED")
return io.ErrClosedPipe
}

rg.mu.Lock()
defer rg.mu.Unlock()
// TODO: test that it's not needed indeed
/*rg.mu.Lock()
defer rg.mu.Unlock()*/

select {
case <-rg.done:
r.logger.Infof("RG IS DONE")
return io.ErrClosedPipe
case rg.readCh <- packet.Payload():
r.logger.Infof("PUT PAYLOAD INTO RG READ CHAN")
return nil
}
}

func (r *router) handleClosePacket(_ context.Context, packet routing.Packet) error {
func (r *router) handleClosePacket(ctx context.Context, packet routing.Packet) error {
routeID := packet.RouteID()

r.logger.Infof("Received keepalive packet for route ID %v", routeID)
r.logger.Infof("Received close packet for route ID %v", routeID)

rules := []routing.RouteID{routeID}
r.rt.DelRules(rules)
rule, err := r.GetRule(routeID)
if err != nil {
return err
}

if t := rule.Type(); t == routing.RuleIntermediaryForward {
r.logger.Infoln("Handling intermediary close packet")

// defer this only on intermediary nodes. destination node will remove
// the needed rules in the route group `Close` routine
defer func() {
routeIDs := []routing.RouteID{routeID}
r.rt.DelRules(routeIDs)
}()

return r.forwardPacket(ctx, packet, rule)
}

desc := rule.RouteDescriptor()
rg, ok := r.routeGroup(desc)

r.logger.Infof("Handling close packet with descriptor %s", &desc)

if !ok {
r.logger.Infof("Descriptor not found for rule with type %s, descriptor: %s", rule.Type(), &desc)
return errors.New("route descriptor does not exist")
}

defer r.removeRouteGroup(desc)

if rg == nil {
return errors.New("RouteGroup is nil")
}

r.logger.Infof("Got new remote close packet with route ID %d. Using rule: %s", packet.RouteID(), rule)
r.logger.Infof("Packet contents (len = %d): %v", len(packet.Payload()), packet.Payload())

closeCode := routing.CloseCode(packet.Payload()[0])

if rg.isClosed() {
return io.ErrClosedPipe
}

if err := rg.handleClosePacket(closeCode); err != nil {
return fmt.Errorf("error handling close packet with code %d by route group with descriptor %s: %w",
closeCode, &desc, err)
}

return nil
}
Expand Down Expand Up @@ -477,6 +515,8 @@ func (r *router) forwardPacket(ctx context.Context, packet routing.Packet, rule
p = routing.MakeDataPacket(rule.NextRouteID(), packet.Payload())
case routing.KeepAlivePacket:
p = routing.MakeKeepAlivePacket(rule.NextRouteID())
case routing.ClosePacket:
p = routing.MakeClosePacket(rule.NextRouteID(), routing.CloseCode(packet.Payload()[0]))
default:
return fmt.Errorf("packet of type %s can't be forwarded", packet.Type())
}
Expand Down Expand Up @@ -578,6 +618,13 @@ func (r *router) routeGroup(desc routing.RouteDescriptor) (*RouteGroup, bool) {
return rg, ok
}

func (r *router) removeRouteGroup(desc routing.RouteDescriptor) {
r.mx.Lock()
defer r.mx.Unlock()

delete(r.rgs, desc)
}

func (r *router) IntroduceRules(rules routing.EdgeRules) error {
select {
case <-r.done:
Expand Down