Skip to content

Commit

Permalink
Renamed dms to dmsg as per Brandon's request.
Browse files Browse the repository at this point in the history
  • Loading branch information
林志宇 committed Jun 2, 2019
1 parent 319b364 commit 7bdace1
Show file tree
Hide file tree
Showing 68 changed files with 82 additions and 11,269 deletions.
3 changes: 1 addition & 2 deletions cmd/manager-node/commands/gen-config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@ import (
"fmt"
"path/filepath"

"github.com/skycoin/skywire/pkg/util/pathutil"

"github.com/spf13/cobra"

"github.com/skycoin/skywire/pkg/manager"
"github.com/skycoin/skywire/pkg/util/pathutil"
)

var (
Expand Down
4 changes: 2 additions & 2 deletions cmd/messaging-server/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/spf13/cobra"

"github.com/skycoin/skywire/pkg/cipher"
"github.com/skycoin/skywire/pkg/dms"
"github.com/skycoin/skywire/pkg/dmsg"
"github.com/skycoin/skywire/pkg/messaging-discovery/client"
)

Expand Down Expand Up @@ -72,7 +72,7 @@ var rootCmd = &cobra.Command{
}()

// Start
srv := dms.NewServer(conf.PubKey, conf.SecKey, conf.PublicAddress, client.NewHTTP(conf.Discovery))
srv := dmsg.NewServer(conf.PubKey, conf.SecKey, conf.PublicAddress, client.NewHTTP(conf.Discovery))
log.Fatal(srv.ListenAndServe(conf.LocalAddress))
},
}
Expand Down
4 changes: 3 additions & 1 deletion cmd/skywire-cli/commands/node/transports.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"text/tabwriter"
"time"

"github.com/skycoin/skywire/pkg/dmsg"

"github.com/spf13/cobra"

"github.com/skycoin/skywire/cmd/skywire-cli/internal"
Expand Down Expand Up @@ -77,7 +79,7 @@ var (
)

