Skip to content

Commit

Permalink
Fix RouteGroup write timeout logic
Browse files Browse the repository at this point in the history
  • Loading branch information
nkryuchkov committed Dec 11, 2019
1 parent 6e41eda commit bc19d96
Showing 1 changed file with 51 additions and 6 deletions.
57 changes: 51 additions & 6 deletions pkg/router/route_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,16 @@ func (r *RouteGroup) Read(p []byte) (n int, err error) {
// }
// r.mu.Unlock()

timeout := time.NewTimer(5 * time.Second)
defer timeout.Stop()

var data []byte
select {
case data = <-r.readCh:
case <-r.readDeadline.Wait():
return 0, timeoutError{}
case <-time.After(50 * time.Second):
case <-timeout.C:
return 0, io.EOF
case data = <-r.readCh:
}

r.mu.Lock()
Expand Down Expand Up @@ -187,13 +190,55 @@ func (r *RouteGroup) Write(p []byte) (n int, err error) {
}

packet := routing.MakeDataPacket(rule.KeyRouteID(), p)
if err := tp.WritePacket(context.Background(), packet); err != nil {
return 0, err

ctx, cancel := context.WithCancel(context.Background())

errCh := make(chan error)

go func() {
select {
case <-ctx.Done():
case errCh <- tp.WritePacket(context.Background(), packet):
}
close(errCh)
return
}()

timeout := time.NewTimer(5 * time.Second)
defer timeout.Stop()

select {
case <-r.writeDeadline.Wait():
cancel()
return 0, timeoutError{}
case <-timeout.C:
cancel()
return 0, io.EOF
case err := <-errCh:
if err != nil {
return 0, err
}

atomic.StoreInt64(&r.lastSent, time.Now().UnixNano())

return len(p), nil
}
}

func (r *RouteGroup) writePacketAsync(tp *transport.ManagedTransport, packet routing.Packet) error {
var err error
var wg sync.WaitGroup

wg.Add(1)

go func() {
defer wg.Done()
err = tp.WritePacket(context.Background(), packet)
}()

atomic.StoreInt64(&r.lastSent, time.Now().UnixNano())
wg.Wait()

return len(p), nil
return err
}

// Close closes a RouteGroup:
Expand Down

0 comments on commit bc19d96

Please sign in to comment.