Skip to content

Commit

Permalink
fix(api,sdk): allow to retry request when conflict due to database lock
Browse files Browse the repository at this point in the history
  • Loading branch information
richardlt committed Jun 10, 2020
1 parent 570f519 commit 34a2939
Show file tree
Hide file tree
Showing 12 changed files with 57 additions and 76 deletions.
2 changes: 1 addition & 1 deletion engine/api/application_import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ name: myNewApp`
//Do the request
rec := httptest.NewRecorder()
api.Router.Mux.ServeHTTP(rec, req)
assert.Equal(t, 409, rec.Code)
assert.Equal(t, 403, rec.Code)

//Check result
t.Logf(">>%s", rec.Body.String())
Expand Down
2 changes: 1 addition & 1 deletion engine/api/database/gorpmapping/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func Insert(db gorp.SqlExecutor, i interface{}) error {
case ViolateUniqueKeyPGCode:
err = sdk.NewError(sdk.ErrInvalidData, e)
case StringDataRightTruncation:
err = sdk.NewError(sdk.ErrConflict, e)
err = sdk.NewError(sdk.ErrInvalidData, e)
}
}

Expand Down
14 changes: 7 additions & 7 deletions engine/api/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,14 +256,14 @@ func (api *API) cloneEnvironmentHandler() service.Handler {
environmentName := vars["environmentName"]
cloneName := vars["cloneName"]

env, errEnv := environment.LoadEnvironmentByName(api.mustDB(), projectKey, environmentName)
if errEnv != nil {
return sdk.WrapError(errEnv, "cloneEnvironmentHandler> Cannot load environment %s: %s", environmentName, errEnv)
env, err := environment.LoadEnvironmentByName(api.mustDB(), projectKey, environmentName)
if err != nil {
return sdk.WrapError(err, "cannot load environment %s", environmentName)
}

p, errProj := project.Load(api.mustDB(), projectKey)
if errProj != nil {
return sdk.WrapError(errProj, "cloneEnvironmentHandler> Cannot load project %s: %s", projectKey, errProj)
p, err := project.Load(api.mustDB(), projectKey)
if err != nil {
return sdk.WrapError(err, "cannot load project %s", projectKey)
}

//Load all environments to check if there is another environment with the same name
Expand All @@ -274,7 +274,7 @@ func (api *API) cloneEnvironmentHandler() service.Handler {

for _, e := range envs {
if e.Name == cloneName {
return sdk.WrapError(sdk.ErrConflict, "cloneEnvironmentHandler> an environment was found with the same name: %s", cloneName)
return sdk.WrapError(sdk.ErrEnvironmentExist, "an environment was found with the same name: %s", cloneName)
}
}

Expand Down
2 changes: 1 addition & 1 deletion engine/api/environment/environment_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func ParseAndImport(db gorp.SqlExecutor, proj sdk.Project, eenv exportentities.E
// If the environment exists and we don't want to force, raise an error
var exist bool
if oldEnv != nil && !opts.Force {
return nil, nil, sdk.ErrEnvironmentExist
return nil, nil, sdk.WithStack(sdk.ErrEnvironmentExist)
}
if oldEnv != nil {
exist = true
Expand Down
2 changes: 1 addition & 1 deletion engine/api/environment_import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ name: myNewEnv`
//Do the request
rec := httptest.NewRecorder()
api.Router.Mux.ServeHTTP(rec, req)
assert.Equal(t, 409, rec.Code)
assert.Equal(t, 403, rec.Code)

//Check result
t.Logf(">>%s", rec.Body.String())
Expand Down
7 changes: 3 additions & 4 deletions engine/api/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package api

