Skip to content

Commit

Permalink
refactor(ui): remove shared worker for long polling (#5257)
Browse files Browse the repository at this point in the history
  • Loading branch information
richardlt authored Jul 7, 2020
1 parent 4747e92 commit c6150a2
Show file tree
Hide file tree
Showing 82 changed files with 1,699 additions and 1,861 deletions.
47 changes: 37 additions & 10 deletions cli/cdsctl/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"

"github.com/spf13/cobra"

"github.com/ovh/cds/cli"
Expand All @@ -23,6 +24,11 @@ func events() *cobra.Command {
var eventsListenCmd = cli.Command{
Name: "listen",
Short: "Listen CDS events",
Example: ` cdsctl events listen --queue
cdsctl events listen --global
cdsctl events listen --project MYPROJ
cdsctl events listen --project MYPROJ --workflow my-workflow
`,
Flags: []cli.Flag{
{
Name: "project",
Expand All @@ -34,29 +40,50 @@ var eventsListenCmd = cli.Command{
Usage: "workflow name to listen",
Type: cli.FlagString,
},
{
Name: "queue",
Usage: "listen job queue events",
Type: cli.FlagBool,
},
{
Name: "global",
Usage: "listen global events",
Type: cli.FlagBool,
},
},
}

func eventsListenRun(v cli.Values) error {
ctx := context.Background()
chanMessageReceived := make(chan sdk.WebsocketEvent)
chanMessageToSend := make(chan sdk.WebsocketFilter)
chanMessageToSend := make(chan []sdk.WebsocketFilter)

sdk.GoRoutine(ctx, "WebsocketEventsListenCmd", func(ctx context.Context) {
client.WebsocketEventsListen(ctx, chanMessageToSend, chanMessageReceived)
})

var t string
switch {
case v.GetString("workflow") != "":
t = sdk.WebsocketFilterTypeWorkflow
case v.GetString("project") != "" && v.GetString("workflow") != "":
chanMessageToSend <- []sdk.WebsocketFilter{{
Type: sdk.WebsocketFilterTypeWorkflow,
ProjectKey: v.GetString("project"),
WorkflowName: v.GetString("workflow"),
}}
case v.GetString("project") != "":
chanMessageToSend <- []sdk.WebsocketFilter{{
Type: sdk.WebsocketFilterTypeProject,
ProjectKey: v.GetString("project"),
}}
case v.GetBool("queue"):
chanMessageToSend <- []sdk.WebsocketFilter{{
Type: sdk.WebsocketFilterTypeQueue,
}}
case v.GetBool("global"):
chanMessageToSend <- []sdk.WebsocketFilter{{
Type: sdk.WebsocketFilterTypeGlobal,
}}
default:
t = sdk.WebsocketFilterTypeProject
}
chanMessageToSend <- sdk.WebsocketFilter{
ProjectKey: v.GetString("project"),
WorkflowName: v.GetString("workflow"),
Type: t,
return fmt.Errorf("invalid given parameters")
}

for {
Expand Down
2 changes: 0 additions & 2 deletions engine/api/api_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ func (api *API) InitRouter() {

// Import As Code
r.Handle("/import/{permProjectKey}", Scope(sdk.AuthConsumerScopeProject), r.POST(api.postImportAsCodeHandler))
r.Handle("/import/{permProjectKey}/{uuid}", Scope(sdk.AuthConsumerScopeProject), r.GET(api.getImportAsCodeHandler))
r.Handle("/import/{permProjectKey}/{uuid}/perform", Scope(sdk.AuthConsumerScopeProject), r.POST(api.postPerformImportAsCodeHandler))

// Bookmarks
Expand Down Expand Up @@ -228,7 +227,6 @@ func (api *API) InitRouter() {
r.Handle("/project/{key}/workflows/{permWorkflowName}/icon", Scope(sdk.AuthConsumerScopeProject), r.PUT(api.putWorkflowIconHandler), r.DELETE(api.deleteWorkflowIconHandler))
r.Handle("/project/{key}/workflows/{permWorkflowName}/ascode", Scope(sdk.AuthConsumerScopeProject), r.POST(api.postWorkflowAsCodeHandler))
r.Handle("/project/{key}/workflows/{permWorkflowName}/ascode/events/resync", Scope(sdk.AuthConsumerScopeProject), r.POST(api.postWorkflowAsCodeEventsResyncHandler, EnableTracing()))
r.Handle("/project/{key}/workflows/{permWorkflowName}/ascode/{uuid}", Scope(sdk.AuthConsumerScopeProject), r.GET(api.getWorkflowAsCodeHandler))
r.Handle("/project/{key}/workflows/{permWorkflowName}/label", Scope(sdk.AuthConsumerScopeProject), r.POST(api.postWorkflowLabelHandler))
r.Handle("/project/{key}/workflows/{permWorkflowName}/label/{labelID}", Scope(sdk.AuthConsumerScopeProject), r.DELETE(api.deleteWorkflowLabelHandler))
r.Handle("/project/{key}/workflows/{permWorkflowName}/rollback/{auditID}", Scope(sdk.AuthConsumerScopeProject), r.POST(api.postWorkflowRollbackHandler))
Expand Down
10 changes: 5 additions & 5 deletions engine/api/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,13 +511,13 @@ func (api *API) updateAsCodeApplicationHandler() service.Handler {
Name: appDB.Name,
OperationUUID: ope.UUID,
}
asCodeEvent := ascode.UpdateAsCodeResult(ctx, api.mustDB(), api.Cache, *proj, wkHolder.ID, *rootApp, ed, u)
if asCodeEvent != nil {
event.PublishAsCodeEvent(ctx, proj.Key, *asCodeEvent, u)
}
ascode.UpdateAsCodeResult(ctx, api.mustDB(), api.Cache, *proj, *wkHolder, *rootApp, ed, u)
}, api.PanicDump())

return service.WriteJSON(w, ope, http.StatusOK)
return service.WriteJSON(w, sdk.Operation{
UUID: ope.UUID,
Status: ope.Status,
}, http.StatusOK)
}
}

Expand Down
4 changes: 0 additions & 4 deletions engine/api/application_deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,10 +290,6 @@ func Test_postApplicationDeploymentStrategyConfigHandler_InsertTwoDifferentInteg

}

func Test_deleteApplicationDeploymentStrategyConfigHandler(t *testing.T) {
//see Test_postApplicationDeploymentStrategyConfigHandler
}

func Test_postApplicationDeploymentStrategyConfigHandlerAsProvider(t *testing.T) {
api, tsURL := newTestServer(t)

Expand Down
76 changes: 38 additions & 38 deletions engine/api/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,20 @@ package api
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/go-gorp/gorp"
"github.com/golang/mock/gomock"
"github.com/ovh/cds/engine/api/event"
"github.com/ovh/cds/engine/api/pipeline"
"github.com/ovh/cds/engine/api/repositoriesmanager"
"github.com/ovh/cds/engine/api/services"
"github.com/ovh/cds/engine/api/services/mock_services"
"github.com/ovh/cds/engine/api/workflow"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -73,9 +76,18 @@ func Test_postApplicationMetadataHandler_AsProvider(t *testing.T) {
}

func TestUpdateAsCodeApplicationHandler(t *testing.T) {
api, db, _ := newTestAPI(t)
api, tsURL := newTestServer(t)
db := api.mustDB()
require.NoError(t, event.Initialize(context.Background(), db, api.Cache))

u, pass := assets.InsertAdminUser(t, db)
u, jwt := assets.InsertAdminUser(t, db)

client := cdsclient.New(cdsclient.Config{
Host: tsURL,
User: u.Username,
InsecureSkipVerifyTLS: true,
SessionToken: jwt,
})

UUID := sdk.UUID()

Expand Down Expand Up @@ -153,6 +165,7 @@ func TestUpdateAsCodeApplicationHandler(t *testing.T) {
DoAndReturn(func(ctx context.Context, method, path string, _ interface{}, in interface{}, out interface{}) (int, error) {
ope := new(sdk.Operation)
ope.UUID = UUID
ope.Status = sdk.OperationStatusPending
*(out.(*sdk.Operation)) = *ope
return 200, nil
}).Times(1)
Expand Down Expand Up @@ -242,11 +255,20 @@ func TestUpdateAsCodeApplicationHandler(t *testing.T) {
wk.FromRepository = "myrepofrom"
require.NoError(t, workflow.Insert(context.Background(), db, api.Cache, *proj, wk))

chanMessageReceived := make(chan sdk.WebsocketEvent)
chanMessageToSend := make(chan []sdk.WebsocketFilter)
go client.WebsocketEventsListen(context.TODO(), chanMessageToSend, chanMessageReceived)
chanMessageToSend <- []sdk.WebsocketFilter{{
Type: sdk.WebsocketFilterTypeAscodeEvent,
ProjectKey: proj.Key,
WorkflowName: wk.Name,
}}

uri := api.Router.GetRoute("PUT", api.updateAsCodeApplicationHandler, map[string]string{
"permProjectKey": proj.Key,
"applicationName": app.Name,
})
req := assets.NewJWTAuthentifiedRequest(t, pass, "PUT", uri, app)
req := assets.NewJWTAuthentifiedRequest(t, jwt, "PUT", uri, app)
q := req.URL.Query()
q.Set("branch", "master")
q.Set("message", "my message")
Expand All @@ -260,36 +282,14 @@ func TestUpdateAsCodeApplicationHandler(t *testing.T) {
test.NoError(t, json.Unmarshal(wr.Body.Bytes(), myOpe))
assert.NotEmpty(t, myOpe.UUID)

cpt := 0
for {
if cpt >= 10 {
t.Fail()
return
}

// Get operation
uriGET := api.Router.GetRoute("GET", api.getWorkflowAsCodeHandler, map[string]string{
"key": proj.Key,
"permWorkflowName": wk.Name,
"uuid": myOpe.UUID,
})
reqGET, err := http.NewRequest("GET", uriGET, nil)
test.NoError(t, err)
assets.AuthentifyRequest(t, reqGET, u, pass)
wrGet := httptest.NewRecorder()
api.Router.Mux.ServeHTTP(wrGet, reqGET)
assert.Equal(t, 200, wrGet.Code)
myOpeGet := new(sdk.Operation)
err = json.Unmarshal(wrGet.Body.Bytes(), myOpeGet)
assert.NoError(t, err)

if myOpeGet.Status < sdk.OperationStatusDone {
cpt++
time.Sleep(1 * time.Second)
continue
}
test.NoError(t, json.Unmarshal(wrGet.Body.Bytes(), myOpeGet))
assert.Equal(t, "myURL", myOpeGet.Setup.Push.PRLink)
break
timeout := time.NewTimer(5 * time.Second)
select {
case <-timeout.C:
t.Fatal("test timeout")
case evt := <-chanMessageReceived:
require.Equal(t, fmt.Sprintf("%T", sdk.EventAsCodeEvent{}), evt.Event.EventType)
var ae sdk.EventAsCodeEvent
require.NoError(t, json.Unmarshal(evt.Event.Payload, &ae))
require.Equal(t, "myURL", ae.Event.PullRequestURL)
}
}
61 changes: 37 additions & 24 deletions engine/api/ascode.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"

"github.com/gorilla/mux"
"github.com/sirupsen/logrus"

"github.com/ovh/cds/engine/api/ascode"
"github.com/ovh/cds/engine/api/event"
Expand Down Expand Up @@ -76,28 +77,37 @@ func (api *API) postImportAsCodeHandler() service.Handler {
if err := operation.PostRepositoryOperation(ctx, api.mustDB(), *p, ope, nil); err != nil {
return sdk.WrapError(err, "cannot create repository operation")
}
ope.RepositoryStrategy.SSHKeyContent = sdk.PasswordPlaceholder
ope.RepositoryStrategy.Password = sdk.PasswordPlaceholder

return service.WriteJSON(w, ope, http.StatusCreated)
}
}
u := getAPIConsumer(ctx)

// getImportAsCodeHandler
// @title Get import workflow as code operation details
// @description This route helps you to know if a "import as code" is over, and the details of the performed operation
// @requestBody None
// @responseBody {"uuid":"ee3946ac-3a77-46b1-af78-77868fde75ec","url":"https://github.com/fsamin/go-repo.git","strategy":{"connection_type":"","ssh_key":"","user":"","password":"","branch":"","default_branch":"","pgp_key":""},"setup":{"checkout":{}},"load_files":{"pattern":".cds/**/*.yml","results":{"w-go-repo.yml":"bmFtZTogdy1nby1yZXBvCgkJCQkJdmVyc2lvbjogdjEuMAoJCQkJCXBpcGVsaW5lOiBidWlsZAoJCQkJCWFwcGxpY2F0aW9uOiBnby1yZXBvCgkJCQkJcGlwZWxpbmVfaG9va3M6CgkJCQkJLSB0eXBlOiBSZXBvc2l0b3J5V2ViSG9vawoJCQkJCQ=="}},"status":2}
func (api *API) getImportAsCodeHandler() service.Handler {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
vars := mux.Vars(r)
ope, err := operation.GetRepositoryOperation(ctx, api.mustDB(), vars["uuid"])
if err != nil {
return sdk.WrapError(err, "cannot get repository operation status")
}
ope.RepositoryStrategy.SSHKeyContent = sdk.PasswordPlaceholder
ope.RepositoryStrategy.Password = sdk.PasswordPlaceholder
return service.WriteJSON(w, ope, http.StatusOK)
sdk.GoRoutine(context.Background(), fmt.Sprintf("postImportAsCodeHandler-%s", ope.UUID), func(ctx context.Context) {
globalOperation := sdk.Operation{
UUID: ope.UUID,
}

ope, err := operation.Poll(ctx, api.mustDB(), ope.UUID)
if err != nil {
isErrWithStack := sdk.IsErrorWithStack(err)
fields := logrus.Fields{}
if isErrWithStack {
fields["stack_trace"] = fmt.Sprintf("%+v", err)
}
log.ErrorWithFields(ctx, fields, "%s", err)

globalOperation.Status = sdk.OperationStatusError
globalOperation.Error = sdk.ToOperationError(err)
} else {
globalOperation.Status = sdk.OperationStatusDone
globalOperation.LoadFiles = ope.LoadFiles
}

event.PublishOperation(ctx, p.Key, globalOperation, u)
}, api.PanicDump())

return service.WriteJSON(w, sdk.Operation{
UUID: ope.UUID,
Status: ope.Status,
}, http.StatusCreated)
}
}

Expand All @@ -112,17 +122,20 @@ func (api *API) postPerformImportAsCodeHandler() service.Handler {
key := vars[permProjectKey]
uuid := vars["uuid"]

//Load project
proj, errp := project.Load(ctx, api.mustDB(), key,
if uuid == "" {
return sdk.NewErrorFrom(sdk.ErrWrongRequest, "invalid given operation uuid")
}

proj, err := project.Load(ctx, api.mustDB(), key,
project.LoadOptions.WithGroups,
project.LoadOptions.WithApplications,
project.LoadOptions.WithEnvironments,
project.LoadOptions.WithPipelines,
project.LoadOptions.WithFeatures(api.Cache),
project.LoadOptions.WithClearIntegrations,
)
if errp != nil {
return sdk.WrapError(errp, "postPerformImportAsCodeHandler> Cannot load project %s", key)
if err != nil {
return sdk.WrapError(err, "cannot load project %s", key)
}

ope, err := operation.GetRepositoryOperation(ctx, api.mustDB(), uuid)
Expand Down
Loading

0 comments on commit c6150a2

Please sign in to comment.