From 00d584cf1148cba3423f809ae7c5214da394593a Mon Sep 17 00:00:00 2001 From: Guiheux Steven Date: Tue, 2 Jun 2020 16:44:51 +0200 Subject: [PATCH] feat(hatchery): enable websocket (#5199) --- cli/cdsctl/events.go | 45 ++++++++---- engine/api/event/publish_workflow_run.go | 10 +-- engine/hatchery/hatchery_helper_test.go | 69 ++++++++++++++----- engine/hatchery/hatchery_local_test.go | 15 ++-- sdk/cdsclient/client_events.go | 8 --- sdk/cdsclient/client_queue.go | 58 +++++++--------- sdk/cdsclient/interface.go | 1 - .../mock_cdsclient/interface_mock.go | 26 +------ sdk/event.go | 12 ++-- sdk/hatchery/hatchery.go | 10 +-- sdk/hatchery/stats.go | 4 +- sdk/hatchery/types.go | 2 +- tests/03_clictl_admin_services.yml | 2 +- 13 files changed, 134 insertions(+), 128 deletions(-) diff --git a/cli/cdsctl/events.go b/cli/cdsctl/events.go index 834f403aee..e09ac95f55 100644 --- a/cli/cdsctl/events.go +++ b/cli/cdsctl/events.go @@ -2,15 +2,11 @@ package main import ( "context" - "encoding/json" "fmt" - "io/ioutil" - "github.com/spf13/cobra" "github.com/ovh/cds/cli" "github.com/ovh/cds/sdk" - "github.com/ovh/cds/sdk/cdsclient" ) var eventsCmd = cli.Command{ @@ -27,28 +23,51 @@ func events() *cobra.Command { var eventsListenCmd = cli.Command{ Name: "listen", Short: "Listen CDS events", + Flags: []cli.Flag{ + { + Name: "project", + Usage: "project key to listen", + Type: cli.FlagString, + }, + { + Name: "workflow", + Usage: "workflow name to listen", + Type: cli.FlagString, + }, + }, } func eventsListenRun(v cli.Values) error { ctx := context.Background() - chanSSE := make(chan cdsclient.SSEvent) + chanMessageReceived := make(chan sdk.WebsocketEvent) + chanMessageToSend := make(chan sdk.WebsocketFilter) - sdk.GoRoutine(ctx, "EventsListenCmd", func(ctx context.Context) { - client.EventsListen(ctx, chanSSE) + sdk.GoRoutine(ctx, "WebsocketEventsListenCmd", func(ctx context.Context) { + client.WebsocketEventsListen(ctx, chanMessageToSend, chanMessageReceived) }) + var t string + switch { + case v.GetString("workflow") != "": + t = sdk.WebsocketFilterTypeWorkflow + default: + t = sdk.WebsocketFilterTypeProject + } + chanMessageToSend <- sdk.WebsocketFilter{ + ProjectKey: v.GetString("project"), + WorkflowName: v.GetString("workflow"), + Type: t, + } + for { select { case <-ctx.Done(): return ctx.Err() - case evt := <-chanSSE: - var e sdk.Event - content, _ := ioutil.ReadAll(evt.Data) - _ = json.Unmarshal(content, &e) - if e.EventType == "" { + case evt := <-chanMessageReceived: + if evt.Event.EventType == "" { continue } - fmt.Printf("%s: %s %s %s\n", e.EventType, e.ProjectKey, e.WorkflowName, e.Status) + fmt.Printf("%s: %s %s %s\n", evt.Event.EventType, evt.Event.ProjectKey, evt.Event.WorkflowName, evt.Event.Status) } } } diff --git a/engine/api/event/publish_workflow_run.go b/engine/api/event/publish_workflow_run.go index c160df1139..eed7cc6555 100644 --- a/engine/api/event/publish_workflow_run.go +++ b/engine/api/event/publish_workflow_run.go @@ -197,13 +197,9 @@ func PublishWorkflowNodeRun(ctx context.Context, nr sdk.WorkflowNodeRun, w sdk.W // PublishWorkflowNodeJobRun publish a WorkflowNodeJobRun func PublishWorkflowNodeJobRun(ctx context.Context, pkey string, wr sdk.WorkflowRun, jr sdk.WorkflowNodeJobRun) { e := sdk.EventRunWorkflowJob{ - ID: jr.ID, - Status: jr.Status, - Start: jr.Start.Unix(), - Requirements: jr.Job.Action.Requirements, - WorkerName: jr.Job.WorkerName, - BookByName: jr.BookedBy.Name, - Parameters: jr.Parameters, + ID: jr.ID, + Status: jr.Status, + Start: jr.Start.Unix(), } if sdk.StatusIsTerminated(jr.Status) { diff --git a/engine/hatchery/hatchery_helper_test.go b/engine/hatchery/hatchery_helper_test.go index 71cafca48c..1e1d89bb16 100644 --- a/engine/hatchery/hatchery_helper_test.go +++ b/engine/hatchery/hatchery_helper_test.go @@ -9,24 +9,63 @@ import ( "io" "io/ioutil" "net/http" + "net/http/httptest" "net/http/httputil" "testing" "time" + "github.com/gorilla/websocket" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/h2non/gock.v1" + "github.com/ovh/cds/engine/api/authentication" "github.com/ovh/cds/sdk" "github.com/ovh/cds/sdk/jws" "github.com/ovh/cds/sdk/log" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "gopkg.in/h2non/gock.v1" ) func init() { log.Initialize(&log.Conf{Level: "debug"}) } -func InitMock(t *testing.T) { +func InitWebsocketTestServer(t *testing.T) *httptest.Server { + upgrader := websocket.Upgrader{} + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + c, err := upgrader.Upgrade(w, r, nil) + require.NoError(t, err) + defer c.Close() + + j := sdk.EventRunWorkflowJob{ + ID: 1, + Status: sdk.StatusWaiting, + } + bts, err := json.Marshal(j) + require.NoError(t, err) + jevent := sdk.WebsocketEvent{ + Status: "OK", + Event: sdk.Event{ + EventType: fmt.Sprintf("%T", j), + Status: sdk.StatusWaiting, + Payload: bts, + }, + } + require.NoError(t, c.WriteJSON(jevent)) + for { + mt, message, err := c.ReadMessage() + if err != nil { + require.NoError(t, err) + } + err = c.WriteMessage(mt, message) + if err != nil { + require.NoError(t, err) + } + } + })) + return s +} + +func InitMock(t *testing.T, url string) { privKey, _ := jws.NewRandomRSAKey() privKeyPEM, _ := jws.ExportPrivateKey(privKey) pubKey, _ := jws.ExportPublicKey(privKey) @@ -52,7 +91,7 @@ func InitMock(t *testing.T) { } } - gock.New("http://lolcat.host").Post("/auth/consumer/builtin/signin"). + gock.New(url).Post("/auth/consumer/builtin/signin"). Reply(201). JSON( sdk.AuthConsumerSigninResponse{ @@ -63,32 +102,30 @@ func InitMock(t *testing.T) { }, ).AddHeader("X-Api-Pub-Signing-Key", base64.StdEncoding.EncodeToString(pubKey)) - gock.New("http://lolcat.host").Get("/download/worker/darwin/amd64").Times(1). + gock.New(url).Get("/download/worker/darwin/amd64").Times(1). Reply(200). Body(bytes.NewBuffer([]byte("nop"))). AddHeader("Content-Type", "application/octet-stream") - gock.New("http://lolcat.host").Get("/download/worker/linux/amd64").Times(1). + gock.New(url).Get("/download/worker/linux/amd64").Times(1). Reply(200). Body(bytes.NewBuffer([]byte("nop"))). AddHeader("Content-Type", "application/octet-stream") - gock.New("http://lolcat.host").Post("/services/register"). + gock.New(url).Post("/services/register"). HeaderPresent("Authorization"). Reply(200). JSON(sdk.Service{}) - gock.New("http://lolcat.host").Post("/services/heartbeat"). + gock.New(url).Post("/services/heartbeat"). HeaderPresent("Authorization"). Reply(204) - gock.New("http://lolcat.host").Get("/worker").Times(6). + gock.New(url).Get("/worker").Times(6). Reply(200). JSON([]sdk.Worker{}) - gock.New("http://lolcat.host").Get("/events").EnableNetworking() - - gock.New("http://lolcat.host").Get("/queue/workflows/1/infos").Times(1). + gock.New(url).Get("/queue/workflows/1/infos").Times(1). Reply(200). JSON(sdk.WorkflowNodeJobRun{ ID: 1, @@ -98,12 +135,12 @@ func InitMock(t *testing.T) { }, }) - gock.New("http://lolcat.host").Post("/queue/workflows/1/spawn/infos").Times(2).Reply(200) + gock.New(url).Post("/queue/workflows/1/spawn/infos").Times(2).Reply(200) - gock.New("http://lolcat.host").Post("/queue/workflows/1/book"). + gock.New(url).Post("/queue/workflows/1/book"). Reply(204) - gock.New("http://lolcat.host").Get("/queue/workflows").Times(1). + gock.New(url).Get("/queue/workflows").Times(1). Reply(200). JSON([]sdk.WorkflowRun{}) diff --git a/engine/hatchery/hatchery_local_test.go b/engine/hatchery/hatchery_local_test.go index 483c8e4e0a..68fb0d14b9 100644 --- a/engine/hatchery/hatchery_local_test.go +++ b/engine/hatchery/hatchery_local_test.go @@ -31,20 +31,21 @@ func (r *TestRunner) NewCmd(ctx context.Context, command string, args ...string) } func TestHatcheryLocal(t *testing.T) { - InitMock(t) - defer gock.Off() + s := InitWebsocketTestServer(t) + InitMock(t, s.URL) + defer s.Close() + defer gock.Off() var h = local.New() h.LocalWorkerRunner = &TestRunner{t} - var cfg = local.HatcheryConfiguration{ Basedir: os.TempDir(), } cfg.Name = "lolcat-test-hatchery" cfg.API.HTTP.Insecure = false - cfg.API.HTTP.URL = "http://lolcat.host" + cfg.API.HTTP.URL = s.URL cfg.API.Token = "xxxxxxxx" cfg.API.MaxHeartbeatFailures = 0 cfg.Provision.RegisterFrequency = 1 @@ -61,8 +62,6 @@ func TestHatcheryLocal(t *testing.T) { t.Logf("service config: %+v", srvCfg) srvCfg.Hook = func(client cdsclient.Interface) error { - client.HTTPSSEClient().Transport = newMockSSERoundTripper(t, context.TODO()) - gock.InterceptClient(client.HTTPSSEClient()) gock.InterceptClient(client.HTTPClient()) return nil } @@ -94,8 +93,8 @@ func TestHatcheryLocal(t *testing.T) { if !gock.IsDone() { pending := gock.Pending() for _, m := range pending { - if m.Request().URLStruct.String() != "http://lolcat.host/services/heartbeat" && - !strings.HasPrefix(m.Request().URLStruct.String(), "http://lolcat.host/download/worker") { + if m.Request().URLStruct.String() != s.URL+"/services/heartbeat" && + !strings.HasPrefix(m.Request().URLStruct.String(), s.URL+"/download/worker") { t.Errorf("PENDING %s %s", m.Request().Method, m.Request().URLStruct.String()) } } diff --git a/sdk/cdsclient/client_events.go b/sdk/cdsclient/client_events.go index a46f9f68a1..5b9f3aad03 100644 --- a/sdk/cdsclient/client_events.go +++ b/sdk/cdsclient/client_events.go @@ -8,14 +8,6 @@ import ( "github.com/ovh/cds/sdk" ) -func (c *client) EventsListen(ctx context.Context, chanSSEvt chan<- SSEvent) { - for ctx.Err() == nil { - if err := c.RequestSSEGet(ctx, "/events", chanSSEvt); err != nil { - log.Println("EventsListen", err) - } - time.Sleep(1 * time.Second) - } -} func (c *client) WebsocketEventsListen(ctx context.Context, chanMsgToSend <-chan sdk.WebsocketFilter, chanMsgReceived chan<- sdk.WebsocketEvent) { for ctx.Err() == nil { if err := c.RequestWebsocket(ctx, "/ws", chanMsgToSend, chanMsgReceived); err != nil { diff --git a/sdk/cdsclient/client_queue.go b/sdk/cdsclient/client_queue.go index 340710cd7f..66894aa4dd 100644 --- a/sdk/cdsclient/client_queue.go +++ b/sdk/cdsclient/client_queue.go @@ -57,15 +57,19 @@ func (c *client) QueuePolling(ctx context.Context, jobs chan<- sdk.WorkflowNodeJ jobsTicker := time.NewTicker(delay) // This goroutine call the SSE route - chanSSEvt := make(chan SSEvent) - sdk.GoRoutine(ctx, "RequestSSEGet", func(ctx context.Context) { + chanMessageReceived := make(chan sdk.WebsocketEvent, 10) + chanMessageToSend := make(chan sdk.WebsocketFilter, 10) + sdk.GoRoutine(ctx, "RequestWebsocket", func(ctx context.Context) { for ctx.Err() == nil { - if err := c.RequestSSEGet(ctx, "/events", chanSSEvt); err != nil { + if err := c.RequestWebsocket(ctx, "/ws", chanMessageToSend, chanMessageReceived); err != nil { log.Println("QueuePolling", err) } time.Sleep(1 * time.Second) } }) + chanMessageToSend <- sdk.WebsocketFilter{ + Queue: true, + } for { select { @@ -75,44 +79,32 @@ func (c *client) QueuePolling(ctx context.Context, jobs chan<- sdk.WorkflowNodeJ close(jobs) } return ctx.Err() - case evt := <-chanSSEvt: + case wsEvent := <-chanMessageReceived: if jobs == nil { continue } - - content, _ := ioutil.ReadAll(evt.Data) - - var apiEvent sdk.Event - _ = json.Unmarshal(content, &apiEvent) // ignore errors - // filter only EventRunWorkflowJob - if apiEvent.EventType == "sdk.EventRunWorkflowJob" { - var runJob sdk.EventRunWorkflowJob - if err := json.Unmarshal(apiEvent.Payload, &runJob); err != nil { - errs <- fmt.Errorf("unable to unmarshal job event: %v", err) + if wsEvent.Event.EventType == "sdk.EventRunWorkflowJob" && wsEvent.Event.Status == sdk.StatusWaiting { + var jobEvent sdk.EventRunWorkflowJob + if err := json.Unmarshal(wsEvent.Event.Payload, &jobEvent); err != nil { + errs <- fmt.Errorf("unable to unmarshal job %v: %v", wsEvent.Event.Payload, err) + continue + } + job, err := c.QueueJobInfo(ctx, jobEvent.ID) + // Do not log the error if the job does not exist + if sdk.ErrorIs(err, sdk.ErrWorkflowNodeRunJobNotFound) { continue } - if runJob.ID != 0 && runJob.Status == sdk.StatusWaiting { - job, err := c.QueueJobInfo(ctx, runJob.ID) - - // Do not log the error if the job does not exist - if sdk.ErrorIs(err, sdk.ErrWorkflowNodeRunJobNotFound) { - continue - } - - if err != nil { - errs <- fmt.Errorf("unable to get job %v info: %v", runJob.ID, err) - continue - } - - // push the job in the channel - if job.Status == sdk.StatusWaiting && job.BookedBy.Name == "" { - job.Header["SSE"] = "true" - jobs <- *job - } + if err != nil { + errs <- fmt.Errorf("unable to get job %v info: %v", job.ID, err) + continue + } + // push the job in the channel + if job.Status == sdk.StatusWaiting && job.BookedBy.Name == "" { + job.Header["WS"] = "true" + jobs <- *job } } - case <-jobsTicker.C: if c.config.Verbose { fmt.Println("jobsTicker") diff --git a/sdk/cdsclient/interface.go b/sdk/cdsclient/interface.go index 9b54258c3d..2e8f16bbee 100644 --- a/sdk/cdsclient/interface.go +++ b/sdk/cdsclient/interface.go @@ -144,7 +144,6 @@ type EnvironmentVariableClient interface { // EventsClient listen SSE Events from CDS API type EventsClient interface { // Must be run in a go routine - EventsListen(ctx context.Context, chanSSEvt chan<- SSEvent) WebsocketEventsListen(ctx context.Context, chanMsgToSend <-chan sdk.WebsocketFilter, chanMsgReceived chan<- sdk.WebsocketEvent) } diff --git a/sdk/cdsclient/mock_cdsclient/interface_mock.go b/sdk/cdsclient/mock_cdsclient/interface_mock.go index ced6b44ec6..f68434a977 100644 --- a/sdk/cdsclient/mock_cdsclient/interface_mock.go +++ b/sdk/cdsclient/mock_cdsclient/interface_mock.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: sdk/cdsclient/interface.go +// Source: interface.go // Package mock_cdsclient is a generated GoMock package. package mock_cdsclient @@ -1709,18 +1709,6 @@ func (m *MockEventsClient) EXPECT() *MockEventsClientMockRecorder { return m.recorder } -// EventsListen mocks base method -func (m *MockEventsClient) EventsListen(ctx context.Context, chanSSEvt chan<- cdsclient.SSEvent) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "EventsListen", ctx, chanSSEvt) -} - -// EventsListen indicates an expected call of EventsListen -func (mr *MockEventsClientMockRecorder) EventsListen(ctx, chanSSEvt interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EventsListen", reflect.TypeOf((*MockEventsClient)(nil).EventsListen), ctx, chanSSEvt) -} - // WebsocketEventsListen mocks base method func (m *MockEventsClient) WebsocketEventsListen(ctx context.Context, chanMsgToSend <-chan sdk.WebsocketFilter, chanMsgReceived chan<- sdk.WebsocketEvent) { m.ctrl.T.Helper() @@ -5546,18 +5534,6 @@ func (mr *MockInterfaceMockRecorder) EnvironmentKeysDelete(projectKey, envName, return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnvironmentKeysDelete", reflect.TypeOf((*MockInterface)(nil).EnvironmentKeysDelete), projectKey, envName, keyEnvName) } -// EventsListen mocks base method -func (m *MockInterface) EventsListen(ctx context.Context, chanSSEvt chan<- cdsclient.SSEvent) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "EventsListen", ctx, chanSSEvt) -} - -// EventsListen indicates an expected call of EventsListen -func (mr *MockInterfaceMockRecorder) EventsListen(ctx, chanSSEvt interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EventsListen", reflect.TypeOf((*MockInterface)(nil).EventsListen), ctx, chanSSEvt) -} - // WebsocketEventsListen mocks base method func (m *MockInterface) WebsocketEventsListen(ctx context.Context, chanMsgToSend <-chan sdk.WebsocketFilter, chanMsgReceived chan<- sdk.WebsocketEvent) { m.ctrl.T.Helper() diff --git a/sdk/event.go b/sdk/event.go index 282341106d..9d9601cdab 100644 --- a/sdk/event.go +++ b/sdk/event.go @@ -106,14 +106,10 @@ type EventRunWorkflowOutgoingHook struct { // EventRunWorkflowJob contains event data for a workflow job node run type EventRunWorkflowJob struct { - ID int64 `json:"id,omitempty"` - Status string `json:"status,omitempty"` - Start int64 `json:"start,omitempty"` - Done int64 `json:"done,omitempty"` - Requirements []Requirement `json:"requirements,omitempty"` - WorkerName string `json:"worker_name,omitempty"` - BookByName string `json:"book_by_name,omitempty"` - Parameters []Parameter `json:"parameters,omitempty"` + ID int64 `json:"id,omitempty"` + Status string `json:"status,omitempty"` + Start int64 `json:"start,omitempty"` + Done int64 `json:"done,omitempty"` } // EventRunWorkflow contains event data for a workflow run diff --git a/sdk/hatchery/hatchery.go b/sdk/hatchery/hatchery.go index ee06141555..50a292e0af 100644 --- a/sdk/hatchery/hatchery.go +++ b/sdk/hatchery/hatchery.go @@ -151,10 +151,10 @@ func Create(ctx context.Context, h Interface) error { observability.Tag(observability.TagWorkflowNodeJobRun, j.ID), ) - if _, ok := j.Header["SSE"]; ok { - log.Debug("hatchery> received job from SSE") + if _, ok := j.Header["WS"]; ok { + log.Debug("hatchery> received job from WS") observability.Current(currentCtx, - observability.Tag("from", "sse"), + observability.Tag("from", "ws"), ) } } @@ -178,8 +178,8 @@ func Create(ctx context.Context, h Interface) error { stats.Record(currentCtx, GetMetrics().Jobs.M(1)) - if _, ok := j.Header["SSE"]; ok { - stats.Record(currentCtx, GetMetrics().JobsSSE.M(1)) + if _, ok := j.Header["WS"]; ok { + stats.Record(currentCtx, GetMetrics().JobsWebsocket.M(1)) } //Check if the jobs is concerned by a pending worker creation diff --git a/sdk/hatchery/stats.go b/sdk/hatchery/stats.go index c46e5e329c..475811080c 100644 --- a/sdk/hatchery/stats.go +++ b/sdk/hatchery/stats.go @@ -25,7 +25,7 @@ func initMetrics() error { var err error onceMetrics.Do(func() { metrics.Jobs = stats.Int64("cds/jobs", "number of analyzed jobs", stats.UnitDimensionless) - metrics.JobsSSE = stats.Int64("cds/jobs_sse", "number of analyzed jobs from SSE", stats.UnitDimensionless) + metrics.JobsWebsocket = stats.Int64("cds/jobs_websocket", "number of analyzed jobs from SSE", stats.UnitDimensionless) metrics.SpawnedWorkers = stats.Int64("cds/spawned_workers", "number of spawned workers", stats.UnitDimensionless) metrics.PendingWorkers = stats.Int64("cds/pending_workers", "number of pending workers", stats.UnitDimensionless) metrics.RegisteringWorkers = stats.Int64("cds/registering_workers", "number of registering workers", stats.UnitDimensionless) @@ -37,7 +37,7 @@ func initMetrics() error { tags := []tag.Key{observability.MustNewKey(observability.TagServiceType), observability.MustNewKey(observability.TagServiceName)} err = observability.RegisterView( observability.NewViewCount("cds/hatchery/jobs_count", metrics.Jobs, tags), - observability.NewViewCount("cds/hatchery/jobs_sse_count", metrics.JobsSSE, tags), + observability.NewViewCount("cds/hatchery/jobs_websocket_count", metrics.JobsWebsocket, tags), observability.NewViewCount("cds/hatchery/spawned_worker_count", metrics.SpawnedWorkers, tags), observability.NewViewLast("cds/hatchery/pending_workers", metrics.PendingWorkers, tags), observability.NewViewLast("cds/hatchery/registering_workers", metrics.RegisteringWorkers, tags), diff --git a/sdk/hatchery/types.go b/sdk/hatchery/types.go index 23155a2d1e..54d4fb8394 100644 --- a/sdk/hatchery/types.go +++ b/sdk/hatchery/types.go @@ -114,7 +114,7 @@ type InterfaceWithModels interface { type Metrics struct { Jobs *stats.Int64Measure - JobsSSE *stats.Int64Measure + JobsWebsocket *stats.Int64Measure SpawnedWorkers *stats.Int64Measure PendingWorkers *stats.Int64Measure RegisteringWorkers *stats.Int64Measure diff --git a/tests/03_clictl_admin_services.yml b/tests/03_clictl_admin_services.yml index 2ee702cc5e..f9613682cc 100644 --- a/tests/03_clictl_admin_services.yml +++ b/tests/03_clictl_admin_services.yml @@ -22,4 +22,4 @@ testcases: - script: {{.cdsctl}} -f {{.cdsctl.config}} admin services request --name `{{.cdsctl}} -f {{.cdsctl.config}} admin services list -q|grep hatchery|head -n1` --query /debug/pprof/goroutine\?debug\=2 assertions: - result.code ShouldEqual 0 - - result.systemout ShouldContainSubstring transport.go + - result.systemout ShouldContainSubstring conn.go