Skip to content

Commit

Permalink
Fix mutex bug in RouteGroup
Browse files Browse the repository at this point in the history
  • Loading branch information
nkryuchkov committed Nov 18, 2019
1 parent 6331e67 commit e67a667
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 5 deletions.
11 changes: 7 additions & 4 deletions pkg/router/route_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,18 +106,19 @@ func NewRouteGroup(cfg *RouteGroupConfig, rt routing.Table, desc routing.RouteDe
// The Router, via transport.Manager, is responsible for reading incoming packets and pushing it to the appropriate RouteGroup via (*RouteGroup).readCh.
// To help with implementing the read logic, within the dmsg repo, we have ioutil.BufRead, just in case the read buffer is short.
func (r *RouteGroup) Read(p []byte) (n int, err error) {
r.mu.Lock()
defer r.mu.Unlock()

var timeout <-chan time.Time
var timer *time.Timer

if deadline := atomic.LoadInt64(&r.readDeadline); deadline != 0 {
delay := time.Until(time.Unix(0, deadline))

r.mu.Lock()
if delay > time.Duration(0) && r.readBuf.Len() > 0 {
return r.readBuf.Read(p)
n, err := r.readBuf.Read(p)
r.mu.Unlock()
return n, err
}
r.mu.Unlock()

timer = time.NewTimer(delay)
timeout = timer.C
Expand All @@ -131,6 +132,8 @@ func (r *RouteGroup) Read(p []byte) (n int, err error) {
if !ok {
return 0, io.ErrClosedPipe
}
r.mu.Lock()
defer r.mu.Unlock()
return ioutil.BufRead(&r.readBuf, data, p)
case <-timeout:
return 0, ErrTimeout
Expand Down
2 changes: 1 addition & 1 deletion pkg/router/route_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestRouteGroup_Read(t *testing.T) {
case err = <-errCh:
}
require.Equal(t, context.DeadlineExceeded, err)
// require.NoError(t, rg1.Close()) // TODO: fix hang
require.NoError(t, rg1.Close())

_, rg1 = prepare()
_, rg2 := prepare()
Expand Down

0 comments on commit e67a667

Please sign in to comment.