Skip to content

Commit

Permalink
fix(api): fix return condition ko AND fix stop migration on locked er…
Browse files Browse the repository at this point in the history
…ror (#4043)
  • Loading branch information
sguiheux authored Mar 18, 2019
1 parent 34fba71 commit 53f4565
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 47 deletions.
3 changes: 3 additions & 0 deletions engine/api/database/gorpmapping/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ const (
// ViolateUniqueKeyPGCode is the pg code when duplicating unique key
ViolateUniqueKeyPGCode = "23505"

// RowLockedPGCode is the pg code when trying to access to a locked row
RowLockedPGCode = "55P03"

// StringDataRightTruncation is raisedalue is too long for varchar.
StringDataRightTruncation = "22001"
)
Expand Down
2 changes: 1 addition & 1 deletion engine/api/migrate/workflow_run_old_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func WorkflowRunOldModel(ctx context.Context, DBFunc func() *gorp.DbMap) error {

log.Info("migrate>WorkflowRunOldModel> %d run to migrate", len(ids))
for _, id := range ids {
if err := migrateRun(ctx, db, id); err != nil {
if err := migrateRun(ctx, db, id); err != nil && !sdk.ErrorIs(err, sdk.ErrLocked) {
return err
}
}
Expand Down
4 changes: 4 additions & 0 deletions engine/api/workflow/dao_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/fsamin/go-dump"
"github.com/go-gorp/gorp"
"github.com/lib/pq"
"go.opencensus.io/stats"

"github.com/ovh/cds/engine/api/database/gorpmapping"
Expand Down Expand Up @@ -463,6 +464,9 @@ func loadRun(db gorp.SqlExecutor, loadOpts LoadRunOptions, query string, args ..
if err == sql.ErrNoRows {
return nil, sdk.ErrWorkflowNotFound
}
if errPG, ok := err.(*pq.Error); ok && errPG.Code == gorpmapping.RowLockedPGCode {
return nil, sdk.ErrLocked
}
return nil, sdk.WrapError(err, "Unable to load workflow run. query:%s args:%v", query, args)
}
wr := sdk.WorkflowRun(*runDB)
Expand Down
62 changes: 27 additions & 35 deletions engine/api/workflow/run_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const (
)

//RunFromHook is the entry point to trigger a workflow from a hook
func runFromHook(ctx context.Context, db gorp.SqlExecutor, store cache.Store, p *sdk.Project, wr *sdk.WorkflowRun, e *sdk.WorkflowNodeRunHookEvent, asCodeMsg []sdk.Message) (*sdk.WorkflowRun, *ProcessorReport, error) {
func runFromHook(ctx context.Context, db gorp.SqlExecutor, store cache.Store, p *sdk.Project, wr *sdk.WorkflowRun, e *sdk.WorkflowNodeRunHookEvent, asCodeMsg []sdk.Message) (*ProcessorReport, error) {
var end func()
ctx, end = observability.Span(ctx, "workflow.RunFromHook")
defer end()
Expand All @@ -36,14 +36,14 @@ func runFromHook(ctx context.Context, db gorp.SqlExecutor, store cache.Store, p
hooks := wr.Workflow.GetHooks()
h, ok := hooks[e.WorkflowNodeHookUUID]
if !ok {
return nil, report, sdk.WithStack(sdk.ErrNoHook)
return report, sdk.WithStack(sdk.ErrNoHook)
}

//If the hook is on the root, it will trigger a new workflow run
//Else if will trigger a new subnumber of the last workflow run
if h.WorkflowNodeID == wr.Workflow.Root.ID {
if err := IsValid(ctx, store, db, &wr.Workflow, p, nil); err != nil {
return nil, nil, sdk.WrapError(err, "Unable to valid workflow")
return nil, sdk.WrapError(err, "Unable to valid workflow")
}

// Add add code spawn info
Expand All @@ -54,33 +54,32 @@ func runFromHook(ctx context.Context, db gorp.SqlExecutor, store cache.Store, p
//Process it
r1, hasRun, errWR := processWorkflowDataRun(ctx, db, store, p, wr, e, nil, nil)
if errWR != nil {
return nil, nil, sdk.WrapError(errWR, "RunFromHook> Unable to process workflow run")
return nil, sdk.WrapError(errWR, "RunFromHook> Unable to process workflow run")
}
if !hasRun {
wr.Status = sdk.StatusNeverBuilt.String()
wr.LastExecution = time.Now()
report.Add(wr)
return wr, report, UpdateWorkflowRun(ctx, db, wr)
return report, sdk.WithStack(sdk.ErrConditionsNotOk)
}
_, _ = report.Merge(r1, nil)
report.Merge(r1, nil) // nolint
}
return wr, report, nil
return report, nil
}

//ManualRunFromNode is the entry point to trigger manually a piece of an existing run workflow
func manualRunFromNode(ctx context.Context, db gorp.SqlExecutor, store cache.Store, p *sdk.Project, wr *sdk.WorkflowRun, e *sdk.WorkflowNodeRunManual, nodeID int64) (*sdk.WorkflowRun, *ProcessorReport, error) {
func manualRunFromNode(ctx context.Context, db gorp.SqlExecutor, store cache.Store, p *sdk.Project, wr *sdk.WorkflowRun, e *sdk.WorkflowNodeRunManual, nodeID int64) (*ProcessorReport, error) {
report := new(ProcessorReport)

r1, condOk, err := processWorkflowDataRun(ctx, db, store, p, wr, nil, e, &nodeID)
if err != nil {
return nil, report, sdk.WrapError(err, "Unable to process workflow run")
return report, sdk.WrapError(err, "Unable to process workflow run")
}
_, _ = report.Merge(r1, nil)

if !condOk {
return nil, report, sdk.WrapError(sdk.ErrConditionsNotOk, "ManualRunFromNode> Conditions aren't ok")
return report, sdk.WithStack(sdk.ErrConditionsNotOk)
}
return wr, report, nil
return report, nil
}

func StartWorkflowRun(ctx context.Context, db *gorp.DbMap, store cache.Store, p *sdk.Project, wr *sdk.WorkflowRun, opts *sdk.WorkflowRunPostHandlerOption, u *sdk.User, asCodeInfos []sdk.Message) (*ProcessorReport, error) {
Expand All @@ -106,12 +105,11 @@ func StartWorkflowRun(ctx context.Context, db *gorp.DbMap, store cache.Store, p

if opts.Hook != nil {
// Run from HOOK
_, r1, err := runFromHook(ctx, tx, store, p, wr, opts.Hook, asCodeInfos)
r1, err := runFromHook(ctx, tx, store, p, wr, opts.Hook, asCodeInfos)
if err != nil {
return nil, sdk.WrapError(err, "Unable to run workflow from hook")
return nil, err
}

_, _ = report.Merge(r1, nil)
report.Merge(r1, nil) // nolint

} else {
// Manual RUN
Expand All @@ -133,23 +131,24 @@ func StartWorkflowRun(ctx context.Context, db *gorp.DbMap, store cache.Store, p
}

// Continue the current workflow run
_, r1, errmr := manualRunFromNode(ctx, tx, store, p, wr, opts.Manual, fromNode.ID)
r1, errmr := manualRunFromNode(ctx, tx, store, p, wr, opts.Manual, fromNode.ID)
if errmr != nil {
return nil, sdk.WrapError(errmr, "Unable to run workflow")
return report, errmr
}
_, _ = report.Merge(r1, nil)
report.Merge(r1, nil) // nolint

} else {
// MANUAL RUN FROM ROOT NODE
if !permission.AccessToWorkflowNode(&wr.Workflow, &wr.Workflow.WorkflowData.Node, u, permission.PermissionReadExecute) {
return nil, sdk.WrapError(sdk.ErrNoPermExecution, "not enough right on node %d", wr.Workflow.WorkflowData.Node.ID)
}
// Start new workflow
_, r1, errmr := manualRun(ctx, tx, store, p, wr, opts.Manual)
r1, errmr := manualRun(ctx, tx, store, p, wr, opts.Manual)
if errmr != nil {
return nil, sdk.WrapError(errmr, "Unable to run workflow")
return nil, errmr
}
_, _ = report.Merge(r1, nil)
report.Merge(r1, nil) // nolint

}
}

Expand All @@ -161,33 +160,26 @@ func StartWorkflowRun(ctx context.Context, db *gorp.DbMap, store cache.Store, p
}

//ManualRun is the entry point to trigger a workflow manually
func manualRun(ctx context.Context, db gorp.SqlExecutor, store cache.Store, p *sdk.Project, wr *sdk.WorkflowRun, e *sdk.WorkflowNodeRunManual) (*sdk.WorkflowRun, *ProcessorReport, error) {
func manualRun(ctx context.Context, db gorp.SqlExecutor, store cache.Store, p *sdk.Project, wr *sdk.WorkflowRun, e *sdk.WorkflowNodeRunManual) (*ProcessorReport, error) {
report := new(ProcessorReport)
ctx, end := observability.Span(ctx, "workflow.ManualRun", observability.Tag(observability.TagWorkflowRun, wr.Number))
defer end()

if err := IsValid(ctx, store, db, &wr.Workflow, p, &e.User); err != nil {
return nil, nil, sdk.WrapError(err, "Unable to valid workflow")
return nil, sdk.WrapError(err, "Unable to valid workflow")
}

if err := UpdateWorkflowRun(ctx, db, wr); err != nil {
return nil, nil, err
return nil, err
}

r1, hasRun, errWR := processWorkflowDataRun(ctx, db, store, p, wr, nil, e, nil)
if errWR != nil {
return wr, report, sdk.WrapError(errWR, "ManualRun")
return report, errWR
}
_, _ = report.Merge(r1, nil)
if !hasRun {
wr.Status = sdk.StatusNeverBuilt.String()
report.Add(wr)
return wr, report, UpdateWorkflowRun(ctx, db, wr)
}

wrUpdated, errReload := LoadRunByID(db, wr.ID, LoadRunOptions{})
if errReload == nil {
return wrUpdated, report, nil
return report, sdk.ErrConditionsNotOk
}
return wr, report, nil
return report, nil
}
22 changes: 17 additions & 5 deletions engine/api/workflow_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -960,12 +960,24 @@ func (api *API) initWorkflowRun(ctx context.Context, db *gorp.DbMap, cache cache
func failInitWorkflowRun(ctx context.Context, db *gorp.DbMap, wfRun *sdk.WorkflowRun, err error) *workflow.ProcessorReport {
report := new(workflow.ProcessorReport)
log.Error("unable to start workflow: %v", err)
wfRun.Status = sdk.StatusFail.String()
info := sdk.SpawnMsg{
ID: sdk.MsgWorkflowError.ID,
Args: []interface{}{err.Error()},

var info sdk.SpawnMsg
if sdk.ErrorIs(err, sdk.ErrConditionsNotOk) {
info = sdk.SpawnMsg{
ID: sdk.MsgWorkflowConditionError.ID,
}
if len(wfRun.WorkflowNodeRuns) == 0 {
wfRun.Status = sdk.StatusNeverBuilt.String()
}
} else {
wfRun.Status = sdk.StatusFail.String()
info = sdk.SpawnMsg{
ID: sdk.MsgWorkflowError.ID,
Args: []interface{}{err.Error()},
}
}
workflow.AddWorkflowRunInfo(wfRun, true, info)

workflow.AddWorkflowRunInfo(wfRun, !sdk.ErrorIs(err, sdk.ErrConditionsNotOk), info)
if errU := workflow.UpdateWorkflowRun(ctx, db, wfRun); errU != nil {
log.Error("unable to fail workflow run %v", errU)
}
Expand Down
3 changes: 3 additions & 0 deletions sdk/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ var (
ErrGroupNotFoundInWorkflow = Error{ID: 161, Status: http.StatusBadRequest}
ErrWorkflowPermInsufficient = Error{ID: 162, Status: http.StatusBadRequest}
ErrApplicationUsedByWorkflow = Error{ID: 163, Status: http.StatusBadRequest}
ErrLocked = Error{ID: 164, Status: http.StatusConflict}
)

var errorsAmericanEnglish = map[int]string{
Expand Down Expand Up @@ -338,6 +339,7 @@ var errorsAmericanEnglish = map[int]string{
ErrGroupNotFoundInWorkflow.ID: "Cannot add this permission group on your workflow node because this group is not already your workflow's permissions",
ErrWorkflowPermInsufficient.ID: "Cannot add this permission group on your workflow because you can't have less rights than rights in your project when you are in RWX",
ErrApplicationUsedByWorkflow.ID: "Application still used by a workflow",
ErrLocked.ID: "Resource locked",
}

var errorsFrench = map[int]string{
Expand Down Expand Up @@ -498,6 +500,7 @@ var errorsFrench = map[int]string{
ErrGroupNotFoundInWorkflow.ID: "Impossible d'ajouter ce groupe dans vos permissions de noeud du workflow car ce groupe n'est pas présent dans les permissions de votre workflow",
ErrWorkflowPermInsufficient.ID: "Impossible d'ajouter ce groupe dans vos permissions du workflow car ce groupe a des droits inférieurs (< RWX) à celui du workflow",
ErrApplicationUsedByWorkflow.ID: "L'application est utilisée par un workflow",
ErrLocked.ID: "La ressource est verrouillée",
}

var errorsLanguages = []map[int]string{
Expand Down
16 changes: 10 additions & 6 deletions sdk/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ var (
MsgPipelineJobUpdated = &Message{"MsgPipelineJobUpdated", trad{FR: "Le job %s du stage %s a été mis à jour", EN: "Job %s in stage %s updated"}, nil}
MsgPipelineJobAdded = &Message{"MsgPipelineJobAdded", trad{FR: "Le job %s du stage %s a été ajouté", EN: "Job %s in stage %s added"}, nil}
MsgPipelineJobDeleted = &Message{"MsgPipelineJobDeleted", trad{FR: "Le job %s du stage %s a été supprimé", EN: "Job %s in stage %s deleted"}, nil}
MsgSpawnInfoDeprecatedModel = &Message{"MsgSpawnInfoDeprecatedModel", trad{FR: "Attention vous utilisez un worker model (%s) déprécié", EN: "Pay attention you are using a deprecated worker model (%s)"}, nil}
MsgSpawnInfoHatcheryStarts = &Message{"MsgSpawnInfoHatcheryStarts", trad{FR: "La Hatchery %s (%s) a démarré le lancement du worker avec le modèle %s", EN: "Hatchery %s (%s) starts spawn worker with model %s"}, nil}
MsgSpawnInfoHatcheryErrorSpawn = &Message{"MsgSpawnInfoHatcheryErrorSpawn", trad{FR: "Une erreur est survenue lorsque la Hatchery %s (%s) a démarré un worker avec le modèle %s après %s, err:%s", EN: "Error while Hatchery %s (%s) spawn worker with model %s after %s, err:%s"}, nil}
MsgSpawnInfoHatcheryStartsSuccessfully = &Message{"MsgSpawnInfoHatcheryStartsSuccessfully", trad{FR: "La Hatchery %s (%s) a démarré le worker %s avec succès en %s", EN: "Hatchery %s (%s) spawn worker %s successfully in %s"}, nil}
MsgSpawnInfoHatcheryStartDockerPull = &Message{"MsgSpawnInfoHatcheryStartDockerPull", trad{FR: "La Hatchery %s (%s) a démarré le docker pull de l'image %s...", EN: "Hatchery %s (%s) starts docker pull %s..."}, nil}
MsgSpawnInfoHatcheryEndDockerPull = &Message{"MsgSpawnInfoHatcheryEndDockerPull", trad{FR: "La Hatchery %s (%s) a terminé le docker pull de l'image %s", EN: "Hatchery %s (%s) docker pull %s done"}, nil}
MsgSpawnInfoHatcheryEndDockerPullErr = &Message{"MsgSpawnInfoHatcheryEndDockerPullErr", trad{FR: "⚠ La Hatchery %s (%s) a terminé le docker pull de l'image %s en erreur: %s", EN: "⚠ Hatchery %s (%s) - docker pull %s done with error: %v"}, nil}
MsgSpawnInfoHatcheryErrorSpawn = &Message{"MsgSpawnInfoHatcheryErrorSpawn", trad{FR: "Une erreur est survenue lorsque la Hatchery %s (%s) a démarré un worker avec le modèle %s après %s, err:%s", EN: "Error while Hatchery %s (%s) spawn worker with model %s after %s, err:%s"}, nil}
MsgSpawnInfoHatcheryStartsSuccessfully = &Message{"MsgSpawnInfoHatcheryStartsSuccessfully", trad{FR: "La Hatchery %s (%s) a démarré le worker %s avec succès en %s", EN: "Hatchery %s (%s) spawn worker %s successfully in %s"}, nil}
MsgSpawnInfoDeprecatedModel = &Message{"MsgSpawnInfoDeprecatedModel", trad{FR: "Attention vous utilisez un worker model (%s) déprécié", EN: "Pay attention you are using a deprecated worker model (%s)"}, nil}
MsgSpawnInfoWorkerEnd = &Message{"MsgSpawnInfoWorkerEnd", trad{FR: "✓ Le worker %s a terminé et a passé %s à travailler sur les étapes", EN: "✓ Worker %s finished working on this job and took %s to work on the steps"}, nil}
MsgSpawnInfoJobInQueue = &Message{"MsgSpawnInfoJobInQueue", trad{FR: "✓ Le job a été mis en file d'attente", EN: "✓ Job has been queued"}, nil}
MsgSpawnInfoJobTaken = &Message{"MsgSpawnInfoJobTaken", trad{FR: "Le job %s a été pris par le worker %s", EN: "Job %s was taken by worker %s"}, nil}
Expand All @@ -75,6 +75,7 @@ var (
MsgSpawnInfoJobError = &Message{"MsgSpawnInfoJobError", trad{FR: "⚠ Impossible de lancer ce job : %s", EN: "⚠ Unable to run this job: %s"}, nil}
MsgWorkflowStarting = &Message{"MsgWorkflowStarting", trad{FR: "Le workflow %s#%s a été démarré", EN: "Workflow %s#%s has been started"}, nil}
MsgWorkflowError = &Message{"MsgWorkflowError", trad{FR: "⚠ Une erreur est survenue: %v", EN: "⚠ An error has occured: %v"}, nil}
MsgWorkflowConditionError = &Message{"MsgWorkflowConditionError", trad{FR: "Les conditions de lancement ne sont pas respectées.", EN: "Run conditions aren't ok."}, nil}
MsgWorkflowNodeStop = &Message{"MsgWorkflowNodeStop", trad{FR: "Le pipeline a été arrété par %s", EN: "The pipeline has been stopped by %s"}, nil}
MsgWorkflowNodeMutex = &Message{"MsgWorkflowNodeMutex", trad{FR: "Le pipeline %s est mis en attente tant qu'il est en cours sur un autre run", EN: "The pipeline %s is waiting while it's running on another run"}, nil}
MsgWorkflowNodeMutexRelease = &Message{"MsgWorkflowNodeMutexRelease", trad{FR: "Lancement du pipeline %s", EN: "Triggering pipeline %s"}, nil}
Expand Down Expand Up @@ -133,6 +134,7 @@ var Messages = map[string]*Message{
MsgSpawnInfoHatcheryStartDockerPull.ID: MsgSpawnInfoHatcheryStartDockerPull,
MsgSpawnInfoHatcheryEndDockerPull.ID: MsgSpawnInfoHatcheryEndDockerPull,
MsgSpawnInfoHatcheryEndDockerPullErr.ID: MsgSpawnInfoHatcheryEndDockerPullErr,
MsgSpawnInfoDeprecatedModel.ID: MsgSpawnInfoDeprecatedModel,
MsgSpawnInfoWorkerEnd.ID: MsgSpawnInfoWorkerEnd,
MsgSpawnInfoJobInQueue.ID: MsgSpawnInfoJobInQueue,
MsgSpawnInfoJobTaken.ID: MsgSpawnInfoJobTaken,
Expand All @@ -142,14 +144,16 @@ var Messages = map[string]*Message{
MsgSpawnInfoJobError.ID: MsgSpawnInfoJobError,
MsgWorkflowStarting.ID: MsgWorkflowStarting,
MsgWorkflowError.ID: MsgWorkflowError,
MsgWorkflowConditionError.ID: MsgWorkflowConditionError,
MsgWorkflowNodeStop.ID: MsgWorkflowNodeStop,
MsgWorkflowImportedUpdated.ID: MsgWorkflowImportedUpdated,
MsgWorkflowImportedInserted.ID: MsgWorkflowImportedInserted,
MsgWorkflowNodeMutex.ID: MsgWorkflowNodeMutex,
MsgWorkflowNodeMutexRelease.ID: MsgWorkflowNodeMutexRelease,
MsgWorkflowImportedUpdated.ID: MsgWorkflowImportedUpdated,
MsgWorkflowImportedInserted.ID: MsgWorkflowImportedInserted,
MsgSpawnInfoHatcheryCannotStartJob.ID: MsgSpawnInfoHatcheryCannotStartJob,
MsgWorkflowRunBranchDeleted.ID: MsgWorkflowRunBranchDeleted,
MsgSpawnInfoDeprecatedModel.ID: MsgSpawnInfoDeprecatedModel,
MsgWorkflowTemplateImportedInserted.ID: MsgWorkflowTemplateImportedInserted,
MsgWorkflowTemplateImportedUpdated.ID: MsgWorkflowTemplateImportedUpdated,
}

//Message represent a struc format translated messages
Expand Down

0 comments on commit 53f4565

Please sign in to comment.