Skip to content

Commit

Permalink
Merge pull request #414 from ivcosla/feature/apps-retry-on-network-error
Browse files Browse the repository at this point in the history
Feature/apps retry on network error
  • Loading branch information
jdknives authored Jun 27, 2019
2 parents c3e882c + 679e144 commit b9db401
Show file tree
Hide file tree
Showing 59 changed files with 11,408 additions and 10 deletions.
15 changes: 13 additions & 2 deletions cmd/apps/skychat/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@ import (
"net"
"net/http"
"sync"
"time"

"github.com/skycoin/skywire/internal/netutil"
"github.com/skycoin/skywire/pkg/app"
"github.com/skycoin/skywire/pkg/cipher"
)

var addr = flag.String("addr", ":8000", "address to bind")
var r = netutil.NewRetrier(50*time.Millisecond, 5, 2)

var (
chatApp *app.App
Expand Down Expand Up @@ -109,7 +112,10 @@ func messageHandler(w http.ResponseWriter, req *http.Request) {

if conn == nil {
var err error
conn, err = chatApp.Dial(addr)
err = r.Do(func() error {
conn, err = chatApp.Dial(addr)
return err
})
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
Expand All @@ -122,10 +128,15 @@ func messageHandler(w http.ResponseWriter, req *http.Request) {
go handleConn(conn)
}

if _, err := conn.Write([]byte(data["message"])); err != nil {
_, err := conn.Write([]byte(data["message"]))
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
connsMu.Lock()
delete(chatConns, pk)
connsMu.Unlock()
return
}

}

func sseHandler(w http.ResponseWriter, req *http.Request) {
Expand Down
12 changes: 11 additions & 1 deletion cmd/apps/therealproxy-client/therealproxy-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ package main
import (
"flag"
"log"
"net"
"time"

"github.com/skycoin/skywire/internal/netutil"

"github.com/skycoin/skywire/internal/therealproxy"
"github.com/skycoin/skywire/pkg/app"
Expand All @@ -14,6 +18,8 @@ import (

const socksPort = 3

var r = netutil.NewRetrier(time.Second, 0, 1)

func main() {
var addr = flag.String("addr", ":1080", "Client address to listen on")
var serverPK = flag.String("srv", "", "PubKey of the server to connect to")
Expand All @@ -35,7 +41,11 @@ func main() {
log.Fatal("Invalid server PubKey: ", err)
}

conn, err := socksApp.Dial(&app.Addr{PubKey: pk, Port: uint16(socksPort)})
var conn net.Conn
err = r.Do(func() error {
conn, err = socksApp.Dial(&app.Addr{PubKey: pk, Port: uint16(socksPort)})
return err
})
if err != nil {
log.Fatal("Failed to dial to a server: ", err)
}
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ require (
github.com/google/uuid v1.1.1
github.com/gorilla/handlers v1.4.0
github.com/gorilla/securecookie v1.1.1
github.com/hashicorp/go-retryablehttp v0.5.4 // indirect
github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d
github.com/kr/pty v1.1.5
github.com/mattn/go-colorable v0.1.2 // indirect
github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b // indirect
github.com/mitchellh/go-homedir v1.1.0
github.com/pkg/profile v1.3.0
github.com/prometheus/client_golang v1.0.0
github.com/prometheus/common v0.4.1
github.com/sirupsen/logrus v1.4.2
github.com/skycoin/skycoin v0.26.0
github.com/spf13/cobra v0.0.5
Expand Down
8 changes: 5 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc h1:cAKDfWh5VpdgMhJosfJnn5/FoN2SRZ4p7fJNX58YPaU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf h1:qet1QNfXsQxTZqLG4oE62mJzwPIB8+Tee4RNCL9ulrY=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
Expand Down Expand Up @@ -32,6 +34,8 @@ github.com/gorilla/handlers v1.4.0 h1:XulKRWSQK5uChr4pEgSE4Tc/OcmnU9GJuSwdog/tZs
github.com/gorilla/handlers v1.4.0/go.mod h1:Qkdc/uu4tH4g6mTK6auzZ766c4CA0Ng8+o/OAirnOIQ=
github.com/gorilla/securecookie v1.1.1 h1:miw7JPhV+b/lAHSXz4qd/nN9jRiAFV5FwjeKyCS8BvQ=
github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-retryablehttp v0.5.4/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d h1:kJCB4vdITiW1eC1vq2e6IsrXKrZit1bv/TDYFGMp4BQ=
github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM=
Expand Down Expand Up @@ -116,17 +120,15 @@ golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0 h1:HyfiK1WMnHj5FXFXatD+Qs1A/xC2Run6RzeW1SyHxpc=
golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
103 changes: 103 additions & 0 deletions internal/netutil/retrier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package netutil

import (
"errors"
"time"

"github.com/prometheus/common/log"
)

// Package errors
var (
ErrMaximumRetriesReached = errors.New("maximum retries attempted without success")
)

// RetryFunc is a function used as argument of (*Retrier).Do(), which will retry on error unless it is whitelisted
type RetryFunc func() error

// Retrier holds a configuration for how retries should be performed
type Retrier struct {
exponentialBackoff time.Duration // multiplied on every retry by a exponentialFactor
exponentialFactor uint32 // multiplier for the backoff duration that is applied on every retry
times uint32 // number of times that the given function is going to be retried until success, if 0 it will be retried forever until success
errWhitelist map[error]struct{}
}

// NewRetrier returns a retrier that is ready to call Do() method
func NewRetrier(exponentialBackoff time.Duration, times, factor uint32) *Retrier {
return &Retrier{
exponentialBackoff: exponentialBackoff,
times: times,
exponentialFactor: factor,
errWhitelist: make(map[error]struct{}),
}
}

// WithErrWhitelist sets a list of errors into the retrier, if the RetryFunc provided to Do() fails with one of them it will return inmediatelly with such error. Calling
// this function is not thread-safe, and is advised to only use it when initializing the Retrier
func (r *Retrier) WithErrWhitelist(errors ...error) *Retrier {
m := make(map[error]struct{})
for _, err := range errors {
m[err] = struct{}{}
}

r.errWhitelist = m
return r
}

// Do takes a RetryFunc and attempts to execute it, if it fails with an error it will be retried a maximum of given times with an exponentialBackoff, until it returns
// nil or an error that is whitelisted
func (r Retrier) Do(f RetryFunc) error {
if r.times == 0 {
return r.retryUntilSuccess(f)
}

return r.retryNTimes(f)
}

func (r Retrier) retryNTimes(f RetryFunc) error {
currentBackoff := r.exponentialBackoff

for i := uint32(0); i < r.times; i++ {
err := f()
if err != nil {
if r.isWhitelisted(err) {
return err
}

log.Warn(err)
currentBackoff = currentBackoff * time.Duration(r.exponentialFactor)
time.Sleep(currentBackoff)
continue
}

return nil
}

return ErrMaximumRetriesReached
}

func (r Retrier) retryUntilSuccess(f RetryFunc) error {
currentBackoff := r.exponentialBackoff

for {
err := f()
if err != nil {
if r.isWhitelisted(err) {
return err
}

log.Warn(err)
currentBackoff = currentBackoff * time.Duration(r.exponentialFactor)
time.Sleep(currentBackoff)
continue
}

return nil
}
}

func (r Retrier) isWhitelisted(err error) bool {
_, ok := r.errWhitelist[err]
return ok
}
61 changes: 61 additions & 0 deletions internal/netutil/retrier_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package netutil

import (
"errors"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestRetrier_Do(t *testing.T) {
r := NewRetrier(time.Millisecond*100, 3, 2)
c := 0
threshold := 2
f := func() error {
c++
if c >= threshold {
return nil
}

return errors.New("foo")
}

t.Run("should retry", func(t *testing.T) {
c = 0

err := r.Do(f)
require.NoError(t, err)
})

t.Run("if retry reaches max number of times should error", func(t *testing.T) {
c = 0
threshold = 4
defer func() {
threshold = 2
}()

err := r.Do(f)
require.Error(t, err)
})

t.Run("should return whitelisted errors if any instead of retry", func(t *testing.T) {
bar := errors.New("bar")
wR := NewRetrier(50*time.Millisecond, 1, 2).WithErrWhitelist(bar)
barF := func() error {
return bar
}

err := wR.Do(barF)
require.EqualError(t, err, bar.Error())
})

t.Run("if times is 0, should retry until success", func(t *testing.T) {
c = 0
loopR := NewRetrier(50*time.Millisecond, 0, 1)
err := loopR.Do(f)
require.NoError(t, err)

require.Equal(t, threshold, c)
})
}
5 changes: 4 additions & 1 deletion internal/therealproxy/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ func NewClient(conn net.Conn) (*Client, error) {
// ListenAndServe start tcp listener on addr and proxies incoming
// connection to a remote proxy server.
func (c *Client) ListenAndServe(addr string) error {
var stream net.Conn
var err error

l, err := net.Listen("tcp", addr)
if err != nil {
return fmt.Errorf("listen: %s", err)
Expand All @@ -40,7 +43,7 @@ func (c *Client) ListenAndServe(addr string) error {
return fmt.Errorf("accept: %s", err)
}

stream, err := c.session.Open()
stream, err = c.session.Open()
if err != nil {
return fmt.Errorf("yamux: %s", err)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/therealproxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"log"
"net"

socks5 "github.com/armon/go-socks5"
"github.com/armon/go-socks5"
"github.com/hashicorp/yamux"
)

Expand Down
13 changes: 12 additions & 1 deletion internal/therealssh/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (
"net"
"net/rpc"
"strings"
"time"

"github.com/skycoin/skywire/internal/netutil"

"github.com/kr/pty"

Expand All @@ -17,6 +20,8 @@ import (
"github.com/skycoin/skywire/pkg/app"
)

var r = netutil.NewRetrier(50*time.Millisecond, 5, 2)

// Dialer dials to a remote node.
type Dialer interface {
Dial(raddr *app.Addr) (net.Conn, error)
Expand Down Expand Up @@ -47,7 +52,13 @@ func NewClient(rpcAddr string, d Dialer) (net.Listener, *Client, error) {

// OpenChannel requests new Channel on the remote Server.
func (c *Client) OpenChannel(remotePK cipher.PubKey) (localID uint32, sshCh *SSHChannel, cErr error) {
conn, err := c.dialer.Dial(&app.Addr{PubKey: remotePK, Port: Port})
var conn net.Conn
var err error

err = r.Do(func() error {
conn, err = c.dialer.Dial(&app.Addr{PubKey: remotePK, Port: Port})
return err
})
if err != nil {
cErr = fmt.Errorf("dial failed: %s", err)
return
Expand Down
1 change: 0 additions & 1 deletion pkg/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ func (app *App) Dial(raddr *Addr) (net.Conn, error) {
if err != nil {
return nil, err
}

addr := &LoopAddr{laddr.Port, *raddr}
conn, out := net.Pipe()
app.mu.Lock()
Expand Down
27 changes: 27 additions & 0 deletions vendor/github.com/alecthomas/template/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit b9db401

Please sign in to comment.