diff --git a/engine/api/api_helper.go b/engine/api/api_helper.go index 79d6203351..e0c48995ab 100644 --- a/engine/api/api_helper.go +++ b/engine/api/api_helper.go @@ -98,6 +98,14 @@ func isCDN(ctx context.Context) bool { return c.Service != nil && c.Service.Type == sdk.TypeCDN } +func isHooks(ctx context.Context) bool { + c := getAPIConsumer(ctx) + if c == nil { + return false + } + return c.Service != nil && c.Service.Type == sdk.TypeHooks +} + func isMFA(ctx context.Context) bool { s := getAuthSession(ctx) if s == nil { diff --git a/engine/api/api_routes.go b/engine/api/api_routes.go index fe5a0ff7c5..b4537f97d3 100644 --- a/engine/api/api_routes.go +++ b/engine/api/api_routes.go @@ -419,6 +419,7 @@ func (api *API) InitRouter() { r.Handle("/workflow/search", Scope(sdk.AuthConsumerScopeProject), r.GET(api.getSearchWorkflowHandler)) r.Handle("/workflow/hook", Scope(sdk.AuthConsumerScopeHooks), r.GET(api.getWorkflowHooksHandler)) + r.Handle("/workflow/hook/executions", Scope(sdk.AuthConsumerScopeHooks), r.GET(api.getWorkflowHookExecutionsHandler)) r.Handle("/workflow/hook/model/{model}", ScopeNone(), r.GET(api.getWorkflowHookModelHandler), r.POST(api.postWorkflowHookModelHandler, service.OverrideAuth(api.authAdminMiddleware)), r.PUT(api.putWorkflowHookModelHandler, service.OverrideAuth(api.authAdminMiddleware))) // SSE diff --git a/engine/api/workflow/dao_node_run.go b/engine/api/workflow/dao_node_run.go index 80f9365d23..046410d00c 100644 --- a/engine/api/workflow/dao_node_run.go +++ b/engine/api/workflow/dao_node_run.go @@ -61,7 +61,7 @@ const withLightNodeRunTestsField string = ", json_build_object('ko', workflow_no func LoadNodeRunIDsWithLogs(db gorp.SqlExecutor, wIDs []int64, status []string) ([]sdk.WorkflowNodeRunIdentifiers, error) { query := ` WITH noderun as ( - SELECT distinct workflow_node_run_id as id, workflow_node_run.workflow_run_id, status + SELECT distinct workflow_node_run_id as id, workflow_node_run.workflow_run_id, status FROM workflow_node_run_job_logs JOIN workflow_node_run ON workflow_node_run.id = workflow_node_run_id WHERE workflow_node_run.workflow_id = ANY($1) @@ -866,3 +866,16 @@ func RunExist(db gorp.SqlExecutor, projectKey string, workflowID int64, hash str count, err := db.SelectInt(query, projectKey, workflowID, hash) return count != 0, err } + +func LoadNodeRunDistinctExecutionIDs(db gorp.SqlExecutor) ([]string, error) { + query := ` + SELECT DISTINCT execution_id + FROM workflow_node_run + WHERE execution_id <> '' AND execution_id IS NOT NULL; + ` + var executionIDs []string + if _, err := db.Select(&executionIDs, query); err != nil { + return nil, sdk.WithStack(err) + } + return executionIDs, nil +} diff --git a/engine/api/workflow_hook.go b/engine/api/workflow_hook.go index 307addd708..70a4945437 100644 --- a/engine/api/workflow_hook.go +++ b/engine/api/workflow_hook.go @@ -17,8 +17,7 @@ import ( func (api *API) getWorkflowHooksHandler() service.Handler { return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error { - // This handler can only be called by a service managed by an admin - if isService := isService(ctx); !isService && !isAdmin(ctx) { + if !isHooks(ctx) { return sdk.WithStack(sdk.ErrForbidden) } @@ -31,6 +30,21 @@ func (api *API) getWorkflowHooksHandler() service.Handler { } } +func (api *API) getWorkflowHookExecutionsHandler() service.Handler { + return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error { + if !isHooks(ctx) { + return sdk.WithStack(sdk.ErrForbidden) + } + + executionIDs, err := workflow.LoadNodeRunDistinctExecutionIDs(api.mustDB()) + if err != nil { + return err + } + + return service.WriteJSON(w, executionIDs, http.StatusOK) + } +} + func (api *API) getWorkflowOutgoingHookModelsHandler() service.Handler { return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error { m, err := workflow.LoadOutgoingHookModels(api.mustDB()) diff --git a/engine/hooks/tasks.go b/engine/hooks/tasks.go index 95ff8534cf..ff0b46e60b 100644 --- a/engine/hooks/tasks.go +++ b/engine/hooks/tasks.go @@ -69,10 +69,24 @@ func (s *Service) synchronizeTasks(ctx context.Context) error { log.Info(ctx, "Hooks> Synchronizing tasks from CDS API (%s)", s.Cfg.API.HTTP.URL) - //Get all hooks from CDS, and synchronize the tasks in cache + // Get all hooks from CDS, and synchronize the tasks in cache hooks, err := s.Client.WorkflowAllHooksList() if err != nil { - return sdk.WrapError(err, "Unable to get hooks") + return sdk.WrapError(err, "unable to get hooks") + } + mHookIDs := make(map[string]struct{}, len(hooks)) + for i := range hooks { + mHookIDs[hooks[i].UUID] = struct{}{} + } + + // Get all node run execution ids from CDS, and synchronize the outgoing tasks in cache + executionIDs, err := s.Client.WorkflowAllHooksExecutions() + if err != nil { + return sdk.WrapError(err, "unable to get hook execution ids") + } + mExecutionIDs := make(map[string]struct{}, len(executionIDs)) + for i := range executionIDs { + mExecutionIDs[executionIDs[i]] = struct{}{} } allOldTasks, err := s.Dao.FindAllTasks(ctx) @@ -80,18 +94,16 @@ func (s *Service) synchronizeTasks(ctx context.Context) error { return sdk.WrapError(err, "Unable to get allOldTasks") } - //Delete all old task which are not referenced in CDS API anymore + // Delete all old task which are not referenced in CDS API anymore for i := range allOldTasks { t := &allOldTasks[i] var found bool - for _, h := range hooks { - if h.UUID == t.UUID { - found = true - log.Debug(ctx, "Hook> Synchronizing %s task %s", h.HookModelName, t.UUID) - break - } + if t.Type == TypeOutgoingWebHook || t.Type == TypeOutgoingWorkflow { + _, found = mExecutionIDs[t.UUID] + } else { + _, found = mHookIDs[t.UUID] } - if !found && t.Type != TypeOutgoingWebHook && t.Type != TypeOutgoingWorkflow { + if !found { if err := s.deleteTask(ctx, t); err != nil { log.Error(ctx, "Hook> Error on task %s delete on synchronization: %v", t.UUID, err) } else { @@ -100,6 +112,7 @@ func (s *Service) synchronizeTasks(ctx context.Context) error { } } + // Create or update hook tasks from CDS API data for _, h := range hooks { confProj := h.Config[sdk.HookConfigProject] confWorkflow := h.Config[sdk.HookConfigWorkflow] diff --git a/engine/hooks/tasks_test.go b/engine/hooks/tasks_test.go index 08db677abf..4ab518fb17 100644 --- a/engine/hooks/tasks_test.go +++ b/engine/hooks/tasks_test.go @@ -3,6 +3,7 @@ package hooks import ( "context" "net/http" + "sort" "testing" "time" @@ -88,6 +89,7 @@ func Test_dequeueTaskExecutions_ScheduledTask(t *testing.T) { // Mock the sync of tasks // It will remove all the tasks from the database m.EXPECT().WorkflowAllHooksList().Return([]sdk.NodeHook{}, nil) + m.EXPECT().WorkflowAllHooksExecutions().Return([]string{}, nil) m.EXPECT().VCSConfiguration().Return(nil, nil).AnyTimes() require.NoError(t, s.synchronizeTasks(ctx)) @@ -164,6 +166,7 @@ func Test_dequeueTaskExecutions_ScheduledTask(t *testing.T) { // Now we will triggered another hooks sync // The mock must return one hook m.EXPECT().WorkflowAllHooksList().Return([]sdk.NodeHook{*h}, nil) + m.EXPECT().WorkflowAllHooksExecutions().Return([]string{}, nil) require.NoError(t, s.synchronizeTasks(context.Background())) // We must be able to find the task @@ -177,3 +180,55 @@ func Test_dequeueTaskExecutions_ScheduledTask(t *testing.T) { assert.Equal(t, "DONE", execs[0].Status) assert.Equal(t, "SCHEDULED", execs[1].Status) } + +func Test_synchronizeTasks(t *testing.T) { + log.Factory = log.NewTestingWrapper(t) + s, cancel := setupTestHookService(t) + defer cancel() + + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + + // Get the mock + m := s.Client.(*mock_cdsclient.MockInterface) + + m.EXPECT().VCSConfiguration().Return(nil, nil).AnyTimes() + + m.EXPECT().WorkflowAllHooksList().Return([]sdk.NodeHook{}, nil) + m.EXPECT().WorkflowAllHooksExecutions().Return([]string{}, nil) + require.NoError(t, s.synchronizeTasks(ctx)) + + tasks, err := s.Dao.FindAllTasks(ctx) + require.NoError(t, err) + require.Len(t, tasks, 0) + + require.NoError(t, s.Dao.SaveTask(&sdk.Task{ + UUID: "1", + Type: TypeScheduler, + })) + require.NoError(t, s.Dao.SaveTask(&sdk.Task{ + UUID: sdk.UUID(), + Type: TypeScheduler, + })) + require.NoError(t, s.Dao.SaveTask(&sdk.Task{ + UUID: "2", + Type: TypeOutgoingWorkflow, + })) + require.NoError(t, s.Dao.SaveTask(&sdk.Task{ + UUID: sdk.UUID(), + Type: TypeOutgoingWorkflow, + })) + + m.EXPECT().WorkflowAllHooksList().Return([]sdk.NodeHook{{UUID: "1"}}, nil) + m.EXPECT().WorkflowAllHooksExecutions().Return([]string{"2"}, nil) + require.NoError(t, s.synchronizeTasks(ctx)) + + tasks, err = s.Dao.FindAllTasks(ctx) + require.NoError(t, err) + require.Len(t, tasks, 2) + sort.Slice(tasks, func(i, j int) bool { return tasks[i].UUID < tasks[j].UUID }) + require.Equal(t, "1", tasks[0].UUID) + require.Equal(t, TypeScheduler, tasks[0].Type) + require.Equal(t, "2", tasks[1].UUID) + require.Equal(t, TypeOutgoingWorkflow, tasks[1].Type) +} diff --git a/sdk/cdsclient/client_workflow_hooks.go b/sdk/cdsclient/client_workflow_hooks.go index 704469df5f..806f4b54a6 100644 --- a/sdk/cdsclient/client_workflow_hooks.go +++ b/sdk/cdsclient/client_workflow_hooks.go @@ -15,3 +15,12 @@ func (c *client) WorkflowAllHooksList() ([]sdk.NodeHook, error) { } return w, nil } + +func (c *client) WorkflowAllHooksExecutions() ([]string, error) { + url := fmt.Sprintf("/workflow/hook/executions") + var res []string + if _, err := c.GetJSON(context.Background(), url, &res); err != nil { + return nil, err + } + return res, nil +} diff --git a/sdk/cdsclient/interface.go b/sdk/cdsclient/interface.go index 90a0c6d807..fdf10d7b95 100644 --- a/sdk/cdsclient/interface.go +++ b/sdk/cdsclient/interface.go @@ -356,6 +356,7 @@ type WorkflowClient interface { WorkflowLogDownload(ctx context.Context, link sdk.CDNLogLink) ([]byte, error) WorkflowNodeRunRelease(projectKey string, workflowName string, runNumber int64, nodeRunID int64, release sdk.WorkflowNodeRunRelease) error WorkflowAllHooksList() ([]sdk.NodeHook, error) + WorkflowAllHooksExecutions() ([]string, error) WorkflowCachePush(projectKey, integrationName, ref string, tarContent io.Reader, size int) error WorkflowCachePull(projectKey, integrationName, ref string) (io.Reader, error) WorkflowTransformAsCode(projectKey, workflowName, branch, message string) (*sdk.Operation, error) diff --git a/sdk/cdsclient/mock_cdsclient/interface_mock.go b/sdk/cdsclient/mock_cdsclient/interface_mock.go index 4f4cdd29b7..4543642cbf 100644 --- a/sdk/cdsclient/mock_cdsclient/interface_mock.go +++ b/sdk/cdsclient/mock_cdsclient/interface_mock.go @@ -3934,6 +3934,21 @@ func (mr *MockWorkflowClientMockRecorder) WorkflowAccess(ctx, projectKey, workfl return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WorkflowAccess", reflect.TypeOf((*MockWorkflowClient)(nil).WorkflowAccess), ctx, projectKey, workflowID, sessionID, itemType) } +// WorkflowAllHooksExecutions mocks base method. +func (m *MockWorkflowClient) WorkflowAllHooksExecutions() ([]string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WorkflowAllHooksExecutions") + ret0, _ := ret[0].([]string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// WorkflowAllHooksExecutions indicates an expected call of WorkflowAllHooksExecutions. +func (mr *MockWorkflowClientMockRecorder) WorkflowAllHooksExecutions() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WorkflowAllHooksExecutions", reflect.TypeOf((*MockWorkflowClient)(nil).WorkflowAllHooksExecutions)) +} + // WorkflowAllHooksList mocks base method. func (m *MockWorkflowClient) WorkflowAllHooksList() ([]sdk.NodeHook, error) { m.ctrl.T.Helper() @@ -8155,6 +8170,21 @@ func (mr *MockInterfaceMockRecorder) WorkflowAccess(ctx, projectKey, workflowID, return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WorkflowAccess", reflect.TypeOf((*MockInterface)(nil).WorkflowAccess), ctx, projectKey, workflowID, sessionID, itemType) } +// WorkflowAllHooksExecutions mocks base method. +func (m *MockInterface) WorkflowAllHooksExecutions() ([]string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WorkflowAllHooksExecutions") + ret0, _ := ret[0].([]string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// WorkflowAllHooksExecutions indicates an expected call of WorkflowAllHooksExecutions. +func (mr *MockInterfaceMockRecorder) WorkflowAllHooksExecutions() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WorkflowAllHooksExecutions", reflect.TypeOf((*MockInterface)(nil).WorkflowAllHooksExecutions)) +} + // WorkflowAllHooksList mocks base method. func (m *MockInterface) WorkflowAllHooksList() ([]sdk.NodeHook, error) { m.ctrl.T.Helper()