Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(hatchery): new metrics #6725

Merged
merged 6 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 10 additions & 13 deletions sdk/cdsclient/client_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/telemetry"
"github.com/rockbears/log"
)

Expand Down Expand Up @@ -43,7 +44,7 @@ func shrinkQueue(queue *sdk.WorkflowQueue, nbJobsToKeep int) time.Time {
return t0
}

func (c *client) QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...RequestModifier) error {
func (c *client) QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, hatcheryMetrics *sdk.HatcheryMetrics, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...RequestModifier) error {
jobsTicker := time.NewTicker(delay)

// This goroutine call the SSE route
Expand All @@ -69,6 +70,7 @@ func (c *client) QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, p
continue
}
if wsEvent.Event.EventType == "sdk.EventRunWorkflowJob" && wsEvent.Event.Status == sdk.StatusWaiting {
telemetry.Record(ctx, hatcheryMetrics.JobReceivedInQueuePollingWSv1, 1)
var jobEvent sdk.EventRunWorkflowJob
if err := sdk.JSONUnmarshal(wsEvent.Event.Payload, &jobEvent); err != nil {
errs <- newError(fmt.Errorf("unable to unmarshal job %v: %v", wsEvent.Event.Payload, err))
Expand All @@ -93,16 +95,13 @@ func (c *client) QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, p
log.Debug(ctx, "skipping job %s", id)
continue
}
pendingWorkerCreation.SetJobInPendingWorkerCreation(id)

lenqueue := pendingWorkerCreation.SetJobInPendingWorkerCreation(id)
log.Debug(ctx, "v1_len_queue: %v", lenqueue)
telemetry.Record(ctx, hatcheryMetrics.ChanV1JobAdd, 1)
jobs <- *job
}
}
case <-jobsTicker.C:
if c.config.Verbose {
fmt.Println("jobsTicker")
}

if jobs == nil {
continue
}
Expand All @@ -117,26 +116,24 @@ func (c *client) QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, p
cancel()
continue
}

cancel()

if c.config.Verbose {
fmt.Println("Jobs Queue size: ", len(queue))
}

queueFiltered := sdk.WorkflowQueue{}
var lenqueue int
for _, job := range queue {
id := strconv.FormatInt(job.ID, 10)
if pendingWorkerCreation.IsJobAlreadyPendingWorkerCreation(id) {
log.Debug(ctx, "skipping job %s", id)
continue
}
pendingWorkerCreation.SetJobInPendingWorkerCreation(id)
lenqueue = pendingWorkerCreation.SetJobInPendingWorkerCreation(id)
queueFiltered = append(queueFiltered, job)
}
log.Debug(ctx, "v1_job_queue_from_api: %v job_queue_filtered: %v len_queue: %v", len(queue), len(queueFiltered), lenqueue)

shrinkQueue(&queueFiltered, cap(jobs))
for _, j := range queueFiltered {
telemetry.Record(ctx, hatcheryMetrics.ChanV1JobAdd, 1)
jobs <- j
}
}
Expand Down
21 changes: 11 additions & 10 deletions sdk/cdsclient/client_queue_V2.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/telemetry"
"github.com/rockbears/log"
)

Expand Down Expand Up @@ -104,7 +105,7 @@ func (c *client) V2QueueGetJobRun(ctx context.Context, regionName, id string) (*
return &job, nil
}

