Skip to content

Commit

Permalink
Merge pull request #371 from SkycoinProject/feature/app-rpc-interface
Browse files Browse the repository at this point in the history
Appevent module implementation. 

Former-commit-id: edac199
  • Loading branch information
Darkren authored May 26, 2020
2 parents 854dbf9 + 9feef0e commit f08b046
Show file tree
Hide file tree
Showing 44 changed files with 1,262 additions and 346 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ before_install:

install:
- go get -u github.com/FiloSottile/vendorcheck
- curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | sh -s -- -b $GOPATH/bin v1.22.2
- curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | sh -s -- -b $GOPATH/bin v1.27.0

before_script:
- ci_scripts/create-ip-aliases.sh
Expand Down
177 changes: 129 additions & 48 deletions cmd/apps/helloworld/helloworld.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,84 +4,165 @@ simple client server app for skywire visor testing
package main

import (
"os"
"flag"
"fmt"
"net"
"time"

"github.com/SkycoinProject/dmsg/cipher"
"github.com/sirupsen/logrus"

"github.com/SkycoinProject/skywire-mainnet/pkg/app"
"github.com/SkycoinProject/skywire-mainnet/pkg/app/appevent"
"github.com/SkycoinProject/skywire-mainnet/pkg/app/appnet"
"github.com/SkycoinProject/skywire-mainnet/pkg/routing"
"github.com/SkycoinProject/skywire-mainnet/pkg/util/buildinfo"
)

const (
netType = appnet.TypeSkynet
modeServer = "server"
modeClient = "client"
)

var (
mode = flag.String("mode", modeServer, fmt.Sprintf("mode of operation: %v", []string{modeServer, modeClient}))
network = flag.String("net", string(appnet.TypeSkynet), fmt.Sprintf("network: %v", []appnet.Type{appnet.TypeSkynet, appnet.TypeDmsg}))
remote = flag.String("remote", "", "remote public key to dial to (client mode only)")
port = flag.Uint("port", 1024, "port to either dial to (client mode), or listen from (server mode)")
)

var log = logrus.New()

