Skip to content

Commit

Permalink
rewritted retrier as loop + sleep, proxy still broken
Browse files Browse the repository at this point in the history
  • Loading branch information
ivcosla committed Jun 19, 2019
1 parent 888e0c5 commit 93eb497
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 56 deletions.
2 changes: 1 addition & 1 deletion cmd/apps/skychat/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
)

var addr = flag.String("addr", ":8000", "address to bind")
var r = netutil.NewRetrier(50*time.Millisecond, time.Second, 2)
var r = netutil.NewRetrier(50*time.Millisecond, 5, 2)

var (
chatApp *app.App
Expand Down
2 changes: 1 addition & 1 deletion cmd/apps/therealproxy-client/therealproxy-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

const socksPort = 3

var r = netutil.NewRetrier(50*time.Millisecond, 10*time.Second, 2)
var r = netutil.NewRetrier(time.Second, 0, 1)

func main() {
var addr = flag.String("addr", ":1080", "Client address to listen on")
Expand Down
73 changes: 42 additions & 31 deletions internal/netutil/retrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,21 @@ import (
"github.com/prometheus/common/log"
)

var ErrThresholdReached = errors.New("threshold timeout has been reached")
var ErrMaximumRetriesReached = errors.New("maximum retries attempted without success")

type RetryFunc func() error

type Retrier struct {
exponentialBackoff time.Duration
exponentialFactor uint32
threshold time.Duration
exponentialFactor uint32 // multiplier for the backoff duration that is applied on every retry. If 0 the backoff is not increased during retries
times uint32 // number of times that the given function is going to be retried until success, if 0 it will be retried forever until success
errWhitelist map[error]struct{}
}

func NewRetrier(exponentialBackoff, threshold time.Duration, factor uint32) *Retrier {
func NewRetrier(exponentialBackoff time.Duration, times, factor uint32) *Retrier {
return &Retrier{
exponentialBackoff: exponentialBackoff,
threshold: threshold,
times: times,
exponentialFactor: factor,
errWhitelist: make(map[error]struct{}),
}
Expand All @@ -38,41 +38,52 @@ func (r *Retrier) WithErrWhitelist(errors ...error) *Retrier {
}

func (r Retrier) Do(f RetryFunc) error {
var err error
var backoff <-chan time.Time
var doneCh <-chan time.Time
if r.times == 0 {
return r.retryUntilSuccess(f)
}

return r.retryNTimes(f)
}

func (r Retrier) retryNTimes(f RetryFunc) error {
currentBackoff := r.exponentialBackoff

errCh := make(chan error)
go func() {
errCh <- f()
}()
for i:= uint32(0); i < r.times; i++ {
err := f()
if err != nil {
if r.isWhitelisted(err) {
return err
}

log.Warn(err)
currentBackoff = currentBackoff * time.Duration(r.exponentialFactor)
time.Sleep(currentBackoff)
continue
}

return nil
}

return ErrMaximumRetriesReached
}

func (r Retrier) retryUntilSuccess(f RetryFunc) error {
currentBackoff := r.exponentialBackoff

for {
select {
case <-doneCh:
return ErrThresholdReached
case <-backoff:
go func() {
errCh <- f()
}()
case err = <-errCh:
if err != nil {
if r.isWhitelisted(err) {
return err
}
} else {
return nil
err := f()
if err != nil {
if r.isWhitelisted(err) {
return err
}
log.Warn(err)

backoff = time.After(currentBackoff)
log.Warn(err)
currentBackoff = currentBackoff * time.Duration(r.exponentialFactor)
if doneCh == nil {
doneCh = time.After(r.threshold)
}
time.Sleep(currentBackoff)
continue
}

return nil
}
}

Expand Down
26 changes: 12 additions & 14 deletions internal/netutil/retrier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

func TestRetrier_Do(t *testing.T) {
r := NewRetrier(time.Millisecond*100, time.Millisecond*500, 2)
r := NewRetrier(time.Millisecond*100, 3, 2)
c := 0
threshold := 2
f := func() error {
Expand All @@ -28,7 +28,7 @@ func TestRetrier_Do(t *testing.T) {
require.NoError(t, err)
})

t.Run("if retry reaches threshold should error", func(t *testing.T) {
t.Run("if retry reaches max number of times should error", func(t *testing.T) {
c = 0
threshold = 4
defer func() {
Expand All @@ -39,25 +39,23 @@ func TestRetrier_Do(t *testing.T) {
require.Error(t, err)
})

t.Run("if function times out should error", func(t *testing.T) {
c = 0
slowF := func() error {
time.Sleep(100*time.Millisecond)
return errors.New("foo")
}

err := r.Do(slowF)
require.Error(t, err)
})

t.Run("should return whitelisted errors if any instead of retry", func(t *testing.T) {
bar := errors.New("bar")
wR := NewRetrier(50*time.Millisecond, time.Second, 2).WithErrWhitelist(bar)
wR := NewRetrier(50*time.Millisecond, 1, 2).WithErrWhitelist(bar)
barF := func() error {
return bar
}

err := wR.Do(barF)
require.EqualError(t, err, bar.Error())
})

t.Run("if times is 0, should retry until success", func(t *testing.T) {
c = 0
loopR := NewRetrier(50*time.Millisecond, 0, 1)
err := loopR.Do(f)
require.NoError(t, err)

require.Equal(t,threshold,c)
})
}
4 changes: 4 additions & 0 deletions internal/therealproxy/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@ package therealproxy

import (
"fmt"
"github.com/skycoin/skywire/internal/netutil"
"io"
"log"
"net"
"time"

"github.com/hashicorp/yamux"
)

var r = netutil.NewRetrier(50*time.Millisecond, 3, 2)

// Client implement multiplexing proxy client using yamux.
type Client struct {
session *yamux.Session
Expand Down
9 changes: 2 additions & 7 deletions internal/therealproxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,12 @@ package therealproxy

import (
"fmt"
"github.com/armon/go-socks5"
"github.com/hashicorp/yamux"
"log"
"net"
"time"

"github.com/skycoin/skywire/internal/netutil"

socks5 "github.com/armon/go-socks5"
"github.com/hashicorp/yamux"
)

var r = netutil.NewRetrier(50*time.Millisecond, 500*time.Millisecond, 2)

// Server implements multiplexing proxy server using yamux.
type Server struct {
Expand Down
2 changes: 1 addition & 1 deletion internal/therealssh/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/skycoin/skywire/pkg/app"
)

var r = netutil.NewRetrier(50*time.Millisecond, time.Second, 2)
var r = netutil.NewRetrier(50*time.Millisecond, 5, 2)

// Dialer dials to a remote node.
type Dialer interface {
Expand Down
4 changes: 3 additions & 1 deletion pkg/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,14 @@ func (app *App) Dial(raddr *Addr) (net.Conn, error) {
if err != nil {
return nil, err
}

fmt.Println("loop created")
addr := &LoopAddr{laddr.Port, *raddr}
conn, out := net.Pipe()
fmt.Println("locking mux...")
app.mu.Lock()
app.conns[*addr] = conn
app.mu.Unlock()
fmt.Println("serving conn...")
go app.serveConn(addr, conn)
return newAppConn(out, laddr, raddr), nil
}
Expand Down

0 comments on commit 93eb497

Please sign in to comment.