func init() {
addTpCmd.Flags().StringVar(&transportType, "type", "dms", "type of transport to add")
addTpCmd.Flags().StringVar(&transportType, "type", dmsg.Type, "type of transport to add")
addTpCmd.Flags().BoolVar(&public, "public", true, "whether to make the transport public")
addTpCmd.Flags().DurationVarP(&timeout, "timeout", "t", 0, "if specified, sets an operation timeout")
}
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ require (
github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b // indirect
github.com/mitchellh/go-homedir v1.1.0
github.com/prometheus/client_golang v0.9.2
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275
github.com/sirupsen/logrus v1.4.1
github.com/skycoin/skycoin v0.25.1
github.com/spf13/cobra v0.0.3
Expand Down
1 change: 0 additions & 1 deletion pkg/dms/frame_test.go

This file was deleted.

1 change: 0 additions & 1 deletion pkg/dms/transport_test.go

This file was deleted.

36 changes: 15 additions & 21 deletions pkg/dms/client.go → pkg/dmsg/client.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package dms
package dmsg

import (
"context"
Expand All @@ -9,7 +9,6 @@ import (
"sync"
"time"

"github.com/prometheus/common/log"
"github.com/skycoin/skycoin/src/util/logging"

"github.com/skycoin/skywire/internal/noise"
Expand All @@ -25,13 +24,13 @@ var (
ErrClientClosed = errors.New("client closed")
)

// ClientConn represents a connection between a dms.Client and dms.Server from a client's perspective.
// ClientConn represents a connection between a dmsg.Client and dmsg.Server from a client's perspective.
type ClientConn struct {
log *logging.Logger

net.Conn // conn to dms server
net.Conn // conn to dmsg server
local cipher.PubKey // local client's pk
remoteSrv cipher.PubKey // dms server's public key
remoteSrv cipher.PubKey // dmsg server's public key

// nextInitID keeps track of unused tp_ids to assign a future locally-initiated tp.
// locally-initiated tps use an even tp_id between local and intermediary dms_server.
Expand Down Expand Up @@ -83,7 +82,7 @@ func (c *ClientConn) addTp(ctx context.Context, clientPK cipher.PubKey) (*Transp

id := c.nextInitID
c.nextInitID = id + 2
ch := NewTransport(c.Conn, c.local, clientPK, id)
ch := NewTransport(c.Conn, c.log, c.local, clientPK, id)
c.tps[id] = ch
return ch, nil
}
Expand All @@ -110,7 +109,7 @@ func (c *ClientConn) handleRequestFrame(ctx context.Context, accept chan<- *Tran
return initPK, ErrRequestCheckFailed
}

tp := NewTransport(c.Conn, c.local, initPK, id)
tp := NewTransport(c.Conn, c.log, c.local, initPK, id)
if err := tp.Handshake(ctx); err != nil {
// return err here as response handshake is send via ClientConn and that shouldn't fail.
c.Close()
Expand Down Expand Up @@ -274,16 +273,16 @@ func (c *Client) connCount() int {
}

// InitiateServerConnections initiates connections with dms_servers.
func (c *Client) InitiateServerConnections(ctx context.Context, n int) error {
if n == 0 {
func (c *Client) InitiateServerConnections(ctx context.Context, min int) error {
if min == 0 {
return nil
}
entries, err := c.findServerEntries(ctx)
if err != nil {
return err
}
c.log.Info("found dms_server entries:", entries)
if err := c.connectToServers(ctx, entries, n); err != nil {
if err := c.findOrConnectToServers(ctx, entries, min); err != nil {
return err
}
if err := c.updateDiscEntry(ctx); err != nil {
Expand All @@ -300,6 +299,7 @@ func (c *Client) findServerEntries(ctx context.Context) ([]*client.Entry, error)
case <-ctx.Done():
return nil, fmt.Errorf("dms_servers are not available: %s", err)
default:
c.log.WithError(err).Warnf("no dms_servers found: trying again is 1 second...")
time.Sleep(time.Second)
continue
}
Expand All @@ -308,25 +308,19 @@ func (c *Client) findServerEntries(ctx context.Context) ([]*client.Entry, error)
}
}

func (c *Client) connectToServers(ctx context.Context, entries []*client.Entry, min int) error {
//c.mx.Lock()
//defer c.mx.Unlock()

func (c *Client) findOrConnectToServers(ctx context.Context, entries []*client.Entry, min int) error {
for _, entry := range entries {
if _, ok := c.getConn(entry.Static); ok {
continue
}
_, err := c.findOrConnectToServer(ctx, entry.Static)
if err != nil {
c.log.Warnf("Failed to connect to server %s: %s", entry.Static, err)
c.log.Warnf("findOrConnectToServers: failed to find/connect to server %s: %s", entry.Static, err)
continue
}
c.log.Infof("Connected to server %s", entry.Static)
c.log.Infof("findOrConnectToServers: found/connected to server %s", entry.Static)
if c.connCount() >= min {
return nil
}
}
return fmt.Errorf("servers are not available: all servers failed")
return fmt.Errorf("findOrConnectToServers: all servers failed")
}

func (c *Client) findOrConnectToServer(ctx context.Context, srvPK cipher.PubKey) (*ClientConn, error) {
Expand Down Expand Up @@ -383,7 +377,7 @@ func (c *Client) findOrConnectToServer(ctx context.Context, srvPK cipher.PubKey)
}

func (c *Client) updateDiscEntry(ctx context.Context) error {
log.Info("updatingEntry")
c.log.Info("updatingEntry")
var srvPKs []cipher.PubKey
c.mx.RLock()
for pk := range c.conns {
Expand Down
6 changes: 3 additions & 3 deletions pkg/dms/frame.go → pkg/dmsg/frame.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package dms
package dmsg

import (
"encoding/binary"
Expand All @@ -11,7 +11,7 @@ import (

const (
// Type returns the transport type string.
Type = "dms"
Type = "dmsg"

hsTimeout = time.Second * 10
readTimeout = time.Second * 10
Expand Down Expand Up @@ -58,7 +58,7 @@ const (
AckType = FrameType(11)
)

// Frame is the dms data unit.
// Frame is the dmsg data unit.
type Frame []byte

// MakeFrame creates a new Frame.
Expand Down
1 change: 1 addition & 0 deletions pkg/dmsg/frame_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package dmsg
16 changes: 8 additions & 8 deletions pkg/dms/server.go → pkg/dmsg/server.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package dms
package dmsg

import (
"context"
Expand Down Expand Up @@ -30,15 +30,15 @@ func (r *NextConn) writeFrame(ft FrameType, p []byte) error {
return nil
}

// ServerConn is a connection between a dms.Server and a dms.Client from a server's perspective.
// ServerConn is a connection between a dmsg.Server and a dmsg.Client from a server's perspective.
type ServerConn struct {
log *logging.Logger

net.Conn
remoteClient cipher.PubKey

nextRespID uint16
nextLinks [math.MaxUint16]*NextConn
nextConns [math.MaxUint16]*NextConn
mx sync.RWMutex
}

Expand All @@ -49,19 +49,19 @@ func NewServerConn(log *logging.Logger, conn net.Conn, remoteClient cipher.PubKe

func (c *ServerConn) delNext(id uint16) {
c.mx.Lock()
c.nextLinks[id] = nil
c.nextConns[id] = nil
c.mx.Unlock()
}

func (c *ServerConn) setNext(id uint16, r *NextConn) {
c.mx.Lock()
c.nextLinks[id] = r
c.nextConns[id] = r
c.mx.Unlock()
}

func (c *ServerConn) getNext(id uint16) (*NextConn, bool) {
c.mx.RLock()
r := c.nextLinks[id]
r := c.nextConns[id]
c.mx.RUnlock()
return r, r != nil
}
Expand All @@ -71,7 +71,7 @@ func (c *ServerConn) addNext(ctx context.Context, r *NextConn) (uint16, error) {
defer c.mx.Unlock()

for {
if r := c.nextLinks[c.nextRespID]; r == nil {
if r := c.nextConns[c.nextRespID]; r == nil {
break
}
c.nextRespID += 2
Expand All @@ -85,7 +85,7 @@ func (c *ServerConn) addNext(ctx context.Context, r *NextConn) (uint16, error) {

id := c.nextRespID
c.nextRespID = id + 2
c.nextLinks[id] = r
c.nextConns[id] = r
return id, nil
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/dms/server_test.go → pkg/dmsg/server_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package dms
package dmsg

import (
"context"
"fmt"
"golang.org/x/net/nettest"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -51,6 +52,7 @@ func TestNewClient(t *testing.T) {
a := NewClient(aPK, aSK, dc)
a.SetLogger(logging.MustGetLogger("A"))
require.NoError(t, a.InitiateServerConnections(context.Background(), 1))
nettest.NewLocalListener()

b := NewClient(bPK, bSK, dc)
b.SetLogger(logging.MustGetLogger("B"))
Expand Down
14 changes: 10 additions & 4 deletions pkg/dms/transport.go → pkg/dmsg/transport.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package dms
package dmsg

import (
"bytes"
Expand All @@ -10,6 +10,8 @@ import (
"net"
"sync"

"github.com/skycoin/skycoin/src/util/logging"

"github.com/skycoin/skywire/internal/ioutil"
"github.com/skycoin/skywire/pkg/cipher"
"github.com/skycoin/skywire/pkg/transport"
Expand All @@ -21,10 +23,11 @@ var (
ErrRequestCheckFailed = errors.New("request check failed")
)

// Transport represents a connection from dms.Client to remote dms.Client (via dms.Server intermediary).
// Transport represents a connection from dmsg.Client to remote dmsg.Client (via dmsg.Server intermediary).
// It implements transport.Transport
type Transport struct {
net.Conn // link with server.
log *logging.Logger

id uint16
local cipher.PubKey
Expand All @@ -39,9 +42,10 @@ type Transport struct {
}

// NewTransport creates a new dms_tp.
func NewTransport(conn net.Conn, local, remote cipher.PubKey, id uint16) *Transport {
func NewTransport(conn net.Conn, log *logging.Logger, local, remote cipher.PubKey, id uint16) *Transport {
return &Transport{
Conn: conn,
log: log,
id: id,
local: local,
remote: remote,
Expand Down Expand Up @@ -79,7 +83,6 @@ func (c *Transport) awaitResponse(ctx context.Context) error {
func (c *Transport) Handshake(ctx context.Context) error {
// if channel ID is even, client is initiator.
if isInitiatorID(c.id) {

pks := combinePKs(c.local, c.remote)
f := MakeFrame(RequestType, c.id, pks)
if err := writeFrame(c.Conn, f); err != nil {
Expand All @@ -91,11 +94,14 @@ func (c *Transport) Handshake(ctx context.Context) error {
return err
}
} else {
c.log.Infof("tp_hs responding...")
f := MakeFrame(AcceptType, c.id, combinePKs(c.remote, c.local))
if err := writeFrame(c.Conn, f); err != nil {
c.log.WithError(err).Error("tp_hs responded with error.")
c.close()
return err
}
c.log.Infoln("tp_hs responded:", f)
}
return nil
}
Expand Down
1 change: 1 addition & 0 deletions pkg/dmsg/transport_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package dmsg
10 changes: 5 additions & 5 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"syscall"
"time"

"github.com/skycoin/skywire/pkg/dms"
"github.com/skycoin/skywire/pkg/dmsg"

"github.com/skycoin/skycoin/src/util/logging"

Expand Down Expand Up @@ -80,7 +80,7 @@ type PacketRouter interface {
type Node struct {
config *Config
router PacketRouter
messenger *dms.Client
messenger *dmsg.Client
tm *transport.Manager
rt routing.Table
executer appExecuter
Expand Down Expand Up @@ -117,8 +117,8 @@ func NewNode(config *Config) (*Node, error) {
return nil, fmt.Errorf("invalid Messaging config: %s", err)
}

node.messenger = dms.NewClient(mConfig.PubKey, mConfig.SecKey, mConfig.Discovery)
node.messenger.SetLogger(node.Logger.PackageLogger("dms"))
node.messenger = dmsg.NewClient(mConfig.PubKey, mConfig.SecKey, mConfig.Discovery)
node.messenger.SetLogger(node.Logger.PackageLogger(dmsg.Type))

trDiscovery, err := config.TransportDiscovery()
if err != nil {
Expand Down Expand Up @@ -200,7 +200,7 @@ func (node *Node) Start() error {
ctx := context.Background()
err := node.messenger.InitiateServerConnections(ctx, node.config.Messaging.ServerCount)
if err != nil {
return fmt.Errorf("dms: %s", err)
return fmt.Errorf("%s: %s", dmsg.Type, err)
}
node.logger.Info("Connected to messaging servers")

Expand Down
Loading

0 comments on commit 7bdace1

Please sign in to comment.