import (
"context"
"fmt"
"net/http"

"github.com/go-gorp/gorp"
Expand Down Expand Up @@ -55,17 +54,17 @@ func (api *API) postIntegrationModelHandler() service.Handler {
defer tx.Rollback() // nolint

if exist, err := integration.ModelExists(tx, m.Name); err != nil {
return sdk.WrapError(err, "Unable to check if model %s exist", m.Name)
return sdk.WrapError(err, "unable to check if model %s exist", m.Name)
} else if exist {
return sdk.NewError(sdk.ErrConflict, fmt.Errorf("integration model %s already exist", m.Name))
return sdk.NewErrorFrom(sdk.ErrAlreadyExist, "integration model %s already exist", m.Name)
}

if err := integration.InsertModel(tx, m); err != nil {
return sdk.WrapError(err, "unable to insert model %s", m.Name)
}

if err := tx.Commit(); err != nil {
return sdk.WrapError(err, "Unable to commit tx")
return sdk.WrapError(err, "unable to commit tx")
}

if m.Public {
Expand Down
2 changes: 1 addition & 1 deletion engine/api/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func (api *API) addPipelineHandler() service.Handler {
return sdk.WrapError(err, "cannot check if pipeline exist")
}
if exist {
return sdk.NewErrorFrom(sdk.ErrConflict, "pipeline %s already exists", p.Name)
return sdk.NewErrorFrom(sdk.ErrPipelineAlreadyExists, "pipeline %s already exists", p.Name)
}

tx, err := api.mustDB().Begin()
Expand Down
6 changes: 3 additions & 3 deletions engine/api/project.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ func (api *API) postProjectHandler() service.Handler {
return sdk.WrapError(errExist, "cannot check if project %s exist", p.Key)
}
if exist {
return sdk.WrapError(sdk.ErrConflict, "project %s already exists", p.Key)
return sdk.NewErrorFrom(sdk.ErrAlreadyExist, "project %s already exists", p.Key)
}

if err := project.Insert(tx, &p); err != nil {
Expand Down Expand Up @@ -641,11 +641,11 @@ func (api *API) deleteProjectHandler() service.Handler {
}

if len(p.Pipelines) > 0 {
return sdk.WrapError(sdk.ErrProjectHasPipeline, "deleteProject> Project '%s' still used by %d pipelines", key, len(p.Pipelines))
return sdk.WrapError(sdk.ErrProjectHasPipeline, "project '%s' still used by %d pipelines", key, len(p.Pipelines))
}

if len(p.Applications) > 0 {
return sdk.WrapError(sdk.ErrProjectHasApplication, "deleteProject> Project '%s' still used by %d applications", key, len(p.Applications))
return sdk.WrapError(sdk.ErrProjectHasApplication, "project '%s' still used by %d applications", key, len(p.Applications))
}

tx, errBegin := api.mustDB().Begin()
Expand Down
8 changes: 3 additions & 5 deletions engine/api/workflow/dao_label.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,18 @@ func LabelWorkflow(db gorp.SqlExecutor, labelID, workflowID int64) error {
log.Debug("LabelWorkflow> %d %d", labelID, workflowID)
if _, err := db.Exec("INSERT INTO project_label_workflow (label_id, workflow_id) VALUES ($1, $2)", labelID, workflowID); err != nil {
if errPG, ok := err.(*pq.Error); ok && errPG.Code == gorpmapping.ViolateUniqueKeyPGCode {
return sdk.WrapError(sdk.ErrConflict, "LabelWorkflow> this label %d is already linked to workflow %d", labelID, workflowID)
return sdk.WrapError(sdk.ErrForbidden, "this label %d is already linked to workflow %d", labelID, workflowID)
}
return sdk.WrapError(err, "Cannot link label %d to workflow %d", labelID, workflowID)
return sdk.WrapError(err, "cannot link label %d to workflow %d", labelID, workflowID)
}

return nil
}

// UnLabelWorkflow unlink a label on a workflow given his workflow id
func UnLabelWorkflow(db gorp.SqlExecutor, labelID, workflowID int64) error {
if _, err := db.Exec("DELETE FROM project_label_workflow WHERE label_id = $1 AND workflow_id = $2", labelID, workflowID); err != nil {
return sdk.WrapError(err, "Cannot unlink label %d to workflow %d", labelID, workflowID)
return sdk.WrapError(err, "cannot unlink label %d to workflow %d", labelID, workflowID)
}

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion engine/api/workflow/workflow_importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func Import(ctx context.Context, db gorp.SqlExecutor, store cache.Store, proj sd
}

if !force {
return sdk.NewErrorFrom(sdk.ErrConflict, "workflow exists")
return sdk.NewErrorFrom(sdk.ErrAlreadyExist, "workflow exists")
}

// Retrieve existing hook
Expand Down
54 changes: 22 additions & 32 deletions sdk/cdsclient/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,15 @@ func (c *client) Request(ctx context.Context, method string, path string, body i

var signinRouteRegexp = regexp.MustCompile(`\/auth\/consumer\/.*\/signin`)

func extractBodyErrorFromResponse(r *http.Response) error {
body, _ := ioutil.ReadAll(r.Body)
r.Body.Close() // nolint
if err := sdk.DecodeError(body); err != nil {
return sdk.WithStack(err)
}
return sdk.WithStack(fmt.Errorf("HTTP %d", r.StatusCode))
}

// Stream makes an authenticated http request and return io.ReadCloser
func (c *client) Stream(ctx context.Context, method string, path string, body io.Reader, noTimeout bool, mods ...RequestModifier) (io.ReadCloser, http.Header, int, error) {
// Checks that current session_token is still valid
Expand All @@ -182,14 +191,14 @@ func (c *client) Stream(ctx context.Context, method string, path string, body io

if checkToken && !c.config.HasValidSessionToken() && c.config.BuitinConsumerAuthenticationToken != "" {
if c.config.Verbose {
fmt.Printf("session token invalid: (%s). Relogin...\n", c.config.SessionToken)
log.Printf("session token invalid: (%s). Relogin...\n", c.config.SessionToken)
}
resp, err := c.AuthConsumerSignin(sdk.ConsumerBuiltin, sdk.AuthConsumerSigninRequest{"token": c.config.BuitinConsumerAuthenticationToken})
if err != nil {
return nil, nil, -1, sdk.WithStack(err)
}
if c.config.Verbose {
fmt.Println("jwt: ", resp.Token[:12])
log.Println("jwt: ", resp.Token[:12])
}
c.config.SessionToken = resp.Token
}
Expand Down Expand Up @@ -221,15 +230,15 @@ func (c *client) Stream(ctx context.Context, method string, path string, body io
var requestError error
if rs, ok := body.(io.ReadSeeker); ok {
if _, err := rs.Seek(0, 0); err != nil {
savederror = err
savederror = sdk.WithStack(err)
continue
}
req, requestError = http.NewRequest(method, url, body)
} else {
req, requestError = http.NewRequest(method, url, bytes.NewBuffer(bodyBytes))
}
if requestError != nil {
savederror = requestError
savederror = sdk.WithStack(requestError)
continue
}

Expand Down Expand Up @@ -262,7 +271,7 @@ func (c *client) Stream(ctx context.Context, method string, path string, body io
if strings.HasPrefix(url, c.config.Host) && !signinRouteRegexp.MatchString(path) {
if _, _, err := new(jwt.Parser).ParseUnverified(c.config.SessionToken, &sdk.AuthSessionJWTClaims{}); err == nil {
if c.config.Verbose {
fmt.Println("JWT recognized")
log.Println("JWT recognized")
}
auth := "Bearer " + c.config.SessionToken
req.Header.Add("Authorization", auth)
Expand All @@ -284,7 +293,8 @@ func (c *client) Stream(ctx context.Context, method string, path string, body io
resp, errDo = c.httpClient.Do(req)
}
if errDo != nil {
return nil, nil, 0, sdk.WithStack(errDo)
savederror = sdk.WithStack(errDo)
continue
}

if c.config.Verbose {
Expand All @@ -298,41 +308,21 @@ func (c *client) Stream(ctx context.Context, method string, path string, body io
c.config.SessionToken = ""
}

// if everything is fine, return body
if resp.StatusCode < 500 {
return resp.Body, resp.Header, resp.StatusCode, nil
if resp.StatusCode == 409 || resp.StatusCode > 500 {
savederror = extractBodyErrorFromResponse(resp)
continue
}

// if no request error by status > 500, check CDS error
// if there is a CDS errors, return it
if resp.StatusCode == 500 {
var body []byte
var errRead error
body, errRead = ioutil.ReadAll(resp.Body)
if errRead != nil {
resp.Body.Close()
continue
}
if err := sdk.DecodeError(body); err != nil {
resp.Body.Close()
return nil, resp.Header, resp.StatusCode, sdk.WithStack(err)
}
}

if resp != nil && resp.StatusCode >= 500 {
savederror = fmt.Errorf("HTTP %d", resp.StatusCode)
if resp != nil && resp.Body != nil {
resp.Body.Close()
}
continue
return nil, resp.Header, resp.StatusCode, extractBodyErrorFromResponse(resp)
}

if resp != nil && resp.Body != nil {
resp.Body.Close()
}
return resp.Body, resp.Header, resp.StatusCode, nil
}

return nil, nil, 0, sdk.WithStack(fmt.Errorf("x%d: %s", c.config.Retry, savederror))
return nil, nil, 0, sdk.WrapError(savederror, "request failed after %d retries", c.config.Retry)
}

// UploadMultiPart upload multipart
Expand Down
Loading

0 comments on commit 34a2939

Please sign in to comment.