From 7bcdada38ae11a858558aa7f9ab24356a9d57139 Mon Sep 17 00:00:00 2001 From: Guiheux Steven Date: Thu, 30 Apr 2020 08:18:08 +0200 Subject: [PATCH] feat(api,ui): websocket server + client for admin (#5128) --- engine/api/api.go | 1 + engine/api/api_routes.go | 12 + engine/api/event/elasticsearch.go | 2 +- engine/api/event/publish_workflow_run.go | 91 +++- engine/api/events.go | 6 +- engine/api/router.go | 2 + engine/api/router_metrics.go | 10 + engine/api/websocket.go | 399 ++++++++++++++++++ engine/api/websocket_test.go | 266 ++++++++++++ engine/api/workflow_event.go | 2 +- engine/ui/types.go | 2 +- go.mod | 1 + sdk/cdsclient/client.go | 26 +- sdk/cdsclient/client_events.go | 10 + sdk/cdsclient/http_websocket.go | 93 ++++ sdk/cdsclient/interface.go | 3 + .../mock_cdsclient/interface_mock.go | 53 +++ sdk/event.go | 33 +- sdk/websocket.go | 30 ++ ui/proxy.conf.json | 1 + ui/src/app/app.component.ts | 19 +- ui/src/app/app.module.ts | 2 + ui/src/app/event.service.ts | 142 +++++++ ui/src/app/model/websocket.model.ts | 21 + 24 files changed, 1169 insertions(+), 58 deletions(-) create mode 100644 engine/api/websocket.go create mode 100644 engine/api/websocket_test.go create mode 100644 sdk/cdsclient/http_websocket.go create mode 100644 sdk/websocket.go create mode 100644 ui/src/app/event.service.ts create mode 100644 ui/src/app/model/websocket.model.ts diff --git a/engine/api/api.go b/engine/api/api.go index a33a3ebf0e..1f2f0d6e29 100644 --- a/engine/api/api.go +++ b/engine/api/api.go @@ -249,6 +249,7 @@ type API struct { StartupTime time.Time Maintenance bool eventsBroker *eventsBroker + websocketBroker *websocketBroker Cache cache.Store Metrics struct { WorkflowRunFailed *stats.Int64Measure diff --git a/engine/api/api_routes.go b/engine/api/api_routes.go index fbdfb5da11..44c0db562d 100644 --- a/engine/api/api_routes.go +++ b/engine/api/api_routes.go @@ -40,6 +40,17 @@ func (api *API) InitRouter() { } api.eventsBroker.Init(r.Background, api.PanicDump()) + api.websocketBroker = &websocketBroker{ + router: api.Router, + cache: api.Cache, + dbFunc: api.mustDB, + clients: make(map[string]*websocketClient), + messages: make(chan sdk.Event), + chanAddClient: make(chan *websocketClient), + chanRemoveClient: make(chan string), + } + api.websocketBroker.Init(r.Background, api.PanicDump()) + // Auth r.Handle("/auth/driver", ScopeNone(), r.GET(api.getAuthDriversHandler, Auth(false))) r.Handle("/auth/me", Scope(sdk.AuthConsumerScopeAction), r.GET(api.getAuthMe)) @@ -405,6 +416,7 @@ func (api *API) InitRouter() { // SSE r.Handle("/events", ScopeNone(), r.GET(api.eventsBroker.ServeHTTP)) + r.Handle("/ws", ScopeNone(), r.GET(api.websocketBroker.ServeHTTP)) // Feature r.Handle("/feature/clean", ScopeNone(), r.POST(api.cleanFeatureHandler, NeedToken("X-Izanami-Token", api.Config.Features.Izanami.Token))) diff --git a/engine/api/event/elasticsearch.go b/engine/api/event/elasticsearch.go index b7a1ee1e1d..7ccd6eb6ea 100644 --- a/engine/api/event/elasticsearch.go +++ b/engine/api/event/elasticsearch.go @@ -37,7 +37,7 @@ func PushInElasticSearch(ctx context.Context, db gorp.SqlExecutor, store cache.S } switch e.EventType { - case "sdk.EventJob", "sdk.EventEngine": + case "sdk.EventEngine": continue } e.Payload = nil diff --git a/engine/api/event/publish_workflow_run.go b/engine/api/event/publish_workflow_run.go index ce33ae9187..92d8221172 100644 --- a/engine/api/event/publish_workflow_run.go +++ b/engine/api/event/publish_workflow_run.go @@ -6,15 +6,27 @@ import ( "fmt" "time" - "github.com/go-gorp/gorp" - "github.com/ovh/cds/sdk" "github.com/ovh/cds/sdk/log" ) -func publishRunWorkflow(ctx context.Context, payload interface{}, key, workflowName, appName, pipName, envName string, num int64, sub int64, status string, tags []sdk.WorkflowRunTag, eventIntegrations []sdk.ProjectIntegration) { - eventIntegrationsID := make([]int64, len(eventIntegrations)) - for i, eventIntegration := range eventIntegrations { +type publishWorkflowRunData struct { + projectKey string + workflowName string + applicationName string + pipelineName string + environmentName string + workflowRunNum int64 + workflowRunSubNum int64 + status string + workflowRunTags []sdk.WorkflowRunTag + eventIntegrations []sdk.ProjectIntegration + workflowNodeRunID int64 +} + +func publishRunWorkflow(ctx context.Context, payload interface{}, data publishWorkflowRunData) { + eventIntegrationsID := make([]int64, len(data.eventIntegrations)) + for i, eventIntegration := range data.eventIntegrations { eventIntegrationsID[i] = eventIntegration.ID } @@ -25,18 +37,18 @@ func publishRunWorkflow(ctx context.Context, payload interface{}, key, workflowN CDSName: cdsname, EventType: fmt.Sprintf("%T", payload), Payload: bts, - ProjectKey: key, - ApplicationName: appName, - PipelineName: pipName, - WorkflowName: workflowName, - EnvironmentName: envName, - WorkflowRunNum: num, - WorkflowRunNumSub: sub, - Status: status, - Tags: tags, + ProjectKey: data.projectKey, + ApplicationName: data.applicationName, + PipelineName: data.pipelineName, + WorkflowName: data.workflowName, + EnvironmentName: data.environmentName, + WorkflowRunNum: data.workflowRunNum, + WorkflowRunNumSub: data.workflowRunSubNum, + Status: data.status, + Tags: data.workflowRunTags, EventIntegrationsID: eventIntegrationsID, } - publishEvent(ctx, event) + _ = publishEvent(ctx, event) } // PublishWorkflowRun publish event on a workflow run @@ -51,7 +63,16 @@ func PublishWorkflowRun(ctx context.Context, wr sdk.WorkflowRun, projectKey stri LastModifiedNano: wr.LastModified.UnixNano(), Tags: wr.Tags, } - publishRunWorkflow(ctx, e, projectKey, wr.Workflow.Name, "", "", "", wr.Number, wr.LastSubNumber, wr.Status, wr.Tags, wr.Workflow.EventIntegrations) + data := publishWorkflowRunData{ + projectKey: projectKey, + workflowName: wr.Workflow.Name, + workflowRunNum: wr.Number, + workflowRunSubNum: wr.LastSubNumber, + status: wr.Status, + workflowRunTags: wr.Tags, + eventIntegrations: wr.Workflow.EventIntegrations, + } + publishRunWorkflow(ctx, e, data) } // PublishWorkflowNodeRun publish event on a workflow node run @@ -157,19 +178,45 @@ func PublishWorkflowNodeRun(ctx context.Context, nr sdk.WorkflowNodeRun, w sdk.W if sdk.StatusIsTerminated(nr.Status) { e.Done = nr.Done.Unix() } - publishRunWorkflow(ctx, e, w.ProjectKey, w.Name, appName, pipName, envName, nr.Number, nr.SubNumber, nr.Status, nil, w.EventIntegrations) + data := publishWorkflowRunData{ + projectKey: w.ProjectKey, + workflowName: w.Name, + applicationName: appName, + pipelineName: pipName, + environmentName: envName, + workflowRunNum: nr.Number, + workflowRunSubNum: nr.SubNumber, + status: nr.Status, + eventIntegrations: w.EventIntegrations, + workflowNodeRunID: nr.ID, + } + publishRunWorkflow(ctx, e, data) } // PublishWorkflowNodeJobRun publish a WorkflowNodeJobRun -func PublishWorkflowNodeJobRun(ctx context.Context, db gorp.SqlExecutor, pkey string, wr sdk.WorkflowRun, jr sdk.WorkflowNodeJobRun) { +func PublishWorkflowNodeJobRun(ctx context.Context, pkey string, wr sdk.WorkflowRun, jr sdk.WorkflowNodeJobRun) { e := sdk.EventRunWorkflowJob{ - ID: jr.ID, - Status: jr.Status, - Start: jr.Start.Unix(), + ID: jr.ID, + Status: jr.Status, + Start: jr.Start.Unix(), + Requirements: jr.Job.Action.Requirements, + WorkerName: jr.Job.WorkerName, + BookByName: jr.BookedBy.Name, + Parameters: jr.Parameters, } if sdk.StatusIsTerminated(jr.Status) { e.Done = jr.Done.Unix() } - publishRunWorkflow(ctx, e, pkey, wr.Workflow.Name, "", "", "", 0, 0, jr.Status, nil, wr.Workflow.EventIntegrations) + data := publishWorkflowRunData{ + projectKey: pkey, + workflowName: wr.Workflow.Name, + workflowRunNum: wr.Number, + workflowRunSubNum: wr.LastSubNumber, + status: jr.Status, + workflowRunTags: wr.Tags, + eventIntegrations: wr.Workflow.EventIntegrations, + workflowNodeRunID: jr.WorkflowNodeRunID, + } + publishRunWorkflow(ctx, e, data) } diff --git a/engine/api/events.go b/engine/api/events.go index f85c74a339..95a79e3619 100644 --- a/engine/api/events.go +++ b/engine/api/events.go @@ -94,10 +94,6 @@ func (b *eventsBroker) cacheSubscribe(c context.Context, cacheMsgChan chan<- sdk continue } - switch e.EventType { - case "sdk.EventJob": - continue - } observability.Record(b.router.Background, SSEEvents, 1) cacheMsgChan <- e } @@ -239,7 +235,7 @@ func (client *eventsBrokerSubscribe) manageEvent(db gorp.SqlExecutor, event sdk. var isHatcheryWithGroups = isHatchery && len(client.consumer.GroupIDs) > 0 switch { - case strings.HasPrefix(event.EventType, "sdk.EventProject") || strings.HasPrefix(event.EventType, "sdk.EventAsCodeEvent"): + case strings.HasPrefix(event.EventType, "sdk.EventProject") || strings.HasPrefix(event.EventType, "sdk.EventAsCodeEvent"): if client.consumer.Maintainer() && !isHatcheryWithGroups { return true, nil } diff --git a/engine/api/router.go b/engine/api/router.go index 536ef17eb3..d67be8af99 100644 --- a/engine/api/router.go +++ b/engine/api/router.go @@ -41,6 +41,8 @@ var ( Hits *stats.Int64Measure SSEClients *stats.Int64Measure SSEEvents *stats.Int64Measure + WebSocketClients *stats.Int64Measure + WebSocketEvents *stats.Int64Measure ServerRequestCount *stats.Int64Measure ServerRequestBytes *stats.Int64Measure ServerResponseBytes *stats.Int64Measure diff --git a/engine/api/router_metrics.go b/engine/api/router_metrics.go index 4cd79298e6..4be1f3773c 100644 --- a/engine/api/router_metrics.go +++ b/engine/api/router_metrics.go @@ -49,6 +49,14 @@ func InitRouterMetrics(s service.NamedService) error { "cds/sse_events", "number of sse events", stats.UnitDimensionless) + WebSocketClients = stats.Int64( + "cds/websocket_clients", + "number of websocket clients", + stats.UnitDimensionless) + WebSocketEvents = stats.Int64( + "cds/websocket_events", + "number of websocket events", + stats.UnitDimensionless) ServerRequestCount = stats.Int64( "cds/http/server/request_count", "Number of HTTP requests started", @@ -122,6 +130,8 @@ func InitRouterMetrics(s service.NamedService) error { observability.NewViewCount("cds/http/router/router_hits", Hits, []tag.Key{tagServiceType, tagServiceName}), observability.NewViewLast("cds/http/router/sse_clients", SSEClients, []tag.Key{tagServiceType, tagServiceName}), observability.NewViewCount("cds/http/router/sse_events", SSEEvents, []tag.Key{tagServiceType, tagServiceName}), + observability.NewViewLast("cds/http/router/websocket_clients", WebSocketClients, []tag.Key{tagServiceType, tagServiceName}), + observability.NewViewCount("cds/http/router/websocket_events", WebSocketEvents, []tag.Key{tagServiceType, tagServiceName}), ServerRequestCountView, ServerRequestBytesView, ServerResponseBytesView, diff --git a/engine/api/websocket.go b/engine/api/websocket.go new file mode 100644 index 0000000000..dda116d2a0 --- /dev/null +++ b/engine/api/websocket.go @@ -0,0 +1,399 @@ +package api + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + "sync" + "time" + + "github.com/go-gorp/gorp" + "github.com/gorilla/websocket" + "github.com/tevino/abool" + + "github.com/ovh/cds/engine/api/cache" + "github.com/ovh/cds/engine/api/observability" + "github.com/ovh/cds/engine/api/permission" + "github.com/ovh/cds/engine/service" + "github.com/ovh/cds/sdk" + "github.com/ovh/cds/sdk/log" +) + +var upgrader = websocket.Upgrader{} // use default options + +type websocketClient struct { + UUID string + AuthConsumer *sdk.AuthConsumer + isAlive *abool.AtomicBool + con *websocket.Conn + mutex sync.Mutex + filter sdk.WebsocketFilter + updateFilterChan chan sdk.WebsocketFilter +} + +type websocketBroker struct { + clients map[string]*websocketClient + cache cache.Store + dbFunc func() *gorp.DbMap + router *Router + messages chan sdk.Event + chanAddClient chan *websocketClient + chanRemoveClient chan string +} + +//Init the websocketBroker +func (b *websocketBroker) Init(ctx context.Context, panicCallback func(s string) (io.WriteCloser, error)) { + // Start cache Subscription + sdk.GoRoutine(ctx, "websocketBroker.Init.CacheSubscribe", func(ctx context.Context) { + b.cacheSubscribe(ctx, b.messages, b.cache) + }, panicCallback) + + sdk.GoRoutine(ctx, "websocketBroker.Init.Start", func(ctx context.Context) { + b.Start(ctx, panicCallback) + }, panicCallback) +} + +// Start the broker +func (b *websocketBroker) Start(ctx context.Context, panicCallback func(s string) (io.WriteCloser, error)) { + tickerMetrics := time.NewTicker(10 * time.Second) + defer tickerMetrics.Stop() + + for { + select { + case <-tickerMetrics.C: + observability.Record(b.router.Background, WebSocketClients, int64(len(b.clients))) + case <-ctx.Done(): + if b.clients != nil { + for uuid := range b.clients { + delete(b.clients, uuid) + } + observability.Record(b.router.Background, WebSocketClients, 0) + } + if ctx.Err() != nil { + log.Error(ctx, "websocketBroker.Start> Exiting: %v", ctx.Err()) + return + } + + case receivedEvent := <-b.messages: + for i := range b.clients { + c := b.clients[i] + if c == nil { + delete(b.clients, i) + continue + } + + // Send the event to the client websocket within a goroutine + s := "websocket-" + b.clients[i].UUID + sdk.GoRoutine(ctx, s, + func(ctx context.Context) { + if c.isAlive.IsSet() { + log.Debug("send data to %s", c.AuthConsumer.GetUsername()) + if err := c.send(ctx, receivedEvent); err != nil { + b.chanRemoveClient <- c.UUID + } + } + }, panicCallback, + ) + } + + case client := <-b.chanAddClient: + b.clients[client.UUID] = client + + case uuid := <-b.chanRemoveClient: + client, has := b.clients[uuid] + if !has { + continue + } + client.isAlive.UnSet() + delete(b.clients, uuid) + } + } +} + +func (b *websocketBroker) cacheSubscribe(ctx context.Context, cacheMsgChan chan<- sdk.Event, store cache.Store) { + if cacheMsgChan == nil { + return + } + + pubSub, err := store.Subscribe("events_pubsub") + if err != nil { + log.Error(ctx, "websocketBroker.cacheSubscribe> unable to subscribe to events_pubsub") + } + tick := time.NewTicker(50 * time.Millisecond) + defer tick.Stop() + for { + select { + case <-ctx.Done(): + if ctx.Err() != nil { + log.Error(ctx, "websocketBroker.cacheSubscribe> Exiting: %v", ctx.Err()) + return + } + case <-tick.C: + msg, err := store.GetMessageFromSubscription(ctx, pubSub) + if err != nil { + log.Warning(ctx, "websocketBroker.cacheSubscribe> Cannot get message %s: %s", msg, err) + continue + } + var e sdk.Event + if err := json.Unmarshal([]byte(msg), &e); err != nil { + // don't print the error as we doesn't care + continue + } + + observability.Record(b.router.Background, WebSocketEvents, 1) + cacheMsgChan <- e + } + } +} + +func (b *websocketBroker) ServeHTTP() service.Handler { + return func(ctx context.Context, w http.ResponseWriter, r *http.Request) (err error) { + c, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Warning(ctx, "websocket> upgrade: %v", err) + return err + } + defer c.Close() + + client := websocketClient{ + UUID: sdk.UUID(), + AuthConsumer: getAPIConsumer(ctx), + isAlive: abool.NewBool(true), + con: c, + updateFilterChan: make(chan sdk.WebsocketFilter, 10), + } + b.chanAddClient <- &client + + sdk.GoRoutine(ctx, fmt.Sprintf("readUpdateFilterChan-%s-%s", client.AuthConsumer.GetUsername(), client.UUID), func(ctx context.Context) { + client.readUpdateFilterChan(ctx, b.dbFunc()) + }) + + for { + if ctx.Err() != nil { + return ctx.Err() + } + + var msg sdk.WebsocketFilter + _, message, err := c.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + log.Warning(ctx, "websocket error: %v", err) + } + log.Debug("%s disconnected", client.AuthConsumer.GetUsername()) + break + } + + if err := json.Unmarshal(message, &msg); err != nil { + log.Warning(ctx, "websocket.readJSON: %v", err) + } + + // Send message to client + client.updateFilterChan <- msg + } + return nil + } +} + +func (c *websocketClient) readUpdateFilterChan(ctx context.Context, db *gorp.DbMap) { + for { + select { + case <-ctx.Done(): + log.Debug("events.Http: context done") + return + case m := <-c.updateFilterChan: + if err := c.updateEventFilter(ctx, db, m); err != nil { + log.Error(ctx, "websocketClient.readUpdateFilterChan: unable to update event filter: %v", err) + msg := sdk.WebsocketEvent{ + Status: "KO", + Error: sdk.Cause(err).Error(), + } + _ = c.con.WriteJSON(msg) + continue + } + } + } +} + +func (c *websocketClient) updateEventFilter(ctx context.Context, db gorp.SqlExecutor, m sdk.WebsocketFilter) error { + c.mutex.Lock() + defer c.mutex.Unlock() + switch m.Type { + case sdk.WebsocketFilterTypeProject: + if m.ProjectKey == "" { + return sdk.ErrWrongRequest + } + b, err := c.hasProjectPermission(ctx, db, m) + if err != nil { + return err + } + if b { + c.filter = sdk.WebsocketFilter{ + ProjectKey: m.ProjectKey, + Type: m.Type, + Operation: m.Operation, + } + } + case sdk.WebsocketFilterTypeApplication: + if m.ProjectKey == "" || m.ApplicationName == "" { + return sdk.ErrWrongRequest + } + b, err := c.hasProjectPermission(ctx, db, m) + if err != nil { + return err + } + if b { + c.filter = sdk.WebsocketFilter{ + ProjectKey: m.ProjectKey, + Type: m.Type, + ApplicationName: m.ApplicationName, + Operation: m.Operation, + } + } + case sdk.WebsocketFilterTypePipeline: + if m.ProjectKey == "" || m.PipelineName == "" { + return sdk.ErrWrongRequest + } + b, err := c.hasProjectPermission(ctx, db, m) + if err != nil { + return err + } + if b { + c.filter = sdk.WebsocketFilter{ + ProjectKey: m.ProjectKey, + Type: m.Type, + PipelineName: m.PipelineName, + Operation: m.Operation, + } + } + case sdk.WebsocketFilterTypeEnvironment: + if m.ProjectKey == "" || m.EnvironmentName == "" { + return sdk.ErrWrongRequest + } + b, err := c.hasProjectPermission(ctx, db, m) + if err != nil { + return err + } + if b { + c.filter = sdk.WebsocketFilter{ + ProjectKey: m.ProjectKey, + Type: m.Type, + EnvironmentName: m.EnvironmentName, + Operation: m.Operation, + } + } + case sdk.WebsocketFilterTypeWorkflow: + if m.ProjectKey == "" || m.WorkflowName == "" { + return sdk.ErrWrongRequest + } + perms, err := permission.LoadWorkflowMaxLevelPermission(ctx, db, m.ProjectKey, []string{m.WorkflowName}, getAPIConsumer(ctx).GetGroupIDs()) + if err != nil { + return err + } + maxLevelPermission := perms.Level(m.WorkflowName) + if maxLevelPermission < sdk.PermissionRead && !isMaintainer(ctx) && !isAdmin(ctx) { + return sdk.WithStack(sdk.ErrForbidden) + } + + c.filter = sdk.WebsocketFilter{ + ProjectKey: m.ProjectKey, + Type: m.Type, + WorkflowName: m.WorkflowName, + WorkflowNodeRunID: m.WorkflowNodeRunID, + WorkflowRunNumber: m.WorkflowRunNumber, + Operation: m.Operation, + } + case sdk.WebsocketFilterTypeQueue: + c.filter = sdk.WebsocketFilter{ + Queue: true, + Type: m.Type, + } + } + + return nil +} + +func (c *websocketClient) hasProjectPermission(ctx context.Context, db gorp.SqlExecutor, m sdk.WebsocketFilter) (bool, error) { + perms, err := permission.LoadProjectMaxLevelPermission(ctx, db, []string{m.ProjectKey}, getAPIConsumer(ctx).GetGroupIDs()) + if err != nil { + return false, err + } + maxLevelPermission := perms.Level(m.ProjectKey) + if maxLevelPermission < sdk.PermissionRead && !isMaintainer(ctx) && !isAdmin(ctx) { + return false, sdk.WithStack(sdk.ErrForbidden) + } + return true, nil +} + +// Send an event to a client +func (c *websocketClient) send(ctx context.Context, event sdk.Event) (err error) { + c.mutex.Lock() + defer c.mutex.Unlock() + defer func() { + if r := recover(); r != nil { + err = sdk.WithStack(fmt.Errorf("websocketClient.Send recovered %v", r)) + } + }() + + if c == nil || c.con == nil || !c.isAlive.IsSet() { + return sdk.WithStack(fmt.Errorf("client deconnected")) + } + + sendEvent := false + switch { + // Event on Job + case event.EventType == fmt.Sprintf("%T", sdk.EventRunWorkflowJob{}) && c.filter.Queue && c.filter.Type == sdk.WebsocketFilterTypeQueue: + sendEvent = true + // Event on Operation + case event.EventType == fmt.Sprintf("%T", sdk.Operation{}) && c.filter.Operation == event.OperationUUID && c.filter.ProjectKey == event.ProjectKey: + sendEvent = true + // Event on project + case strings.HasPrefix(event.EventType, "sdk.EventProject") && event.ProjectKey == c.filter.ProjectKey && c.filter.Type == sdk.WebsocketFilterTypeProject: + sendEvent = true + // Event on application + case strings.HasPrefix(event.EventType, "sdk.EventApplication") && event.ProjectKey == c.filter.ProjectKey && event.ApplicationName == c.filter.ApplicationName && c.filter.Type == sdk.WebsocketFilterTypeApplication: + sendEvent = true + // Event on pipeline + case strings.HasPrefix(event.EventType, "sdk.EventPipeline") && event.ProjectKey == c.filter.ProjectKey && event.PipelineName == c.filter.PipelineName && c.filter.Type == sdk.WebsocketFilterTypePipeline: + sendEvent = true + // Event on environment + case strings.HasPrefix(event.EventType, "sdk.EventEnvironment") && event.ProjectKey == c.filter.ProjectKey && event.EnvironmentName == c.filter.EnvironmentName && c.filter.Type == sdk.WebsocketFilterTypeEnvironment: + sendEvent = true + // Event on workflow + case strings.HasPrefix(event.EventType, "sdk.EventWorkflow") && event.ProjectKey == c.filter.ProjectKey && event.WorkflowName == c.filter.WorkflowName && c.filter.Type == sdk.WebsocketFilterTypeWorkflow: + sendEvent = true + // Event on runworkflow* + case strings.HasPrefix(event.EventType, "sdk.EventRunWorkflow") && c.filter.Type == sdk.WebsocketFilterTypeWorkflow: + if event.ProjectKey != c.filter.ProjectKey || event.WorkflowName != c.filter.WorkflowName { + sendEvent = false + } + if c.filter.WorkflowRunNumber != 0 && event.WorkflowRunNum != c.filter.WorkflowRunNumber { + sendEvent = false + } + // WORKFLOW NODE RUN EVENT + if c.filter.WorkflowNodeRunID != 0 && event.WorkflowNodeRunID != c.filter.WorkflowNodeRunID { + sendEvent = false + } + sendEvent = true + default: + sendEvent = false + } + + if !sendEvent { + return nil + } + + msg := sdk.WebsocketEvent{ + Status: "OK", + Event: event, + } + if err := c.con.WriteJSON(msg); err != nil { + if strings.Contains(err.Error(), "use of closed network connection") { + return sdk.WithStack(err) + } + log.Error(ctx, "websocketClient.Send > unable to write json: %v", err) + } + return nil +} diff --git a/engine/api/websocket_test.go b/engine/api/websocket_test.go new file mode 100644 index 0000000000..afc6f977aa --- /dev/null +++ b/engine/api/websocket_test.go @@ -0,0 +1,266 @@ +package api + +import ( + "context" + "github.com/ovh/cds/engine/api/authentication" + "github.com/ovh/cds/engine/api/authentication/builtin" + "github.com/ovh/cds/engine/api/test/assets" + "github.com/ovh/cds/engine/api/workflow" + "github.com/ovh/cds/sdk" + "github.com/ovh/cds/sdk/cdsclient" + "github.com/stretchr/testify/require" + "net/url" + "strings" + "testing" + "time" +) + +func Test_websocketWrongFilters(t *testing.T) { + api, tsURL, tsClose := newTestServer(t) + defer tsClose() + + u, _ := assets.InsertAdminUser(t, api.mustDB()) + localConsumer, err := authentication.LoadConsumerByTypeAndUserID(context.TODO(), api.mustDB(), sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser) + require.NoError(t, err) + + _, jws, err := builtin.NewConsumer(context.TODO(), api.mustDB(), sdk.RandomString(10), sdk.RandomString(10), localConsumer, u.GetGroupIDs(), + sdk.NewAuthConsumerScopeDetails(sdk.AuthConsumerScopeProject)) + + chanMessageReceived := make(chan sdk.WebsocketEvent) + chanMessageToSend := make(chan sdk.WebsocketFilter) + + client := cdsclient.New(cdsclient.Config{ + Host: tsURL, + User: u.Username, + InsecureSkipVerifyTLS: true, + BuitinConsumerAuthenticationToken: jws, + }) + go client.WebsocketEventsListen(context.TODO(), chanMessageToSend, chanMessageReceived) + + // Subscribe to project without project key + chanMessageToSend <- sdk.WebsocketFilter{ + Type: sdk.WebsocketFilterTypeProject, + ProjectKey: "", + } + response := <-chanMessageReceived + require.Equal(t, "KO", response.Status) + require.Equal(t, "wrong request", response.Error) + + // Subscribe to application without application name + chanMessageToSend <- sdk.WebsocketFilter{ + Type: sdk.WebsocketFilterTypeApplication, + ProjectKey: "Key", + ApplicationName: "", + } + response = <-chanMessageReceived + require.Equal(t, "KO", response.Status) + require.Equal(t, "wrong request", response.Error) + + // Subscribe to application without project key + chanMessageToSend <- sdk.WebsocketFilter{ + Type: sdk.WebsocketFilterTypeApplication, + ProjectKey: "", + ApplicationName: "App1", + } + response = <-chanMessageReceived + require.Equal(t, "KO", response.Status) + require.Equal(t, "wrong request", response.Error) + + // Subscribe to pipeline without pipeline name + chanMessageToSend <- sdk.WebsocketFilter{ + Type: sdk.WebsocketFilterTypeApplication, + ProjectKey: "Key", + PipelineName: "", + } + response = <-chanMessageReceived + require.Equal(t, "KO", response.Status) + require.Equal(t, "wrong request", response.Error) + + // Subscribe to pipeline without project key + chanMessageToSend <- sdk.WebsocketFilter{ + Type: sdk.WebsocketFilterTypePipeline, + ProjectKey: "", + PipelineName: "PipName", + } + response = <-chanMessageReceived + require.Equal(t, "KO", response.Status) + require.Equal(t, "wrong request", response.Error) + + // Subscribe to environment without environment name + chanMessageToSend <- sdk.WebsocketFilter{ + Type: sdk.WebsocketFilterTypeEnvironment, + ProjectKey: "Key", + EnvironmentName: "", + } + response = <-chanMessageReceived + require.Equal(t, "KO", response.Status) + require.Equal(t, "wrong request", response.Error) + + // Subscribe to environment without project key + chanMessageToSend <- sdk.WebsocketFilter{ + Type: sdk.WebsocketFilterTypeEnvironment, + ProjectKey: "", + EnvironmentName: "EnvNmae", + } + response = <-chanMessageReceived + require.Equal(t, "KO", response.Status) + require.Equal(t, "wrong request", response.Error) + + // Subscribe to workflow without workflow name + chanMessageToSend <- sdk.WebsocketFilter{ + Type: sdk.WebsocketFilterTypeWorkflow, + ProjectKey: "Key", + WorkflowName: "", + } + response = <-chanMessageReceived + require.Equal(t, "KO", response.Status) + require.Equal(t, "wrong request", response.Error) + + // Subscribe to workflow without project key + chanMessageToSend <- sdk.WebsocketFilter{ + Type: sdk.WebsocketFilterTypeWorkflow, + ProjectKey: "", + WorkflowName: "WorkflowName", + } + response = <-chanMessageReceived + require.Equal(t, "KO", response.Status) + require.Equal(t, "wrong request", response.Error) +} + +func Test_websocketGetWorkflowEvent(t *testing.T) { + api, tsURL, tsClose := newTestServer(t) + defer tsClose() + + u, _ := assets.InsertAdminUser(t, api.mustDB()) + localConsumer, err := authentication.LoadConsumerByTypeAndUserID(context.TODO(), api.mustDB(), sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser) + require.NoError(t, err) + + key := sdk.RandomString(10) + proj := assets.InsertTestProject(t, api.mustDB(), api.Cache, key, key) + + w := sdk.Workflow{ + Name: "workflow1", + ProjectID: proj.ID, + ProjectKey: proj.Key, + WorkflowData: sdk.WorkflowData{ + Node: sdk.Node{ + Name: "root", + Type: sdk.NodeTypeFork, + }, + }, + } + require.NoError(t, workflow.Insert(context.TODO(), api.mustDB(), api.Cache, *proj, &w)) + + _, jws, err := builtin.NewConsumer(context.TODO(), api.mustDB(), sdk.RandomString(10), sdk.RandomString(10), localConsumer, u.GetGroupIDs(), + sdk.NewAuthConsumerScopeDetails(sdk.AuthConsumerScopeProject)) + + chanMessageReceived := make(chan sdk.WebsocketEvent) + chanMessageToSend := make(chan sdk.WebsocketFilter) + + client := cdsclient.New(cdsclient.Config{ + Host: tsURL, + User: u.Username, + InsecureSkipVerifyTLS: true, + BuitinConsumerAuthenticationToken: jws, + }) + go client.WebsocketEventsListen(context.TODO(), chanMessageToSend, chanMessageReceived) + + chanMessageToSend <- sdk.WebsocketFilter{ + Type: sdk.WebsocketFilterTypeWorkflow, + ProjectKey: key, + WorkflowName: w.Name, + } + // Waiting websocket to update filter + time.Sleep(1 * time.Second) + + api.websocketBroker.messages <- sdk.Event{ProjectKey: proj.Key, WorkflowName: w.Name, EventType: "sdk.EventWorkflow"} + response := <-chanMessageReceived + require.Equal(t, "OK", response.Status) + require.Equal(t, response.Event.EventType, "sdk.EventWorkflow") + require.Equal(t, response.Event.ProjectKey, proj.Key) + require.Equal(t, response.Event.WorkflowName, w.Name) +} + +func Test_websocketDeconnection(t *testing.T) { + api, tsURL, tsClose := newTestServer(t) + defer tsClose() + + u, _ := assets.InsertAdminUser(t, api.mustDB()) + localConsumer, err := authentication.LoadConsumerByTypeAndUserID(context.TODO(), api.mustDB(), sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser) + require.NoError(t, err) + + key := sdk.RandomString(10) + proj := assets.InsertTestProject(t, api.mustDB(), api.Cache, key, key) + + w := sdk.Workflow{ + Name: "workflow1", + ProjectID: proj.ID, + ProjectKey: proj.Key, + WorkflowData: sdk.WorkflowData{ + Node: sdk.Node{ + Name: "root", + Type: sdk.NodeTypeFork, + }, + }, + } + require.NoError(t, workflow.Insert(context.TODO(), api.mustDB(), api.Cache, *proj, &w)) + + _, jws, err := builtin.NewConsumer(context.TODO(), api.mustDB(), sdk.RandomString(10), sdk.RandomString(10), localConsumer, u.GetGroupIDs(), + sdk.NewAuthConsumerScopeDetails(sdk.AuthConsumerScopeProject)) + + // Open websocket + client := cdsclient.New(cdsclient.Config{ + Host: tsURL, + User: u.Username, + InsecureSkipVerifyTLS: true, + BuitinConsumerAuthenticationToken: jws, + }) + resp, err := client.AuthConsumerSignin(sdk.ConsumerBuiltin, sdk.AuthConsumerSigninRequest{"token": jws}) + require.NoError(t, err) + token := resp.Token + + uHost, err := url.Parse(tsURL) + require.NoError(t, err) + urlWebsocket := url.URL{ + Scheme: strings.Replace(uHost.Scheme, "http", "ws", -1), + Host: uHost.Host, + Path: "/ws", + } + headers := make(map[string][]string) + date := sdk.FormatDateRFC5322(time.Now()) + headers["Date"] = []string{date} + headers["X-CDS-RemoteTime"] = []string{date} + auth := "Bearer " + token + headers["Authorization"] = []string{auth} + con, _, err := client.HTTPWebsocketClient().Dial(urlWebsocket.String(), headers) + require.NoError(t, err) + defer con.Close() // nolint + + // Waiting the websocket add the client + time.Sleep(1 * time.Second) + + // Send filter + err = con.WriteJSON(sdk.WebsocketFilter{ + Type: sdk.WebsocketFilterTypeWorkflow, + ProjectKey: key, + WorkflowName: w.Name, + }) + require.NoError(t, err) + + // Waiting websocket to update filter + time.Sleep(1 * time.Second) + + // Send message to client + go func() { + for i := 0; i < 100; i++ { + api.websocketBroker.messages <- sdk.Event{ProjectKey: proj.Key, WorkflowName: w.Name, EventType: "sdk.EventWorkflow"} + time.Sleep(200 * time.Millisecond) + } + }() + // Kill client + con.Close() + + time.Sleep(1 * time.Second) + + require.Equal(t, len(api.websocketBroker.clients), 0) +} diff --git a/engine/api/workflow_event.go b/engine/api/workflow_event.go index b2b9041796..41f6f75417 100644 --- a/engine/api/workflow_event.go +++ b/engine/api/workflow_event.go @@ -76,6 +76,6 @@ func WorkflowSendEvent(ctx context.Context, db gorp.SqlExecutor, store cache.Sto log.Warning(ctx, "workflowSendEvent> Cannot load workflow run %d: %s", noderun.WorkflowRunID, errWR) continue } - event.PublishWorkflowNodeJobRun(ctx, db, proj.Key, *wr, jobrun) + event.PublishWorkflowNodeJobRun(ctx, proj.Key, *wr, jobrun) } } diff --git a/engine/ui/types.go b/engine/ui/types.go index 9cfc853d4b..78e4a56f72 100644 --- a/engine/ui/types.go +++ b/engine/ui/types.go @@ -16,7 +16,7 @@ type Service struct { // Configuration is the ui configuration structure type Configuration struct { Name string `toml:"name" comment:"Name of this CDS UI Service\n Enter a name to enable this service" json:"name"` - Staticdir string `toml:"staticdir" default:"./ui_static_files" comment:"This directory must contains index.html file and other ui files (css, js...) from ui.tar.gz artifact." json:"staticdir"` + Staticdir string `toml:"staticdir" default:"./ui_static_files" comment:"This directory must contain the dist directory." json:"staticdir"` BaseURL string `toml:"baseURL" default:"/" comment:"Base URL. If you expose CDS UI with https://your-domain.com/ui, enter the value '/ui'" json:"baseURL"` SentryURL string `toml:"sentryURL" default:"" comment:"Sentry URL. Optional" json:"sentryURL"` HTTP struct { diff --git a/go.mod b/go.mod index 6ce8c1120c..e8598c3694 100644 --- a/go.mod +++ b/go.mod @@ -69,6 +69,7 @@ require ( github.com/gorhill/cronexpr v0.0.0-20161205141322-d520615e531a github.com/gorilla/handlers v0.0.0-20160816184729-a5775781a543 github.com/gorilla/mux v1.6.2 + github.com/gorilla/websocket v1.4.0 github.com/gregjones/httpcache v0.0.0-20190212212710-3befbb6ad0cc // indirect github.com/grpc-ecosystem/grpc-gateway v1.9.6 // indirect github.com/hashicorp/consul v1.3.0 // indirect diff --git a/sdk/cdsclient/client.go b/sdk/cdsclient/client.go index dcc84e7911..7a2438d74f 100644 --- a/sdk/cdsclient/client.go +++ b/sdk/cdsclient/client.go @@ -11,16 +11,27 @@ import ( "sync" "time" + "github.com/gorilla/websocket" + "github.com/ovh/cds/sdk" ) var _ Interface = new(client) type client struct { - httpClient *http.Client - httpSSEClient *http.Client - config *Config - name string + httpClient *http.Client + httpSSEClient *http.Client + httpWebsocketClient *websocket.Dialer + config *Config + name string +} + +func NewWebsocketDialer(insecureSkipVerifyTLS bool) *websocket.Dialer { + return &websocket.Dialer{ + Proxy: http.ProxyFromEnvironment, + HandshakeTimeout: 10 * time.Second, + TLSClientConfig: &tls.Config{InsecureSkipVerify: insecureSkipVerifyTLS}, + } } // NewHTTPClient returns a new HTTP Client. @@ -58,6 +69,7 @@ func New(c Config) Interface { cli.config.Mutex = new(sync.Mutex) cli.httpClient = NewHTTPClient(time.Second*60, c.InsecureSkipVerifyTLS) cli.httpSSEClient = NewHTTPClient(0, c.InsecureSkipVerifyTLS) + cli.httpWebsocketClient = NewWebsocketDialer(c.InsecureSkipVerifyTLS) cli.init() return cli } @@ -78,6 +90,7 @@ func NewWorker(endpoint string, name string, c *http.Client) WorkerInterface { cli.httpClient = c } cli.httpSSEClient = NewHTTPClient(0, false) + cli.httpWebsocketClient = NewWebsocketDialer(false) cli.name = name cli.init() @@ -112,6 +125,7 @@ func NewProviderClient(cfg ProviderConfig) ProviderClient { cli.config.Mutex = new(sync.Mutex) cli.httpClient = NewHTTPClient(time.Duration(cfg.RequestSecondsTimeout)*time.Second, conf.InsecureSkipVerifyTLS) cli.httpSSEClient = NewHTTPClient(0, conf.InsecureSkipVerifyTLS) + cli.httpWebsocketClient = NewWebsocketDialer(conf.InsecureSkipVerifyTLS) cli.init() return cli } @@ -134,6 +148,7 @@ func NewServiceClient(cfg ServiceConfig) (Interface, []byte, error) { cli.config.Mutex = new(sync.Mutex) cli.httpClient = NewHTTPClient(time.Duration(cfg.RequestSecondsTimeout)*time.Second, conf.InsecureSkipVerifyTLS) cli.httpSSEClient = NewHTTPClient(0, conf.InsecureSkipVerifyTLS) + cli.httpWebsocketClient = NewWebsocketDialer(conf.InsecureSkipVerifyTLS) cli.config.Verbose = cfg.Verbose cli.init() @@ -181,3 +196,6 @@ func (c *client) HTTPClient() *http.Client { func (c *client) HTTPSSEClient() *http.Client { return c.httpSSEClient } +func (c *client) HTTPWebsocketClient() *websocket.Dialer { + return c.httpWebsocketClient +} diff --git a/sdk/cdsclient/client_events.go b/sdk/cdsclient/client_events.go index 312a103970..a46f9f68a1 100644 --- a/sdk/cdsclient/client_events.go +++ b/sdk/cdsclient/client_events.go @@ -4,6 +4,8 @@ import ( "context" "log" "time" + + "github.com/ovh/cds/sdk" ) func (c *client) EventsListen(ctx context.Context, chanSSEvt chan<- SSEvent) { @@ -14,3 +16,11 @@ func (c *client) EventsListen(ctx context.Context, chanSSEvt chan<- SSEvent) { time.Sleep(1 * time.Second) } } +func (c *client) WebsocketEventsListen(ctx context.Context, chanMsgToSend <-chan sdk.WebsocketFilter, chanMsgReceived chan<- sdk.WebsocketEvent) { + for ctx.Err() == nil { + if err := c.RequestWebsocket(ctx, "/ws", chanMsgToSend, chanMsgReceived); err != nil { + log.Printf("websocket error: %v\n", err) + } + time.Sleep(1 * time.Second) + } +} diff --git a/sdk/cdsclient/http_websocket.go b/sdk/cdsclient/http_websocket.go new file mode 100644 index 0000000000..09fd50f2c9 --- /dev/null +++ b/sdk/cdsclient/http_websocket.go @@ -0,0 +1,93 @@ +package cdsclient + +import ( + "context" + "encoding/json" + "fmt" + "net/url" + "runtime/pprof" + "strings" + "time" + + "github.com/gorilla/websocket" + + "github.com/ovh/cds/sdk" + "github.com/ovh/cds/sdk/log" +) + +func (c *client) RequestWebsocket(ctx context.Context, path string, msgToSend <-chan sdk.WebsocketFilter, msgReceived chan<- sdk.WebsocketEvent) error { + wsContext, wsContextCancel := context.WithCancel(ctx) + + // Checks that current session_token is still valid + // If not, challenge a new one against the authenticationToken + if !c.config.HasValidSessionToken() && c.config.BuitinConsumerAuthenticationToken != "" { + resp, err := c.AuthConsumerSignin(sdk.ConsumerBuiltin, sdk.AuthConsumerSigninRequest{"token": c.config.BuitinConsumerAuthenticationToken}) + if err != nil { + return err + } + c.config.SessionToken = resp.Token + } + + labels := pprof.Labels("path", path, "method", "GET") + wsContext = pprof.WithLabels(wsContext, labels) + pprof.SetGoroutineLabels(wsContext) + + uHost, err := url.Parse(c.config.Host) + if err != nil { + return sdk.WrapError(err, "wrong Host configuration") + } + urlWebsocket := url.URL{ + Scheme: strings.Replace(uHost.Scheme, "http", "ws", -1), + Host: uHost.Host, + Path: "/ws", + } + + headers := make(map[string][]string) + date := sdk.FormatDateRFC5322(time.Now()) + headers["Date"] = []string{date} + headers["X-CDS-RemoteTime"] = []string{date} + auth := "Bearer " + c.config.SessionToken + headers["Authorization"] = []string{auth} + con, _, err := c.httpWebsocketClient.Dial(urlWebsocket.String(), headers) + if err != nil { + return sdk.WithStack(err) + } + defer con.Close() // nolint + + // Message to send + sdk.GoRoutine(wsContext, fmt.Sprintf("RequestWebsocket-%s-%s", c.config.User, sdk.UUID()), func(ctx context.Context) { + for { + select { + case <-ctx.Done(): + log.Warning(wsContext, "Leaving....") + return + case m := <-msgToSend: + if err := con.WriteJSON(m); err != nil { + log.Error(wsContext, "ws: unable to send message: %v", err) + } + } + } + }) + + for { + if ctx.Err() != nil { + return ctx.Err() + } + _, message, err := con.ReadMessage() + if err != nil { + if websocket.IsCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + log.Warning(ctx, "websocket error: %v", err) + wsContextCancel() + return err + } + log.Error(ctx, "ws: unable to read message: %v", err) + continue + } + var wsEvent sdk.WebsocketEvent + if err := json.Unmarshal(message, &wsEvent); err != nil { + log.Error(ctx, "ws: unable to unmarshal message: %s : %v", string(message), err) + continue + } + msgReceived <- wsEvent + } +} diff --git a/sdk/cdsclient/interface.go b/sdk/cdsclient/interface.go index fb9763fefe..b733b8e300 100644 --- a/sdk/cdsclient/interface.go +++ b/sdk/cdsclient/interface.go @@ -8,6 +8,7 @@ import ( "net/url" "time" + "github.com/gorilla/websocket" "github.com/sguiheux/go-coverage" "github.com/ovh/cds/sdk" @@ -144,6 +145,7 @@ type EnvironmentVariableClient interface { type EventsClient interface { // Must be run in a go routine EventsListen(ctx context.Context, chanSSEvt chan<- SSEvent) + WebsocketEventsListen(ctx context.Context, chanMsgToSend <-chan sdk.WebsocketFilter, chanMsgReceived chan<- sdk.WebsocketEvent) } // DownloadClient exposes download related functions @@ -401,6 +403,7 @@ type Raw interface { Request(ctx context.Context, method string, path string, body io.Reader, mods ...RequestModifier) ([]byte, http.Header, int, error) HTTPClient() *http.Client HTTPSSEClient() *http.Client + HTTPWebsocketClient() *websocket.Dialer } // GRPCPluginsClient exposes plugins API diff --git a/sdk/cdsclient/mock_cdsclient/interface_mock.go b/sdk/cdsclient/mock_cdsclient/interface_mock.go index 34aa4aaba2..5703aa5339 100644 --- a/sdk/cdsclient/mock_cdsclient/interface_mock.go +++ b/sdk/cdsclient/mock_cdsclient/interface_mock.go @@ -8,6 +8,7 @@ import ( tar "archive/tar" context "context" gomock "github.com/golang/mock/gomock" + websocket "github.com/gorilla/websocket" sdk "github.com/ovh/cds/sdk" cdsclient "github.com/ovh/cds/sdk/cdsclient" venom "github.com/ovh/venom" @@ -1720,6 +1721,18 @@ func (mr *MockEventsClientMockRecorder) EventsListen(ctx, chanSSEvt interface{}) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EventsListen", reflect.TypeOf((*MockEventsClient)(nil).EventsListen), ctx, chanSSEvt) } +// WebsocketEventsListen mocks base method +func (m *MockEventsClient) WebsocketEventsListen(ctx context.Context, chanMsgToSend <-chan sdk.WebsocketFilter, chanMsgReceived chan<- sdk.WebsocketEvent) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "WebsocketEventsListen", ctx, chanMsgToSend, chanMsgReceived) +} + +// WebsocketEventsListen indicates an expected call of WebsocketEventsListen +func (mr *MockEventsClientMockRecorder) WebsocketEventsListen(ctx, chanMsgToSend, chanMsgReceived interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WebsocketEventsListen", reflect.TypeOf((*MockEventsClient)(nil).WebsocketEventsListen), ctx, chanMsgToSend, chanMsgReceived) +} + // MockDownloadClient is a mock of DownloadClient interface type MockDownloadClient struct { ctrl *gomock.Controller @@ -4349,6 +4362,20 @@ func (mr *MockInterfaceMockRecorder) HTTPSSEClient() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HTTPSSEClient", reflect.TypeOf((*MockInterface)(nil).HTTPSSEClient)) } +// HTTPWebsocketClient mocks base method +func (m *MockInterface) HTTPWebsocketClient() *websocket.Dialer { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "HTTPWebsocketClient") + ret0, _ := ret[0].(*websocket.Dialer) + return ret0 +} + +// HTTPWebsocketClient indicates an expected call of HTTPWebsocketClient +func (mr *MockInterfaceMockRecorder) HTTPWebsocketClient() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HTTPWebsocketClient", reflect.TypeOf((*MockInterface)(nil).HTTPWebsocketClient)) +} + // AuthDriverList mocks base method func (m *MockInterface) AuthDriverList() (sdk.AuthDriverResponse, error) { m.ctrl.T.Helper() @@ -5511,6 +5538,18 @@ func (mr *MockInterfaceMockRecorder) EventsListen(ctx, chanSSEvt interface{}) *g return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EventsListen", reflect.TypeOf((*MockInterface)(nil).EventsListen), ctx, chanSSEvt) } +// WebsocketEventsListen mocks base method +func (m *MockInterface) WebsocketEventsListen(ctx context.Context, chanMsgToSend <-chan sdk.WebsocketFilter, chanMsgReceived chan<- sdk.WebsocketEvent) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "WebsocketEventsListen", ctx, chanMsgToSend, chanMsgReceived) +} + +// WebsocketEventsListen indicates an expected call of WebsocketEventsListen +func (mr *MockInterfaceMockRecorder) WebsocketEventsListen(ctx, chanMsgToSend, chanMsgReceived interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WebsocketEventsListen", reflect.TypeOf((*MockInterface)(nil).WebsocketEventsListen), ctx, chanMsgToSend, chanMsgReceived) +} + // PipelineExport mocks base method func (m *MockInterface) PipelineExport(projectKey, name string, mods ...cdsclient.RequestModifier) ([]byte, error) { m.ctrl.T.Helper() @@ -8796,6 +8835,20 @@ func (mr *MockRawMockRecorder) HTTPSSEClient() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HTTPSSEClient", reflect.TypeOf((*MockRaw)(nil).HTTPSSEClient)) } +// HTTPWebsocketClient mocks base method +func (m *MockRaw) HTTPWebsocketClient() *websocket.Dialer { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "HTTPWebsocketClient") + ret0, _ := ret[0].(*websocket.Dialer) + return ret0 +} + +// HTTPWebsocketClient indicates an expected call of HTTPWebsocketClient +func (mr *MockRawMockRecorder) HTTPWebsocketClient() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HTTPWebsocketClient", reflect.TypeOf((*MockRaw)(nil).HTTPWebsocketClient)) +} + // MockGRPCPluginsClient is a mock of GRPCPluginsClient interface type MockGRPCPluginsClient struct { ctrl *gomock.Controller diff --git a/sdk/event.go b/sdk/event.go index ba2029fd25..282341106d 100644 --- a/sdk/event.go +++ b/sdk/event.go @@ -25,6 +25,8 @@ type Event struct { WorkflowName string `json:"workflow_name,omitempty"` WorkflowRunNum int64 `json:"workflow_run_num,omitempty"` WorkflowRunNumSub int64 `json:"workflow_run_num_sub,omitempty"` + WorkflowNodeRunID int64 `json:"workflow_node_run_id,omitempty"` + OperationUUID string `json:"operation_uuid,omitempty"` Status string `json:"status,omitempty"` Tags []WorkflowRunTag `json:"tag,omitempty"` EventIntegrationsID []int64 `json:"event_integrations_id"` @@ -104,10 +106,14 @@ type EventRunWorkflowOutgoingHook struct { // EventRunWorkflowJob contains event data for a workflow job node run type EventRunWorkflowJob struct { - ID int64 `json:"id,omitempty"` - Status string `json:"status,omitempty"` - Start int64 `json:"start,omitempty"` - Done int64 `json:"done,omitempty"` + ID int64 `json:"id,omitempty"` + Status string `json:"status,omitempty"` + Start int64 `json:"start,omitempty"` + Done int64 `json:"done,omitempty"` + Requirements []Requirement `json:"requirements,omitempty"` + WorkerName string `json:"worker_name,omitempty"` + BookByName string `json:"book_by_name,omitempty"` + Parameters []Parameter `json:"parameters,omitempty"` } // EventRunWorkflow contains event data for a workflow run @@ -122,25 +128,6 @@ type EventRunWorkflow struct { Tags []WorkflowRunTag `json:"tags"` } -// EventJob contains event data for a job -type EventJob struct { - Version int64 `json:"version,omitempty"` - JobName string `json:"jobName,omitempty"` - JobID int64 `json:"jobID,omitempty"` - Status string `json:"status,omitempty"` - Queued int64 `json:"queued,omitempty"` - Start int64 `json:"start,omitempty"` - Done int64 `json:"done,omitempty"` - ModelName string `json:"modelName,omitempty"` - PipelineName string `json:"pipelineName,omitempty"` - PipelineType string `json:"type,omitempty"` - ProjectKey string `json:"projectKey,omitempty"` - ApplicationName string `json:"applicationName,omitempty"` - EnvironmentName string `json:"environmentName,omitempty"` - BranchName string `json:"branchName,omitempty"` - Hash string `json:"hash,omitempty"` -} - // EventNotif contains event data for a job type EventNotif struct { Recipients []string `json:"recipients"` diff --git a/sdk/websocket.go b/sdk/websocket.go new file mode 100644 index 0000000000..a27b6f4029 --- /dev/null +++ b/sdk/websocket.go @@ -0,0 +1,30 @@ +package sdk + +const ( + WebsocketFilterTypeProject = "project" + WebsocketFilterTypeApplication = "application" + WebsocketFilterTypePipeline = "pipeline" + WebsocketFilterTypeEnvironment = "environment" + WebsocketFilterTypeWorkflow = "workflow" + WebsocketFilterTypeQueue = "queue" +) + +type WebsocketFilter struct { + Type string `json:"type"` + ProjectKey string `json:"project_key"` + ApplicationName string `json:"application_name"` + PipelineName string `json:"pipeline_name"` + EnvironmentName string `json:"environment_name"` + WorkflowName string `json:"workflow_name"` + WorkflowRunNumber int64 `json:"workflow_run_num"` + WorkflowNodeRunID int64 `json:"workflow_node_run_id"` + Favorites bool `json:"favorites"` + Queue bool `json:"queue"` + Operation string `json:"operation"` +} + +type WebsocketEvent struct { + Status string `json:"status"` + Error string `json:"error"` + Event Event `json:"event"` +} diff --git a/ui/proxy.conf.json b/ui/proxy.conf.json index e5173c24ca..70cae2557e 100644 --- a/ui/proxy.conf.json +++ b/ui/proxy.conf.json @@ -2,6 +2,7 @@ "/cdsapi": { "target": "http://localhost:8081", "secure": false, + "ws": true, "pathRewrite": { "^/cdsapi": "" } diff --git a/ui/src/app/app.component.ts b/ui/src/app/app.component.ts index 7248650544..028f973e03 100644 --- a/ui/src/app/app.component.ts +++ b/ui/src/app/app.component.ts @@ -6,10 +6,12 @@ import { Title } from '@angular/platform-browser'; import { ActivatedRoute, NavigationEnd, NavigationStart, ResolveEnd, ResolveStart, Router } from '@angular/router'; import { TranslateService } from '@ngx-translate/core'; import { Store } from '@ngxs/store'; +import { EventService } from 'app/event.service'; import { WorkflowNodeRun } from 'app/model/workflow.run.model'; import { GetCDSStatus } from 'app/store/cds.action'; import { CDSState } from 'app/store/cds.state'; import { Observable } from 'rxjs'; +import { WebSocketSubject } from 'rxjs/internal-compatibility'; import { bufferTime, filter, map, mergeMap } from 'rxjs/operators'; import { Subscription } from 'rxjs/Subscription'; import * as format from 'string-format-obj'; @@ -57,10 +59,12 @@ export class AppComponent implements OnInit { maintenance: boolean; cdsstateSub: Subscription; user: AuthentifiedUser; + previousURL: string @ViewChild('gamification') eltGamification: ElementRef; gameInit: boolean; + websocket: WebSocketSubject; constructor( _translate: TranslateService, @@ -72,7 +76,8 @@ export class AppComponent implements OnInit { private _notification: NotificationService, private _appService: AppService, private _toastService: ToastService, - private _store: Store + private _store: Store, + private _eventService: EventService ) { this.zone = new NgZone({ enableLongStackTrace: false }); this.toasterConfig = this._toastService.getConfig(); @@ -120,6 +125,7 @@ export class AppComponent implements OnInit { this.user = user; this.isConnected = true; this.startSSE(); + this._eventService.startWebsocket(this.user); } }); this.startVersionWorker(); @@ -137,6 +143,14 @@ export class AppComponent implements OnInit { this._routerNavEndSubscription = this._router.events .pipe(filter((event) => event instanceof NavigationEnd)) + .pipe(map((e: NavigationEnd) => { + if ((!this.previousURL || this.previousURL.split('?')[0] !== e.url.split('?')[0])) { + this.previousURL = e.url; + this._eventService.manageWebsocketFilterByUrl(e.url); + return; + } + + })) .pipe(map(() => this._activatedRoute)) .pipe(map((route) => { let params = {}; @@ -183,6 +197,9 @@ export class AppComponent implements OnInit { } startSSE(): void { + if (this.user.isAdmin() && localStorage.getItem('WS-EVENT')) { + return + } if (this.sseWorker) { this.stopWorker(this.sseWorker, null); } diff --git a/ui/src/app/app.module.ts b/ui/src/app/app.module.ts index e0fd6eb171..d17aed46c2 100644 --- a/ui/src/app/app.module.ts +++ b/ui/src/app/app.module.ts @@ -5,6 +5,7 @@ import { BrowserAnimationsModule } from '@angular/platform-browser/animations'; import { TranslateLoader, TranslateModule } from '@ngx-translate/core'; import { TranslateHttpLoader } from '@ngx-translate/http-loader'; import { ToasterModule } from 'angular2-toaster/angular2-toaster'; +import { EventService } from 'app/event.service'; import { NgxsStoreModule } from 'app/store/store.module'; import * as Raven from 'raven-js'; import { AppComponent } from './app.component'; @@ -41,6 +42,7 @@ let ngModule: NgModule = { ], providers: [ AppService, + EventService, { provide: LOCALE_ID, useValue: navigator.language.match(/fr/) ? 'fr' : 'en' } ], bootstrap: [AppComponent] diff --git a/ui/src/app/event.service.ts b/ui/src/app/event.service.ts new file mode 100644 index 0000000000..803e8634e1 --- /dev/null +++ b/ui/src/app/event.service.ts @@ -0,0 +1,142 @@ +import { Injectable } from '@angular/core'; +import { Router } from '@angular/router'; +import { AppService } from 'app/app.service'; +import { AuthentifiedUser } from 'app/model/user.model'; +import { WebSocketEvent, WebSocketMessage } from 'app/model/websocket.model'; +import { ToastService } from 'app/shared/toast/ToastService'; +import { WebSocketSubject } from 'rxjs/internal-compatibility'; +import { delay, retryWhen } from 'rxjs/operators'; +import { webSocket } from 'rxjs/webSocket'; + +@Injectable() +export class EventService { + + websocket: WebSocketSubject; + currentFilter: WebSocketMessage; + private connected: boolean; + + constructor( + private _router: Router, + private _appService: AppService, + private _toastService: ToastService + ) {} + + startWebsocket(user: AuthentifiedUser) { + if (!user.isAdmin()) { + return + } + if (!localStorage.getItem('WS-EVENT')) { + return + } + console.log('Starting websocket'); + const protocol = window.location.protocol.replace('http', 'ws'); + const host = window.location.host; + const href = this._router['location']._baseHref; + + this.websocket = webSocket({ + url: `${protocol}//${host}${href}/cdsapi/ws`, + openObserver: { + next: value => { + if (value.type === 'open') { + this.connected = true; + if (this.currentFilter) { + this.websocket.next(this.currentFilter); + } + } + } + } + }); + + this.websocket + .pipe(retryWhen(errors => errors.pipe(delay(2000)))) + .subscribe((message: WebSocketEvent) => { + if (message.status === 'OK') { + this._appService.manageEvent(message.event); + } else { + this._toastService.error('', message.error); + } + }, (err) => { + console.error('Error: ', err) + }, () => { + console.warn('Websocket Completed'); + }); + } + + addOperationFilter(uuid: string) { + this.currentFilter.operation = uuid; + this.websocket.next(this.currentFilter); + } + + updateFilter(f: WebSocketMessage): void { + this.currentFilter = f; + if (this.connected) { + this.websocket.next(this.currentFilter); + } + } + + manageWebsocketFilterByUrl(url: string) { + let msg = new WebSocketMessage(); + let urlSplitted = url.substr(1, url.length - 1).split('/'); + switch (urlSplitted[0]) { + case 'home': + msg.favorites = true; + break; + case 'project': + switch (urlSplitted.length) { + case 1: // project creation + break; + case 2: // project view + msg.project_key = urlSplitted[1].split('?')[0]; + msg.type = 'project'; + break; + default: // App/pipeline/env/workflow view + msg.project_key = urlSplitted[1].split('?')[0]; + this.manageWebsocketFilterProjectPath(urlSplitted, msg); + } + break; + case 'settings': + if (urlSplitted.length === 2 && urlSplitted[1] === 'queue') { + msg.queue = true; + } + break; + } + this.updateFilter(msg); + } + + manageWebsocketFilterProjectPath(urlSplitted: Array, msg: WebSocketMessage) { + switch (urlSplitted[2]) { + case 'pipeline': + if (urlSplitted.length >= 4) { + msg.pipeline_name = urlSplitted[3].split('?')[0]; + msg.type = 'pipeline'; + } + break; + case 'application': + if (urlSplitted.length >= 4) { + msg.application_name = urlSplitted[3].split('?')[0]; + msg.type = 'application'; + } + break; + case 'environment': + if (urlSplitted.length >= 4) { + msg.environment_name = urlSplitted[3].split('?')[0]; + msg.type = 'environment'; + } + break; + case 'workflow': + if (urlSplitted.length >= 4) { + msg.workflow_name = urlSplitted[3].split('?')[0]; + msg.type = 'workflow'; + } + if (urlSplitted.length >= 6) { + msg.workflow_run_num = Number(urlSplitted[5].split('?')[0]); + msg.type = 'workflow'; + } + if (urlSplitted.length >= 8) { + msg.workflow_node_run_id = Number(urlSplitted[7].split('?')[0]); + msg.type = 'workflow'; + } + break; + } + } +} diff --git a/ui/src/app/model/websocket.model.ts b/ui/src/app/model/websocket.model.ts new file mode 100644 index 0000000000..2f751c6d62 --- /dev/null +++ b/ui/src/app/model/websocket.model.ts @@ -0,0 +1,21 @@ +import { Event } from 'app/model/event.model'; + +export class WebSocketMessage { + project_key: string; + application_name: string; + pipeline_name: string; + environment_name: string; + workflow_name: string; + workflow_run_num: number; + workflow_node_run_id: number; + favorites: boolean; + queue: boolean; + operation: string; + type: string; +} + +export class WebSocketEvent { + status: string; + error: string; + event: Event; +}