Skip to content

Commit

Permalink
fix(api): remove all pipeline parameters before inserting new ones (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
sguiheux authored Apr 10, 2020
1 parent 788e7f6 commit dd502d8
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 138 deletions.
2 changes: 1 addition & 1 deletion engine/api/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (api *API) postPipelineRollbackHandler() service.Handler {
}
}(&msgList)

if err := pipeline.ImportUpdate(ctx, tx, *proj, audit.Pipeline, msgChan, u); err != nil {
if err := pipeline.ImportUpdate(ctx, tx, *proj, audit.Pipeline, msgChan); err != nil {
return sdk.WrapError(err, "cannot import pipeline")
}

Expand Down
174 changes: 41 additions & 133 deletions engine/api/pipeline/pipeline_importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

//ImportUpdate import and update the pipeline in the project
func ImportUpdate(ctx context.Context, db gorp.SqlExecutor, proj sdk.Project, pip *sdk.Pipeline, msgChan chan<- sdk.Message, u sdk.Identifiable) error {
func ImportUpdate(ctx context.Context, db gorp.SqlExecutor, proj sdk.Project, pip *sdk.Pipeline, msgChan chan<- sdk.Message) error {
t := time.Now()
log.Debug("ImportUpdate> Begin")
defer log.Debug("ImportUpdate> End (%d ns)", time.Since(t).Nanoseconds())
Expand All @@ -40,155 +40,63 @@ func ImportUpdate(ctx context.Context, db gorp.SqlExecutor, proj sdk.Project, pi

rx := sdk.NamePatternSpaceRegex
pip.ID = oldPipeline.ID

// Delete old stages
for _, s := range oldPipeline.Stages {
if err := DeleteStageByID(ctx, db, &s); err != nil {
return sdk.WrapError(err, "unable to delete stage %d", s.ID)
}
}

// delete old parameters
if err := DeleteAllParameterFromPipeline(db, oldPipeline.ID); err != nil {
return sdk.WrapError(err, "unable to delete pipeline parameters for pipeline %d", oldPipeline.ID)
}

// Insert new stages
for i := range pip.Stages {
s := &pip.Stages[i]
// stage name mandatory if there are many stages
if len(pip.Stages) > 1 && !rx.MatchString(s.Name) {
return sdk.NewError(sdk.ErrInvalidName, fmt.Errorf("Invalid stage name '%s'. It should match %s", s.Name, sdk.NamePatternSpace))
}
var stageFound bool
var oldStage *sdk.Stage
for _, os := range oldPipeline.Stages {
if s.Name == os.Name {
oldStage = &os
stageFound = true
break
}
}
if !stageFound {
//Insert stage
log.Debug("Inserting stage %s", s.Name)
s.PipelineID = pip.ID
if err := InsertStage(db, s); err != nil {
return sdk.WrapError(err, "Unable to insert stage %s in %s", s.Name, pip.Name)
}
//Insert stage's Jobs
for x := range s.Jobs {
jobAction := &s.Jobs[x]
if errs := CheckJob(ctx, db, jobAction); errs != nil {
log.Debug("CheckJob > %s", errs)
return errs
}
if err := action.CheckChildrenForGroupIDs(ctx, db, &jobAction.Action, groupIDs); err != nil {
return err
}
jobAction.PipelineStageID = s.ID
jobAction.Action.Type = sdk.JoinedAction
log.Debug("Creating job %s on stage %s on pipeline %s", jobAction.Action.Name, s.Name, pip.Name)
if err := InsertJob(db, jobAction, s.ID, pip); err != nil {
return sdk.WrapError(err, "Unable to insert job %s in %s", jobAction.Action.Name, pip.Name)
}
if msgChan != nil {
msgChan <- sdk.NewMessage(sdk.MsgPipelineJobAdded, jobAction.Action.Name, s.Name)
}
}
if msgChan != nil {
msgChan <- sdk.NewMessage(sdk.MsgPipelineStageAdded, s.Name)
}
} else {
//Update
log.Debug("> Updating stage %s", oldStage.Name)
msgChan <- sdk.NewMessage(sdk.MsgPipelineStageUpdating, oldStage.Name)
msgChan <- sdk.NewMessage(sdk.MsgPipelineStageDeletingOldJobs, oldStage.Name)
for x := range s.Jobs {
jobAction := &s.Jobs[x]
//Check the job
if errs := CheckJob(ctx, db, jobAction); errs != nil {
log.Debug(">> CheckJob > %s", errs)
return errs
}
if err := action.CheckChildrenForGroupIDs(ctx, db, &jobAction.Action, groupIDs); err != nil {
return err
}
}
// Delete all existing jobs in existing stage
for _, oj := range oldStage.Jobs {
if err := DeleteJob(db, oj); err != nil {
return sdk.WrapError(err, "unable to delete job %s in %s", oj.Action.Name, pip.Name)
}
msgChan <- sdk.NewMessage(sdk.MsgPipelineJobDeleted, oj.Action.Name, s.Name)
}
msgChan <- sdk.NewMessage(sdk.MsgPipelineStageInsertingNewJobs, oldStage.Name)
// then insert job from yml into existing stage
for x := range s.Jobs {
j := &s.Jobs[x]
//Insert the job
j.PipelineStageID = oldStage.ID
j.Action.Type = sdk.JoinedAction
log.Debug(">> Creating job %s on stage %s on pipeline %s stageID: %d", j.Action.Name, s.Name, pip.Name, oldStage.ID)
if err := InsertJob(db, j, oldStage.ID, pip); err != nil {
return sdk.WrapError(err, "Unable to insert job %s in %s", j.Action.Name, pip.Name)
}
if msgChan != nil {
msgChan <- sdk.NewMessage(sdk.MsgPipelineJobAdded, j.Action.Name, s.Name)
}
}

if oldStage.BuildOrder != s.BuildOrder {
s.ID = oldStage.ID
if err := updateStageOrder(db, s.ID, s.BuildOrder); err != nil {
return sdk.WrapError(err, "Unable to update stage %s", s.Name)
}
}

//Update stage
if msgChan != nil {
msgChan <- sdk.NewMessage(sdk.MsgPipelineStageUpdated, s.Name)
}
//Insert stage
log.Debug("Inserting stage %s", s.Name)
s.PipelineID = pip.ID
if err := InsertStage(db, s); err != nil {
return sdk.WrapError(err, "Unable to insert stage %s in %s", s.Name, pip.Name)
}
}

//Check if we have to delete stages
for _, os := range oldPipeline.Stages {
var stageFound bool
var currentStage sdk.Stage
for _, s := range pip.Stages {
if s.Name == os.Name {
stageFound = true
currentStage = s
currentStage.ID = os.ID
break
//Insert stage's Jobs
for x := range s.Jobs {
jobAction := &s.Jobs[x]
if errs := CheckJob(ctx, db, jobAction); errs != nil {
log.Debug("CheckJob > %s", errs)
return errs
}
}
if !stageFound {
for x := range os.Jobs {
j := os.Jobs[x]
if err := DeleteJob(db, j); err != nil {
return sdk.WrapError(err, "unable to delete job %s in %s", j.Action.Name, pip.Name)
}
if msgChan != nil {
msgChan <- sdk.NewMessage(sdk.MsgPipelineJobDeleted, j.Action.Name, os.Name)
}
if err := action.CheckChildrenForGroupIDs(ctx, db, &jobAction.Action, groupIDs); err != nil {
return err
}
if err := DeleteStageByID(ctx, db, &os); err != nil {
return sdk.WrapError(err, "unable to delete stage %d", os.ID)
jobAction.PipelineStageID = s.ID
jobAction.Action.Type = sdk.JoinedAction
log.Debug("Creating job %s on stage %s on pipeline %s", jobAction.Action.Name, s.Name, pip.Name)
if err := InsertJob(db, jobAction, s.ID, pip); err != nil {
return sdk.WrapError(err, "Unable to insert job %s in %s", jobAction.Action.Name, pip.Name)
}
if msgChan != nil {
msgChan <- sdk.NewMessage(sdk.MsgPipelineStageDeleted, os.Name)
}
} else {
// Update stage
if err := UpdateStage(db, &currentStage); err != nil {
return sdk.WrapError(err, "cannot update stage %s (id=%d) for conditions, build_order and name", currentStage.Name, currentStage.ID)
msgChan <- sdk.NewMessage(sdk.MsgPipelineJobAdded, jobAction.Action.Name, s.Name)
}
}
if msgChan != nil {
msgChan <- sdk.NewMessage(sdk.MsgPipelineStageAdded, s.Name)
}

}

// Insert new parameters
for _, param := range pip.Parameter {
found := false
for _, oldParam := range oldPipeline.Parameter {
if param.Name == oldParam.Name {
found = true
if err := UpdateParameterInPipeline(db, pip.ID, oldParam.Name, param); err != nil {
return sdk.WrapError(err, "cannot update parameter %s", param.Name)
}
break
}
}
if !found {
if err := InsertParameterInPipeline(db, pip.ID, &param); err != nil {
return sdk.WrapError(err, "cannot insert parameter %s", param.Name)
}
if err := InsertParameterInPipeline(db, pip.ID, &param); err != nil {
return sdk.WrapError(err, "cannot insert parameter %s", param.Name)
}
}

Expand Down
6 changes: 3 additions & 3 deletions engine/api/pipeline/pipeline_importer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func testImportUpdate(t *testing.T, db gorp.SqlExecutor, store cache.Store, tt t
proj, err := project.Load(db, store, tt.args.pip.ProjectKey, nil)
test.NoError(t, err)

if err := pipeline.ImportUpdate(context.TODO(), db, *proj, tt.args.pip, msgChan, tt.args.u); (err != nil) != tt.wantErr {
if err := pipeline.ImportUpdate(context.TODO(), db, *proj, tt.args.pip, msgChan); (err != nil) != tt.wantErr {
t.Errorf("%q. ImportUpdate() error = %v, wantErr %v", tt.name, err, tt.wantErr)
}

Expand Down Expand Up @@ -493,6 +493,7 @@ func TestImportUpdate(t *testing.T) {

test.NoError(t, pipeline.InsertStage(db, &args.pip.Stages[0]))
test.NoError(t, pipeline.InsertStage(db, &args.pip.Stages[1]))
test.NoError(t, pipeline.InsertParameterInPipeline(db, args.pip.ID, &sdk.Parameter{Name: "oldparam", Type: string(sdk.ParameterTypeString), Value: "foo"}))

args.pip.Stages[1].Jobs = args.pip.Stages[1].Jobs[1:]

Expand Down Expand Up @@ -646,8 +647,7 @@ func TestImportUpdate(t *testing.T) {
},
asserts: func(t *testing.T, pip sdk.Pipeline) {
t.Logf("Asserts on %+v", pip)
assert.Equal(t, 1, len(pip.Stages))
assert.Equal(t, 1, pip.Stages[0].BuildOrder)
assert.Equal(t, 0, len(pip.Stages))
},
}

Expand Down
2 changes: 1 addition & 1 deletion engine/api/pipeline/pipeline_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func ParseAndImport(ctx context.Context, db gorp.SqlExecutor, cache cache.Store,
if exist && !opts.Force {
return pip, nil, sdk.ErrPipelineAlreadyExists
} else if exist {
globalError = ImportUpdate(ctx, db, proj, pip, msgChan, u)
globalError = ImportUpdate(ctx, db, proj, pip, msgChan)
} else {
globalError = Import(ctx, db, cache, proj, pip, msgChan, u)
}
Expand Down

0 comments on commit dd502d8

Please sign in to comment.