func (c *client) V2QueuePolling(ctx context.Context, regionName string, goRoutines *sdk.GoRoutines, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.V2WorkflowRunJob, errs chan<- error, delay time.Duration, ms ...RequestModifier) error {
func (c *client) V2QueuePolling(ctx context.Context, regionName string, goRoutines *sdk.GoRoutines, hatcheryMetrics *sdk.HatcheryMetrics, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.V2WorkflowRunJob, errs chan<- error, delay time.Duration, ms ...RequestModifier) error {
jobsTicker := time.NewTicker(delay)

// This goroutine call the Websocket
Expand All @@ -125,6 +126,7 @@ func (c *client) V2QueuePolling(ctx context.Context, regionName string, goRoutin
if jobs == nil {
continue
}
telemetry.Record(ctx, hatcheryMetrics.JobReceivedInQueuePollingWSv2, 1)
j, err := c.V2QueueGetJobRun(ctx, wsEvent.Event.Region, wsEvent.Event.JobRunID)
// Do not log the error if the job does not exist
if sdk.ErrorIs(err, sdk.ErrNotFound) {
Expand All @@ -140,14 +142,12 @@ func (c *client) V2QueuePolling(ctx context.Context, regionName string, goRoutin
log.Debug(ctx, "skipping job %s", wsEvent.Event.JobRunID)
continue
}
pendingWorkerCreation.SetJobInPendingWorkerCreation(wsEvent.Event.JobRunID)
lenqueue := pendingWorkerCreation.SetJobInPendingWorkerCreation(wsEvent.Event.JobRunID)
log.Debug(ctx, "v2_len_queue: %v", lenqueue)
telemetry.Record(ctx, hatcheryMetrics.ChanV2JobAdd, 1)
jobs <- *j
}
case <-jobsTicker.C:
if c.config.Verbose {
fmt.Println("jobsTicker")
}

if jobs == nil {
continue
}
Expand All @@ -163,25 +163,26 @@ func (c *client) V2QueuePolling(ctx context.Context, regionName string, goRoutin
continue
}
cancel()
if c.config.Verbose {
fmt.Println("Jobs Queue size: ", len(queue))
}

queueFiltered := []sdk.V2WorkflowRunJob{}
var lenqueue int
for _, job := range queue {
if pendingWorkerCreation.IsJobAlreadyPendingWorkerCreation(job.ID) {
log.Debug(ctx, "skipping job %s", job.ID)
continue
}
pendingWorkerCreation.SetJobInPendingWorkerCreation(job.ID)
lenqueue = pendingWorkerCreation.SetJobInPendingWorkerCreation(job.ID)
queueFiltered = append(queueFiltered, job)
}

log.Debug(ctx, "v2_job_queue_from_api: %v job_queue_filtered: %v len_queue: %v", len(queue), len(queueFiltered), lenqueue)

max := cap(jobs) * 2
if len(queueFiltered) < max {
max = len(queueFiltered)
}
for i := 0; i < max; i++ {
telemetry.Record(ctx, hatcheryMetrics.ChanV2JobAdd, 1)
jobs <- queueFiltered[i]
}
}
Expand Down
4 changes: 2 additions & 2 deletions sdk/cdsclient/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ type ProjectVariablesClient interface {

type V2QueueClient interface {
V2QueueGetJobRun(ctx context.Context, regionName string, id string) (*sdk.V2WorkflowRunJob, error)
V2QueuePolling(ctx context.Context, region string, goRoutines *sdk.GoRoutines, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.V2WorkflowRunJob, errs chan<- error, delay time.Duration, ms ...RequestModifier) error
V2QueuePolling(ctx context.Context, region string, goRoutines *sdk.GoRoutines, hatcheryMetrics *sdk.HatcheryMetrics, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.V2WorkflowRunJob, errs chan<- error, delay time.Duration, ms ...RequestModifier) error
V2QueueJobResult(ctx context.Context, region string, jobRunID string, result sdk.V2WorkflowRunJobResult) error
V2QueueJobRunResultGet(ctx context.Context, regionName string, jobRunID string, runResultID string) (*sdk.V2WorkflowRunResult, error)
V2QueueJobRunResultsGet(ctx context.Context, regionName string, jobRunID string) ([]sdk.V2WorkflowRunResult, error)
Expand All @@ -311,7 +311,7 @@ type V2QueueClient interface {
type QueueClient interface {
QueueWorkflowNodeJobRun(mods ...RequestModifier) ([]sdk.WorkflowNodeJobRun, error)
QueueCountWorkflowNodeJobRun(since *time.Time, until *time.Time, modelType string) (sdk.WorkflowNodeJobRunCount, error)
QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...RequestModifier) error
QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, hatcheryMetrics *sdk.HatcheryMetrics, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...RequestModifier) error
QueueTakeJob(ctx context.Context, job sdk.WorkflowNodeJobRun) (*sdk.WorkflowNodeJobRunData, error)
QueueJobBook(ctx context.Context, id string) (sdk.WorkflowNodeJobRunBooked, error)
QueueJobRelease(ctx context.Context, id string) error
Expand Down
Loading