Skip to content

Commit

Permalink
Various renamings.
Browse files Browse the repository at this point in the history
  • Loading branch information
林志宇 committed Jun 1, 2019
1 parent 4510ff2 commit e8d87f0
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 39 deletions.
58 changes: 29 additions & 29 deletions pkg/dms/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@ var (
ErrClientClosed = errors.New("client closed")
)

// Conn represents a connection between a dms.Client and dms.Server from a client's perspective.
type Conn struct {
// ClientConn represents a connection between a dms.Client and dms.Server from a client's perspective.
type ClientConn struct {
log *logging.Logger

net.Conn // conn to dms server
local cipher.PubKey // local client's pk
remoteSrv cipher.PubKey // dms server's public key

// nextID keeps track of unused tp_ids to assign a future locally-initiated tp.
// nextInitID keeps track of unused tp_ids to assign a future locally-initiated tp.
// locally-initiated tps use an even tp_id between local and intermediary dms_server.
nextID uint16
nextInitID uint16

// map of transports to remote dms_clients (key: tp_id, val: transport).
tps [math.MaxUint16]*Transport
Expand All @@ -45,34 +45,34 @@ type Conn struct {
wg sync.WaitGroup
}

// NewConn creates a new Conn.
func NewConn(log *logging.Logger, conn net.Conn, local, remote cipher.PubKey) *Conn {
return &Conn{log: log, Conn: conn, local: local, remoteSrv: remote, nextID: 0}
// NewClientConn creates a new ClientConn.
func NewClientConn(log *logging.Logger, conn net.Conn, local, remote cipher.PubKey) *ClientConn {
return &ClientConn{log: log, Conn: conn, local: local, remoteSrv: remote, nextInitID: randID(true)}
}

func (c *Conn) delTp(id uint16) {
func (c *ClientConn) delTp(id uint16) {
c.mx.Lock()
c.tps[id] = nil
c.mx.Unlock()
}

func (c *Conn) setTp(tp *Transport) {
func (c *ClientConn) setTp(tp *Transport) {
c.mx.Lock()
c.tps[tp.id] = tp
c.mx.Unlock()
}

// keeps record of a locally-initiated tp to 'clientPK'.
// assigns an even tp_id and keeps track of it in tps map.
func (c *Conn) addTp(ctx context.Context, clientPK cipher.PubKey) (*Transport, error) {
func (c *ClientConn) addTp(ctx context.Context, clientPK cipher.PubKey) (*Transport, error) {
c.mx.Lock()
defer c.mx.Unlock()

for {
if ch := c.tps[c.nextID]; ch == nil || ch.IsDone() {
if ch := c.tps[c.nextInitID]; ch == nil || ch.IsDone() {
break
}
c.nextID += 2
c.nextInitID += 2

select {
case <-ctx.Done():
Expand All @@ -81,28 +81,28 @@ func (c *Conn) addTp(ctx context.Context, clientPK cipher.PubKey) (*Transport, e
}
}

id := c.nextID
c.nextID = id + 2
id := c.nextInitID
c.nextInitID = id + 2
ch := NewTransport(c.Conn, c.local, clientPK, id)
c.tps[id] = ch
return ch, nil
}

func (c *Conn) getTp(id uint16) (*Transport, bool) {
func (c *ClientConn) getTp(id uint16) (*Transport, bool) {
c.mx.RLock()
tp := c.tps[id]
c.mx.RUnlock()
ok := tp != nil && !tp.IsDone()
return tp, ok
}

func (c *Conn) handleRequestFrame(ctx context.Context, id uint16, p []byte) (*Transport, error) {
func (c *ClientConn) handleRequestFrame(ctx context.Context, id uint16, p []byte) (*Transport, error) {
// remote-initiated tps should:
// - have a payload structured as 'init_pk:resp_pk'.
// - resp_pk should be of local client.
// - use an odd tp_id with the intermediary dms_server.
initPK, respPK, ok := splitPKs(p)
if !ok || respPK != c.local || isEven(id) {
if !ok || respPK != c.local || isInitiatorID(id) {
if err := writeCloseFrame(c.Conn, id, 0); err != nil {
return nil, err
}
Expand All @@ -111,7 +111,7 @@ func (c *Conn) handleRequestFrame(ctx context.Context, id uint16, p []byte) (*Tr

tp := NewTransport(c.Conn, c.local, initPK, id)
if err := tp.Handshake(ctx); err != nil {
// return err here as response handshake is send via Conn and that shouldn't fail.
// return err here as response handshake is send via ClientConn and that shouldn't fail.
return nil, err
}
c.setTp(tp)
Expand All @@ -121,7 +121,7 @@ func (c *Conn) handleRequestFrame(ctx context.Context, id uint16, p []byte) (*Tr

// Serve handles incoming frames.
// Remote-initiated tps that are successfully created are pushing into 'accept' and exposed via 'Client.Accept()'.
func (c *Conn) Serve(ctx context.Context, accept chan<- *Transport) error {
func (c *ClientConn) Serve(ctx context.Context, accept chan<- *Transport) error {
c.wg.Add(1)
defer c.wg.Done()

Expand Down Expand Up @@ -179,7 +179,7 @@ func (c *Conn) Serve(ctx context.Context, accept chan<- *Transport) error {
}

// DialTransport dials a transport to remote dms_client.
func (c *Conn) DialTransport(ctx context.Context, clientPK cipher.PubKey) (*Transport, error) {
func (c *ClientConn) DialTransport(ctx context.Context, clientPK cipher.PubKey) (*Transport, error) {
tp, err := c.addTp(ctx, clientPK)
if err != nil {
return nil, err
Expand All @@ -188,7 +188,7 @@ func (c *Conn) DialTransport(ctx context.Context, clientPK cipher.PubKey) (*Tran
}

// Close closes the connection to dms_server.
func (c *Conn) Close() error {
func (c *ClientConn) Close() error {
c.log.Infof("closingLink: remoteSrv(%v)", c.remoteSrv)
c.mx.Lock()
for _, tp := range c.tps {
Expand All @@ -210,7 +210,7 @@ type Client struct {
sk cipher.SecKey
dc client.APIClient

conns map[cipher.PubKey]*Conn // conns with messaging servers. Key: pk of server
conns map[cipher.PubKey]*ClientConn // conns with messaging servers. Key: pk of server
mx sync.RWMutex

accept chan *Transport
Expand All @@ -224,7 +224,7 @@ func NewClient(pk cipher.PubKey, sk cipher.SecKey, dc client.APIClient) *Client
pk: pk,
sk: sk,
dc: dc,
conns: make(map[cipher.PubKey]*Conn),
conns: make(map[cipher.PubKey]*ClientConn),
accept: make(chan *Transport, readBufLen),
}
}
Expand All @@ -235,7 +235,7 @@ func (c *Client) SetLogger(log *logging.Logger) {
}

// TODO: re-connect logic.
//func (c *Client) setConn(l *Conn) {
//func (c *Client) setConn(l *ClientConn) {
// c.mx.Lock()
// c.conns[l.remoteSrv] = l
// c.mx.Unlock()
Expand All @@ -248,14 +248,14 @@ func (c *Client) delConn(pk cipher.PubKey) {
}

// TODO: re-connect logic.
//func (c *Client) getConn(pk cipher.PubKey) (*Conn, bool) {
//func (c *Client) getConn(pk cipher.PubKey) (*ClientConn, bool) {
// c.mx.RLock()
// l, ok := c.conns[pk]
// c.mx.RUnlock()
// return l, ok
//}

func (c *Client) newConn(ctx context.Context, srvPK cipher.PubKey, addr string) (*Conn, error) {
func (c *Client) newConn(ctx context.Context, srvPK cipher.PubKey, addr string) (*ClientConn, error) {
conn, err := net.Dial("tcp", addr)
if err != nil {
return nil, err
Expand All @@ -273,7 +273,7 @@ func (c *Client) newConn(ctx context.Context, srvPK cipher.PubKey, addr string)
if err != nil {
return nil, err
}
l := NewConn(c.log, nc, c.pk, srvPK)
l := NewClientConn(c.log, nc, c.pk, srvPK)
go func() {
if err := l.Serve(ctx, c.accept); err != nil {
l.log.WithError(err).WithField("srv_pk", l.remoteSrv).Warn("link with server closed")
Expand Down Expand Up @@ -329,7 +329,7 @@ func (c *Client) InitiateServers(ctx context.Context, n int) error {
return nil
}

func (c *Client) findConn(ctx context.Context, srvPKs []cipher.PubKey) (*Conn, error) {
func (c *Client) findConn(ctx context.Context, srvPKs []cipher.PubKey) (*ClientConn, error) {
for _, srvPK := range srvPKs {
conn, ok := c.conns[srvPK]
if !ok {
Expand Down Expand Up @@ -424,7 +424,7 @@ func (c *Client) Close() error {
for _, link := range c.conns {
_ = link.Close()
}
c.conns = make(map[cipher.PubKey]*Conn)
c.conns = make(map[cipher.PubKey]*ClientConn)
c.once.Do(func() {
close(c.accept)
})
Expand Down
12 changes: 11 additions & 1 deletion pkg/dms/frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,17 @@ const (
headerLen = 5 // fType(1 byte), chID(2 byte), payLen(2 byte)
)

func isEven(chID uint16) bool { return chID%2 == 0 }
func isInitiatorID(tpID uint16) bool { return tpID%2 == 0 }

func randID(initiator bool) uint16 {
var id uint16
for {
id = binary.BigEndian.Uint16(cipher.RandByte(2))
if initiator && id%2 == 0 || !initiator && id%2 != 0 {
return id
}
}
}

// FrameType represents the frame type.
type FrameType byte
Expand Down
16 changes: 8 additions & 8 deletions pkg/dms/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ type ServerConn struct {
net.Conn
remoteClient cipher.PubKey

nextID uint16
nextLinks [math.MaxUint16]*NextConn
mx sync.RWMutex
nextRespID uint16
nextLinks [math.MaxUint16]*NextConn
mx sync.RWMutex
}

// NewServerConn creates a new connection from the perspective of a dms_server.
func NewServerConn(log *logging.Logger, conn net.Conn, remoteClient cipher.PubKey) *ServerConn {
return &ServerConn{log: log, Conn: conn, remoteClient: remoteClient, nextID: 1}
return &ServerConn{log: log, Conn: conn, remoteClient: remoteClient, nextRespID: randID(false)}
}

func (c *ServerConn) delNext(id uint16) {
Expand All @@ -71,10 +71,10 @@ func (c *ServerConn) addNext(ctx context.Context, r *NextConn) (uint16, error) {
defer c.mx.Unlock()

for {
if r := c.nextLinks[c.nextID]; r == nil {
if r := c.nextLinks[c.nextRespID]; r == nil {
break
}
c.nextID += 2
c.nextRespID += 2

select {
case <-ctx.Done():
Expand All @@ -83,8 +83,8 @@ func (c *ServerConn) addNext(ctx context.Context, r *NextConn) (uint16, error) {
}
}

id := c.nextID
c.nextID = id + 2
id := c.nextRespID
c.nextRespID = id + 2
c.nextLinks[id] = r
return id, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/dms/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (c *Transport) close() (closed bool) {
// Handshake performs a tp handshake (before tp is considered valid).
func (c *Transport) Handshake(ctx context.Context) error {
// if channel ID is even, client is initiator.
if init := isEven(c.id); init {
if init := isInitiatorID(c.id); init {

pks := combinePKs(c.local, c.remoteClient)
f := MakeFrame(RequestType, c.id, pks)
Expand Down

0 comments on commit e8d87f0

Please sign in to comment.