Skip to content

Commit

Permalink
added resiliency to apps
Browse files Browse the repository at this point in the history
  • Loading branch information
ivcosla committed Jun 15, 2019
1 parent 45a02aa commit 257e408
Show file tree
Hide file tree
Showing 58 changed files with 11,340 additions and 22 deletions.
14 changes: 10 additions & 4 deletions cmd/apps/skychat/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,18 @@ import (
"encoding/json"
"flag"
"fmt"
"github.com/skycoin/skywire/internal/netutil"
"github.com/skycoin/skywire/pkg/app"
"github.com/skycoin/skywire/pkg/cipher"
"log"
"net"
"net/http"
"sync"

"github.com/skycoin/skywire/pkg/app"
"github.com/skycoin/skywire/pkg/cipher"
"time"
)

var addr = flag.String("addr", ":8000", "address to bind")
var r = netutil.NewRetrier(50*time.Millisecond, time.Second, 2)

var (
chatApp *app.App
Expand Down Expand Up @@ -121,7 +123,11 @@ func messageHandler(w http.ResponseWriter, req *http.Request) {
go handleConn(conn)
}

if _, err := conn.Write([]byte(data["message"])); err != nil {
err := r.Do(func() error {
_, err := conn.Write([]byte(data["message"]))
return err
})
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
Expand Down
10 changes: 9 additions & 1 deletion cmd/apps/therealproxy-client/therealproxy-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@ package main

import (
"flag"
"github.com/skycoin/skywire/internal/netutil"
"log"
"net"
"time"

"github.com/skycoin/skywire/internal/therealproxy"
"github.com/skycoin/skywire/pkg/app"
"github.com/skycoin/skywire/pkg/cipher"
)

const socksPort = 3
var r = netutil.NewRetrier(50*time.Millisecond, 10*time.Second, 2)

func main() {
var addr = flag.String("addr", ":1080", "Client address to listen on")
Expand All @@ -35,7 +39,11 @@ func main() {
log.Fatal("Invalid server PubKey: ", err)
}

conn, err := socksApp.Dial(&app.Addr{PubKey: pk, Port: uint16(socksPort)})
var conn net.Conn
err = r.Do(func() error {
conn, err = socksApp.Dial(&app.Addr{PubKey: pk, Port: uint16(socksPort)})
return err
})
if err != nil {
log.Fatal("Failed to dial to a server: ", err)
}
Expand Down
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,23 @@ module github.com/skycoin/skywire
go 1.12

require (
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc // indirect
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf // indirect
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5
github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6
github.com/go-chi/chi v4.0.2+incompatible
github.com/google/uuid v1.1.1
github.com/gorilla/handlers v1.4.0
github.com/gorilla/securecookie v1.1.1
github.com/hashicorp/go-retryablehttp v0.5.4 // indirect
github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/kr/pty v1.1.4
github.com/mattn/go-colorable v0.1.1 // indirect
github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b // indirect
github.com/mitchellh/go-homedir v1.1.0
github.com/prometheus/client_golang v0.9.2
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275
github.com/sirupsen/logrus v1.4.1
github.com/skycoin/skycoin v0.25.1
github.com/spf13/cobra v0.0.3
Expand All @@ -24,4 +28,5 @@ require (
go.etcd.io/bbolt v1.3.2
golang.org/x/crypto v0.0.0-20190418165655-df01cb2cc480
golang.org/x/net v0.0.0-20190419010253-1f3472d942ba
gopkg.in/alecthomas/kingpin.v2 v2.2.6 // indirect
)
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc h1:cAKDfWh5VpdgMhJosfJnn5/FoN2SRZ4p7fJNX58YPaU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf h1:qet1QNfXsQxTZqLG4oE62mJzwPIB8+Tee4RNCL9ulrY=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0=
Expand All @@ -17,6 +21,10 @@ github.com/gorilla/handlers v1.4.0 h1:XulKRWSQK5uChr4pEgSE4Tc/OcmnU9GJuSwdog/tZs
github.com/gorilla/handlers v1.4.0/go.mod h1:Qkdc/uu4tH4g6mTK6auzZ766c4CA0Ng8+o/OAirnOIQ=
github.com/gorilla/securecookie v1.1.1 h1:miw7JPhV+b/lAHSXz4qd/nN9jRiAFV5FwjeKyCS8BvQ=
github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
github.com/hashicorp/go-cleanhttp v0.5.0 h1:wvCrVc9TjDls6+YGAF2hAifE1E5U1+b4tH6KdvN3Gig=
github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-retryablehttp v0.5.4 h1:1BZvpawXoJCWX6pNtow9+rpEj+3itIlutiqnntI6jOE=
github.com/hashicorp/go-retryablehttp v0.5.4/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs=
github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d h1:kJCB4vdITiW1eC1vq2e6IsrXKrZit1bv/TDYFGMp4BQ=
github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
Expand Down Expand Up @@ -73,3 +81,5 @@ golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e h1:nFYrTHrdrAOpShe27kaFHjsqYSEQ0KWqdWLu3xuZJts=
golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
46 changes: 31 additions & 15 deletions internal/netutil/retrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ package netutil
import (
"errors"
"github.com/prometheus/common/log"
"net"
"time"
)

var ErrThresholdReached = errors.New("threshold timeout has been reached")

type ConnFunc func(conn *net.Conn) error
type RetryFunc func() error

type Retrier struct {
exponentialBackoff time.Duration
Expand Down Expand Up @@ -37,27 +36,44 @@ func (r *Retrier) WithErrWhitelist(errors ...error) *Retrier {
return r
}

func (r Retrier) Do(conn *net.Conn, f ConnFunc) error {
func (r Retrier) Do(f RetryFunc) error {
counter := time.Duration(0)
currentBackoff := r.exponentialBackoff
var err error

for counter < r.threshold {
err = f(conn)
if err == nil {
return nil
}
t := time.NewTicker(r.exponentialBackoff)
doneCh := time.After(r.threshold)
errCh := make(chan error)
go func() {
errCh <- f()
}()
for {
select {
case <- doneCh:
return ErrThresholdReached
case <- t.C:
counter += currentBackoff
currentBackoff = currentBackoff * time.Duration(r.exponentialFactor)
t.Stop()
t = time.NewTicker(currentBackoff)

if r.isWhitelisted(err) {
return err
go func() {
errCh <- f()
}()
case err = <-errCh:
if err != nil {
if r.isWhitelisted(err) {
return err
}
} else {
return nil
}
log.Warn(err)
}

log.Warn(err)
counter += time.Duration(r.exponentialFactor) * r.exponentialBackoff
}

return ErrThresholdReached
}


func (r Retrier) isWhitelisted(err error) bool {
_, ok := r.errWhitelist[err]
return ok
Expand Down
62 changes: 62 additions & 0 deletions internal/netutil/retrier_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package netutil

import (
"errors"
"github.com/stretchr/testify/require"
"testing"
"time"
)

func TestRetrier_Do(t *testing.T) {
r := NewRetrier(time.Millisecond*100,time.Millisecond*500,2)
c := 0
threshold := 2
f := func() error {
c++
if c >= threshold {
return nil
}

return errors.New("foo")
}

t.Run("should retry", func(t *testing.T) {
c = 0

err := r.Do(f)
require.NoError(t, err)
})

t.Run("if retry reaches threshold should error", func(t *testing.T){
c = 0
threshold = 4
defer func() {
threshold = 2
}()

err := r.Do(f)
require.Error(t, err)
})

t.Run("if function times out should error", func(t *testing.T) {
c = 0
slowF := func() error {
time.Sleep(time.Second)
return nil
}

err := r.Do(slowF)
require.Error(t, err)
})

t.Run("should return whitelisted errors if any instead of retry", func(t *testing.T){
bar := errors.New("bar")
wR := NewRetrier(50*time.Millisecond, time.Second, 2).WithErrWhitelist(bar)
barF := func() error {
return bar
}

err := wR.Do(barF)
require.EqualError(t, err, bar.Error())
})
}
8 changes: 7 additions & 1 deletion internal/therealproxy/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ func NewClient(conn net.Conn) (*Client, error) {
// ListenAndServe start tcp listener on addr and proxies incoming
// connection to a remote proxy server.
func (c *Client) ListenAndServe(addr string) error {
var stream net.Conn
var err error

l, err := net.Listen("tcp", addr)
if err != nil {
return fmt.Errorf("listen: %s", err)
Expand All @@ -40,7 +43,10 @@ func (c *Client) ListenAndServe(addr string) error {
return fmt.Errorf("accept: %s", err)
}

stream, err := c.session.Open()
err = r.Do(func() error {
stream, err = c.session.Open()
return err
})
if err != nil {
return fmt.Errorf("yamux: %s", err)
}
Expand Down
4 changes: 4 additions & 0 deletions internal/therealproxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@ package therealproxy

import (
"fmt"
"github.com/skycoin/skywire/internal/netutil"
"log"
"net"
"time"

socks5 "github.com/armon/go-socks5"
"github.com/hashicorp/yamux"
)

var r = netutil.NewRetrier(50*time.Millisecond,500*time.Millisecond,2)

// Server implements multiplexing proxy server using yamux.
type Server struct {
socks *socks5.Server
Expand Down
12 changes: 11 additions & 1 deletion internal/therealssh/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import (
"encoding/binary"
"errors"
"fmt"
"github.com/skycoin/skywire/internal/netutil"
"io"
"log"
"net"
"net/rpc"
"strings"
"time"

"github.com/kr/pty"

Expand All @@ -17,6 +19,8 @@ import (
"github.com/skycoin/skywire/pkg/app"
)

var r = netutil.NewRetrier(50*time.Millisecond, time.Second, 2)

// Dialer dials to a remote node.
type Dialer interface {
Dial(raddr *app.Addr) (net.Conn, error)
Expand Down Expand Up @@ -47,7 +51,13 @@ func NewClient(rpcAddr string, d Dialer) (net.Listener, *Client, error) {

// OpenChannel requests new Channel on the remote Server.
func (c *Client) OpenChannel(remotePK cipher.PubKey) (localID uint32, sshCh *SSHChannel, cErr error) {
conn, err := c.dialer.Dial(&app.Addr{PubKey: remotePK, Port: Port})
var conn net.Conn
var err error

err = r.Do(func() error {
conn, err = c.dialer.Dial(&app.Addr{PubKey: remotePK, Port: Port})
return err
})
if err != nil {
cErr = fmt.Errorf("dial failed: %s", err)
return
Expand Down
27 changes: 27 additions & 0 deletions vendor/github.com/alecthomas/template/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 25 additions & 0 deletions vendor/github.com/alecthomas/template/README.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 257e408

Please sign in to comment.