Skip to content

Commit

Permalink
checkpoint: use retryablehttp for checkpoint data retrieval
Browse files Browse the repository at this point in the history
  • Loading branch information
ivan4th committed Jul 12, 2024
1 parent e581bb0 commit 05a5cf6
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 72 deletions.
53 changes: 21 additions & 32 deletions checkpoint/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,28 +41,28 @@ type Config struct {
// set to false if atxs are not compatible before and after the checkpoint recovery.
PreserveOwnAtx bool `mapstructure:"preserve-own-atx"`

RetryCount int `mapstructure:"retry-count"`
RetryInterval time.Duration `mapstructure:"retry-interval"`
RetryMax int `mapstructure:"retry-max"`
RetryDelay time.Duration `mapstructure:"retry-delay"`
}

func DefaultConfig() Config {
return Config{
PreserveOwnAtx: true,
RetryCount: 5,
RetryInterval: 3 * time.Second,
RetryMax: 5,
RetryDelay: 3 * time.Second,
}
}

type RecoverConfig struct {
GoldenAtx types.ATXID
DataDir string
DbFile string
LocalDbFile string
NodeIDs []types.NodeID // IDs to preserve own ATXs
Uri string
Restore types.LayerID
RetryCount int
RetryInterval time.Duration
GoldenAtx types.ATXID
DataDir string
DbFile string
LocalDbFile string
NodeIDs []types.NodeID // IDs to preserve own ATXs
Uri string
Restore types.LayerID
RetryMax int
RetryDelay time.Duration
}

