Skip to content

Commit

Permalink
Merge branch 'feature/router2' of https://github.com/nkryuchkov/skywire
Browse files Browse the repository at this point in the history
… into feature/app2-integration
  • Loading branch information
Darkren committed Oct 7, 2019
2 parents 292d65b + 28c5ed2 commit fc1030c
Show file tree
Hide file tree
Showing 84 changed files with 1,695 additions and 7,364 deletions.
3 changes: 1 addition & 2 deletions cmd/setup-node/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package commands

import (
"bufio"
"context"
"encoding/json"
"io"
"log"
Expand Down Expand Up @@ -75,7 +74,7 @@ var rootCmd = &cobra.Command{
}
}()

logger.Fatal(sn.Serve(context.Background()))
logger.Fatal(sn.Serve())
},
}

Expand Down
19 changes: 12 additions & 7 deletions cmd/skywire-cli/commands/rtfind/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import (

"github.com/skycoin/dmsg/cipher"
"github.com/spf13/cobra"
"golang.org/x/net/context"

"github.com/skycoin/skywire/cmd/skywire-cli/internal"
"github.com/skycoin/skywire/pkg/route-finder/client"
"github.com/skycoin/skywire/pkg/routefinder/rfclient"
"github.com/skycoin/skywire/pkg/routing"
)

var frAddr string
Expand All @@ -22,22 +24,25 @@ func init() {
RootCmd.Flags().DurationVar(&timeout, "timeout", 10*time.Second, "timeout for remote server requests")
}

