Skip to content

Commit

Permalink
feat(hatchery): new metrics (#6725)
Browse files Browse the repository at this point in the history
  • Loading branch information
yesnault authored Dec 8, 2023
1 parent cd9c0d6 commit 78d7b75
Show file tree
Hide file tree
Showing 10 changed files with 111 additions and 78 deletions.
23 changes: 10 additions & 13 deletions sdk/cdsclient/client_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/telemetry"
"github.com/rockbears/log"
)

Expand Down Expand Up @@ -43,7 +44,7 @@ func shrinkQueue(queue *sdk.WorkflowQueue, nbJobsToKeep int) time.Time {
return t0
}

func (c *client) QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...RequestModifier) error {
func (c *client) QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, hatcheryMetrics *sdk.HatcheryMetrics, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...RequestModifier) error {
jobsTicker := time.NewTicker(delay)

// This goroutine call the SSE route
Expand All @@ -69,6 +70,7 @@ func (c *client) QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, p
continue
}
if wsEvent.Event.EventType == "sdk.EventRunWorkflowJob" && wsEvent.Event.Status == sdk.StatusWaiting {
telemetry.Record(ctx, hatcheryMetrics.JobReceivedInQueuePollingWSv1, 1)
var jobEvent sdk.EventRunWorkflowJob
if err := sdk.JSONUnmarshal(wsEvent.Event.Payload, &jobEvent); err != nil {
errs <- newError(fmt.Errorf("unable to unmarshal job %v: %v", wsEvent.Event.Payload, err))
Expand All @@ -93,16 +95,13 @@ func (c *client) QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, p
log.Debug(ctx, "skipping job %s", id)
continue
}
pendingWorkerCreation.SetJobInPendingWorkerCreation(id)

lenqueue := pendingWorkerCreation.SetJobInPendingWorkerCreation(id)
log.Debug(ctx, "v1_len_queue: %v", lenqueue)
telemetry.Record(ctx, hatcheryMetrics.ChanV1JobAdd, 1)
jobs <- *job
}
}
case <-jobsTicker.C:
if c.config.Verbose {
fmt.Println("jobsTicker")
}

if jobs == nil {
continue
}
Expand All @@ -117,26 +116,24 @@ func (c *client) QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, p
cancel()
continue
}

cancel()

if c.config.Verbose {
fmt.Println("Jobs Queue size: ", len(queue))
}

queueFiltered := sdk.WorkflowQueue{}
var lenqueue int
for _, job := range queue {
id := strconv.FormatInt(job.ID, 10)
if pendingWorkerCreation.IsJobAlreadyPendingWorkerCreation(id) {
log.Debug(ctx, "skipping job %s", id)
continue
}
pendingWorkerCreation.SetJobInPendingWorkerCreation(id)
lenqueue = pendingWorkerCreation.SetJobInPendingWorkerCreation(id)
queueFiltered = append(queueFiltered, job)
}
log.Debug(ctx, "v1_job_queue_from_api: %v job_queue_filtered: %v len_queue: %v", len(queue), len(queueFiltered), lenqueue)

shrinkQueue(&queueFiltered, cap(jobs))
for _, j := range queueFiltered {
telemetry.Record(ctx, hatcheryMetrics.ChanV1JobAdd, 1)
jobs <- j
}
}
Expand Down
21 changes: 11 additions & 10 deletions sdk/cdsclient/client_queue_V2.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/telemetry"
"github.com/rockbears/log"
)

Expand Down Expand Up @@ -104,7 +105,7 @@ func (c *client) V2QueueGetJobRun(ctx context.Context, regionName, id string) (*
return &job, nil
}

