Skip to content

Commit

Permalink
fix(api,sdk): allow to retry request when conflict due to database lock
Browse files Browse the repository at this point in the history
  • Loading branch information
richardlt committed Jun 10, 2020
1 parent 570f519 commit 6884454
Show file tree
Hide file tree
Showing 10 changed files with 56 additions and 74 deletions.
2 changes: 1 addition & 1 deletion engine/api/database/gorpmapping/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func Insert(db gorp.SqlExecutor, i interface{}) error {
case ViolateUniqueKeyPGCode:
err = sdk.NewError(sdk.ErrInvalidData, e)
case StringDataRightTruncation:
err = sdk.NewError(sdk.ErrConflict, e)
err = sdk.NewError(sdk.ErrInvalidData, e)
}
}

Expand Down
14 changes: 7 additions & 7 deletions engine/api/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,14 +256,14 @@ func (api *API) cloneEnvironmentHandler() service.Handler {
environmentName := vars["environmentName"]
cloneName := vars["cloneName"]

env, errEnv := environment.LoadEnvironmentByName(api.mustDB(), projectKey, environmentName)
if errEnv != nil {
return sdk.WrapError(errEnv, "cloneEnvironmentHandler> Cannot load environment %s: %s", environmentName, errEnv)
env, err := environment.LoadEnvironmentByName(api.mustDB(), projectKey, environmentName)
if err != nil {
return sdk.WrapError(err, "cannot load environment %s", environmentName)
}

p, errProj := project.Load(api.mustDB(), projectKey)
if errProj != nil {
return sdk.WrapError(errProj, "cloneEnvironmentHandler> Cannot load project %s: %s", projectKey, errProj)
p, err := project.Load(api.mustDB(), projectKey)
if err != nil {
return sdk.WrapError(err, "cannot load project %s", projectKey)
}

//Load all environments to check if there is another environment with the same name
Expand All @@ -274,7 +274,7 @@ func (api *API) cloneEnvironmentHandler() service.Handler {

for _, e := range envs {
if e.Name == cloneName {
return sdk.WrapError(sdk.ErrConflict, "cloneEnvironmentHandler> an environment was found with the same name: %s", cloneName)
return sdk.WrapError(sdk.ErrEnvironmentExist, "an environment was found with the same name: %s", cloneName)
}
}

Expand Down
2 changes: 1 addition & 1 deletion engine/api/environment/environment_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func ParseAndImport(db gorp.SqlExecutor, proj sdk.Project, eenv exportentities.E
// If the environment exists and we don't want to force, raise an error
var exist bool
if oldEnv != nil && !opts.Force {
return nil, nil, sdk.ErrEnvironmentExist
return nil, nil, sdk.WithStack(sdk.ErrEnvironmentExist)
}
if oldEnv != nil {
exist = true
Expand Down
7 changes: 3 additions & 4 deletions engine/api/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package api

import (
"context"
"fmt"
"net/http"

"github.com/go-gorp/gorp"
Expand Down Expand Up @@ -55,17 +54,17 @@ func (api *API) postIntegrationModelHandler() service.Handler {
defer tx.Rollback() // nolint

if exist, err := integration.ModelExists(tx, m.Name); err != nil {
return sdk.WrapError(err, "Unable to check if model %s exist", m.Name)
return sdk.WrapError(err, "unable to check if model %s exist", m.Name)
} else if exist {
return sdk.NewError(sdk.ErrConflict, fmt.Errorf("integration model %s already exist", m.Name))
return sdk.NewErrorFrom(sdk.ErrAlreadyExist, "integration model %s already exist", m.Name)
}

if err := integration.InsertModel(tx, m); err != nil {
return sdk.WrapError(err, "unable to insert model %s", m.Name)
}

if err := tx.Commit(); err != nil {
return sdk.WrapError(err, "Unable to commit tx")
return sdk.WrapError(err, "unable to commit tx")
}

if m.Public {
Expand Down
2 changes: 1 addition & 1 deletion engine/api/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func (api *API) addPipelineHandler() service.Handler {
return sdk.WrapError(err, "cannot check if pipeline exist")
}
if exist {
return sdk.NewErrorFrom(sdk.ErrConflict, "pipeline %s already exists", p.Name)
return sdk.NewErrorFrom(sdk.ErrPipelineAlreadyExists, "pipeline %s already exists", p.Name)
}

tx, err := api.mustDB().Begin()
Expand Down
6 changes: 3 additions & 3 deletions engine/api/project.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ func (api *API) postProjectHandler() service.Handler {
return sdk.WrapError(errExist, "cannot check if project %s exist", p.Key)
}
if exist {
return sdk.WrapError(sdk.ErrConflict, "project %s already exists", p.Key)
return sdk.NewErrorFrom(sdk.ErrAlreadyExist, "project %s already exists", p.Key)
}

if err := project.Insert(tx, &p); err != nil {
Expand Down Expand Up @@ -641,11 +641,11 @@ func (api *API) deleteProjectHandler() service.Handler {
}

if len(p.Pipelines) > 0 {
return sdk.WrapError(sdk.ErrProjectHasPipeline, "deleteProject> Project '%s' still used by %d pipelines", key, len(p.Pipelines))
return sdk.WrapError(sdk.ErrProjectHasPipeline, "project '%s' still used by %d pipelines", key, len(p.Pipelines))
}

if len(p.Applications) > 0 {
return sdk.WrapError(sdk.ErrProjectHasApplication, "deleteProject> Project '%s' still used by %d applications", key, len(p.Applications))
return sdk.WrapError(sdk.ErrProjectHasApplication, "project '%s' still used by %d applications", key, len(p.Applications))
}

tx, errBegin := api.mustDB().Begin()
Expand Down
8 changes: 3 additions & 5 deletions engine/api/workflow/dao_label.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,18 @@ func LabelWorkflow(db gorp.SqlExecutor, labelID, workflowID int64) error {
log.Debug("LabelWorkflow> %d %d", labelID, workflowID)
if _, err := db.Exec("INSERT INTO project_label_workflow (label_id, workflow_id) VALUES ($1, $2)", labelID, workflowID); err != nil {
if errPG, ok := err.(*pq.Error); ok && errPG.Code == gorpmapping.ViolateUniqueKeyPGCode {
return sdk.WrapError(sdk.ErrConflict, "LabelWorkflow> this label %d is already linked to workflow %d", labelID, workflowID)
return sdk.WrapError(sdk.ErrForbidden, "this label %d is already linked to workflow %d", labelID, workflowID)
}
return sdk.WrapError(err, "Cannot link label %d to workflow %d", labelID, workflowID)
return sdk.WrapError(err, "cannot link label %d to workflow %d", labelID, workflowID)
}

return nil
}

// UnLabelWorkflow unlink a label on a workflow given his workflow id
func UnLabelWorkflow(db gorp.SqlExecutor, labelID, workflowID int64) error {
if _, err := db.Exec("DELETE FROM project_label_workflow WHERE label_id = $1 AND workflow_id = $2", labelID, workflowID); err != nil {
return sdk.WrapError(err, "Cannot unlink label %d to workflow %d", labelID, workflowID)
return sdk.WrapError(err, "cannot unlink label %d to workflow %d", labelID, workflowID)
}

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion engine/api/workflow/workflow_importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func Import(ctx context.Context, db gorp.SqlExecutor, store cache.Store, proj sd
}

if !force {
return sdk.NewErrorFrom(sdk.ErrConflict, "workflow exists")
return sdk.NewErrorFrom(sdk.ErrAlreadyExist, "workflow exists")
}

// Retrieve existing hook
Expand Down
55 changes: 23 additions & 32 deletions sdk/cdsclient/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,15 @@ func (c *client) Request(ctx context.Context, method string, path string, body i

var signinRouteRegexp = regexp.MustCompile(`\/auth\/consumer\/.*\/signin`)

func extractBodyErrorFromResponse(r *http.Response) error {
body, _ := ioutil.ReadAll(r.Body)
r.Body.Close() // nolint
if err := sdk.DecodeError(body); err != nil {
return sdk.WithStack(err)
}
return sdk.WithStack(fmt.Errorf("HTTP %d", r.StatusCode))
}

// Stream makes an authenticated http request and return io.ReadCloser
func (c *client) Stream(ctx context.Context, method string, path string, body io.Reader, noTimeout bool, mods ...RequestModifier) (io.ReadCloser, http.Header, int, error) {
// Checks that current session_token is still valid
Expand All @@ -182,14 +191,14 @@ func (c *client) Stream(ctx context.Context, method string, path string, body io

if checkToken && !c.config.HasValidSessionToken() && c.config.BuitinConsumerAuthenticationToken != "" {
if c.config.Verbose {
fmt.Printf("session token invalid: (%s). Relogin...\n", c.config.SessionToken)
log.Printf("session token invalid: (%s). Relogin...\n", c.config.SessionToken)
}
resp, err := c.AuthConsumerSignin(sdk.ConsumerBuiltin, sdk.AuthConsumerSigninRequest{"token": c.config.BuitinConsumerAuthenticationToken})
if err != nil {
return nil, nil, -1, sdk.WithStack(err)
}
if c.config.Verbose {
fmt.Println("jwt: ", resp.Token[:12])
log.Println("jwt: ", resp.Token[:12])
}
c.config.SessionToken = resp.Token
}
Expand Down Expand Up @@ -221,15 +230,15 @@ func (c *client) Stream(ctx context.Context, method string, path string, body io
var requestError error
if rs, ok := body.(io.ReadSeeker); ok {
if _, err := rs.Seek(0, 0); err != nil {
savederror = err
savederror = sdk.WithStack(err)
continue
}
req, requestError = http.NewRequest(method, url, body)
} else {
req, requestError = http.NewRequest(method, url, bytes.NewBuffer(bodyBytes))
}
if requestError != nil {
savederror = requestError
savederror = sdk.WithStack(requestError)
continue
}

Expand Down Expand Up @@ -262,7 +271,7 @@ func (c *client) Stream(ctx context.Context, method string, path string, body io
if strings.HasPrefix(url, c.config.Host) && !signinRouteRegexp.MatchString(path) {
if _, _, err := new(jwt.Parser).ParseUnverified(c.config.SessionToken, &sdk.AuthSessionJWTClaims{}); err == nil {
if c.config.Verbose {
fmt.Println("JWT recognized")
log.Println("JWT recognized")
}
auth := "Bearer " + c.config.SessionToken
req.Header.Add("Authorization", auth)
Expand All @@ -284,7 +293,8 @@ func (c *client) Stream(ctx context.Context, method string, path string, body io
resp, errDo = c.httpClient.Do(req)
}
if errDo != nil {
return nil, nil, 0, sdk.WithStack(errDo)
savederror = sdk.WithStack(errDo)
continue
}

if c.config.Verbose {
Expand All @@ -298,41 +308,22 @@ func (c *client) Stream(ctx context.Context, method string, path string, body io
c.config.SessionToken = ""
}

// if everything is fine, return body
if resp.StatusCode < 500 {
return resp.Body, resp.Header, resp.StatusCode, nil
if resp.StatusCode == 409 || resp.StatusCode > 500 {
savederror = extractBodyErrorFromResponse(resp)
continue
}

// if no request error by status > 500, check CDS error
// if there is a CDS errors, return it
if resp.StatusCode == 500 {
var body []byte
var errRead error
body, errRead = ioutil.ReadAll(resp.Body)
if errRead != nil {
resp.Body.Close()
continue
}
if err := sdk.DecodeError(body); err != nil {
resp.Body.Close()
return nil, resp.Header, resp.StatusCode, sdk.WithStack(err)
}
}

if resp != nil && resp.StatusCode >= 500 {
savederror = fmt.Errorf("HTTP %d", resp.StatusCode)
if resp != nil && resp.Body != nil {
resp.Body.Close()
}
continue
return nil, resp.Header, resp.StatusCode, extractBodyErrorFromResponse(resp)
}

if resp != nil && resp.Body != nil {
resp.Body.Close()
}
resp.Body.Close()
return resp.Body, resp.Header, resp.StatusCode, nil
}

return nil, nil, 0, sdk.WithStack(fmt.Errorf("x%d: %s", c.config.Retry, savederror))
return nil, nil, 0, sdk.WrapError(savederror, "request failed after %d retries", c.config.Retry)
}

// UploadMultiPart upload multipart
Expand Down
32 changes: 13 additions & 19 deletions sdk/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@ var (
ErrInvalidID = Error{ID: 5, Status: http.StatusBadRequest}
ErrInvalidProject = Error{ID: 6, Status: http.StatusBadRequest}
ErrInvalidProjectKey = Error{ID: 7, Status: http.StatusBadRequest}
ErrProjectHasPipeline = Error{ID: 8, Status: http.StatusConflict}
ErrProjectHasApplication = Error{ID: 9, Status: http.StatusConflict}
ErrProjectHasPipeline = Error{ID: 8, Status: http.StatusForbidden}
ErrProjectHasApplication = Error{ID: 9, Status: http.StatusForbidden}
ErrUnauthorized = Error{ID: 10, Status: http.StatusUnauthorized}
ErrForbidden = Error{ID: 11, Status: http.StatusForbidden}
ErrPipelineNotFound = Error{ID: 12, Status: http.StatusBadRequest}
ErrPipelineNotAttached = Error{ID: 13, Status: http.StatusBadRequest}
ErrNoEnvironmentProvided = Error{ID: 14, Status: http.StatusBadRequest}
ErrEnvironmentProvided = Error{ID: 15, Status: http.StatusBadRequest}
ErrUnknownEnv = Error{ID: 16, Status: http.StatusBadRequest}
ErrEnvironmentExist = Error{ID: 17, Status: http.StatusConflict}
ErrEnvironmentExist = Error{ID: 17, Status: http.StatusForbidden}
ErrNoPipelineBuild = Error{ID: 18, Status: http.StatusNotFound}
ErrInvalidUsername = Error{ID: 21, Status: http.StatusBadRequest}
ErrInvalidEmail = Error{ID: 22, Status: http.StatusBadRequest}
Expand All @@ -41,11 +41,11 @@ var (
ErrInvalidUser = Error{ID: 25, Status: http.StatusBadRequest}
ErrBuildArchived = Error{ID: 26, Status: http.StatusBadRequest}
ErrNoEnvironment = Error{ID: 27, Status: http.StatusNotFound}
ErrModelNameExist = Error{ID: 28, Status: http.StatusConflict}
ErrModelNameExist = Error{ID: 28, Status: http.StatusForbidden}
ErrNoProject = Error{ID: 30, Status: http.StatusNotFound}
ErrVariableExists = Error{ID: 31, Status: http.StatusConflict}
ErrVariableExists = Error{ID: 31, Status: http.StatusForbidden}
ErrInvalidGroupPattern = Error{ID: 32, Status: http.StatusBadRequest}
ErrGroupExists = Error{ID: 33, Status: http.StatusConflict}
ErrGroupExists = Error{ID: 33, Status: http.StatusForbidden}
ErrNotEnoughAdmin = Error{ID: 34, Status: http.StatusBadRequest}
ErrInvalidProjectName = Error{ID: 35, Status: http.StatusBadRequest}
ErrInvalidApplicationPattern = Error{ID: 36, Status: http.StatusBadRequest}
Expand Down Expand Up @@ -77,19 +77,17 @@ var (
ErrGroupNeedWrite = Error{ID: 64, Status: http.StatusBadRequest}
ErrNoVariable = Error{ID: 65, Status: http.StatusNotFound}
ErrPluginInvalid = Error{ID: 66, Status: http.StatusBadRequest}
ErrConflict = Error{ID: 67, Status: http.StatusConflict}
ErrPipelineAlreadyAttached = Error{ID: 68, Status: http.StatusConflict}
ErrApplicationExist = Error{ID: 69, Status: http.StatusConflict}
ErrApplicationExist = Error{ID: 69, Status: http.StatusForbidden}
ErrBranchNameNotProvided = Error{ID: 70, Status: http.StatusBadRequest}
ErrInfiniteTriggerLoop = Error{ID: 71, Status: http.StatusBadRequest}
ErrInvalidResetUser = Error{ID: 72, Status: http.StatusBadRequest}
ErrUserConflict = Error{ID: 73, Status: http.StatusBadRequest}
ErrWrongRequest = Error{ID: 74, Status: http.StatusBadRequest}
ErrAlreadyExist = Error{ID: 75, Status: http.StatusConflict}
ErrAlreadyExist = Error{ID: 75, Status: http.StatusForbidden}
ErrInvalidType = Error{ID: 76, Status: http.StatusBadRequest}
ErrParentApplicationAndPipelineMandatory = Error{ID: 77, Status: http.StatusBadRequest}
ErrNoParentBuildFound = Error{ID: 78, Status: http.StatusNotFound}
ErrParameterExists = Error{ID: 79, Status: http.StatusConflict}
ErrParameterExists = Error{ID: 79, Status: http.StatusForbidden}
ErrNoHatchery = Error{ID: 80, Status: http.StatusNotFound}
ErrInvalidWorkerStatus = Error{ID: 81, Status: http.StatusNotFound}
ErrInvalidToken = Error{ID: 82, Status: http.StatusUnauthorized}
Expand All @@ -98,8 +96,8 @@ var (
ErrEnvironmentCannotBeDeleted = Error{ID: 85, Status: http.StatusForbidden}
ErrInvalidPipeline = Error{ID: 86, Status: http.StatusBadRequest}
ErrKeyNotFound = Error{ID: 87, Status: http.StatusNotFound}
ErrPipelineAlreadyExists = Error{ID: 88, Status: http.StatusConflict}
ErrJobAlreadyBooked = Error{ID: 89, Status: http.StatusConflict}
ErrPipelineAlreadyExists = Error{ID: 88, Status: http.StatusForbidden}
ErrJobAlreadyBooked = Error{ID: 89, Status: http.StatusForbidden}
ErrPipelineBuildNotFound = Error{ID: 90, Status: http.StatusNotFound}
ErrAlreadyTaken = Error{ID: 91, Status: http.StatusGone}
ErrWorkflowNodeNotFound = Error{ID: 93, Status: http.StatusNotFound}
Expand All @@ -124,7 +122,7 @@ var (
ErrWorkflowNodeRunJobNotFound = Error{ID: 112, Status: http.StatusNotFound}
ErrBuiltinKeyNotFound = Error{ID: 113, Status: http.StatusInternalServerError}
ErrStepNotFound = Error{ID: 114, Status: http.StatusNotFound}
ErrWorkerModelAlreadyBooked = Error{ID: 115, Status: http.StatusConflict}
ErrWorkerModelAlreadyBooked = Error{ID: 115, Status: http.StatusForbidden}
ErrConditionsNotOk = Error{ID: 116, Status: http.StatusBadRequest}
ErrDownloadInvalidOS = Error{ID: 117, Status: http.StatusNotFound}
ErrDownloadInvalidArch = Error{ID: 118, Status: http.StatusNotFound}
Expand All @@ -147,7 +145,7 @@ var (
ErrJobNotBooked = Error{ID: 135, Status: http.StatusBadRequest}
ErrUserNotFound = Error{ID: 136, Status: http.StatusNotFound}
ErrInvalidNumber = Error{ID: 137, Status: http.StatusBadRequest}
ErrKeyAlreadyExist = Error{ID: 138, Status: http.StatusConflict}
ErrKeyAlreadyExist = Error{ID: 138, Status: http.StatusForbidden}
ErrPipelineNameImport = Error{ID: 139, Status: http.StatusBadRequest}
ErrWorkflowNameImport = Error{ID: 140, Status: http.StatusBadRequest}
ErrIconBadFormat = Error{ID: 141, Status: http.StatusBadRequest}
Expand Down Expand Up @@ -264,8 +262,6 @@ var errorsAmericanEnglish = map[int]string{
ErrGroupNeedWrite.ID: "need at least 1 group with write permission",
ErrNoVariable.ID: "variable not found",
ErrPluginInvalid.ID: "invalid plugin",
ErrConflict.ID: "object conflict",
ErrPipelineAlreadyAttached.ID: "pipeline already attached to this application",
ErrApplicationExist.ID: "application already exists",
ErrBranchNameNotProvided.ID: "git.branch or git.tag parameter must be provided",
ErrInfiniteTriggerLoop.ID: "infinite trigger loop are forbidden",
Expand Down Expand Up @@ -444,8 +440,6 @@ var errorsFrench = map[int]string{
ErrGroupNeedWrite.ID: "il faut au moins 1 groupe avec les droits d'écriture",
ErrNoVariable.ID: "la variable n'existe pas",
ErrPluginInvalid.ID: "plugin non valide",
ErrConflict.ID: "l'objet est en conflit",
ErrPipelineAlreadyAttached.ID: "le pipeline est déjà attaché à cette application",
ErrApplicationExist.ID: "une application du même nom existe déjà",
ErrBranchNameNotProvided.ID: "le paramètre git.branch ou git.tag est obligatoire",
ErrInfiniteTriggerLoop.ID: "création d'une boucle de trigger infinie interdite",
Expand Down

0 comments on commit 6884454

Please sign in to comment.