Skip to content

Commit

Permalink
Added more comments to dmsg.Transport
Browse files Browse the repository at this point in the history
  • Loading branch information
林志宇 committed Jun 19, 2019
1 parent a6d9680 commit 190ac58
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 18 deletions.
1 change: 1 addition & 0 deletions pkg/dmsg/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
>
>- `ACK` frames should include the first 4 bytes of the rolling hash of incoming payloads, enforcing reliability of data. Transports should therefore keep track of incoming/outgoing rolling hashes.
>- Transports should also be noise-encrypted. `REQUEST` and `ACCEPT` frames should include noise handshake messages (`KK` handshake pattern), and the `FWD` and `ACK` payloads are to be encrypted.
> - Transports should implement read/write deadlines and local/remote addresses (like `net.Conn`).
> - `dmsg.Server` should check incoming frames to disallow excessive sending of `CLOSE`, `ACCEPT` and `REQUEST` frames.
## Terminology
Expand Down
48 changes: 30 additions & 18 deletions pkg/dmsg/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,27 @@ var (
// Transport represents a connection from dmsg.Client to remote dmsg.Client (via dmsg.Server intermediary).
// It implements transport.Transport
type Transport struct {
net.Conn // link with server.
net.Conn // underlying connection to dmsg.Server
log *logging.Logger

id uint16
local cipher.PubKey
id uint16 // tp ID that identifies this dmsg.Transport
local cipher.PubKey // local PK
remote cipher.PubKey // remote PK

inCh chan Frame
inMx sync.RWMutex
inCh chan Frame // handles incoming frames (from dmsg.Client)
inMx sync.RWMutex // protects 'inCh'

ackWaiter ioutil.Uint16AckWaiter
ackBuf []byte
buf net.Buffers
bufCh chan struct{}
bufSize int
bufMx sync.Mutex // protects 'buf' and 'bufCh'
ackWaiter ioutil.Uint16AckWaiter // awaits for associated ACK frames
ackBuf []byte // buffer for unsent ACK frames
buf net.Buffers // buffer for non-read FWD frames
bufCh chan struct{} // chan for indicating whether this is a new FWD frame
bufSize int // keeps track of the total size of 'buf'
bufMx sync.Mutex // protects fields responsible for handling FWD and ACK frames

servingOnce sync.Once
serving chan struct{}
doneOnce sync.Once
done chan struct{}
serving chan struct{} // chan which closes when serving begins
servingOnce sync.Once // ensures 'serving' only closes once
done chan struct{} // chan which closes when transport stops serving
doneOnce sync.Once // ensures 'done' only closes once
doneFunc func(id uint16) // contains a method to remove the transport from dmsg.Client
}

Expand Down Expand Up @@ -227,10 +227,14 @@ func (tp *Transport) ReadAccept(ctx context.Context) (err error) {

// Serve handles received frames.
func (tp *Transport) Serve() {

// return is transport is already being served, or is closed
if !tp.serve() {
return
}

// ensure transport closes when serving stops
// also write CLOSE frame if this is the first time 'close' is triggered
defer func() {
if tp.close() {
_ = writeCloseFrame(tp.Conn, tp.id, 0) //nolint:errcheck
Expand All @@ -254,9 +258,11 @@ func (tp *Transport) Serve() {
log.Warnln("Rejected [FWD]: Invalid payload size.")
return
}
ack := MakeFrame(AckType, tp.id, p[:2])

tp.bufMx.Lock()

// Acknowledgement logic: if read buffer has free space, send ACK. If not, add to 'ackBuf'.
ack := MakeFrame(AckType, tp.id, p[:2])
if tp.bufSize += len(p[2:]); tp.bufSize > tpBufCap {
tp.ackBuf = append(tp.ackBuf, ack...)
} else {
Expand All @@ -266,12 +272,17 @@ func (tp *Transport) Serve() {
}
}()
}

// add payload to 'buf'
tp.buf = append(tp.buf, p[2:])

// notify of new data via 'bufCh'
select {
case <-tp.done:
case tp.bufCh <- struct{}{}:
default:
}

log.WithField("bufSize", fmt.Sprintf("%d/%d", tp.bufSize, tpBufCap)).Infoln("Injected [FWD]")
tp.bufMx.Unlock()

Expand All @@ -285,15 +296,16 @@ func (tp *Transport) Serve() {

case CloseType:
log.Infoln("Injected [CLOSE]: Closing transport...")
tp.close() // ensure there is no sending of CLOSE frame
return

case RequestType:
log.Warnln("Rejected [REQUEST]: ID already occupied, malicious server.")
log.Warnln("Rejected [REQUEST]: ID already occupied, possibly malicious server.")
_ = tp.Conn.Close()
return

default:
tp.log.Infof("Rejected [%s]: Unexpected frame, malicious server (ignored for now).", f.Type())
tp.log.Infof("Rejected [%s]: Unexpected frame, possibly malicious server (ignored for now).", f.Type())
}
}
}
Expand Down

0 comments on commit 190ac58

Please sign in to comment.