Skip to content

Commit

Permalink
Merge pull request #351 from skycoin/bug/messaging-discovery-right-se…
Browse files Browse the repository at this point in the history
…quence

added retry on sequence error for update
  • Loading branch information
ivcosla authored May 21, 2019
2 parents 04e1d63 + 90b8f84 commit a9d1066
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 21 deletions.
48 changes: 32 additions & 16 deletions pkg/messaging-discovery/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"sync"
"time"

"github.com/skycoin/skywire/pkg/cipher"
Expand All @@ -25,8 +25,9 @@ type APIClient interface {
// HTTPClient represents a client that communicates with a messaging-discovery service through http, it
// implements APIClient
type httpClient struct {
client http.Client
address string
client http.Client
address string
updateMux sync.Mutex // for thread-safe sequence incrementing
}

// NewHTTP constructs a new APIClient that communicates with discovery via http.
Expand Down Expand Up @@ -63,7 +64,7 @@ func (c *httpClient) Entry(ctx context.Context, publicKey cipher.PubKey) (*Entry
return nil, err
}

return nil, errors.New(message.String())
return nil, errFromString(message.String())
}

err = json.NewDecoder(resp.Body).Decode(&entry)
Expand Down Expand Up @@ -108,27 +109,42 @@ func (c *httpClient) SetEntry(ctx context.Context, e *Entry) error {
if err != nil {
return err
}

return errors.New(httpResponse.String())
return errFromString(httpResponse.Message)
}
return nil
}

// UpdateEntry updates Entry in messaging discovery.
func (c *httpClient) UpdateEntry(ctx context.Context, sk cipher.SecKey, e *Entry) error {
c.updateMux.Lock()
defer c.updateMux.Unlock()

e.Sequence++
e.Timestamp = time.Now().UnixNano()
err := e.Sign(sk)
if err != nil {
return err
}

err = c.SetEntry(ctx, e)
if err != nil {
e.Sequence--
for {
err := e.Sign(sk)
if err != nil {
return err
}
err = c.SetEntry(ctx, e)
if err == nil {
return nil
}
if err != ErrValidationWrongSequence {
e.Sequence--
return err
}
rE, entryErr := c.Entry(ctx, e.Static)
if entryErr != nil {
return err
}
if rE.Timestamp > e.Timestamp { // If there is a more up to date entry drop update
e.Sequence = rE.Sequence
return nil
}
e.Sequence = rE.Sequence + 1
}

return err
}

// AvailableServers returns list of available servers.
Expand Down Expand Up @@ -157,7 +173,7 @@ func (c *httpClient) AvailableServers(ctx context.Context) ([]*Entry, error) {
return nil, err
}

return nil, errors.New(message.String())
return nil, errFromString(message.String())
}

err = json.NewDecoder(resp.Body).Decode(&entries)
Expand Down
8 changes: 8 additions & 0 deletions pkg/messaging-discovery/client/client_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,14 @@ func TestNewMockUpdateEntriesEndpoint(t *testing.T) {
e.Server.Address = "different one"
},
},
{
name: "udpate retries on wrong sequence",
responseShouldError: false,
secretKey: sk,
entryPreHook: func(entry *client.Entry) {
entry.Sequence = 3
},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
Expand Down
24 changes: 24 additions & 0 deletions pkg/messaging-discovery/client/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,32 @@ var (
ErrValidationNoClientOrServer = NewEntryValidationError("entry has neither client or server field")
ErrValidationWrongSequence = NewEntryValidationError("sequence field of new entry is not sequence of old entry + 1")
ErrValidationWrongTime = NewEntryValidationError("previous entry timestamp is not set before current entry timestamp")

errReverseMap = map[string]error{
ErrKeyNotFound.Error(): ErrKeyNotFound,
ErrUnexpected.Error(): ErrUnexpected,
ErrUnauthorized.Error(): ErrUnauthorized,
ErrBadInput.Error(): ErrBadInput,
ErrValidationNonZeroSequence.Error(): ErrValidationNonZeroSequence,
ErrValidationNilEphemerals.Error(): ErrValidationNilEphemerals,
ErrValidationNilKeys.Error(): ErrValidationNilKeys,
ErrValidationNonNilEphemerals.Error(): ErrValidationNonNilEphemerals,
ErrValidationNoSignature.Error(): ErrValidationNoSignature,
ErrValidationNoVersion.Error(): ErrValidationNoVersion,
ErrValidationNoClientOrServer.Error(): ErrValidationNoClientOrServer,
ErrValidationWrongSequence.Error(): ErrValidationWrongSequence,
ErrValidationWrongTime.Error(): ErrValidationWrongTime,
}
)

func errFromString(s string) error {
err, ok := errReverseMap[s]
if !ok {
return ErrUnexpected
}
return err
}

// EntryValidationError represents transient error caused by invalid
// data in Entry
type EntryValidationError struct {
Expand Down
28 changes: 23 additions & 5 deletions pkg/messaging-discovery/client/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,30 @@ func (m *mockClient) SetEntry(ctx context.Context, e *Entry) error {
func (m *mockClient) UpdateEntry(ctx context.Context, sk cipher.SecKey, e *Entry) error {
e.Sequence++
e.Timestamp = time.Now().UnixNano()
err := e.Sign(sk)
if err != nil {
return err
}

return m.SetEntry(ctx, e)
for {
err := e.Sign(sk)
if err != nil {
return err
}
err = m.SetEntry(ctx, e)
if err == nil {
return nil
}
if err != ErrValidationWrongSequence {
e.Sequence--
return err
}
rE, entryErr := m.Entry(ctx, e.Static)
if entryErr != nil {
return err
}
if rE.Timestamp > e.Timestamp { // If there is a more up to date entry drop update
e.Sequence = rE.Sequence
return nil
}
e.Sequence = rE.Sequence + 1
}
}

// AvailableServers returns all the servers that the APIClient mock has
Expand Down

0 comments on commit a9d1066

Please sign in to comment.