Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(hooks): sync outgoing tasks #5854

Merged
merged 3 commits into from
Jun 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions engine/api/api_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,14 @@ func isCDN(ctx context.Context) bool {
return c.Service != nil && c.Service.Type == sdk.TypeCDN
}

func isHooks(ctx context.Context) bool {
c := getAPIConsumer(ctx)
if c == nil {
return false
}
return c.Service != nil && c.Service.Type == sdk.TypeHooks
}

func isMFA(ctx context.Context) bool {
s := getAuthSession(ctx)
if s == nil {
Expand Down
1 change: 1 addition & 0 deletions engine/api/api_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ func (api *API) InitRouter() {

r.Handle("/workflow/search", Scope(sdk.AuthConsumerScopeProject), r.GET(api.getSearchWorkflowHandler))
r.Handle("/workflow/hook", Scope(sdk.AuthConsumerScopeHooks), r.GET(api.getWorkflowHooksHandler))
r.Handle("/workflow/hook/executions", Scope(sdk.AuthConsumerScopeHooks), r.GET(api.getWorkflowHookExecutionsHandler))
r.Handle("/workflow/hook/model/{model}", ScopeNone(), r.GET(api.getWorkflowHookModelHandler), r.POST(api.postWorkflowHookModelHandler, service.OverrideAuth(api.authAdminMiddleware)), r.PUT(api.putWorkflowHookModelHandler, service.OverrideAuth(api.authAdminMiddleware)))

// SSE
Expand Down
15 changes: 14 additions & 1 deletion engine/api/workflow/dao_node_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ const withLightNodeRunTestsField string = ", json_build_object('ko', workflow_no
func LoadNodeRunIDsWithLogs(db gorp.SqlExecutor, wIDs []int64, status []string) ([]sdk.WorkflowNodeRunIdentifiers, error) {
query := `
WITH noderun as (
SELECT distinct workflow_node_run_id as id, workflow_node_run.workflow_run_id, status
SELECT distinct workflow_node_run_id as id, workflow_node_run.workflow_run_id, status
FROM workflow_node_run_job_logs
JOIN workflow_node_run ON workflow_node_run.id = workflow_node_run_id
WHERE workflow_node_run.workflow_id = ANY($1)
Expand Down Expand Up @@ -866,3 +866,16 @@ func RunExist(db gorp.SqlExecutor, projectKey string, workflowID int64, hash str
count, err := db.SelectInt(query, projectKey, workflowID, hash)
return count != 0, err
}

func LoadNodeRunDistinctExecutionIDs(db gorp.SqlExecutor) ([]string, error) {
query := `
SELECT DISTINCT execution_id
FROM workflow_node_run
WHERE execution_id <> '' AND execution_id IS NOT NULL;
`
var executionIDs []string
if _, err := db.Select(&executionIDs, query); err != nil {
return nil, sdk.WithStack(err)
}
return executionIDs, nil
}
18 changes: 16 additions & 2 deletions engine/api/workflow_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ import (

func (api *API) getWorkflowHooksHandler() service.Handler {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
// This handler can only be called by a service managed by an admin
if isService := isService(ctx); !isService && !isAdmin(ctx) {
if !isHooks(ctx) {
return sdk.WithStack(sdk.ErrForbidden)
}

Expand All @@ -31,6 +30,21 @@ func (api *API) getWorkflowHooksHandler() service.Handler {
}
}

func (api *API) getWorkflowHookExecutionsHandler() service.Handler {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
if !isHooks(ctx) {
return sdk.WithStack(sdk.ErrForbidden)
}

executionIDs, err := workflow.LoadNodeRunDistinctExecutionIDs(api.mustDB())
if err != nil {
return err
}

return service.WriteJSON(w, executionIDs, http.StatusOK)
}
}

func (api *API) getWorkflowOutgoingHookModelsHandler() service.Handler {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
m, err := workflow.LoadOutgoingHookModels(api.mustDB())
Expand Down
33 changes: 23 additions & 10 deletions engine/hooks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,29 +69,41 @@ func (s *Service) synchronizeTasks(ctx context.Context) error {

log.Info(ctx, "Hooks> Synchronizing tasks from CDS API (%s)", s.Cfg.API.HTTP.URL)

//Get all hooks from CDS, and synchronize the tasks in cache
// Get all hooks from CDS, and synchronize the tasks in cache
hooks, err := s.Client.WorkflowAllHooksList()
if err != nil {
return sdk.WrapError(err, "Unable to get hooks")
return sdk.WrapError(err, "unable to get hooks")
}
mHookIDs := make(map[string]struct{}, len(hooks))
for i := range hooks {
mHookIDs[hooks[i].UUID] = struct{}{}
}

// Get all node run execution ids from CDS, and synchronize the outgoing tasks in cache
executionIDs, err := s.Client.WorkflowAllHooksExecutions()
if err != nil {
return sdk.WrapError(err, "unable to get hook execution ids")
}
mExecutionIDs := make(map[string]struct{}, len(executionIDs))
for i := range executionIDs {
mExecutionIDs[executionIDs[i]] = struct{}{}
}

allOldTasks, err := s.Dao.FindAllTasks(ctx)
if err != nil {
return sdk.WrapError(err, "Unable to get allOldTasks")
}

//Delete all old task which are not referenced in CDS API anymore
// Delete all old task which are not referenced in CDS API anymore
for i := range allOldTasks {
t := &allOldTasks[i]
var found bool
for _, h := range hooks {
if h.UUID == t.UUID {
found = true
log.Debug(ctx, "Hook> Synchronizing %s task %s", h.HookModelName, t.UUID)
break
}
if t.Type == TypeOutgoingWebHook || t.Type == TypeOutgoingWorkflow {
_, found = mExecutionIDs[t.UUID]
} else {
_, found = mHookIDs[t.UUID]
}
if !found && t.Type != TypeOutgoingWebHook && t.Type != TypeOutgoingWorkflow {
if !found {
if err := s.deleteTask(ctx, t); err != nil {
log.Error(ctx, "Hook> Error on task %s delete on synchronization: %v", t.UUID, err)
} else {
Expand All @@ -100,6 +112,7 @@ func (s *Service) synchronizeTasks(ctx context.Context) error {
}
}

// Create or update hook tasks from CDS API data
for _, h := range hooks {
confProj := h.Config[sdk.HookConfigProject]
confWorkflow := h.Config[sdk.HookConfigWorkflow]
Expand Down
55 changes: 55 additions & 0 deletions engine/hooks/tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package hooks
import (
"context"
"net/http"
"sort"
"testing"
"time"

Expand Down Expand Up @@ -88,6 +89,7 @@ func Test_dequeueTaskExecutions_ScheduledTask(t *testing.T) {
// Mock the sync of tasks
// It will remove all the tasks from the database
m.EXPECT().WorkflowAllHooksList().Return([]sdk.NodeHook{}, nil)
m.EXPECT().WorkflowAllHooksExecutions().Return([]string{}, nil)
m.EXPECT().VCSConfiguration().Return(nil, nil).AnyTimes()
require.NoError(t, s.synchronizeTasks(ctx))

Expand Down Expand Up @@ -164,6 +166,7 @@ func Test_dequeueTaskExecutions_ScheduledTask(t *testing.T) {
// Now we will triggered another hooks sync
// The mock must return one hook
m.EXPECT().WorkflowAllHooksList().Return([]sdk.NodeHook{*h}, nil)
m.EXPECT().WorkflowAllHooksExecutions().Return([]string{}, nil)
require.NoError(t, s.synchronizeTasks(context.Background()))

// We must be able to find the task
Expand All @@ -177,3 +180,55 @@ func Test_dequeueTaskExecutions_ScheduledTask(t *testing.T) {
assert.Equal(t, "DONE", execs[0].Status)
assert.Equal(t, "SCHEDULED", execs[1].Status)
}

func Test_synchronizeTasks(t *testing.T) {
log.Factory = log.NewTestingWrapper(t)
s, cancel := setupTestHookService(t)
defer cancel()

ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
defer cancel()

// Get the mock
m := s.Client.(*mock_cdsclient.MockInterface)

m.EXPECT().VCSConfiguration().Return(nil, nil).AnyTimes()

m.EXPECT().WorkflowAllHooksList().Return([]sdk.NodeHook{}, nil)
m.EXPECT().WorkflowAllHooksExecutions().Return([]string{}, nil)
require.NoError(t, s.synchronizeTasks(ctx))

tasks, err := s.Dao.FindAllTasks(ctx)
require.NoError(t, err)
require.Len(t, tasks, 0)

require.NoError(t, s.Dao.SaveTask(&sdk.Task{
UUID: "1",
Type: TypeScheduler,
}))
require.NoError(t, s.Dao.SaveTask(&sdk.Task{
UUID: sdk.UUID(),
Type: TypeScheduler,
}))
require.NoError(t, s.Dao.SaveTask(&sdk.Task{
UUID: "2",
Type: TypeOutgoingWorkflow,
}))
require.NoError(t, s.Dao.SaveTask(&sdk.Task{
UUID: sdk.UUID(),
Type: TypeOutgoingWorkflow,
}))

m.EXPECT().WorkflowAllHooksList().Return([]sdk.NodeHook{{UUID: "1"}}, nil)
m.EXPECT().WorkflowAllHooksExecutions().Return([]string{"2"}, nil)
require.NoError(t, s.synchronizeTasks(ctx))

tasks, err = s.Dao.FindAllTasks(ctx)
require.NoError(t, err)
require.Len(t, tasks, 2)
sort.Slice(tasks, func(i, j int) bool { return tasks[i].UUID < tasks[j].UUID })
require.Equal(t, "1", tasks[0].UUID)
require.Equal(t, TypeScheduler, tasks[0].Type)
require.Equal(t, "2", tasks[1].UUID)
require.Equal(t, TypeOutgoingWorkflow, tasks[1].Type)
}
9 changes: 9 additions & 0 deletions sdk/cdsclient/client_workflow_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,12 @@ func (c *client) WorkflowAllHooksList() ([]sdk.NodeHook, error) {
}
return w, nil
}

func (c *client) WorkflowAllHooksExecutions() ([]string, error) {
url := fmt.Sprintf("/workflow/hook/executions")
var res []string
if _, err := c.GetJSON(context.Background(), url, &res); err != nil {
return nil, err
}
return res, nil
}
1 change: 1 addition & 0 deletions sdk/cdsclient/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ type WorkflowClient interface {
WorkflowLogDownload(ctx context.Context, link sdk.CDNLogLink) ([]byte, error)
WorkflowNodeRunRelease(projectKey string, workflowName string, runNumber int64, nodeRunID int64, release sdk.WorkflowNodeRunRelease) error
WorkflowAllHooksList() ([]sdk.NodeHook, error)
WorkflowAllHooksExecutions() ([]string, error)
WorkflowCachePush(projectKey, integrationName, ref string, tarContent io.Reader, size int) error
WorkflowCachePull(projectKey, integrationName, ref string) (io.Reader, error)
WorkflowTransformAsCode(projectKey, workflowName, branch, message string) (*sdk.Operation, error)
Expand Down
30 changes: 30 additions & 0 deletions sdk/cdsclient/mock_cdsclient/interface_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.