Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api,ui): websocket server + client for admin #5128

Merged
merged 13 commits into from
Apr 30, 2020
1 change: 1 addition & 0 deletions engine/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ type API struct {
StartupTime time.Time
Maintenance bool
eventsBroker *eventsBroker
websocketBroker *websocketBroker
Cache cache.Store
Metrics struct {
WorkflowRunFailed *stats.Int64Measure
Expand Down
12 changes: 12 additions & 0 deletions engine/api/api_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,17 @@ func (api *API) InitRouter() {
}
api.eventsBroker.Init(r.Background, api.PanicDump())

api.websocketBroker = &websocketBroker{
router: api.Router,
cache: api.Cache,
dbFunc: api.mustDB,
clients: make(map[string]*websocketClient),
messages: make(chan sdk.Event),
chanAddClient: make(chan *websocketClient),
chanRemoveClient: make(chan string),
}
api.websocketBroker.Init(r.Background, api.PanicDump())

// Auth
r.Handle("/auth/driver", ScopeNone(), r.GET(api.getAuthDriversHandler, Auth(false)))
r.Handle("/auth/me", Scope(sdk.AuthConsumerScopeAction), r.GET(api.getAuthMe))
Expand Down Expand Up @@ -405,6 +416,7 @@ func (api *API) InitRouter() {

// SSE
r.Handle("/events", ScopeNone(), r.GET(api.eventsBroker.ServeHTTP))
r.Handle("/ws", ScopeNone(), r.GET(api.websocketBroker.ServeHTTP))
sguiheux marked this conversation as resolved.
Show resolved Hide resolved

// Feature
r.Handle("/feature/clean", ScopeNone(), r.POST(api.cleanFeatureHandler, NeedToken("X-Izanami-Token", api.Config.Features.Izanami.Token)))
Expand Down
2 changes: 1 addition & 1 deletion engine/api/event/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func PushInElasticSearch(ctx context.Context, db gorp.SqlExecutor, store cache.S
}

switch e.EventType {
case "sdk.EventJob", "sdk.EventEngine":
case "sdk.EventEngine":
continue
}
e.Payload = nil
Expand Down
91 changes: 69 additions & 22 deletions engine/api/event/publish_workflow_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,27 @@ import (
"fmt"
"time"

"github.com/go-gorp/gorp"

"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/log"
)

func publishRunWorkflow(ctx context.Context, payload interface{}, key, workflowName, appName, pipName, envName string, num int64, sub int64, status string, tags []sdk.WorkflowRunTag, eventIntegrations []sdk.ProjectIntegration) {
eventIntegrationsID := make([]int64, len(eventIntegrations))
for i, eventIntegration := range eventIntegrations {
type publishWorkflowRunData struct {
projectKey string
workflowName string
applicationName string
sguiheux marked this conversation as resolved.
Show resolved Hide resolved
pipelineName string
environmentName string
workflowRunNum int64
workflowRunSubNum int64
status string
workflowRunTags []sdk.WorkflowRunTag
eventIntegrations []sdk.ProjectIntegration
workflowNodeRunID int64
}

func publishRunWorkflow(ctx context.Context, payload interface{}, data publishWorkflowRunData) {
eventIntegrationsID := make([]int64, len(data.eventIntegrations))
for i, eventIntegration := range data.eventIntegrations {
eventIntegrationsID[i] = eventIntegration.ID
}

Expand All @@ -25,18 +37,18 @@ func publishRunWorkflow(ctx context.Context, payload interface{}, key, workflowN
CDSName: cdsname,
EventType: fmt.Sprintf("%T", payload),
Payload: bts,
ProjectKey: key,
ApplicationName: appName,
PipelineName: pipName,
WorkflowName: workflowName,
EnvironmentName: envName,
WorkflowRunNum: num,
WorkflowRunNumSub: sub,
Status: status,
Tags: tags,
ProjectKey: data.projectKey,
ApplicationName: data.applicationName,
PipelineName: data.pipelineName,
WorkflowName: data.workflowName,
EnvironmentName: data.environmentName,
WorkflowRunNum: data.workflowRunNum,
WorkflowRunNumSub: data.workflowRunSubNum,
Status: data.status,
Tags: data.workflowRunTags,
EventIntegrationsID: eventIntegrationsID,
}
publishEvent(ctx, event)
_ = publishEvent(ctx, event)
}

// PublishWorkflowRun publish event on a workflow run
Expand All @@ -51,7 +63,16 @@ func PublishWorkflowRun(ctx context.Context, wr sdk.WorkflowRun, projectKey stri
LastModifiedNano: wr.LastModified.UnixNano(),
Tags: wr.Tags,
}
publishRunWorkflow(ctx, e, projectKey, wr.Workflow.Name, "", "", "", wr.Number, wr.LastSubNumber, wr.Status, wr.Tags, wr.Workflow.EventIntegrations)
data := publishWorkflowRunData{
projectKey: projectKey,
workflowName: wr.Workflow.Name,
workflowRunNum: wr.Number,
workflowRunSubNum: wr.LastSubNumber,
status: wr.Status,
workflowRunTags: wr.Tags,
eventIntegrations: wr.Workflow.EventIntegrations,
}
publishRunWorkflow(ctx, e, data)
}

// PublishWorkflowNodeRun publish event on a workflow node run
Expand Down Expand Up @@ -157,19 +178,45 @@ func PublishWorkflowNodeRun(ctx context.Context, nr sdk.WorkflowNodeRun, w sdk.W
if sdk.StatusIsTerminated(nr.Status) {
e.Done = nr.Done.Unix()
}
publishRunWorkflow(ctx, e, w.ProjectKey, w.Name, appName, pipName, envName, nr.Number, nr.SubNumber, nr.Status, nil, w.EventIntegrations)
data := publishWorkflowRunData{
projectKey: w.ProjectKey,
workflowName: w.Name,
applicationName: appName,
pipelineName: pipName,
environmentName: envName,
workflowRunNum: nr.Number,
workflowRunSubNum: nr.SubNumber,
status: nr.Status,
eventIntegrations: w.EventIntegrations,
workflowNodeRunID: nr.ID,
}
publishRunWorkflow(ctx, e, data)
}

// PublishWorkflowNodeJobRun publish a WorkflowNodeJobRun
func PublishWorkflowNodeJobRun(ctx context.Context, db gorp.SqlExecutor, pkey string, wr sdk.WorkflowRun, jr sdk.WorkflowNodeJobRun) {
func PublishWorkflowNodeJobRun(ctx context.Context, pkey string, wr sdk.WorkflowRun, jr sdk.WorkflowNodeJobRun) {
e := sdk.EventRunWorkflowJob{
ID: jr.ID,
Status: jr.Status,
Start: jr.Start.Unix(),
ID: jr.ID,
sguiheux marked this conversation as resolved.
Show resolved Hide resolved
Status: jr.Status,
Start: jr.Start.Unix(),
Requirements: jr.Job.Action.Requirements,
WorkerName: jr.Job.WorkerName,
BookByName: jr.BookedBy.Name,
Parameters: jr.Parameters,
}

if sdk.StatusIsTerminated(jr.Status) {
e.Done = jr.Done.Unix()
}
publishRunWorkflow(ctx, e, pkey, wr.Workflow.Name, "", "", "", 0, 0, jr.Status, nil, wr.Workflow.EventIntegrations)
data := publishWorkflowRunData{
projectKey: pkey,
workflowName: wr.Workflow.Name,
workflowRunNum: wr.Number,
workflowRunSubNum: wr.LastSubNumber,
status: jr.Status,
workflowRunTags: wr.Tags,
eventIntegrations: wr.Workflow.EventIntegrations,
workflowNodeRunID: jr.WorkflowNodeRunID,
}
publishRunWorkflow(ctx, e, data)
}
6 changes: 1 addition & 5 deletions engine/api/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,6 @@ func (b *eventsBroker) cacheSubscribe(c context.Context, cacheMsgChan chan<- sdk
continue
}

switch e.EventType {
case "sdk.EventJob":
continue
}
observability.Record(b.router.Background, SSEEvents, 1)
cacheMsgChan <- e
}
Expand Down Expand Up @@ -239,7 +235,7 @@ func (client *eventsBrokerSubscribe) manageEvent(db gorp.SqlExecutor, event sdk.
var isHatcheryWithGroups = isHatchery && len(client.consumer.GroupIDs) > 0

switch {
case strings.HasPrefix(event.EventType, "sdk.EventProject") || strings.HasPrefix(event.EventType, "sdk.EventAsCodeEvent"):
case strings.HasPrefix(event.EventType, "sdk.EventProject") || strings.HasPrefix(event.EventType, "sdk.EventAsCodeEvent"):
if client.consumer.Maintainer() && !isHatcheryWithGroups {
return true, nil
}
Expand Down
2 changes: 2 additions & 0 deletions engine/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ var (
Hits *stats.Int64Measure
SSEClients *stats.Int64Measure
SSEEvents *stats.Int64Measure
WebSocketClients *stats.Int64Measure
WebSocketEvents *stats.Int64Measure
ServerRequestCount *stats.Int64Measure
ServerRequestBytes *stats.Int64Measure
ServerResponseBytes *stats.Int64Measure
Expand Down
10 changes: 10 additions & 0 deletions engine/api/router_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ func InitRouterMetrics(s service.NamedService) error {
"cds/sse_events",
"number of sse events",
stats.UnitDimensionless)
WebSocketClients = stats.Int64(
"cds/websocket_clients",
"number of websocket clients",
stats.UnitDimensionless)
WebSocketEvents = stats.Int64(
"cds/websocket_events",
"number of websocket events",
stats.UnitDimensionless)
ServerRequestCount = stats.Int64(
"cds/http/server/request_count",
"Number of HTTP requests started",
Expand Down Expand Up @@ -122,6 +130,8 @@ func InitRouterMetrics(s service.NamedService) error {
observability.NewViewCount("cds/http/router/router_hits", Hits, []tag.Key{tagServiceType, tagServiceName}),
observability.NewViewLast("cds/http/router/sse_clients", SSEClients, []tag.Key{tagServiceType, tagServiceName}),
observability.NewViewCount("cds/http/router/sse_events", SSEEvents, []tag.Key{tagServiceType, tagServiceName}),
observability.NewViewLast("cds/http/router/websocket_clients", WebSocketClients, []tag.Key{tagServiceType, tagServiceName}),
observability.NewViewCount("cds/http/router/websocket_events", WebSocketEvents, []tag.Key{tagServiceType, tagServiceName}),
ServerRequestCountView,
ServerRequestBytesView,
ServerResponseBytesView,
Expand Down
Loading