Skip to content

Commit

Permalink
made modifications to channel ID thread-safe.
Browse files Browse the repository at this point in the history
  • Loading branch information
林志宇 committed May 9, 2019
1 parent 1233e32 commit 11fbeaa
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 11 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ pkg/node/foo/
/*-node
/*-cli
/*.json
/*.sh
/*.sh
/*.log
23 changes: 20 additions & 3 deletions pkg/messaging/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ import (
)

type channel struct {
ID byte
id byte // This is to be changed.
idMx sync.RWMutex

remotePK cipher.PubKey
link *Link
buf *bytes.Buffer
Expand Down Expand Up @@ -59,6 +61,21 @@ func newChannel(initiator bool, secKey cipher.SecKey, remote cipher.PubKey, link
}, nil
}

// ID obtains the channel's id.
func (c *channel) ID() byte {
c.idMx.RLock()
id := c.id
c.idMx.RUnlock()
return id
}

// SetID set's the channel's id.
func (c *channel) SetID(id byte) {
c.idMx.Lock()
c.id = id
c.idMx.Unlock()
}

// Edges returns the public keys of the channel's edge nodes
func (c *channel) Edges() [2]cipher.PubKey {
return transport.SortPubKeys(c.link.Local(), c.remotePK)
Expand Down Expand Up @@ -122,7 +139,7 @@ func (c *channel) Write(p []byte) (n int, err error) {
done := make(chan struct{}, 1)
defer close(done)
go func() {
n, err = c.link.Send(c.ID, buf)
n, err = c.link.Send(c.ID(), buf)
n = n - (len(data) - len(p) + 2)
select {
case done <- struct{}{}:
Expand All @@ -143,7 +160,7 @@ func (c *channel) Close() error {
return ErrChannelClosed
}

if _, err := c.link.SendCloseChannel(c.ID); err != nil {
if _, err := c.link.SendCloseChannel(c.ID()); err != nil {
return err
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/messaging/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestChannelWrite(t *testing.T) {

c, err := newChannel(true, sk, remotePK, l)
require.NoError(t, err)
c.ID = 10
c.SetID(10)

rn := handshakeChannel(t, c, remotePK, remoteSK)

Expand Down Expand Up @@ -118,7 +118,7 @@ func TestChannelClose(t *testing.T) {

c, err := newChannel(true, sk, remotePK, l)
require.NoError(t, err)
c.ID = 10
c.SetID(10)

handshakeChannel(t, c, remotePK, remoteSK)

Expand Down
10 changes: 5 additions & 5 deletions pkg/messaging/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (c *Client) Dial(ctx context.Context, remote cipher.PubKey) (transport.Tran
return nil, ctx.Err()
}

c.Logger.Infof("Opened new channel local ID %d, remote ID %d with %s", localID, channel.ID, remote)
c.Logger.Infof("Opened new channel local ID %d, remote ID %d with %s", localID, channel.ID(), remote) // TODO: race condition
return channel, nil
}

Expand Down Expand Up @@ -314,10 +314,10 @@ func (c *Client) onData(l *Link, frameType FrameType, body []byte) error {
switch frameType {
case FrameTypeCloseChannel:
clientLink.chans.remove(channelID)
_, sendErr = l.SendChannelClosed(channel.ID)
_, sendErr = l.SendChannelClosed(channel.ID())
c.Logger.Debugf("Closed channel ID %d", channelID)
case FrameTypeChannelOpened:
channel.ID = body[1]
channel.SetID(body[1])
if err := channel.ProcessMessage(body[2:]); err != nil {
sendErr = fmt.Errorf("noise handshake: %s", err)
}
Expand All @@ -327,7 +327,7 @@ func (c *Client) onData(l *Link, frameType FrameType, body []byte) error {
default:
}
case FrameTypeChannelClosed:
channel.ID = body[0]
channel.SetID(body[0])
select {
case channel.waitChan <- false:
case channel.closeChan <- struct{}{}:
Expand Down Expand Up @@ -390,7 +390,7 @@ func (c *Client) openChannel(rID byte, remotePK []byte, noiseMsg []byte, chanLin
}

channel, err := newChannel(false, c.secKey, pubKey, chanLink.link)
channel.ID = rID
channel.SetID(rID)
if err != nil {
err = fmt.Errorf("noise setup: %s", err)
return
Expand Down

0 comments on commit 11fbeaa

Please sign in to comment.