func (c *client) V2QueuePolling(ctx context.Context, regionName string, goRoutines *sdk.GoRoutines, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.V2WorkflowRunJob, errs chan<- error, delay time.Duration, ms ...RequestModifier) error {
func (c *client) V2QueuePolling(ctx context.Context, regionName string, goRoutines *sdk.GoRoutines, hatcheryMetrics *sdk.HatcheryMetrics, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.V2WorkflowRunJob, errs chan<- error, delay time.Duration, ms ...RequestModifier) error {
jobsTicker := time.NewTicker(delay)

// This goroutine call the Websocket
Expand All @@ -125,6 +126,7 @@ func (c *client) V2QueuePolling(ctx context.Context, regionName string, goRoutin
if jobs == nil {
continue
}
telemetry.Record(ctx, hatcheryMetrics.JobReceivedInQueuePollingWSv2, 1)
j, err := c.V2QueueGetJobRun(ctx, wsEvent.Event.Region, wsEvent.Event.JobRunID)
// Do not log the error if the job does not exist
if sdk.ErrorIs(err, sdk.ErrNotFound) {
Expand All @@ -140,14 +142,12 @@ func (c *client) V2QueuePolling(ctx context.Context, regionName string, goRoutin
log.Debug(ctx, "skipping job %s", wsEvent.Event.JobRunID)
continue
}
pendingWorkerCreation.SetJobInPendingWorkerCreation(wsEvent.Event.JobRunID)
lenqueue := pendingWorkerCreation.SetJobInPendingWorkerCreation(wsEvent.Event.JobRunID)
log.Debug(ctx, "v2_len_queue: %v", lenqueue)
telemetry.Record(ctx, hatcheryMetrics.ChanV2JobAdd, 1)
jobs <- *j
}
case <-jobsTicker.C:
if c.config.Verbose {
fmt.Println("jobsTicker")
}

if jobs == nil {
continue
}
Expand All @@ -163,25 +163,26 @@ func (c *client) V2QueuePolling(ctx context.Context, regionName string, goRoutin
continue
}
cancel()
if c.config.Verbose {
fmt.Println("Jobs Queue size: ", len(queue))
}

queueFiltered := []sdk.V2WorkflowRunJob{}
var lenqueue int
for _, job := range queue {
if pendingWorkerCreation.IsJobAlreadyPendingWorkerCreation(job.ID) {
log.Debug(ctx, "skipping job %s", job.ID)
continue
}
pendingWorkerCreation.SetJobInPendingWorkerCreation(job.ID)
lenqueue = pendingWorkerCreation.SetJobInPendingWorkerCreation(job.ID)
queueFiltered = append(queueFiltered, job)
}

log.Debug(ctx, "v2_job_queue_from_api: %v job_queue_filtered: %v len_queue: %v", len(queue), len(queueFiltered), lenqueue)

max := cap(jobs) * 2
if len(queueFiltered) < max {
max = len(queueFiltered)
}
for i := 0; i < max; i++ {
telemetry.Record(ctx, hatcheryMetrics.ChanV2JobAdd, 1)
jobs <- queueFiltered[i]
}
}
Expand Down
4 changes: 2 additions & 2 deletions sdk/cdsclient/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ type ProjectVariablesClient interface {

type V2QueueClient interface {
V2QueueGetJobRun(ctx context.Context, regionName string, id string) (*sdk.V2WorkflowRunJob, error)
V2QueuePolling(ctx context.Context, region string, goRoutines *sdk.GoRoutines, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.V2WorkflowRunJob, errs chan<- error, delay time.Duration, ms ...RequestModifier) error
V2QueuePolling(ctx context.Context, region string, goRoutines *sdk.GoRoutines, hatcheryMetrics *sdk.HatcheryMetrics, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.V2WorkflowRunJob, errs chan<- error, delay time.Duration, ms ...RequestModifier) error
V2QueueJobResult(ctx context.Context, region string, jobRunID string, result sdk.V2WorkflowRunJobResult) error
V2QueueJobRunResultGet(ctx context.Context, regionName string, jobRunID string, runResultID string) (*sdk.V2WorkflowRunResult, error)
V2QueueJobRunResultsGet(ctx context.Context, regionName string, jobRunID string) ([]sdk.V2WorkflowRunResult, error)
Expand All @@ -311,7 +311,7 @@ type V2QueueClient interface {
type QueueClient interface {
QueueWorkflowNodeJobRun(mods ...RequestModifier) ([]sdk.WorkflowNodeJobRun, error)
QueueCountWorkflowNodeJobRun(since *time.Time, until *time.Time, modelType string) (sdk.WorkflowNodeJobRunCount, error)
QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...RequestModifier) error
QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, hatcheryMetrics *sdk.HatcheryMetrics, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...RequestModifier) error
QueueTakeJob(ctx context.Context, job sdk.WorkflowNodeJobRun) (*sdk.WorkflowNodeJobRunData, error)
QueueJobBook(ctx context.Context, id string) (sdk.WorkflowNodeJobRunBooked, error)
QueueJobRelease(ctx context.Context, id string) error
Expand Down
Loading

0 comments on commit 78d7b75

Please sign in to comment.