// RootCmd is the command that queries the route-finder.
// RootCmd is the command that queries the route finder.
var RootCmd = &cobra.Command{
Use: "rtfind <public-key-node-1> <public-key-node-2>",
Short: "Queries the Route Finder for available routes between two nodes",
Args: cobra.MinimumNArgs(2),
Run: func(_ *cobra.Command, args []string) {
rfc := client.NewHTTP(frAddr, timeout)
rfc := rfclient.NewHTTP(frAddr, timeout)

var srcPK, dstPK cipher.PubKey
internal.Catch(srcPK.Set(args[0]))
internal.Catch(dstPK.Set(args[1]))

forward, reverse, err := rfc.PairedRoutes(srcPK, dstPK, frMinHops, frMaxHops)
forward := [2]cipher.PubKey{srcPK, dstPK}
backward := [2]cipher.PubKey{dstPK, srcPK}
ctx := context.Background()
routes, err := rfc.FindRoutes(ctx, []routing.PathEdges{forward, backward},
&rfclient.RouteOptions{MinHops: frMinHops, MaxHops: frMaxHops})
internal.Catch(err)

fmt.Println("forward: ", forward)
fmt.Println("reverse: ", reverse)
fmt.Println("forward: ", routes[forward][0])
fmt.Println("reverse: ", routes[backward][0])
},
}
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ require (
github.com/gorilla/securecookie v1.1.1
github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d
github.com/mitchellh/go-homedir v1.1.0
github.com/pkg/errors v0.8.1
github.com/pkg/profile v1.3.0
github.com/prometheus/client_golang v1.1.0
github.com/prometheus/common v0.7.0
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,11 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxv
github.com/konsorten/go-windows-terminal-sequences v1.0.2 h1:DB17ag19krx9CFsz4o3enTrPXyIXCl+2iCXH/aMAp9s=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.3/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ=
Expand Down Expand Up @@ -165,13 +167,15 @@ golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3 h1:4y9KwBHBgBNwDbtu44R5o1fdOCQUEXhbk/P4A9WmJq0=
golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20181112210238-4b1f3b6b1646/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190627182818-9947fec5c3ab/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
Expand Down
101 changes: 47 additions & 54 deletions pkg/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ type App struct {
config Config
proto *Protocol

acceptChan chan [2]routing.Addr
acceptChan chan routing.RouteDescriptor
doneChan chan struct{}

conns map[routing.Loop]io.ReadWriteCloser
conns map[routing.RouteDescriptor]net.Conn
mu sync.Mutex
}

Expand Down Expand Up @@ -78,9 +78,9 @@ func SetupFromPipe(config *Config, inFD, outFD uintptr) (*App, error) {
app := &App{
config: *config,
proto: NewProtocol(pipeConn),
acceptChan: make(chan [2]routing.Addr),
acceptChan: make(chan routing.RouteDescriptor),
doneChan: make(chan struct{}),
conns: make(map[routing.Loop]io.ReadWriteCloser),
conns: make(map[routing.RouteDescriptor]net.Conn),
}

go app.handleProto()
Expand All @@ -100,9 +100,9 @@ func New(conn net.Conn, conf *Config) (*App, error) {
app := &App{
config: *conf,
proto: NewProtocol(conn),
acceptChan: make(chan [2]routing.Addr),
acceptChan: make(chan routing.RouteDescriptor),
doneChan: make(chan struct{}),
conns: make(map[routing.Loop]io.ReadWriteCloser),
conns: make(map[routing.RouteDescriptor]net.Conn),
}

go app.handleProto()
Expand Down Expand Up @@ -152,34 +152,32 @@ func (app *App) Close() error {
// Accept awaits for incoming loop confirmation request from a Node and
// returns net.Conn for received loop.
func (app *App) Accept() (net.Conn, error) {
fmt.Println("!!! [ACCEPT] start !!!")
addrs := <-app.acceptChan
fmt.Println("!!! [ACCEPT] read from ch !!!")
laddr := addrs[0]
raddr := addrs[1]
desc := <-app.acceptChan

loop := routing.Loop{Local: routing.Addr{Port: laddr.Port}, Remote: raddr}
conn, out := net.Pipe()
app.mu.Lock()
app.conns[loop] = conn
app.conns[desc] = conn
app.mu.Unlock()
go app.serveConn(loop, conn)
return newAppConn(out, laddr, raddr), nil
go app.serveConn(desc, conn)
return newAppConn(out, desc.Src(), desc.Dst()), nil
}

// Dial sends create loop request to a Node and returns net.Conn for created loop.
func (app *App) Dial(raddr routing.Addr) (net.Conn, error) {
var laddr routing.Addr
err := app.proto.Send(FrameCreateLoop, raddr, &laddr)
err := app.proto.Send(FrameCreateRoutes, raddr, &laddr)
if err != nil {
return nil, err
}
loop := routing.Loop{Local: routing.Addr{Port: laddr.Port}, Remote: raddr}

desc := routing.NewRouteDescriptor(laddr.PubKey, raddr.PubKey, laddr.Port, raddr.Port)
conn, out := net.Pipe()

app.mu.Lock()
app.conns[loop] = conn
app.conns[desc] = conn
app.mu.Unlock()
go app.serveConn(loop, conn)

go app.serveConn(desc, conn)
return newAppConn(out, laddr, raddr), nil
}

Expand All @@ -190,10 +188,15 @@ func (app *App) Addr() net.Addr {

func (app *App) handleProto() {
err := app.proto.Serve(func(frame Frame, payload []byte) (res interface{}, err error) {
fmt.Printf("!!! app received frame: %s\n", frame)
switch frame {
case FrameConfirmLoop:
err = app.confirmLoop(payload)
case FrameRoutesCreated:
var routes []routing.Route
err = json.Unmarshal(payload, &routes)
if err != nil {
break
}

err = app.confirmRoutes(routes)
case FrameSend:
err = app.forwardPacket(payload)
case FrameClose:
Expand All @@ -210,7 +213,7 @@ func (app *App) handleProto() {
}
}

func (app *App) serveConn(loop routing.Loop, conn io.ReadWriteCloser) {
func (app *App) serveConn(desc routing.RouteDescriptor, conn io.ReadWriteCloser) {
defer func() {
if err := conn.Close(); err != nil {
log.WithError(err).Warn("failed to close connection")
Expand All @@ -224,19 +227,19 @@ func (app *App) serveConn(loop routing.Loop, conn io.ReadWriteCloser) {
break
}

packet := &Packet{Loop: loop, Payload: buf[:n]}
packet := &Packet{Desc: desc, Payload: buf[:n]}
if err := app.proto.Send(FrameSend, packet, nil); err != nil {
break
}
}

app.mu.Lock()
if _, ok := app.conns[loop]; ok {
if err := app.proto.Send(FrameClose, &loop, nil); err != nil {
if _, ok := app.conns[desc]; ok {
if err := app.proto.Send(FrameClose, &desc, nil); err != nil {
log.WithError(err).Warn("Failed to send command frame")
}
}
delete(app.conns, loop)
delete(app.conns, desc)
app.mu.Unlock()
}

Expand All @@ -246,10 +249,8 @@ func (app *App) forwardPacket(data []byte) error {
return err
}

fmt.Printf("!!! packet loop: %s\n", packet.Loop)

app.mu.Lock()
conn := app.conns[packet.Loop]
conn := app.conns[packet.Desc]
app.mu.Unlock()

if conn == nil {
Expand All @@ -261,14 +262,14 @@ func (app *App) forwardPacket(data []byte) error {
}

func (app *App) closeConn(data []byte) error {
var loop routing.Loop
if err := json.Unmarshal(data, &loop); err != nil {
var route routing.Route
if err := json.Unmarshal(data, &route); err != nil {
return err
}

app.mu.Lock()
conn := app.conns[loop]
delete(app.conns, loop)
conn := app.conns[route.Desc]
delete(app.conns, route.Desc)
app.mu.Unlock()

if conn != nil {
Expand All @@ -277,28 +278,20 @@ func (app *App) closeConn(data []byte) error {
return nil
}

func (app *App) confirmLoop(data []byte) error {
fmt.Println("!!! [confirmLoop] !!!")
var addrs [2]routing.Addr
if err := json.Unmarshal(data, &addrs); err != nil {
return err
}

laddr := addrs[0]
raddr := addrs[1]

app.mu.Lock()
conn := app.conns[routing.Loop{Local: laddr, Remote: raddr}]
app.mu.Unlock()
func (app *App) confirmRoutes(routes []routing.Route) error {
for _, route := range routes {
app.mu.Lock()
conn := app.conns[route.Desc]
app.mu.Unlock()

if conn != nil {
return errors.New("loop is already created")
}
if conn != nil {
return errors.New("loop is already created")
}

fmt.Println("!!! [confirmLoop] selecting !!!")
select {
case app.acceptChan <- addrs:
default:
select {
case app.acceptChan <- route.Desc:
default:
}
}

return nil
Expand Down
Loading

0 comments on commit fc1030c

Please sign in to comment.