From e67a667eecbb76ca00c3171b21087d23f5c09fae Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Tue, 19 Nov 2019 00:08:44 +0300 Subject: [PATCH] Fix mutex bug in RouteGroup --- pkg/router/route_group.go | 11 +++++++---- pkg/router/route_group_test.go | 2 +- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/pkg/router/route_group.go b/pkg/router/route_group.go index 7a150c180..26fefe225 100644 --- a/pkg/router/route_group.go +++ b/pkg/router/route_group.go @@ -106,18 +106,19 @@ func NewRouteGroup(cfg *RouteGroupConfig, rt routing.Table, desc routing.RouteDe // The Router, via transport.Manager, is responsible for reading incoming packets and pushing it to the appropriate RouteGroup via (*RouteGroup).readCh. // To help with implementing the read logic, within the dmsg repo, we have ioutil.BufRead, just in case the read buffer is short. func (r *RouteGroup) Read(p []byte) (n int, err error) { - r.mu.Lock() - defer r.mu.Unlock() - var timeout <-chan time.Time var timer *time.Timer if deadline := atomic.LoadInt64(&r.readDeadline); deadline != 0 { delay := time.Until(time.Unix(0, deadline)) + r.mu.Lock() if delay > time.Duration(0) && r.readBuf.Len() > 0 { - return r.readBuf.Read(p) + n, err := r.readBuf.Read(p) + r.mu.Unlock() + return n, err } + r.mu.Unlock() timer = time.NewTimer(delay) timeout = timer.C @@ -131,6 +132,8 @@ func (r *RouteGroup) Read(p []byte) (n int, err error) { if !ok { return 0, io.ErrClosedPipe } + r.mu.Lock() + defer r.mu.Unlock() return ioutil.BufRead(&r.readBuf, data, p) case <-timeout: return 0, ErrTimeout diff --git a/pkg/router/route_group_test.go b/pkg/router/route_group_test.go index d02590672..ee05d6f44 100644 --- a/pkg/router/route_group_test.go +++ b/pkg/router/route_group_test.go @@ -54,7 +54,7 @@ func TestRouteGroup_Read(t *testing.T) { case err = <-errCh: } require.Equal(t, context.DeadlineExceeded, err) - // require.NoError(t, rg1.Close()) // TODO: fix hang + require.NoError(t, rg1.Close()) _, rg1 = prepare() _, rg2 := prepare()