diff --git a/pkg/router/route_group.go b/pkg/router/route_group.go index 7a150c1804..26fefe2255 100644 --- a/pkg/router/route_group.go +++ b/pkg/router/route_group.go @@ -106,18 +106,19 @@ func NewRouteGroup(cfg *RouteGroupConfig, rt routing.Table, desc routing.RouteDe // The Router, via transport.Manager, is responsible for reading incoming packets and pushing it to the appropriate RouteGroup via (*RouteGroup).readCh. // To help with implementing the read logic, within the dmsg repo, we have ioutil.BufRead, just in case the read buffer is short. func (r *RouteGroup) Read(p []byte) (n int, err error) { - r.mu.Lock() - defer r.mu.Unlock() - var timeout <-chan time.Time var timer *time.Timer if deadline := atomic.LoadInt64(&r.readDeadline); deadline != 0 { delay := time.Until(time.Unix(0, deadline)) + r.mu.Lock() if delay > time.Duration(0) && r.readBuf.Len() > 0 { - return r.readBuf.Read(p) + n, err := r.readBuf.Read(p) + r.mu.Unlock() + return n, err } + r.mu.Unlock() timer = time.NewTimer(delay) timeout = timer.C @@ -131,6 +132,8 @@ func (r *RouteGroup) Read(p []byte) (n int, err error) { if !ok { return 0, io.ErrClosedPipe } + r.mu.Lock() + defer r.mu.Unlock() return ioutil.BufRead(&r.readBuf, data, p) case <-timeout: return 0, ErrTimeout diff --git a/pkg/router/route_group_test.go b/pkg/router/route_group_test.go index d025906726..ee05d6f449 100644 --- a/pkg/router/route_group_test.go +++ b/pkg/router/route_group_test.go @@ -54,7 +54,7 @@ func TestRouteGroup_Read(t *testing.T) { case err = <-errCh: } require.Equal(t, context.DeadlineExceeded, err) - // require.NoError(t, rg1.Close()) // TODO: fix hang + require.NoError(t, rg1.Close()) _, rg1 = prepare() _, rg2 := prepare()