Skip to content

Commit

Permalink
Fixing link race
Browse files Browse the repository at this point in the history
  • Loading branch information
gz-c committed May 25, 2019
1 parent c633881 commit 95cbf46
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 45 deletions.
70 changes: 29 additions & 41 deletions pkg/messaging/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ import (
"encoding/binary"
"io"
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/skycoin/skywire/internal/noise"
"github.com/skycoin/skywire/pkg/cipher"
Expand All @@ -24,13 +22,12 @@ type msgChannel struct {
buf *bytes.Buffer

deadline time.Time
closed unsafe.Pointer // unsafe.Pointer is used alongside 'atomic' module for fast, thread-safe access.

waitChan chan bool // waits for remote response (whether msgChannel is accepted or not).
readChan chan []byte
closeChan chan struct{}
closeChanMx sync.RWMutex // TODO(evanlinjin): This is a hack to avoid race conditions when closing msgChannel.
doneChan chan struct{}
waitChan chan bool // waits for remote response (whether msgChannel is accepted or not).
readChan chan []byte

doneChan chan struct{}
doneOnce sync.Once

noise *noise.Noise
rMx sync.Mutex // lock for decrypt cipher state
Expand All @@ -50,15 +47,13 @@ func newChannel(initiator bool, secKey cipher.SecKey, remote cipher.PubKey, link
}

return &msgChannel{
remotePK: remote,
link: link,
buf: new(bytes.Buffer),
closed: unsafe.Pointer(new(bool)), //nolint:gosec
waitChan: make(chan bool, 1), // should allows receive one reply.
readChan: make(chan []byte),
closeChan: make(chan struct{}),
doneChan: make(chan struct{}),
noise: noiseInstance,
remotePK: remote,
link: link,
buf: new(bytes.Buffer),
waitChan: make(chan bool, 1), // should allows receive one reply.
readChan: make(chan []byte),
doneChan: make(chan struct{}),
noise: noiseInstance,
}, nil
}

Expand Down Expand Up @@ -118,8 +113,10 @@ func (mCh *msgChannel) Read(p []byte) (n int, err error) {
}

func (mCh *msgChannel) Write(p []byte) (n int, err error) {
if mCh.isClosed() {
select {
case <-mCh.doneChan:
return 0, ErrChannelClosed
default:
}

ctx := context.Background()
Expand Down Expand Up @@ -156,24 +153,18 @@ func (mCh *msgChannel) Write(p []byte) (n int, err error) {
}
}

func (mCh *msgChannel) Close() error {
if mCh.isClosed() {
func (mCh *msgChannel) RequestClose() error {
select {
case <-mCh.doneChan:
return ErrChannelClosed
default:
}

if _, err := mCh.link.SendCloseChannel(mCh.ID()); err != nil {
return err
}

mCh.setClosed(true)

select {
case <-mCh.closeChan:
case <-time.After(time.Second):
}

mCh.close()
return nil
}

func (mCh *msgChannel) SetDeadline(t time.Time) error {
Expand All @@ -185,16 +176,17 @@ func (mCh *msgChannel) Type() string {
return "messaging"
}

func (mCh *msgChannel) close() {
select {
case <-mCh.doneChan:
default:
close(mCh.doneChan)
func (mCh *msgChannel) OnChannelClosed() bool {
return mCh.close()
}

mCh.closeChanMx.Lock() // TODO(evanlinjin): START(avoid race condition).
close(mCh.closeChan) // TODO(evanlinjin): data race.
mCh.closeChanMx.Unlock() // TODO(evanlinjin): END(avoid race condition).
}
func (mCh *msgChannel) close() bool {
closed := false
mCh.doneOnce.Do(func() {
close(mc.doneChan)
closed = true
})
return closed
}

func (mCh *msgChannel) readEncrypted(ctx context.Context, p []byte) (n int, err error) {
Expand Down Expand Up @@ -250,7 +242,3 @@ func (mCh *msgChannel) readEncrypted(ctx context.Context, p []byte) (n int, err

return copy(p, data), nil
}

// for getting and setting the 'closed' status.
func (mCh *msgChannel) isClosed() bool { return *(*bool)(atomic.LoadPointer(&mCh.closed)) }
func (mCh *msgChannel) setClosed(v bool) { atomic.StorePointer(&mCh.closed, unsafe.Pointer(&v)) } //nolint:gosec
7 changes: 3 additions & 4 deletions pkg/messaging/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,14 +328,13 @@ func (c *Client) onData(l *Link, frameType FrameType, body []byte) error {
}
case FrameTypeChannelClosed:
channel.SetID(body[0])
channel.closeChanMx.RLock() // TODO(evanlinjin): START(avoid race condition).
select {
case channel.waitChan <- false:
case channel.closeChan <- struct{}{}: // TODO(evanlinjin): data race.
clientLink.chans.remove(channelID)
default:
}
channel.closeChanMx.RUnlock() // TODO(evanlinjin): END(avoid race condition).
if channel.OnChannelClosed() {
clientLink.chans.remove(channelID)
}
case FrameTypeSend:
go func() {
select {
Expand Down

0 comments on commit 95cbf46

Please sign in to comment.