Skip to content

Commit

Permalink
First working version of skymsg.
Browse files Browse the repository at this point in the history
  • Loading branch information
林志宇 committed May 30, 2019
1 parent 72b5cac commit b34f98b
Show file tree
Hide file tree
Showing 68 changed files with 11,882 additions and 166 deletions.
4 changes: 1 addition & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@ pkg/node/apps/
pkg/node/bar/
pkg/node/foo/

/*-node
/skywire-cli
/therealssh-cli
/node
/users.db
/*-node
/*-cli
/*-server
/*.json
/*.sh
/*.log
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ bin: ## Build `skywire-node`, `skywire-cli`, `manager-node`, `therealssh-cli`
${OPTS} go build -race -o ./skywire-node ./cmd/skywire-node
${OPTS} go build -race -o ./skywire-cli ./cmd/skywire-cli
${OPTS} go build -race -o ./setup-node ./cmd/setup-node
${OPTS} go build -race -o ./manager-node ./cmd/manager-node
${OPTS} go build -race -o ./manager-node ./cmd/manager-node
${OPTS} go build -race -o ./messaging-server ./cmd/messaging-server
${OPTS} go build -race -o ./therealssh-cli ./cmd/therealssh-cli

release: ## Build skywire-node`, skywire-cli, manager-node, therealssh-cli and apps without -race flag
Expand Down
111 changes: 111 additions & 0 deletions cmd/messaging-server/commands/root.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package commands

import (
"bufio"
"encoding/json"
"github.com/skycoin/skywire/pkg/skymsg"
"io"
"log"
"log/syslog"
"net/http"
"os"

"github.com/prometheus/client_golang/prometheus/promhttp"
logrus_syslog "github.com/sirupsen/logrus/hooks/syslog"
"github.com/skycoin/skycoin/src/util/logging"
"github.com/skycoin/skywire/pkg/cipher"
"github.com/skycoin/skywire/pkg/messaging-discovery/client"
"github.com/spf13/cobra"
)

var (
metricsAddr string
syslogAddr string
tag string
cfgFromStdin bool
)

// Config is a messaging-server config
type Config struct {
PubKey cipher.PubKey `json:"public_key"`
SecKey cipher.SecKey `json:"secret_key"`
Discovery string `json:"discovery"`
LocalAddress string `json:"local_address"`
PublicAddress string `json:"public_address"`
LogLevel string `json:"log_level"`
}

var rootCmd = &cobra.Command{
Use: "messaging-server [config.json]",
Short: "Messaging Server for skywire",
Run: func(_ *cobra.Command, args []string) {
// Config
configFile := "config.json"
if len(args) > 0 {
configFile = args[0]
}
conf := parseConfig(configFile)

// Logger
logger := logging.MustGetLogger(tag)
logLevel, err := logging.LevelFromString(conf.LogLevel)
if err != nil {
log.Fatal("Failed to parse LogLevel: ", err)
}
logging.SetLevel(logLevel)

if syslogAddr != "" {
hook, err := logrus_syslog.NewSyslogHook("udp", syslogAddr, syslog.LOG_INFO, tag)
if err != nil {
logger.Fatalf("Unable to connect to syslog daemon on %v", syslogAddr)
}
logging.AddHook(hook)
}

// Metrics
go func() {
http.Handle("/metrics", promhttp.Handler())
if err := http.ListenAndServe(metricsAddr, nil); err != nil {
logger.Println("Failed to start metrics API:", err)
}
}()

// Start
srv := skymsg.NewServer(conf.PubKey, conf.SecKey, conf.PublicAddress, client.NewHTTP(conf.Discovery))
log.Fatal(srv.ListenAndServe(conf.LocalAddress))
},
}

func init() {
rootCmd.Flags().StringVarP(&metricsAddr, "metrics", "m", ":2121", "address to bind metrics API to")
rootCmd.Flags().StringVar(&syslogAddr, "syslog", "", "syslog server address. E.g. localhost:514")
rootCmd.Flags().StringVar(&tag, "tag", "messaging-server", "logging tag")
rootCmd.Flags().BoolVarP(&cfgFromStdin, "stdin", "i", false, "read configuration from STDIN")
}

func parseConfig(configFile string) *Config {
var rdr io.Reader
var err error
if !cfgFromStdin {
rdr, err = os.Open(configFile)
if err != nil {
log.Fatalf("Failed to open config: %s", err)
}
} else {
rdr = bufio.NewReader(os.Stdin)
}

conf := &Config{}
if err := json.NewDecoder(rdr).Decode(&conf); err != nil {
log.Fatalf("Failed to decode %s: %s", rdr, err)
}

return conf
}

// Execute executes root CLI command.
func Execute() {
if err := rootCmd.Execute(); err != nil {
log.Fatal(err)
}
}
7 changes: 7 additions & 0 deletions cmd/messaging-server/messaging-server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package main

import "github.com/skycoin/skywire/cmd/messaging-server/commands"

func main() {
commands.Execute()
}
2 changes: 1 addition & 1 deletion cmd/skywire-cli/commands/node/transports.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ var (
)

func init() {
addTpCmd.Flags().StringVar(&transportType, "type", "messaging", "type of transport to add")
addTpCmd.Flags().StringVar(&transportType, "type", "skymsg", "type of transport to add")
addTpCmd.Flags().BoolVar(&public, "public", true, "whether to make the transport public")
addTpCmd.Flags().DurationVarP(&timeout, "timeout", "t", 0, "if specified, sets an operation timeout")
}
Expand Down
6 changes: 6 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ module github.com/skycoin/skywire
go 1.12

require (
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc // indirect
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf // indirect
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5
github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6
github.com/go-chi/chi v4.0.2+incompatible
Expand All @@ -11,11 +13,13 @@ require (
github.com/gorilla/securecookie v1.1.1
github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/kr/pretty v0.1.0 // indirect
github.com/kr/pty v1.1.4
github.com/mattn/go-colorable v0.1.1 // indirect
github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b // indirect
github.com/mitchellh/go-homedir v1.1.0
github.com/prometheus/client_golang v0.9.2
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275
github.com/sirupsen/logrus v1.4.1
github.com/skycoin/skycoin v0.25.1
github.com/spf13/cobra v0.0.3
Expand All @@ -24,4 +28,6 @@ require (
go.etcd.io/bbolt v1.3.2
golang.org/x/crypto v0.0.0-20190418165655-df01cb2cc480
golang.org/x/net v0.0.0-20190419010253-1f3472d942ba
gopkg.in/alecthomas/kingpin.v2 v2.2.6 // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
)
15 changes: 15 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc h1:cAKDfWh5VpdgMhJosfJnn5/FoN2SRZ4p7fJNX58YPaU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf h1:qet1QNfXsQxTZqLG4oE62mJzwPIB8+Tee4RNCL9ulrY=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0=
Expand All @@ -23,8 +27,13 @@ github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NH
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.4 h1:5Myjjh3JY/NaAi4IsUbHADytDyl1VE1Y9PXDlL+P/VQ=
github.com/kr/pty v1.1.4/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/mattn/go-colorable v0.1.1 h1:G1f5SKeVxmagw/IyvzvtZE4Gybcc4Tr1tf7I8z0XgOg=
github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ=
github.com/mattn/go-isatty v0.0.5 h1:tHXDdz1cpzGaovsTB+TVB8q90WEokoVmfMqoVcrLUgw=
Expand Down Expand Up @@ -66,10 +75,16 @@ golang.org/x/crypto v0.0.0-20190418165655-df01cb2cc480/go.mod h1:WFFai1msRO1wXaE
golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190419010253-1f3472d942ba h1:h0zCzEL5UW1mERvwTN6AXcc75PpLkY6OcReia6Dq1BM=
golang.org/x/net v0.0.0-20190419010253-1f3472d942ba/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f h1:Bl/8QSvNqXvPGPGXa2z5xUTmV7VDcZyvRZ+QQXkXTZQ=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e h1:nFYrTHrdrAOpShe27kaFHjsqYSEQ0KWqdWLu3xuZJts=
golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
6 changes: 3 additions & 3 deletions pkg/messaging/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ type msgChannel struct {
doneChan chan struct{}
doneOnce sync.Once

noise *noise.Noise
noise *noise.Noise
readFunc func([]byte) (int, error)
writeFunc func([]byte) (int, error)
rMx sync.Mutex // lock for decrypt cipher state
wMx sync.Mutex // lock for encrypt cipher state
rMx sync.Mutex // lock for decrypt cipher state
wMx sync.Mutex // lock for encrypt cipher state
}

func newChannel(initiator bool, secKey cipher.SecKey, remote cipher.PubKey, link *Link) (*msgChannel, error) {
Expand Down
12 changes: 6 additions & 6 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"github.com/skycoin/skywire/pkg/skymsg"
"io"
"net"
"net/rpc"
Expand All @@ -19,7 +20,6 @@ import (

"github.com/skycoin/skywire/internal/noise"
"github.com/skycoin/skywire/pkg/app"
"github.com/skycoin/skywire/pkg/messaging"
routeFinder "github.com/skycoin/skywire/pkg/route-finder/client"
"github.com/skycoin/skywire/pkg/router"
"github.com/skycoin/skywire/pkg/routing"
Expand Down Expand Up @@ -79,7 +79,7 @@ type PacketRouter interface {
type Node struct {
config *Config
router PacketRouter
messenger *messaging.Client
messenger *skymsg.Client
tm *transport.Manager
rt routing.Table
executer appExecuter
Expand Down Expand Up @@ -116,8 +116,8 @@ func NewNode(config *Config) (*Node, error) {
return nil, fmt.Errorf("invalid Messaging config: %s", err)
}

node.messenger = messaging.NewClient(mConfig)
node.messenger.Logger = node.Logger.PackageLogger("messenger")
node.messenger = skymsg.NewClient(mConfig.PubKey, mConfig.SecKey, mConfig.Discovery)
node.messenger.SetLogger(node.Logger.PackageLogger("dms"))

trDiscovery, err := config.TransportDiscovery()
if err != nil {
Expand Down Expand Up @@ -197,9 +197,9 @@ func NewNode(config *Config) (*Node, error) {
// Start spawns auto-started Apps, starts router and RPC interfaces .
func (node *Node) Start() error {
ctx := context.Background()
err := node.messenger.ConnectToInitialServers(ctx, node.config.Messaging.ServerCount)
err := node.messenger.InitiateLinks(ctx, node.config.Messaging.ServerCount)
if err != nil {
return fmt.Errorf("messaging: %s", err)
return fmt.Errorf("skymsg: %s", err)
}
node.logger.Info("Connected to messaging servers")

Expand Down
3 changes: 2 additions & 1 deletion pkg/node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"errors"
"github.com/skycoin/skywire/pkg/skymsg"
"net"
"os"
"os/exec"
Expand Down Expand Up @@ -74,7 +75,7 @@ func TestNodeStartClose(t *testing.T) {
node := &Node{config: &Config{}, router: r, executer: executer, appsConf: conf,
startedApps: map[string]*appBind{}, logger: logging.MustGetLogger("test")}
mConf := &messaging.Config{PubKey: cipher.PubKey{}, SecKey: cipher.SecKey{}, Discovery: client.NewMock()}
node.messenger = messaging.NewClient(mConf)
node.messenger = skymsg.NewClient(mConf.PubKey, mConf.SecKey, mConf.Discovery)
var err error

tmConf := &transport.ManagerConfig{PubKey: cipher.PubKey{}, DiscoveryClient: transport.NewDiscoveryMock()}
Expand Down
3 changes: 2 additions & 1 deletion pkg/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,8 @@ func (r *Router) setupProto(ctx context.Context) (*setup.Protocol, transport.Tra
return nil, nil, errors.New("route setup: no nodes")
}

tr, err := r.tm.CreateTransport(ctx, r.config.SetupNodes[0], "messaging", false)
// TODO(evanlinjin): need string constant for tp type.
tr, err := r.tm.CreateTransport(ctx, r.config.SetupNodes[0], "skymsg", false)
if err != nil {
return nil, nil, fmt.Errorf("transport: %s", err)
}
Expand Down
22 changes: 8 additions & 14 deletions pkg/setup/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/skycoin/skywire/pkg/skymsg"
"log"
"time"

Expand All @@ -13,7 +14,6 @@ import (
"github.com/skycoin/skycoin/src/util/logging"

"github.com/skycoin/skywire/pkg/cipher"
"github.com/skycoin/skywire/pkg/messaging"
mClient "github.com/skycoin/skywire/pkg/messaging-discovery/client"
"github.com/skycoin/skywire/pkg/routing"
"github.com/skycoin/skywire/pkg/transport"
Expand All @@ -31,7 +31,7 @@ type Node struct {
Logger *logging.Logger

tm *transport.Manager
messenger *messaging.Client
messenger *skymsg.Client
srvCount int
metrics metrics.Recorder
}
Expand All @@ -45,14 +45,8 @@ func NewNode(conf *Config, metrics metrics.Recorder) (*Node, error) {
if lvl, err := logging.LevelFromString(conf.LogLevel); err == nil {
logger.SetLevel(lvl)
}
messenger := messaging.NewClient(&messaging.Config{
PubKey: pk,
SecKey: sk,
Discovery: mClient.NewHTTP(conf.Messaging.Discovery),
Retries: 10,
RetryDelay: time.Second,
})
messenger.Logger = logger.PackageLogger("messenger")
messenger := skymsg.NewClient(pk, sk, mClient.NewHTTP(conf.Messaging.Discovery))
messenger.SetLogger(logger.PackageLogger("messenger"))

trDiscovery, err := trClient.NewHTTP(conf.TransportDiscovery, pk, sk)
if err != nil {
Expand Down Expand Up @@ -84,7 +78,7 @@ func NewNode(conf *Config, metrics metrics.Recorder) (*Node, error) {
// Serve starts transport listening loop.
func (sn *Node) Serve(ctx context.Context) error {
if sn.srvCount > 0 {
if err := sn.messenger.ConnectToInitialServers(ctx, sn.srvCount); err != nil {
if err := sn.messenger.InitiateLinks(ctx, sn.srvCount); err != nil {
return fmt.Errorf("messaging: %s", err)
}
sn.Logger.Info("Connected to messaging servers")
Expand Down Expand Up @@ -233,7 +227,7 @@ func (sn *Node) serveTransport(tr transport.Transport) error {
}

func (sn *Node) connectLoop(on cipher.PubKey, ld *LoopData) (noiseRes []byte, err error) {
tr, err := sn.tm.CreateTransport(context.Background(), on, "messaging", false)
tr, err := sn.tm.CreateTransport(context.Background(), on, skymsg.TpType, false)
if err != nil {
err = fmt.Errorf("transport: %s", err)
return
Expand All @@ -251,7 +245,7 @@ func (sn *Node) connectLoop(on cipher.PubKey, ld *LoopData) (noiseRes []byte, er
}

func (sn *Node) closeLoop(on cipher.PubKey, ld *LoopData) error {
tr, err := sn.tm.CreateTransport(context.Background(), on, "messaging", false)
tr, err := sn.tm.CreateTransport(context.Background(), on, skymsg.TpType, false)
if err != nil {
return fmt.Errorf("transport: %s", err)
}
Expand All @@ -267,7 +261,7 @@ func (sn *Node) closeLoop(on cipher.PubKey, ld *LoopData) error {
}

func (sn *Node) setupRule(pubKey cipher.PubKey, rule routing.Rule) (routeID routing.RouteID, err error) {
tr, err := sn.tm.CreateTransport(context.Background(), pubKey, "messaging", false)
tr, err := sn.tm.CreateTransport(context.Background(), pubKey, skymsg.TpType, false)
if err != nil {
err = fmt.Errorf("transport: %s", err)
return
Expand Down
Loading

0 comments on commit b34f98b

Please sign in to comment.