Skip to content

Commit

Permalink
Reconnect proxy if yamux session failed
Browse files Browse the repository at this point in the history
  • Loading branch information
nkryuchkov committed Nov 7, 2019
1 parent 57cc9dc commit 65202be
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 21 deletions.
26 changes: 9 additions & 17 deletions cmd/apps/therealproxy-client/therealproxy-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,15 @@ package main
import (
"flag"
"net"
"time"

"github.com/SkycoinProject/skywire-mainnet/internal/skyenv"

"github.com/SkycoinProject/dmsg/cipher"

"github.com/SkycoinProject/skywire-mainnet/internal/netutil"
"github.com/SkycoinProject/skywire-mainnet/internal/skyenv"
"github.com/SkycoinProject/skywire-mainnet/internal/therealproxy"
"github.com/SkycoinProject/skywire-mainnet/pkg/app"
"github.com/SkycoinProject/skywire-mainnet/pkg/routing"
)

var r = netutil.NewRetrier(time.Second, 0, 1)

func main() {
log := app.NewLogger(skyenv.SkyproxyClientName)
therealproxy.Log = log.PackageLogger(skyenv.SkyproxyClientName)
Expand Down Expand Up @@ -48,23 +43,20 @@ func main() {
log.Fatal("Invalid server PubKey: ", err)
}

var conn net.Conn
err = r.Do(func() error {
conn, err = socksApp.Dial(routing.Addr{PubKey: pk, Port: routing.Port(skyenv.SkyproxyPort)})
return err
})
log.Printf("Serving on %v", *addr)
l, err := net.Listen("tcp", *addr)
if err != nil {
log.Fatal("Failed to dial to a server: ", err)
log.Fatal("Failed to listen on %v: %v", *addr, err)
}

log.Printf("Connected to %v\n", pk)
remote := routing.Addr{PubKey: pk, Port: routing.Port(skyenv.SkyproxyPort)}

client, err := therealproxy.NewClient(conn)
client, err := therealproxy.NewClient(l, socksApp, remote)
if err != nil {
log.Fatal("Failed to create a new client: ", err)
}

log.Printf("Serving %v\n", addr)

log.Fatal(client.ListenAndServe(*addr))
if err := client.Serve(); err != nil {
log.Warnf("Failed to serve: %v", err)
}
}
94 changes: 91 additions & 3 deletions internal/therealproxy/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@ import (
"fmt"
"io"
"net"
"time"

"github.com/SkycoinProject/skycoin/src/util/logging"
"github.com/hashicorp/yamux"

"github.com/SkycoinProject/skywire-mainnet/internal/netutil"
"github.com/SkycoinProject/skywire-mainnet/pkg/app"
"github.com/SkycoinProject/skywire-mainnet/pkg/routing"
)

// Log is therealproxy package level logger, it can be replaced with a different one from outside the package
Expand All @@ -16,16 +21,99 @@ var Log = logging.MustGetLogger("therealproxy")
type Client struct {
session *yamux.Session
listener net.Listener
app *app.App
addr routing.Addr
}

// NewClient constructs a new Client.
func NewClient(conn net.Conn) (*Client, error) {
func NewClient(lis net.Listener, app *app.App, addr routing.Addr) (*Client, error) {
c := &Client{
listener: lis,
app: app,
addr: addr,
}
if err := c.connect(); err != nil {
return nil, err
}

return c, nil
}

func (c *Client) connect() error {
r := netutil.NewRetrier(time.Second, 0, 1)

var conn net.Conn
err := r.Do(func() error {
var err error
conn, err = c.app.Dial(c.addr)
return err
})
if err != nil {
return fmt.Errorf("failed to dial to a server: %v", err)
}

session, err := yamux.Client(conn, nil)
if err != nil {
return nil, fmt.Errorf("yamux: %s", err)
return fmt.Errorf("failed to create client: %s", err)
}

return &Client{session: session}, nil
c.session = session

return nil
}

func (c *Client) Serve() error {
var stream net.Conn

for {
conn, err := c.listener.Accept()
if err != nil {
return fmt.Errorf("accept: %s", err)
}

for {
stream, err = c.session.Open()
if err == nil {
break
}

Log.Warnf("Failed to open yamux session: %v", err)

delay := 1 * time.Second
Log.Warnf("Restarting in %v", delay)
time.Sleep(delay)

if err := c.connect(); err != nil {
Log.Warnf("Failed to reconnect, trying again")
}
}

go func() {
errCh := make(chan error, 2)
go func() {
_, err := io.Copy(stream, conn)
errCh <- err
}()

go func() {
_, err := io.Copy(conn, stream)
errCh <- err
}()

for err := range errCh {
if err := conn.Close(); err != nil {
Log.WithError(err).Warn("Failed to close connection")
}
if err := stream.Close(); err != nil {
Log.WithError(err).Warn("Failed to close stream")
}

if err != nil {
Log.Error("Copy error:", err)
}
}
}()
}
}

// ListenAndServe start tcp listener on addr and proxies incoming
Expand Down
2 changes: 1 addition & 1 deletion internal/therealproxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (s *Server) Serve(l net.Listener) error {

session, err := yamux.Server(conn, nil)
if err != nil {
return fmt.Errorf("yamux: %s", err)
return fmt.Errorf("yamux server failure: %s", err)
}

go func() {
Expand Down

0 comments on commit 65202be

Please sign in to comment.