func (c *RecoverConfig) DbPath() string {
Expand All @@ -83,8 +83,8 @@ func copyToLocalFile(
fs afero.Fs,
dataDir, uri string,
restore types.LayerID,
retryCount int,
retryInterval time.Duration,
retryMax int,
retryDelay time.Duration,
) (string, error) {
parsed, err := url.Parse(uri)
if err != nil {
Expand All @@ -99,23 +99,12 @@ func copyToLocalFile(
logger.Info("old recovery data backed up", log.ZContext(ctx), zap.String("dir", bdir))
}
dst := RecoveryFilename(dataDir, filepath.Base(parsed.String()), restore)
for range retryCount + 1 {
err = httpToLocalFile(ctx, parsed, fs, dst)
switch {
case errors.Is(err, ErrTransient):
select {
case <-ctx.Done():
return "", ctx.Err()
case <-time.After(retryInterval):
}
case err != nil:
return "", err
default:
logger.Info("checkpoint data persisted", log.ZContext(ctx), zap.String("file", dst))
return dst, nil
}
if err = httpToLocalFile(ctx, parsed, fs, dst, retryMax, retryDelay); err != nil {
return "", err
}
return "", ErrCheckpointRequestFailed

logger.Info("checkpoint data persisted", log.ZContext(ctx), zap.String("file", dst))
return dst, nil
}

type AtxDep struct {
Expand Down Expand Up @@ -194,7 +183,7 @@ func RecoverWithDb(
logger.Info("recover from uri", zap.String("uri", cfg.Uri))
cpFile, err := copyToLocalFile(
ctx, logger, fs, cfg.DataDir, cfg.Uri, cfg.Restore,
cfg.RetryCount, cfg.RetryInterval,
cfg.RetryMax, cfg.RetryDelay,
)
if err != nil {
return nil, err
Expand Down
18 changes: 9 additions & 9 deletions checkpoint/recovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,15 +184,15 @@ func TestRecover(t *testing.T) {
fail.Store(tc.reqFail)
fs := afero.NewMemMapFs()
cfg := &checkpoint.RecoverConfig{
GoldenAtx: goldenAtx,
DataDir: t.TempDir(),
DbFile: "test.sql",
LocalDbFile: "local.sql",
NodeIDs: []types.NodeID{types.RandomNodeID()},
Uri: tc.uri,
Restore: types.LayerID(recoverLayer),
RetryCount: 5,
RetryInterval: 100 * time.Millisecond,
GoldenAtx: goldenAtx,
DataDir: t.TempDir(),
DbFile: "test.sql",
LocalDbFile: "local.sql",
NodeIDs: []types.NodeID{types.RandomNodeID()},
Uri: tc.uri,
Restore: types.LayerID(recoverLayer),
RetryMax: 5,
RetryDelay: 100 * time.Millisecond,
}
bsdir := filepath.Join(cfg.DataDir, bootstrap.DirName)
require.NoError(t, fs.MkdirAll(bsdir, 0o700))
Expand Down
46 changes: 24 additions & 22 deletions checkpoint/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"path/filepath"
"time"

"github.com/hashicorp/go-retryablehttp"
"github.com/santhosh-tekuri/jsonschema/v5"
"github.com/spf13/afero"

Expand All @@ -24,7 +25,6 @@ import (
)

var (
ErrTransient = errors.New("transient error")
ErrCheckpointNotFound = errors.New("checkpoint not found")
ErrCheckpointRequestFailed = errors.New("checkpoint request failed")
ErrUrlSchemeNotSupported = errors.New("url scheme not supported")
Expand Down Expand Up @@ -111,36 +111,38 @@ func CopyFile(fs afero.Fs, src, dst string) error {
return rf.Copy(fs, srcf)
}

func httpToLocalFile(ctx context.Context, resource *url.URL, fs afero.Fs, dst string) error {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, resource.String(), nil)
func httpToLocalFile(
ctx context.Context,
resource *url.URL,
fs afero.Fs,
dst string,
retryMax int,
retryDelay time.Duration,
) error {
c := retryablehttp.NewClient()
c.RetryMax = retryMax
c.RetryWaitMin = retryDelay
c.RetryWaitMax = retryDelay * 2
c.Backoff = retryablehttp.LinearJitterBackoff

req, err := retryablehttp.NewRequestWithContext(ctx, http.MethodGet, resource.String(), nil)
if err != nil {
return fmt.Errorf("create http request: %w", err)
}
resp, err := (&http.Client{}).Do(req)
urlErr := &url.Error{}
switch {
case errors.As(err, &urlErr):
// It makes sense to retry on network errors.
return ErrTransient
case err != nil:

resp, err := c.Do(req)
if err != nil {
// This shouldn't really happen. According to net/http docs for Do:
// "Any returned error will be of type *url.Error."
return fmt.Errorf("http get recovery file: %w", err)
return fmt.Errorf("%w: %w", ErrCheckpointRequestFailed, err)
}
defer resp.Body.Close()
switch {
case resp.StatusCode == http.StatusOK:
// Continue
case resp.StatusCode == http.StatusNotFound:
// The checkpoint is not found. This is not considered a fatal error.
switch resp.StatusCode {
case http.StatusOK:
case http.StatusNotFound:
return ErrCheckpointNotFound
case resp.StatusCode%100 == 4:
// Can't load the checkoint but it maybe due to an unexpected server problem
// that is not likely to be resolved by retrying.
return ErrCheckpointRequestFailed
default:
// It makes sense to retry on other server errors.
return ErrTransient
return fmt.Errorf("%w: status code %d", ErrCheckpointRequestFailed, resp.StatusCode)

Check warning on line 145 in checkpoint/util.go

View check run for this annotation

Codecov / codecov/patch

checkpoint/util.go#L144-L145

Added lines #L144 - L145 were not covered by tests
}
rf, err := NewRecoveryFile(fs, dst)
if err != nil {
Expand Down
18 changes: 9 additions & 9 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,15 +436,15 @@ func (app *App) loadCheckpoint(ctx context.Context) (*checkpoint.PreservedData,
}
}
cfg := &checkpoint.RecoverConfig{
GoldenAtx: types.ATXID(app.Config.Genesis.GoldenATX()),
DataDir: app.Config.DataDir(),
DbFile: dbFile,
LocalDbFile: localDbFile,
NodeIDs: nodeIDs,
Uri: app.Config.Recovery.Uri,
Restore: types.LayerID(app.Config.Recovery.Restore),
RetryCount: app.Config.Recovery.RetryCount,
RetryInterval: app.Config.Recovery.RetryInterval,
GoldenAtx: types.ATXID(app.Config.Genesis.GoldenATX()),
DataDir: app.Config.DataDir(),
DbFile: dbFile,
LocalDbFile: localDbFile,
NodeIDs: nodeIDs,
Uri: app.Config.Recovery.Uri,
Restore: types.LayerID(app.Config.Recovery.Restore),
RetryMax: app.Config.Recovery.RetryMax,
RetryDelay: app.Config.Recovery.RetryDelay,

Check warning on line 447 in node/node.go

View check run for this annotation

Codecov / codecov/patch

node/node.go#L446-L447

Added lines #L446 - L447 were not covered by tests
}

return checkpoint.Recover(ctx, app.log.Zap(), afero.NewOsFs(), cfg)
Expand Down

0 comments on commit 05a5cf6

Please sign in to comment.