From bc19d960371e76b5468363e5d75024ba80c5125d Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Wed, 11 Dec 2019 18:16:50 +0300 Subject: [PATCH] Fix RouteGroup write timeout logic --- pkg/router/route_group.go | 57 ++++++++++++++++++++++++++++++++++----- 1 file changed, 51 insertions(+), 6 deletions(-) diff --git a/pkg/router/route_group.go b/pkg/router/route_group.go index 3b4054362..c030508be 100644 --- a/pkg/router/route_group.go +++ b/pkg/router/route_group.go @@ -138,13 +138,16 @@ func (r *RouteGroup) Read(p []byte) (n int, err error) { // } // r.mu.Unlock() + timeout := time.NewTimer(5 * time.Second) + defer timeout.Stop() + var data []byte select { - case data = <-r.readCh: case <-r.readDeadline.Wait(): return 0, timeoutError{} - case <-time.After(50 * time.Second): + case <-timeout.C: return 0, io.EOF + case data = <-r.readCh: } r.mu.Lock() @@ -187,13 +190,55 @@ func (r *RouteGroup) Write(p []byte) (n int, err error) { } packet := routing.MakeDataPacket(rule.KeyRouteID(), p) - if err := tp.WritePacket(context.Background(), packet); err != nil { - return 0, err + + ctx, cancel := context.WithCancel(context.Background()) + + errCh := make(chan error) + + go func() { + select { + case <-ctx.Done(): + case errCh <- tp.WritePacket(context.Background(), packet): + } + close(errCh) + return + }() + + timeout := time.NewTimer(5 * time.Second) + defer timeout.Stop() + + select { + case <-r.writeDeadline.Wait(): + cancel() + return 0, timeoutError{} + case <-timeout.C: + cancel() + return 0, io.EOF + case err := <-errCh: + if err != nil { + return 0, err + } + + atomic.StoreInt64(&r.lastSent, time.Now().UnixNano()) + + return len(p), nil } +} + +func (r *RouteGroup) writePacketAsync(tp *transport.ManagedTransport, packet routing.Packet) error { + var err error + var wg sync.WaitGroup + + wg.Add(1) + + go func() { + defer wg.Done() + err = tp.WritePacket(context.Background(), packet) + }() - atomic.StoreInt64(&r.lastSent, time.Now().UnixNano()) + wg.Wait() - return len(p), nil + return err } // Close closes a RouteGroup: