Skip to content

Commit

Permalink
fix(api, vcs, repositories): repository web hook eventFilter (#5266)
Browse files Browse the repository at this point in the history
* fix(api, vcs, repositories): repository web hook eventFilter

Signed-off-by: francois  samin <[email protected]>
  • Loading branch information
fsamin authored Jun 29, 2020
1 parent fd95ff9 commit 8e83900
Show file tree
Hide file tree
Showing 36 changed files with 549 additions and 186 deletions.
2 changes: 1 addition & 1 deletion cli/cdsctl/workflow_transform_as_code.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func workflowTransformAsCodeRun(v cli.Values) (interface{}, error) {
}
switch ope.Status {
case sdk.OperationStatusError:
return nil, fmt.Errorf("cannot perform operation: %s", ope.Error)
return nil, fmt.Errorf("cannot perform operation: %v", ope.Error)
}
return response, nil
}
2 changes: 1 addition & 1 deletion engine/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ func (a *API) Serve(ctx context.Context) error {
a.serviceAPIHeartbeat(ctx)
}, a.PanicDump())
sdk.GoRoutine(ctx, "authentication.SessionCleaner", func(ctx context.Context) {
authentication.SessionCleaner(ctx, a.mustDB)
authentication.SessionCleaner(ctx, a.mustDB, 10*time.Second)
}, a.PanicDump())

isFreshInstall, errF := version.IsFreshInstall(a.mustDB())
Expand Down
11 changes: 6 additions & 5 deletions engine/api/ascode/pull_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ forLoop:
case <-tick.C:
ope, err := operation.GetRepositoryOperation(ctx, db, ed.OperationUUID)
if err != nil {
globalErr = sdk.NewErrorFrom(err, "unable to get repository operation %s", ed.OperationUUID)
globalErr = sdk.WrapError(err, "unable to get repository operation %s", ed.OperationUUID)
break forLoop
}

