Skip to content

Commit

Permalink
feat(api,ui): websocket server + client for admin (#5128)
Browse files Browse the repository at this point in the history
  • Loading branch information
sguiheux authored Apr 30, 2020
1 parent 1e25af6 commit 7bcdada
Show file tree
Hide file tree
Showing 24 changed files with 1,169 additions and 58 deletions.
1 change: 1 addition & 0 deletions engine/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ type API struct {
StartupTime time.Time
Maintenance bool
eventsBroker *eventsBroker
websocketBroker *websocketBroker
Cache cache.Store
Metrics struct {
WorkflowRunFailed *stats.Int64Measure
Expand Down
12 changes: 12 additions & 0 deletions engine/api/api_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,17 @@ func (api *API) InitRouter() {
}
api.eventsBroker.Init(r.Background, api.PanicDump())

api.websocketBroker = &websocketBroker{
router: api.Router,
cache: api.Cache,
dbFunc: api.mustDB,
clients: make(map[string]*websocketClient),
messages: make(chan sdk.Event),
chanAddClient: make(chan *websocketClient),
chanRemoveClient: make(chan string),
}
api.websocketBroker.Init(r.Background, api.PanicDump())

// Auth
r.Handle("/auth/driver", ScopeNone(), r.GET(api.getAuthDriversHandler, Auth(false)))
r.Handle("/auth/me", Scope(sdk.AuthConsumerScopeAction), r.GET(api.getAuthMe))
Expand Down Expand Up @@ -405,6 +416,7 @@ func (api *API) InitRouter() {

// SSE
r.Handle("/events", ScopeNone(), r.GET(api.eventsBroker.ServeHTTP))
r.Handle("/ws", ScopeNone(), r.GET(api.websocketBroker.ServeHTTP))

// Feature
r.Handle("/feature/clean", ScopeNone(), r.POST(api.cleanFeatureHandler, NeedToken("X-Izanami-Token", api.Config.Features.Izanami.Token)))
Expand Down
2 changes: 1 addition & 1 deletion engine/api/event/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func PushInElasticSearch(ctx context.Context, db gorp.SqlExecutor, store cache.S
}

switch e.EventType {
case "sdk.EventJob", "sdk.EventEngine":
case "sdk.EventEngine":
continue
}
e.Payload = nil
Expand Down
91 changes: 69 additions & 22 deletions engine/api/event/publish_workflow_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,27 @@ import (
"fmt"
"time"

"github.com/go-gorp/gorp"

"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/log"
)

func publishRunWorkflow(ctx context.Context, payload interface{}, key, workflowName, appName, pipName, envName string, num int64, sub int64, status string, tags []sdk.WorkflowRunTag, eventIntegrations []sdk.ProjectIntegration) {
eventIntegrationsID := make([]int64, len(eventIntegrations))
for i, eventIntegration := range eventIntegrations {
type publishWorkflowRunData struct {
projectKey string
workflowName string
applicationName string
pipelineName string
environmentName string
workflowRunNum int64
workflowRunSubNum int64
status string
workflowRunTags []sdk.WorkflowRunTag
eventIntegrations []sdk.ProjectIntegration
workflowNodeRunID int64
}

func publishRunWorkflow(ctx context.Context, payload interface{}, data publishWorkflowRunData) {
eventIntegrationsID := make([]int64, len(data.eventIntegrations))
for i, eventIntegration := range data.eventIntegrations {
eventIntegrationsID[i] = eventIntegration.ID
}

Expand All @@ -25,18 +37,18 @@ func publishRunWorkflow(ctx context.Context, payload interface{}, key, workflowN
CDSName: cdsname,
EventType: fmt.Sprintf("%T", payload),
Payload: bts,
ProjectKey: key,
ApplicationName: appName,
PipelineName: pipName,
WorkflowName: workflowName,
EnvironmentName: envName,
WorkflowRunNum: num,
WorkflowRunNumSub: sub,
Status: status,
Tags: tags,
ProjectKey: data.projectKey,
ApplicationName: data.applicationName,
PipelineName: data.pipelineName,
WorkflowName: data.workflowName,
EnvironmentName: data.environmentName,
WorkflowRunNum: data.workflowRunNum,
WorkflowRunNumSub: data.workflowRunSubNum,
Status: data.status,
Tags: data.workflowRunTags,
EventIntegrationsID: eventIntegrationsID,
}
publishEvent(ctx, event)
_ = publishEvent(ctx, event)
}

// PublishWorkflowRun publish event on a workflow run
Expand All @@ -51,7 +63,16 @@ func PublishWorkflowRun(ctx context.Context, wr sdk.WorkflowRun, projectKey stri
LastModifiedNano: wr.LastModified.UnixNano(),
Tags: wr.Tags,
}
publishRunWorkflow(ctx, e, projectKey, wr.Workflow.Name, "", "", "", wr.Number, wr.LastSubNumber, wr.Status, wr.Tags, wr.Workflow.EventIntegrations)
data := publishWorkflowRunData{
projectKey: projectKey,
workflowName: wr.Workflow.Name,
workflowRunNum: wr.Number,
workflowRunSubNum: wr.LastSubNumber,
status: wr.Status,
workflowRunTags: wr.Tags,
eventIntegrations: wr.Workflow.EventIntegrations,
}
publishRunWorkflow(ctx, e, data)
}

// PublishWorkflowNodeRun publish event on a workflow node run
Expand Down Expand Up @@ -157,19 +178,45 @@ func PublishWorkflowNodeRun(ctx context.Context, nr sdk.WorkflowNodeRun, w sdk.W
if sdk.StatusIsTerminated(nr.Status) {
e.Done = nr.Done.Unix()
}
publishRunWorkflow(ctx, e, w.ProjectKey, w.Name, appName, pipName, envName, nr.Number, nr.SubNumber, nr.Status, nil, w.EventIntegrations)
data := publishWorkflowRunData{
projectKey: w.ProjectKey,
workflowName: w.Name,
applicationName: appName,
pipelineName: pipName,
environmentName: envName,
workflowRunNum: nr.Number,
workflowRunSubNum: nr.SubNumber,
status: nr.Status,
eventIntegrations: w.EventIntegrations,
workflowNodeRunID: nr.ID,
}
publishRunWorkflow(ctx, e, data)
}

// PublishWorkflowNodeJobRun publish a WorkflowNodeJobRun
func PublishWorkflowNodeJobRun(ctx context.Context, db gorp.SqlExecutor, pkey string, wr sdk.WorkflowRun, jr sdk.WorkflowNodeJobRun) {
func PublishWorkflowNodeJobRun(ctx context.Context, pkey string, wr sdk.WorkflowRun, jr sdk.WorkflowNodeJobRun) {
e := sdk.EventRunWorkflowJob{
ID: jr.ID,
Status: jr.Status,
Start: jr.Start.Unix(),
ID: jr.ID,
Status: jr.Status,
Start: jr.Start.Unix(),
Requirements: jr.Job.Action.Requirements,
WorkerName: jr.Job.WorkerName,
BookByName: jr.BookedBy.Name,
Parameters: jr.Parameters,
}

if sdk.StatusIsTerminated(jr.Status) {
e.Done = jr.Done.Unix()
}
publishRunWorkflow(ctx, e, pkey, wr.Workflow.Name, "", "", "", 0, 0, jr.Status, nil, wr.Workflow.EventIntegrations)
data := publishWorkflowRunData{
projectKey: pkey,
workflowName: wr.Workflow.Name,
workflowRunNum: wr.Number,
workflowRunSubNum: wr.LastSubNumber,
status: jr.Status,
workflowRunTags: wr.Tags,
eventIntegrations: wr.Workflow.EventIntegrations,
workflowNodeRunID: jr.WorkflowNodeRunID,
}
publishRunWorkflow(ctx, e, data)
}
6 changes: 1 addition & 5 deletions engine/api/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,6 @@ func (b *eventsBroker) cacheSubscribe(c context.Context, cacheMsgChan chan<- sdk
continue
}

switch e.EventType {
case "sdk.EventJob":
continue
}
observability.Record(b.router.Background, SSEEvents, 1)
cacheMsgChan <- e
}
Expand Down Expand Up @@ -239,7 +235,7 @@ func (client *eventsBrokerSubscribe) manageEvent(db gorp.SqlExecutor, event sdk.
var isHatcheryWithGroups = isHatchery && len(client.consumer.GroupIDs) > 0

switch {
case strings.HasPrefix(event.EventType, "sdk.EventProject") || strings.HasPrefix(event.EventType, "sdk.EventAsCodeEvent"):
case strings.HasPrefix(event.EventType, "sdk.EventProject") || strings.HasPrefix(event.EventType, "sdk.EventAsCodeEvent"):
if client.consumer.Maintainer() && !isHatcheryWithGroups {
return true, nil
}
Expand Down
2 changes: 2 additions & 0 deletions engine/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ var (
Hits *stats.Int64Measure
SSEClients *stats.Int64Measure
SSEEvents *stats.Int64Measure
WebSocketClients *stats.Int64Measure
WebSocketEvents *stats.Int64Measure
ServerRequestCount *stats.Int64Measure
ServerRequestBytes *stats.Int64Measure
ServerResponseBytes *stats.Int64Measure
Expand Down
10 changes: 10 additions & 0 deletions engine/api/router_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ func InitRouterMetrics(s service.NamedService) error {
"cds/sse_events",
"number of sse events",
stats.UnitDimensionless)
WebSocketClients = stats.Int64(
"cds/websocket_clients",
"number of websocket clients",
stats.UnitDimensionless)
WebSocketEvents = stats.Int64(
"cds/websocket_events",
"number of websocket events",
stats.UnitDimensionless)
ServerRequestCount = stats.Int64(
"cds/http/server/request_count",
"Number of HTTP requests started",
Expand Down Expand Up @@ -122,6 +130,8 @@ func InitRouterMetrics(s service.NamedService) error {
observability.NewViewCount("cds/http/router/router_hits", Hits, []tag.Key{tagServiceType, tagServiceName}),
observability.NewViewLast("cds/http/router/sse_clients", SSEClients, []tag.Key{tagServiceType, tagServiceName}),
observability.NewViewCount("cds/http/router/sse_events", SSEEvents, []tag.Key{tagServiceType, tagServiceName}),
observability.NewViewLast("cds/http/router/websocket_clients", WebSocketClients, []tag.Key{tagServiceType, tagServiceName}),
observability.NewViewCount("cds/http/router/websocket_events", WebSocketEvents, []tag.Key{tagServiceType, tagServiceName}),
ServerRequestCountView,
ServerRequestBytesView,
ServerResponseBytesView,
Expand Down
Loading

0 comments on commit 7bcdada

Please sign in to comment.