Skip to content

Commit

Permalink
feat(hatchery): enable websocket (#5199)
Browse files Browse the repository at this point in the history
  • Loading branch information
sguiheux authored Jun 2, 2020
1 parent f841412 commit 00d584c
Show file tree
Hide file tree
Showing 13 changed files with 134 additions and 128 deletions.
45 changes: 32 additions & 13 deletions cli/cdsctl/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,11 @@ package main

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"

"github.com/spf13/cobra"

"github.com/ovh/cds/cli"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/cdsclient"
)

var eventsCmd = cli.Command{
Expand All @@ -27,28 +23,51 @@ func events() *cobra.Command {
var eventsListenCmd = cli.Command{
Name: "listen",
Short: "Listen CDS events",
Flags: []cli.Flag{
{
Name: "project",
Usage: "project key to listen",
Type: cli.FlagString,
},
{
Name: "workflow",
Usage: "workflow name to listen",
Type: cli.FlagString,
},
},
}

func eventsListenRun(v cli.Values) error {
ctx := context.Background()
chanSSE := make(chan cdsclient.SSEvent)
chanMessageReceived := make(chan sdk.WebsocketEvent)
chanMessageToSend := make(chan sdk.WebsocketFilter)

sdk.GoRoutine(ctx, "EventsListenCmd", func(ctx context.Context) {
client.EventsListen(ctx, chanSSE)
sdk.GoRoutine(ctx, "WebsocketEventsListenCmd", func(ctx context.Context) {
client.WebsocketEventsListen(ctx, chanMessageToSend, chanMessageReceived)
})

var t string
switch {
case v.GetString("workflow") != "":
t = sdk.WebsocketFilterTypeWorkflow
default:
t = sdk.WebsocketFilterTypeProject
}
chanMessageToSend <- sdk.WebsocketFilter{
ProjectKey: v.GetString("project"),
WorkflowName: v.GetString("workflow"),
Type: t,
}

for {
select {
case <-ctx.Done():
return ctx.Err()
case evt := <-chanSSE:
var e sdk.Event
content, _ := ioutil.ReadAll(evt.Data)
_ = json.Unmarshal(content, &e)
if e.EventType == "" {
case evt := <-chanMessageReceived:
if evt.Event.EventType == "" {
continue
}
fmt.Printf("%s: %s %s %s\n", e.EventType, e.ProjectKey, e.WorkflowName, e.Status)
fmt.Printf("%s: %s %s %s\n", evt.Event.EventType, evt.Event.ProjectKey, evt.Event.WorkflowName, evt.Event.Status)
}
}
}
10 changes: 3 additions & 7 deletions engine/api/event/publish_workflow_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,13 +197,9 @@ func PublishWorkflowNodeRun(ctx context.Context, nr sdk.WorkflowNodeRun, w sdk.W
// PublishWorkflowNodeJobRun publish a WorkflowNodeJobRun
func PublishWorkflowNodeJobRun(ctx context.Context, pkey string, wr sdk.WorkflowRun, jr sdk.WorkflowNodeJobRun) {
e := sdk.EventRunWorkflowJob{
ID: jr.ID,
Status: jr.Status,
Start: jr.Start.Unix(),
Requirements: jr.Job.Action.Requirements,
WorkerName: jr.Job.WorkerName,
BookByName: jr.BookedBy.Name,
Parameters: jr.Parameters,
ID: jr.ID,
Status: jr.Status,
Start: jr.Start.Unix(),
}

if sdk.StatusIsTerminated(jr.Status) {
Expand Down
69 changes: 53 additions & 16 deletions engine/hatchery/hatchery_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,63 @@ import (
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"net/http/httputil"
"testing"
"time"

"github.com/gorilla/websocket"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/h2non/gock.v1"

"github.com/ovh/cds/engine/api/authentication"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/jws"
"github.com/ovh/cds/sdk/log"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/h2non/gock.v1"
)

func init() {
log.Initialize(&log.Conf{Level: "debug"})
}

func InitMock(t *testing.T) {
func InitWebsocketTestServer(t *testing.T) *httptest.Server {
upgrader := websocket.Upgrader{}
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
c, err := upgrader.Upgrade(w, r, nil)
require.NoError(t, err)
defer c.Close()

j := sdk.EventRunWorkflowJob{
ID: 1,
Status: sdk.StatusWaiting,
}
bts, err := json.Marshal(j)
require.NoError(t, err)
jevent := sdk.WebsocketEvent{
Status: "OK",
Event: sdk.Event{
EventType: fmt.Sprintf("%T", j),
Status: sdk.StatusWaiting,
Payload: bts,
},
}
require.NoError(t, c.WriteJSON(jevent))
for {
mt, message, err := c.ReadMessage()
if err != nil {
require.NoError(t, err)
}
err = c.WriteMessage(mt, message)
if err != nil {
require.NoError(t, err)
}
}
}))
return s
}

func InitMock(t *testing.T, url string) {
privKey, _ := jws.NewRandomRSAKey()
privKeyPEM, _ := jws.ExportPrivateKey(privKey)
pubKey, _ := jws.ExportPublicKey(privKey)
Expand All @@ -52,7 +91,7 @@ func InitMock(t *testing.T) {
}
}

gock.New("http://lolcat.host").Post("/auth/consumer/builtin/signin").
gock.New(url).Post("/auth/consumer/builtin/signin").
Reply(201).
JSON(
sdk.AuthConsumerSigninResponse{
Expand All @@ -63,32 +102,30 @@ func InitMock(t *testing.T) {
},
).AddHeader("X-Api-Pub-Signing-Key", base64.StdEncoding.EncodeToString(pubKey))

gock.New("http://lolcat.host").Get("/download/worker/darwin/amd64").Times(1).
gock.New(url).Get("/download/worker/darwin/amd64").Times(1).
Reply(200).
Body(bytes.NewBuffer([]byte("nop"))).
AddHeader("Content-Type", "application/octet-stream")

gock.New("http://lolcat.host").Get("/download/worker/linux/amd64").Times(1).
gock.New(url).Get("/download/worker/linux/amd64").Times(1).
Reply(200).
Body(bytes.NewBuffer([]byte("nop"))).
AddHeader("Content-Type", "application/octet-stream")

gock.New("http://lolcat.host").Post("/services/register").
gock.New(url).Post("/services/register").
HeaderPresent("Authorization").
Reply(200).
JSON(sdk.Service{})

gock.New("http://lolcat.host").Post("/services/heartbeat").
gock.New(url).Post("/services/heartbeat").
HeaderPresent("Authorization").
Reply(204)

gock.New("http://lolcat.host").Get("/worker").Times(6).
gock.New(url).Get("/worker").Times(6).
Reply(200).
JSON([]sdk.Worker{})

gock.New("http://lolcat.host").Get("/events").EnableNetworking()

gock.New("http://lolcat.host").Get("/queue/workflows/1/infos").Times(1).
gock.New(url).Get("/queue/workflows/1/infos").Times(1).
Reply(200).
JSON(sdk.WorkflowNodeJobRun{
ID: 1,
Expand All @@ -98,12 +135,12 @@ func InitMock(t *testing.T) {
},
})

gock.New("http://lolcat.host").Post("/queue/workflows/1/spawn/infos").Times(2).Reply(200)
gock.New(url).Post("/queue/workflows/1/spawn/infos").Times(2).Reply(200)

gock.New("http://lolcat.host").Post("/queue/workflows/1/book").
gock.New(url).Post("/queue/workflows/1/book").
Reply(204)

gock.New("http://lolcat.host").Get("/queue/workflows").Times(1).
gock.New(url).Get("/queue/workflows").Times(1).
Reply(200).
JSON([]sdk.WorkflowRun{})

Expand Down
15 changes: 7 additions & 8 deletions engine/hatchery/hatchery_local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,21 @@ func (r *TestRunner) NewCmd(ctx context.Context, command string, args ...string)
}

func TestHatcheryLocal(t *testing.T) {
InitMock(t)
defer gock.Off()
s := InitWebsocketTestServer(t)
InitMock(t, s.URL)
defer s.Close()

defer gock.Off()
var h = local.New()

h.LocalWorkerRunner = &TestRunner{t}

var cfg = local.HatcheryConfiguration{
Basedir: os.TempDir(),
}

cfg.Name = "lolcat-test-hatchery"
cfg.API.HTTP.Insecure = false
cfg.API.HTTP.URL = "http://lolcat.host"
cfg.API.HTTP.URL = s.URL
cfg.API.Token = "xxxxxxxx"
cfg.API.MaxHeartbeatFailures = 0
cfg.Provision.RegisterFrequency = 1
Expand All @@ -61,8 +62,6 @@ func TestHatcheryLocal(t *testing.T) {
t.Logf("service config: %+v", srvCfg)

srvCfg.Hook = func(client cdsclient.Interface) error {
client.HTTPSSEClient().Transport = newMockSSERoundTripper(t, context.TODO())
gock.InterceptClient(client.HTTPSSEClient())
gock.InterceptClient(client.HTTPClient())
return nil
}
Expand Down Expand Up @@ -94,8 +93,8 @@ func TestHatcheryLocal(t *testing.T) {
if !gock.IsDone() {
pending := gock.Pending()
for _, m := range pending {
if m.Request().URLStruct.String() != "http://lolcat.host/services/heartbeat" &&
!strings.HasPrefix(m.Request().URLStruct.String(), "http://lolcat.host/download/worker") {
if m.Request().URLStruct.String() != s.URL+"/services/heartbeat" &&
!strings.HasPrefix(m.Request().URLStruct.String(), s.URL+"/download/worker") {
t.Errorf("PENDING %s %s", m.Request().Method, m.Request().URLStruct.String())
}
}
Expand Down
8 changes: 0 additions & 8 deletions sdk/cdsclient/client_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,6 @@ import (
"github.com/ovh/cds/sdk"
)

func (c *client) EventsListen(ctx context.Context, chanSSEvt chan<- SSEvent) {
for ctx.Err() == nil {
if err := c.RequestSSEGet(ctx, "/events", chanSSEvt); err != nil {
log.Println("EventsListen", err)
}
time.Sleep(1 * time.Second)
}
}
func (c *client) WebsocketEventsListen(ctx context.Context, chanMsgToSend <-chan sdk.WebsocketFilter, chanMsgReceived chan<- sdk.WebsocketEvent) {
for ctx.Err() == nil {
if err := c.RequestWebsocket(ctx, "/ws", chanMsgToSend, chanMsgReceived); err != nil {
Expand Down
58 changes: 25 additions & 33 deletions sdk/cdsclient/client_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,19 @@ func (c *client) QueuePolling(ctx context.Context, jobs chan<- sdk.WorkflowNodeJ
jobsTicker := time.NewTicker(delay)

// This goroutine call the SSE route
chanSSEvt := make(chan SSEvent)
sdk.GoRoutine(ctx, "RequestSSEGet", func(ctx context.Context) {
chanMessageReceived := make(chan sdk.WebsocketEvent, 10)
chanMessageToSend := make(chan sdk.WebsocketFilter, 10)
sdk.GoRoutine(ctx, "RequestWebsocket", func(ctx context.Context) {
for ctx.Err() == nil {
if err := c.RequestSSEGet(ctx, "/events", chanSSEvt); err != nil {
if err := c.RequestWebsocket(ctx, "/ws", chanMessageToSend, chanMessageReceived); err != nil {
log.Println("QueuePolling", err)
}
time.Sleep(1 * time.Second)
}
})
chanMessageToSend <- sdk.WebsocketFilter{
Queue: true,
}

for {
select {
Expand All @@ -75,44 +79,32 @@ func (c *client) QueuePolling(ctx context.Context, jobs chan<- sdk.WorkflowNodeJ
close(jobs)
}
return ctx.Err()
case evt := <-chanSSEvt:
case wsEvent := <-chanMessageReceived:
if jobs == nil {
continue
}

content, _ := ioutil.ReadAll(evt.Data)

var apiEvent sdk.Event
_ = json.Unmarshal(content, &apiEvent) // ignore errors
// filter only EventRunWorkflowJob
if apiEvent.EventType == "sdk.EventRunWorkflowJob" {
var runJob sdk.EventRunWorkflowJob
if err := json.Unmarshal(apiEvent.Payload, &runJob); err != nil {
errs <- fmt.Errorf("unable to unmarshal job event: %v", err)
if wsEvent.Event.EventType == "sdk.EventRunWorkflowJob" && wsEvent.Event.Status == sdk.StatusWaiting {
var jobEvent sdk.EventRunWorkflowJob
if err := json.Unmarshal(wsEvent.Event.Payload, &jobEvent); err != nil {
errs <- fmt.Errorf("unable to unmarshal job %v: %v", wsEvent.Event.Payload, err)
continue
}
job, err := c.QueueJobInfo(ctx, jobEvent.ID)
// Do not log the error if the job does not exist
if sdk.ErrorIs(err, sdk.ErrWorkflowNodeRunJobNotFound) {
continue
}
if runJob.ID != 0 && runJob.Status == sdk.StatusWaiting {
job, err := c.QueueJobInfo(ctx, runJob.ID)

// Do not log the error if the job does not exist
if sdk.ErrorIs(err, sdk.ErrWorkflowNodeRunJobNotFound) {
continue
}

if err != nil {
errs <- fmt.Errorf("unable to get job %v info: %v", runJob.ID, err)
continue
}

// push the job in the channel
if job.Status == sdk.StatusWaiting && job.BookedBy.Name == "" {
job.Header["SSE"] = "true"
jobs <- *job
}

if err != nil {
errs <- fmt.Errorf("unable to get job %v info: %v", job.ID, err)
continue
}
// push the job in the channel
if job.Status == sdk.StatusWaiting && job.BookedBy.Name == "" {
job.Header["WS"] = "true"
jobs <- *job
}
}

case <-jobsTicker.C:
if c.config.Verbose {
fmt.Println("jobsTicker")
Expand Down
1 change: 0 additions & 1 deletion sdk/cdsclient/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ type EnvironmentVariableClient interface {
// EventsClient listen SSE Events from CDS API
type EventsClient interface {
// Must be run in a go routine
EventsListen(ctx context.Context, chanSSEvt chan<- SSEvent)
WebsocketEventsListen(ctx context.Context, chanMsgToSend <-chan sdk.WebsocketFilter, chanMsgReceived chan<- sdk.WebsocketEvent)
}

Expand Down
Loading

0 comments on commit 00d584c

Please sign in to comment.