func main() {
appC := app.NewClient()
flag.Parse()

subs := prepareSubscriptions()
appC := app.NewClient(subs)
defer appC.Close()

if _, err := buildinfo.Get().WriteTo(log.Writer()); err != nil {
log.Printf("Failed to output build info: %v", err)
log.WithError(err).Info("Failed to output build info.")
}

if len(os.Args) == 1 {
port := routing.Port(1024)
l, err := appC.Listen(netType, port)
if err != nil {
log.Fatalf("Error listening network %v on port %d: %v\n", netType, port, err)
}

log.Println("listening for incoming connections")
for {
conn, err := l.Accept()
if err != nil {
log.Fatalf("Failed to accept conn: %v\n", err)
}

log.Printf("got new connection from: %v\n", conn.RemoteAddr())
go func() {
buf := make([]byte, 4)
if _, err := conn.Read(buf); err != nil {
log.Printf("Failed to read remote data: %v\n", err)
// TODO: close conn
}

log.Printf("Message from %s: %s\n", conn.RemoteAddr().String(), string(buf))
if _, err := conn.Write([]byte("pong")); err != nil {
log.Printf("Failed to write to a remote visor: %v\n", err)
// TODO: close conn
}
}()
}
switch *mode {
case modeServer:
runServer(appC)
case modeClient:
runClient(appC)
default:
log.WithField("mode", *mode).Fatal("Invalid mode.")
}
}

remotePK := cipher.PubKey{}
if err := remotePK.UnmarshalText([]byte(os.Args[1])); err != nil {
log.Fatal("Failed to construct PubKey: ", err, os.Args[1])
}
func prepareSubscriptions() *appevent.Subscriber {
subs := appevent.NewSubscriber()

subs.OnTCPDial(func(data appevent.TCPDialData) {
log.WithField("event_type", data.Type()).
WithField("event_data", data).
Info("Received event.")
})

conn, err := appC.Dial(appnet.Addr{
Net: netType,
PubKey: remotePK,
Port: 10,
subs.OnTCPClose(func(data appevent.TCPCloseData) {
log.WithField("event_type", data.Type()).
WithField("event_data", data).
Info("Received event.")
})

return subs
}

func runServer(appC *app.Client) {
log := log.
WithField("network", *network).
WithField("port", *port)

lis, err := appC.Listen(appnet.Type(*network), routing.Port(*port))
if err != nil {
log.Fatalf("Failed to open remote conn: %v\n", err)
log.WithError(err).Fatal("Failed to listen.")
}
log.Info("Listening for incoming connections.")

for {
conn, err := lis.Accept()
if err != nil {
log.WithError(err).Fatal("Failed to accept connection.")
}
go handleServerConn(log, conn)
}
}

if _, err := conn.Write([]byte("ping")); err != nil {
log.Fatalf("Failed to write to a remote visor: %v\n", err)
func handleServerConn(log logrus.FieldLogger, conn net.Conn) {
log = log.WithField("remote_addr", conn.RemoteAddr())
log.Info("Serving connection.")
defer func() {
log.WithError(conn.Close()).Debug("Closed connection.")
}()

for {
buf := make([]byte, 1024)
n, err := conn.Read(buf)
if err != nil {
log.WithField("n", n).WithError(err).
Error("Failed to read from connection.")
return
}
msg := string(buf[:n])
log.WithField("n", n).WithField("data", msg).Info("Read from connection.")

n, err = conn.Write([]byte(fmt.Sprintf("I've got your message: %s", msg)))
if err != nil {
log.WithField("n", n).WithError(err).
Error("Failed to write to connection.")
return
}
log.WithField("n", n).Info("Wrote response message.")
}
}

buf := make([]byte, 4)
if _, err = conn.Read(buf); err != nil {
log.Fatalf("Failed to read remote data: %v\n", err)
func runClient(appC *app.Client) {
var remotePK cipher.PubKey
if err := remotePK.UnmarshalText([]byte(*remote)); err != nil {
log.WithError(err).Fatal("Invalid remote public key.")
}

log.Printf("Message from %s: %s", conn.RemoteAddr().String(), string(buf))
var conn net.Conn

for i := 0; true; i++ {
time.Sleep(time.Second * 2)

if conn != nil {
log.WithError(conn.Close()).Debug("Connection closed.")
conn = nil
}

var err error
conn, err = appC.Dial(appnet.Addr{
Net: appnet.Type(*network),
PubKey: remotePK,
Port: routing.Port(*port),
})
if err != nil {
log.WithError(err).Error("Failed to dial.")
time.Sleep(time.Second)
continue
}

n, err := conn.Write([]byte(fmt.Sprintf("Hello world! %d", i)))
if err != nil {
log.WithField("n", n).WithError(err).
Error("Failed to write to connection.")
continue
}

buf := make([]byte, 1024)
n, err = conn.Read(buf)
if err != nil {
log.WithField("n", n).WithError(err).
Error("Failed to read from connection.")
continue
}
msg := string(buf[:n])
log.WithField("n", n).WithField("data", msg).Info("Read reply from connection.")
}
}
2 changes: 1 addition & 1 deletion cmd/apps/skychat/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var (
)

func main() {
appC = app.NewClient()
appC = app.NewClient(nil)
defer appC.Close()

if _, err := buildinfo.Get().WriteTo(os.Stdout); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/apps/skysocks-client/skysocks-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func dialServer(appCl *app.Client, pk cipher.PubKey) (net.Conn, error) {
}

func main() {
appC := app.NewClient()
appC := app.NewClient(nil)
defer appC.Close()

skysocks.Log = log
Expand Down
2 changes: 1 addition & 1 deletion cmd/apps/skysocks/skysocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const (
var log = logrus.New()

func main() {
appC := app.NewClient()
appC := app.NewClient(nil)
defer appC.Close()

skysocks.Log = log
Expand Down
2 changes: 1 addition & 1 deletion cmd/apps/vpn-client/vpn-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func main() {
noiseCreds = vpn.NewNoiseCredentials(localSK, localPK)
}

appClient := app.NewClient()
appClient := app.NewClient(nil)
defer appClient.Close()

log.Infof("Connecting to VPN server %s", serverPK.String())
Expand Down
2 changes: 1 addition & 1 deletion cmd/apps/vpn-server/vpn-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func main() {
noiseCreds = vpn.NewNoiseCredentials(localSK, localPK)
}

appClient := app.NewClient()
appClient := app.NewClient(nil)
defer appClient.Close()

osSigs := make(chan os.Signal, 2)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/SkycoinProject/skywire-mainnet
go 1.13

require (
github.com/SkycoinProject/dmsg v0.1.1-0.20200420091742-8c1a3d828a49
github.com/SkycoinProject/dmsg v0.1.1-0.20200523194607-be73f083a729
github.com/SkycoinProject/skycoin v0.27.0
github.com/SkycoinProject/yamux v0.0.0-20191213015001-a36efeefbf6a
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/SkycoinProject/dmsg v0.0.0-20200306152741-acee74fa4514/go.mod h1:DzykXMLlx6Fx0fGjZsCIRas/MIvxW8DZpmDA6f2nCRk=
github.com/SkycoinProject/dmsg v0.1.1-0.20200420091742-8c1a3d828a49 h1:rYqmvSRA+rq6LTne/Ge34T0i4yjSHSwkhk0ER6relWU=
github.com/SkycoinProject/dmsg v0.1.1-0.20200420091742-8c1a3d828a49/go.mod h1:MiX+UG/6fl3g+9rS13/fq7BwUQ2eOlg1yOBOnNf6J6A=
github.com/SkycoinProject/dmsg v0.1.1-0.20200523194607-be73f083a729 h1:Edgnt4ido4MGfNTEJUYqNeXt0AlJ4EHlFCWBrKYPvT4=
github.com/SkycoinProject/dmsg v0.1.1-0.20200523194607-be73f083a729/go.mod h1:MiX+UG/6fl3g+9rS13/fq7BwUQ2eOlg1yOBOnNf6J6A=
github.com/SkycoinProject/skycoin v0.26.0/go.mod h1:xqPLOKh5B6GBZlGA7B5IJfQmCy7mwimD9NlqxR3gMXo=
github.com/SkycoinProject/skycoin v0.27.0 h1:N3IHxj8ossHOcsxLYOYugT+OaELLncYHJHxbbYLPPmY=
github.com/SkycoinProject/skycoin v0.27.0/go.mod h1:xqPLOKh5B6GBZlGA7B5IJfQmCy7mwimD9NlqxR3gMXo=
Expand Down
1 change: 1 addition & 0 deletions internal/vpn/os.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ func parseCIDR(ipCIDR string) (ipStr, netmask string, err error) {
return ip.String(), fmt.Sprintf("%d.%d.%d.%d", net.Mask[0], net.Mask[1], net.Mask[2], net.Mask[3]), nil
}

//nolint:unparam
func run(bin string, args ...string) error {
cmd := exec.Command(bin, args...) //nolint:gosec

Expand Down
2 changes: 2 additions & 0 deletions internal/vpn/os_server_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func EnableIPv6Forwarding() error {
// EnableIPMasquerading enables IP masquerading for the interface with name `ifcName`.
func EnableIPMasquerading(ifcName string) error {
cmd := fmt.Sprintf(enableIPMasqueradingCMDFmt, ifcName)
//nolint:gosec
if err := exec.Command("sh", "-c", cmd).Run(); err != nil {
return fmt.Errorf("error running command %s: %w", cmd, err)
}
Expand All @@ -85,6 +86,7 @@ func EnableIPMasquerading(ifcName string) error {
// DisableIPMasquerading disables IP masquerading for the interface with name `ifcName`.
func DisableIPMasquerading(ifcName string) error {
cmd := fmt.Sprintf(disableIPMasqueradingCMDFmt, ifcName)
//nolint:gosec
if err := exec.Command("sh", "-c", cmd).Run(); err != nil {
return fmt.Errorf("error running command %s: %w", cmd, err)
}
Expand Down
74 changes: 74 additions & 0 deletions pkg/app/appcommon/hello.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package appcommon

import (
"encoding/binary"
"encoding/json"
"fmt"
"io"
)

// Hello represents the first JSON object that an app sends the visor.
type Hello struct {
ProcKey ProcKey `json:"proc_key"` // proc key
EgressNet string `json:"egress_net,omitempty"` // network which hosts the appevent.RPCGateway of the app
EgressAddr string `json:"egress_addr,omitempty"` // address which hosts the appevent.RPCGateway of the app
EventSubs map[string]bool `json:"event_subs,omitempty"` // event subscriptions
}

// String implements fmt.Stringer
func (h *Hello) String() string {
j, err := json.Marshal(h)
if err != nil {
panic(err) // should never happen
}
return string(j)
}

// AllowsEventType returns true if the hello object contents allow for an event type.
func (h *Hello) AllowsEventType(eventType string) bool {
if h.EventSubs == nil {
return false
}
return h.EventSubs[eventType]
}

// ReadHello reads in a hello object from the given reader.
func ReadHello(r io.Reader) (Hello, error) {
sizeRaw := make([]byte, 2)
if _, err := io.ReadFull(r, sizeRaw); err != nil {
return Hello{}, fmt.Errorf("failed to read hello size prefix: %w", err)
}
size := binary.BigEndian.Uint16(sizeRaw)

helloRaw := make([]byte, size)
if _, err := io.ReadFull(r, helloRaw); err != nil {
return Hello{}, fmt.Errorf("failed to read hello data: %w", err)
}

var hello Hello
if err := json.Unmarshal(helloRaw, &hello); err != nil {
return Hello{}, fmt.Errorf("failed to unmarshal hello data: %w", err)
}

return hello, nil
}

// WriteHello writes a hello object into a given writer.
func WriteHello(w io.Writer, hello Hello) error {
helloRaw, err := json.Marshal(hello)
if err != nil {
panic(err) // should never happen
}

raw := make([]byte, 2+len(helloRaw))
size := len(helloRaw)
binary.BigEndian.PutUint16(raw[:2], uint16(size))
if n := copy(raw[2:], helloRaw); n != size {
panic("hello write does not add up")
}

if _, err := w.Write(raw); err != nil {
return fmt.Errorf("failed to write hello data: %w", err)
}
return nil
}
Loading

0 comments on commit f08b046

Please sign in to comment.