if ope.Status == sdk.OperationStatusError {
globalErr = sdk.NewErrorFrom(sdk.ErrUnknownError, "repository operation in error: %s", ope.Error)
globalOperation.Error = ope.Error
globalErr = ope.Error.ToError()
break forLoop
}
if ope.Status == sdk.OperationStatusDone {
Expand All @@ -76,7 +76,6 @@ forLoop:
}
}
if globalErr != nil {
httpErr := sdk.ExtractHTTPError(globalErr, "")
isErrWithStack := sdk.IsErrorWithStack(globalErr)
fields := logrus.Fields{}
if isErrWithStack {
Expand All @@ -85,7 +84,9 @@ forLoop:
log.ErrorWithFields(ctx, fields, "%s", globalErr)

globalOperation.Status = sdk.OperationStatusError
globalOperation.Error = httpErr.Error()
if globalOperation.Error == nil {
globalOperation.Error = sdk.ToOperationError(globalErr)
}
}

_ = store.SetWithTTL(cache.Key(operation.CacheOperationKey, globalOperation.UUID), globalOperation, 300)
Expand Down
4 changes: 2 additions & 2 deletions engine/api/authentication/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ func CheckSessionJWT(jwtToken string) (*jwt.Token, error) {
}

// SessionCleaner must be run as a goroutine
func SessionCleaner(ctx context.Context, dbFunc func() *gorp.DbMap) {
func SessionCleaner(ctx context.Context, dbFunc func() *gorp.DbMap, tickerDuration time.Duration) {
log.Info(ctx, "Initializing session cleaner...")
db := dbFunc()
tick := time.NewTicker(10 * time.Second)
tick := time.NewTicker(tickerDuration)
tickCorruped := time.NewTicker(12 * time.Hour)
defer tick.Stop()
defer tickCorruped.Stop()
Expand Down
4 changes: 2 additions & 2 deletions engine/api/authentication/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ func Test_CheckSessionJWT(t *testing.T) {
}

func Test_SessionCleaner(t *testing.T) {
ctx, cancel := context.WithTimeout(context.TODO(), 15*time.Second)
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
defer cancel()
db, _ := test.SetupPG(t, bootstrap.InitiliazeDB)

authentication.SessionCleaner(ctx, func() *gorp.DbMap { return db })
authentication.SessionCleaner(ctx, func() *gorp.DbMap { return db }, 1*time.Second)
}
3 changes: 3 additions & 0 deletions engine/api/services/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ func doJSONRequest(ctx context.Context, db gorp.SqlExecutor, srvs []sdk.Service,
}
return headers, code, nil
}
if lastCode < 409 {
break
}
}

log.Error(ctx, "unable to call service: maximum attempt exceed : %+v", lastErr)
Expand Down
2 changes: 1 addition & 1 deletion engine/api/workflow/dao_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ func CanBeRun(workflowRun *sdk.WorkflowRun, workflowNodeRun *sdk.WorkflowNodeRun
}
ancestorsID := node.Ancestors(workflowRun.Workflow.WorkflowData)

if ancestorsID == nil || len(ancestorsID) == 0 {
if len(ancestorsID) == 0 {
return true
}
for _, ancestorID := range ancestorsID {
Expand Down
10 changes: 5 additions & 5 deletions engine/api/workflow/execute_node_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -956,14 +956,14 @@ func stopWorkflowNodeJobRun(ctx context.Context, dbFunc func() *gorp.DbMap, stor
njr, errNRJ := LoadAndLockNodeJobRunWait(ctx, tx, store, njrID)
if errNRJ != nil {
chanErr <- sdk.WrapError(errNRJ, "StopWorkflowNodeRun> Cannot load node job run id")
tx.Rollback()
_ = tx.Rollback()
wg.Done()
return report
}

if err := AddSpawnInfosNodeJobRun(tx, njr.WorkflowNodeRunID, njr.ID, []sdk.SpawnInfo{stopInfos}); err != nil {
chanErr <- sdk.WrapError(err, "Cannot save spawn info job %d", njr.ID)
tx.Rollback()
_ = tx.Rollback()
wg.Done()
return report
}
Expand All @@ -973,14 +973,14 @@ func stopWorkflowNodeJobRun(ctx context.Context, dbFunc func() *gorp.DbMap, stor
report.Merge(ctx, r)
if err != nil {
chanErr <- sdk.WrapError(err, "cannot update node job run")
tx.Rollback()
_ = tx.Rollback()
wg.Done()
return report
}

if err := tx.Commit(); err != nil {
chanErr <- sdk.WithStack(err)
tx.Rollback()
_ = tx.Rollback()
wg.Done()
return report
}
Expand Down Expand Up @@ -1077,7 +1077,7 @@ func getVCSInfos(ctx context.Context, db gorp.SqlExecutor, store cache.Store, pr
// Check repository value
if vcsInfos.Repository == "" {
vcsInfos.Repository = applicationRepositoryFullname
} else if strings.ToLower(vcsInfos.Repository) != strings.ToLower(applicationRepositoryFullname) {
} else if !strings.EqualFold(vcsInfos.Repository, applicationRepositoryFullname) {
//The input repository is not the same as the application, we have to check if it is a fork
forks, err := client.ListForks(ctx, applicationRepositoryFullname)
if err != nil {
Expand Down
15 changes: 13 additions & 2 deletions engine/api/workflow/hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,15 +131,22 @@ func hookRegistration(ctx context.Context, db gorp.SqlExecutor, store cache.Stor
previousHook, has := oldHooks[h.UUID]
// If previous hook is the same, we do nothing
if has && h.Equals(*previousHook) {
// If this a repowebhook with an empty eventFilter, let's keep the old one because vcs won't be called to get the default eventFilter
eventFilter, has := h.GetConfigValue(sdk.HookConfigEventFilter)
if previousHook.IsRepositoryWebHook() && h.IsRepositoryWebHook() &&
(!has || eventFilter == "") {
h.Config[sdk.HookConfigEventFilter] = previousHook.Config[sdk.HookConfigEventFilter]
}
continue
}

}
// initialize a UUID is there no uuid
if h.UUID == "" {
h.UUID = sdk.UUID()
}

if h.HookModelName == sdk.RepositoryWebHookModelName || h.HookModelName == sdk.GitPollerModelName || h.HookModelName == sdk.GerritHookModelName {
if h.IsRepositoryWebHook() || h.HookModelName == sdk.GitPollerModelName || h.HookModelName == sdk.GerritHookModelName {
if wf.WorkflowData.Node.Context.ApplicationID == 0 || wf.Applications[wf.WorkflowData.Node.Context.ApplicationID].RepositoryFullname == "" || wf.Applications[wf.WorkflowData.Node.Context.ApplicationID].VCSServer == "" {
return sdk.NewErrorFrom(sdk.ErrForbidden, "cannot create a git poller or repository webhook on an application without a repository")
}
Expand All @@ -157,6 +164,7 @@ func hookRegistration(ctx context.Context, db gorp.SqlExecutor, store cache.Stor
return err
}
hookToUpdate[h.UUID] = *h
log.Debug("workflow.hookrRegistration> following hook must be updated: %+v", h)
}

if len(hookToUpdate) > 0 {
Expand All @@ -179,7 +187,10 @@ func hookRegistration(ctx context.Context, db gorp.SqlExecutor, store cache.Stor
continue
}
v, ok := h.Config[sdk.HookConfigWebHookID]
if h.HookModelName == sdk.RepositoryWebHookModelName && h.Config["vcsServer"].Value != "" {
if h.IsRepositoryWebHook() {
log.Debug("workflow.hookRegistration> managing vcs configuration: %+v", h)
}
if h.IsRepositoryWebHook() && h.Config["vcsServer"].Value != "" {
if !ok || v.Value == "" {
if err := createVCSConfiguration(ctx, db, store, proj, h); err != nil {
return sdk.WithStack(err)
Expand Down
2 changes: 0 additions & 2 deletions engine/api/workflow/process_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ func processAllJoins(ctx context.Context, db gorp.SqlExecutor, store cache.Store
//now checks if all sources have been completed
var ok = true

nodeRunIDs := []int64{}
sourcesParams := map[string]string{}
for _, nodeRun := range sources {
if nodeRun == nil {
Expand All @@ -136,7 +135,6 @@ func processAllJoins(ctx context.Context, db gorp.SqlExecutor, store cache.Store
}
}

nodeRunIDs = append(nodeRunIDs, nodeRun.ID)
//Merge build parameters from all sources
sourcesParams = sdk.ParametersMapMerge(sourcesParams, sdk.ParametersToMap(nodeRun.BuildParameters))
}
Expand Down
2 changes: 1 addition & 1 deletion engine/api/workflow/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func pollRepositoryOperation(c context.Context, db gorp.SqlExecutor, store cache
opeTrusted := *ope
opeTrusted.RepositoryStrategy.SSHKeyContent = sdk.PasswordPlaceholder
opeTrusted.RepositoryStrategy.Password = sdk.PasswordPlaceholder
return nil, sdk.WrapError(fmt.Errorf("%s", ope.Error), "operation in error: %+v", opeTrusted)
return nil, sdk.WrapError(fmt.Errorf("%+v", ope.Error), "operation in error: %+v", opeTrusted)
case sdk.OperationStatusDone:
return ope, nil
}
Expand Down
2 changes: 1 addition & 1 deletion engine/api/workflow/resync_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func Resync(ctx context.Context, db gorp.SqlExecutor, store cache.Store, proj sd
wr.Workflow.HookModels = wf.HookModels
wr.Workflow.OutGoingHookModels = wf.OutGoingHookModels

return UpdateWorkflowRun(nil, db, wr)
return UpdateWorkflowRun(ctx, db, wr)
}

//ResyncWorkflowRunStatus resync the status of workflow if you stop a node run when workflow run is building
Expand Down
35 changes: 26 additions & 9 deletions engine/api/workflow/workflow_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func ParseAndImport(ctx context.Context, db gorp.SqlExecutor, store cache.Store,
if oldW != nil {
for i := range oldW.WorkflowData.Node.Hooks {
h := &oldW.WorkflowData.Node.Hooks[i]
if h.HookModelName == sdk.RepositoryWebHookModel.Name {
if h.IsRepositoryWebHook() {
oldRepoWebHook = h
break
}
Expand All @@ -111,24 +111,36 @@ func ParseAndImport(ctx context.Context, db gorp.SqlExecutor, store cache.Store,
// Get current webhook
for i := range w.WorkflowData.Node.Hooks {
h := &w.WorkflowData.Node.Hooks[i]
if h.HookModelName == sdk.RepositoryWebHookModel.Name {
if h.IsRepositoryWebHook() {
h.UUID = oldRepoWebHook.UUID
h.Config = oldRepoWebHook.Config.Clone()
h.Config[sdk.HookConfigWorkflow] = sdk.WorkflowNodeHookConfigValue{Value: w.Name}
h.Config.MergeWith(
oldRepoWebHook.Config.Filter(
func(k string, v sdk.WorkflowNodeHookConfigValue) bool {
return !v.Configurable
},
),
)
// get only non cofigurable stuff
currentRepoWebHook = h
log.Debug("workflow.ParseAndImport> keeping the old repository web hook: %+v (%+v)", h, oldRepoWebHook)
break
}
}

// If not found
// If not found, take the default config
if currentRepoWebHook == nil {
h := sdk.NodeHook{
UUID: oldRepoWebHook.UUID,
HookModelName: oldRepoWebHook.HookModelName,
Config: oldRepoWebHook.Config.Clone(),
HookModelID: oldRepoWebHook.HookModelID,
Config: sdk.RepositoryWebHookModel.DefaultConfig.Clone(),
HookModelID: sdk.RepositoryWebHookModel.ID,
}
oldNonConfigurableConfig := oldRepoWebHook.Config.Filter(func(k string, v sdk.WorkflowNodeHookConfigValue) bool {
return !v.Configurable
})
for k, v := range oldNonConfigurableConfig {
h.Config[k] = v
}
h.Config[sdk.HookConfigWorkflow] = sdk.WorkflowNodeHookConfigValue{Value: w.Name}
w.WorkflowData.Node.Hooks = append(w.WorkflowData.Node.Hooks, h)
}
}
Expand All @@ -144,13 +156,18 @@ func ParseAndImport(ctx context.Context, db gorp.SqlExecutor, store cache.Store,
Config: sdk.RepositoryWebHookModel.DefaultConfig.Clone(),
}

// If the new workflow already contains a repowebhook (comparing refs), we dont have to add a new one
// If the new workflow already contains a repowebhook, we dont have to add a new one
var hasARepoWebHook bool
for _, h := range w.WorkflowData.Node.Hooks {
if h.Ref() == newRepoWebHook.Ref() {
hasARepoWebHook = true
break
}
if h.HookModelName == newRepoWebHook.HookModelName &&
h.ConfigValueContainsEventsDefault() {
hasARepoWebHook = true
break
}
}
if !hasARepoWebHook {
w.WorkflowData.Node.Hooks = append(w.WorkflowData.Node.Hooks, newRepoWebHook)
Expand Down
35 changes: 27 additions & 8 deletions engine/api/workflow_ascode_rename_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,11 @@ version: v1.0`),
func(ctx context.Context, method, path string, in interface{}, out interface{}, _ interface{}) (http.Header, int, error) {
vcsHooks := in.(*sdk.VCSHook)
vcsHooks.ID = sdk.UUID()
require.Len(t, vcsHooks.Events, 0, "events list should be empty, default value is set by vcs")

vcsHooks.Events = []string{
"push",
}
*(out.(*sdk.VCSHook)) = *vcsHooks
return nil, 200, nil
},
Expand Down Expand Up @@ -296,6 +301,20 @@ version: v1.0`),
assert.NoError(t, err)
assert.NotNil(t, wk)

require.Len(t, wk.WorkflowData.GetHooks(), 1)

for _, h := range wk.WorkflowData.GetHooks() {
log.Debug("--> %T %+v", h, h)
require.Equal(t, "RepositoryWebHook", h.HookModelName)
require.Equal(t, "push", h.Config["eventFilter"].Value)
require.Equal(t, "Github", h.Config["hookIcon"].Value)
require.Equal(t, "POST", h.Config["method"].Value)
require.Equal(t, proj.Key, h.Config["project"].Value)
require.Equal(t, "fsamin/go-repo", h.Config["repoFullName"].Value)
require.Equal(t, "github", h.Config["vcsServer"].Value)
require.Equal(t, wk.Name, h.Config["workflow"].Value)
}

// Then we will trigger a run of the workflow wich should trigger an as-code operation with a renamed workflow
vars := map[string]string{
"key": proj.Key,
Expand Down Expand Up @@ -343,13 +362,13 @@ version: v1.0`),

for _, h := range wk.WorkflowData.GetHooks() {
log.Debug("--> %T %+v", h, h)
assert.Equal(t, "RepositoryWebHook", h.HookModelName)
assert.Equal(t, "push", h.Config["eventFilter"].Value)
assert.Equal(t, "Github", h.Config["hookIcon"].Value)
assert.Equal(t, "POST", h.Config["method"].Value)
assert.Equal(t, proj.Key, h.Config["project"].Value)
assert.Equal(t, "fsamin/go-repo", h.Config["repoFullName"].Value)
assert.Equal(t, "github", h.Config["vcsServer"].Value)
assert.Equal(t, wk.Name, h.Config["workflow"].Value)
require.Equal(t, "RepositoryWebHook", h.HookModelName)
require.Equal(t, "push", h.Config["eventFilter"].Value)
require.Equal(t, "Github", h.Config["hookIcon"].Value)
require.Equal(t, "POST", h.Config["method"].Value)
require.Equal(t, proj.Key, h.Config["project"].Value)
require.Equal(t, "fsamin/go-repo", h.Config["repoFullName"].Value)
require.Equal(t, "github", h.Config["vcsServer"].Value)
require.Equal(t, wk.Name, h.Config["workflow"].Value)
}
}
4 changes: 2 additions & 2 deletions engine/hooks/tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ func Test_dequeueTaskExecutions_ScheduledTask(t *testing.T) {
s, cancel := setupTestHookService(t)
defer cancel()

ctx, cancel := context.WithTimeout(context.TODO(), 65*time.Second)
ctx, cancel := context.WithTimeout(context.TODO(), 60*time.Second)
defer cancel()

// Get the mock
m := s.Client.(*mock_cdsclient.MockInterface)

// Mock the sync of tasks
// It will remove all the tascks from the database
// It will remove all the tasks from the database
m.EXPECT().WorkflowAllHooksList().Return([]sdk.NodeHook{}, nil)
m.EXPECT().VCSConfiguration().Return(nil, nil).AnyTimes()
require.NoError(t, s.synchronizeTasks(ctx))
Expand Down
6 changes: 3 additions & 3 deletions engine/hooks/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ func setupTestHookService(t *testing.T) (Service, func()) {
ctrl := gomock.NewController(t)
s.Client = mock_cdsclient.NewMockInterface(ctrl)

cancel := func() {
t.Cleanup(func() {
store.Client.Close()
store.Client = nil
ctrl.Finish()
}
})

return s, cancel
return s, func() {}
}
Loading

0 comments on commit 8e83900

Please sign in to comment.