Skip to content

Commit

Permalink
Initial implementation of dms_tp ACKS.
Browse files Browse the repository at this point in the history
  • Loading branch information
林志宇 committed Jun 1, 2019
1 parent e6c5dab commit fb31ee6
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 82 deletions.
22 changes: 11 additions & 11 deletions pkg/dms/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,11 @@ func (c *ClientConn) Serve(ctx context.Context, accept chan<- *Transport) error
if ok {
// If tp of tp_id exists, attempt to forward frame to tp.
// delete tp on any failure.
if !tp.AwaitRead(f) {
log.Infof("failed to injest to local_tp: id(%d) dstClient(%s)", id, tp.remoteClient)
if !tp.InjectRead(f) {
log.Infof("failed to inject to local_tp: id(%d) dstClient(%s)", id, tp.remote)
c.delTp(id)
}
log.Infof("successfully injested to local_tp: id(%d) dstClient(%s)", id, tp.remoteClient)
log.Infof("successfully injected to local_tp: id(%d) dstClient(%s)", id, tp.remote)
continue
}

Expand Down Expand Up @@ -230,7 +230,7 @@ func NewClient(pk cipher.PubKey, sk cipher.SecKey, dc client.APIClient) *Client
sk: sk,
dc: dc,
conns: make(map[cipher.PubKey]*ClientConn),
accept: make(chan *Transport, readBufLen),
accept: make(chan *Transport, acceptChSize),
}
}

Expand Down Expand Up @@ -261,7 +261,7 @@ func (c *Client) delConn(pk cipher.PubKey) {
//}

func (c *Client) newConn(ctx context.Context, srvPK cipher.PubKey, addr string) (*ClientConn, error) {
conn, err := net.Dial("tcp", addr)
tcpConn, err := net.Dial("tcp", addr)
if err != nil {
return nil, err
}
Expand All @@ -274,21 +274,21 @@ func (c *Client) newConn(ctx context.Context, srvPK cipher.PubKey, addr string)
if err != nil {
return nil, err
}
nc, err := noise.WrapConn(conn, ns, hsTimeout)
nc, err := noise.WrapConn(tcpConn, ns, hsTimeout)
if err != nil {
return nil, err
}
l := NewClientConn(c.log, nc, c.pk, srvPK)
conn := NewClientConn(c.log, nc, c.pk, srvPK)
go func() {
if err := l.Serve(ctx, c.accept); err != nil {
l.log.WithError(err).WithField("srv_pk", l.remoteSrv).Warn("link with server closed")
if err := conn.Serve(ctx, c.accept); err != nil {
conn.log.WithError(err).WithField("srv_pk", conn.remoteSrv).Warn("link with server closed")
if err := c.updateDiscEntry(ctx); err != nil {
c.log.WithError(err).Error("failed to update entry after server close.")
}
c.delConn(l.remoteSrv)
c.delConn(conn.remoteSrv)
}
}()
return l, nil
return conn, nil
}

// InitiateServers initiates connections with dms_servers.
Expand Down
18 changes: 13 additions & 5 deletions pkg/dms/frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ const (
// Type returns the transport type string.
Type = "dms"

hsTimeout = time.Second * 10
readBufLen = 10
headerLen = 5 // fType(1 byte), chID(2 byte), payLen(2 byte)
hsTimeout = time.Second * 10
readTimeout = time.Second * 10
acceptChSize = 1
readChSize = 20
headerLen = 5 // fType(1 byte), chID(2 byte), payLen(2 byte)
)

func isInitiatorID(tpID uint16) bool { return tpID%2 == 0 }
Expand All @@ -38,7 +40,8 @@ func (ft FrameType) String() string {
RequestType: "REQUEST",
AcceptType: "ACCEPT",
CloseType: "CLOSE",
SendType: "SEND",
FwdType: "FWD",
AckType: "ACK",
}
if int(ft) >= len(names) {
return fmt.Sprintf("UNKNOWN:%d", ft)
Expand All @@ -51,7 +54,8 @@ const (
RequestType = FrameType(1)
AcceptType = FrameType(2)
CloseType = FrameType(3)
SendType = FrameType(10)
FwdType = FrameType(10)
AckType = FrameType(11)
)

// Frame is the dms data unit.
Expand Down Expand Up @@ -99,6 +103,10 @@ func writeFrame(w io.Writer, f Frame) error {
return err
}

func writeFwdFrame(w io.Writer, id uint16, seq AckSeq, p []byte) error {
return writeFrame(w, MakeFrame(FwdType, id, append(seq.Encode(), p...)))
}

func writeCloseFrame(w io.Writer, id uint16, reason byte) error {
return writeFrame(w, MakeFrame(CloseType, id, []byte{reason}))
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/dms/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (c *ServerConn) Serve(ctx context.Context, getConn getConnFunc) error {
}
}()

case AcceptType, SendType, CloseType:
case AcceptType, FwdType, AckType, CloseType:
next, why, ok := c.forwardFrame(ft, id, p)
if !ok {
// Delete channel (and associations) on failure.
Expand Down
11 changes: 6 additions & 5 deletions pkg/dms/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package dms
import (
"context"
"fmt"
"github.com/skycoin/skycoin/src/util/logging"
"sync"
"testing"
"time"

"github.com/skycoin/skycoin/src/util/logging"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -36,9 +37,9 @@ func TestNewServer(t *testing.T) {
// Data should be sent and delivered successfully via the transport.
// TODO: fix this.
func TestNewClient(t *testing.T) {
aPK, aSK, _ := cipher.GenerateDeterministicKeyPair([]byte("a"))
bPK, bSK, _ := cipher.GenerateDeterministicKeyPair([]byte("b"))
sPK, sSK, _ := cipher.GenerateDeterministicKeyPair([]byte("c"))
aPK, aSK := cipher.GenerateKeyPair()
bPK, bSK := cipher.GenerateKeyPair()
sPK, sSK := cipher.GenerateKeyPair()
sAddr := ":8080"

const tpCount = 10
Expand Down Expand Up @@ -138,4 +139,4 @@ func catch(err error) {

func TestNewConn(t *testing.T) {

}
}
Loading

0 comments on commit fb31ee6

Please sign in to comment.