Skip to content

Commit

Permalink
fix(hatchery): do not add same job into workersStarter (#6722)
Browse files Browse the repository at this point in the history
  • Loading branch information
yesnault authored Dec 7, 2023
1 parent d66aad6 commit a65f819
Show file tree
Hide file tree
Showing 11 changed files with 231 additions and 70 deletions.
9 changes: 9 additions & 0 deletions engine/hatchery/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Common struct {
Clientv2 cdsclient.HatcheryServiceClient
mapServiceNextLineNumberMutex sync.Mutex
mapServiceNextLineNumber map[string]int64
mapPendingWorkerCreation *sdk.HatcheryPendingWorkerCreation
}

func (c *Common) MaxHeartbeat() int {
Expand Down Expand Up @@ -65,6 +66,14 @@ func (c *Common) GetGoRoutines() *sdk.GoRoutines {
return c.GoRoutines
}

func (c *Common) GetMapPendingWorkerCreation() *sdk.HatcheryPendingWorkerCreation {
if c.mapPendingWorkerCreation == nil {
c.mapPendingWorkerCreation = &sdk.HatcheryPendingWorkerCreation{}
c.mapPendingWorkerCreation.Init()
}
return c.mapPendingWorkerCreation
}

// CommonServe start the HatcheryLocal server
func (c *Common) CommonServe(ctx context.Context, h hatchery.Interface) error {
log.Info(ctx, "%s> Starting service %s (%s)...", c.Name(), h.Configuration().Name, sdk.VERSION)
Expand Down
26 changes: 23 additions & 3 deletions sdk/cdsclient/client_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/ovh/cds/sdk"
"github.com/rockbears/log"
)

// shrinkQueue is used to shrink the polled queue 200% of the channel capacity (l)
Expand Down Expand Up @@ -42,7 +43,7 @@ func shrinkQueue(queue *sdk.WorkflowQueue, nbJobsToKeep int) time.Time {
return t0
}

func (c *client) QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...RequestModifier) error {
func (c *client) QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...RequestModifier) error {
jobsTicker := time.NewTicker(delay)

// This goroutine call the SSE route
Expand Down Expand Up @@ -86,6 +87,14 @@ func (c *client) QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, j
// push the job in the channel
if job.Status == sdk.StatusWaiting && job.BookedBy.Name == "" {
job.Header["WS"] = "true"

id := strconv.FormatInt(job.ID, 10)
if pendingWorkerCreation.IsJobAlreadyPendingWorkerCreation(id) {
log.Debug(ctx, "skipping job %s", id)
continue
}
pendingWorkerCreation.SetJobInPendingWorkerCreation(id)

jobs <- *job
}
}
Expand Down Expand Up @@ -115,8 +124,19 @@ func (c *client) QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, j
fmt.Println("Jobs Queue size: ", len(queue))
}

shrinkQueue(&queue, cap(jobs))
for _, j := range queue {
queueFiltered := sdk.WorkflowQueue{}
for _, job := range queue {
id := strconv.FormatInt(job.ID, 10)
if pendingWorkerCreation.IsJobAlreadyPendingWorkerCreation(id) {
log.Debug(ctx, "skipping job %s", id)
continue
}
pendingWorkerCreation.SetJobInPendingWorkerCreation(id)
queueFiltered = append(queueFiltered, job)
}

shrinkQueue(&queueFiltered, cap(jobs))
for _, j := range queueFiltered {
jobs <- j
}
}
Expand Down
25 changes: 20 additions & 5 deletions sdk/cdsclient/client_queue_V2.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/ovh/cds/sdk"
"github.com/rockbears/log"
)

func (c *client) V2QueueJobStepUpdate(ctx context.Context, regionName string, jobRunID string, stepsStatus sdk.JobStepsStatus) error {
Expand Down Expand Up @@ -103,7 +104,7 @@ func (c *client) V2QueueGetJobRun(ctx context.Context, regionName, id string) (*
return &job, nil
}

func (c *client) V2QueuePolling(ctx context.Context, regionName string, goRoutines *sdk.GoRoutines, jobs chan<- sdk.V2WorkflowRunJob, errs chan<- error, delay time.Duration, ms ...RequestModifier) error {
func (c *client) V2QueuePolling(ctx context.Context, regionName string, goRoutines *sdk.GoRoutines, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.V2WorkflowRunJob, errs chan<- error, delay time.Duration, ms ...RequestModifier) error {
jobsTicker := time.NewTicker(delay)

// This goroutine call the Websocket
Expand Down Expand Up @@ -135,6 +136,11 @@ func (c *client) V2QueuePolling(ctx context.Context, regionName string, goRoutin
}
// push the job in the channel
if j.Status == sdk.StatusWaiting {
if pendingWorkerCreation.IsJobAlreadyPendingWorkerCreation(wsEvent.Event.JobRunID) {
log.Debug(ctx, "skipping job %s", wsEvent.Event.JobRunID)
continue
}
pendingWorkerCreation.SetJobInPendingWorkerCreation(wsEvent.Event.JobRunID)
jobs <- *j
}
case <-jobsTicker.C:
Expand All @@ -161,14 +167,23 @@ func (c *client) V2QueuePolling(ctx context.Context, regionName string, goRoutin
fmt.Println("Jobs Queue size: ", len(queue))
}

queueFiltered := []sdk.V2WorkflowRunJob{}
for _, job := range queue {
if pendingWorkerCreation.IsJobAlreadyPendingWorkerCreation(job.ID) {
log.Debug(ctx, "skipping job %s", job.ID)
continue
}
pendingWorkerCreation.SetJobInPendingWorkerCreation(job.ID)
queueFiltered = append(queueFiltered, job)
}

max := cap(jobs) * 2
if len(queue) < max {
max = len(queue)
if len(queueFiltered) < max {
max = len(queueFiltered)
}
for i := 0; i < max; i++ {
jobs <- queue[i]
jobs <- queueFiltered[i]
}

}
}
}
Expand Down
4 changes: 2 additions & 2 deletions sdk/cdsclient/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ type ProjectVariablesClient interface {

type V2QueueClient interface {
V2QueueGetJobRun(ctx context.Context, regionName string, id string) (*sdk.V2WorkflowRunJob, error)
V2QueuePolling(ctx context.Context, region string, goRoutines *sdk.GoRoutines, jobs chan<- sdk.V2WorkflowRunJob, errs chan<- error, delay time.Duration, ms ...RequestModifier) error
V2QueuePolling(ctx context.Context, region string, goRoutines *sdk.GoRoutines, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.V2WorkflowRunJob, errs chan<- error, delay time.Duration, ms ...RequestModifier) error
V2QueueJobResult(ctx context.Context, region string, jobRunID string, result sdk.V2WorkflowRunJobResult) error
V2QueueJobRunResultGet(ctx context.Context, regionName string, jobRunID string, runResultID string) (*sdk.V2WorkflowRunResult, error)
V2QueueJobRunResultsGet(ctx context.Context, regionName string, jobRunID string) ([]sdk.V2WorkflowRunResult, error)
Expand All @@ -311,7 +311,7 @@ type V2QueueClient interface {
type QueueClient interface {
QueueWorkflowNodeJobRun(mods ...RequestModifier) ([]sdk.WorkflowNodeJobRun, error)
QueueCountWorkflowNodeJobRun(since *time.Time, until *time.Time, modelType string) (sdk.WorkflowNodeJobRunCount, error)
QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...RequestModifier) error
QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...RequestModifier) error
QueueTakeJob(ctx context.Context, job sdk.WorkflowNodeJobRun) (*sdk.WorkflowNodeJobRunData, error)
QueueJobBook(ctx context.Context, id string) (sdk.WorkflowNodeJobRunBooked, error)
QueueJobRelease(ctx context.Context, id string) error
Expand Down
56 changes: 28 additions & 28 deletions sdk/cdsclient/mock_cdsclient/interface_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 30 additions & 0 deletions sdk/hatchery.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"strconv"
"sync"
"time"
)

Expand Down Expand Up @@ -46,6 +47,35 @@ type Hatchery struct {
Token string `json:"token,omitempty" db:"-" cli:"token,omitempty"`
}

type HatcheryPendingWorkerCreation struct {
mapSpawnJobRequest map[string]struct{}
mapSpawnJobRequestMutex *sync.Mutex
}

func (c *HatcheryPendingWorkerCreation) Init() {
c.mapSpawnJobRequest = make(map[string]struct{})
c.mapSpawnJobRequestMutex = new(sync.Mutex)
}

func (c *HatcheryPendingWorkerCreation) SetJobInPendingWorkerCreation(id string) {
c.mapSpawnJobRequestMutex.Lock()
c.mapSpawnJobRequest[id] = struct{}{}
c.mapSpawnJobRequestMutex.Unlock()
}

func (c *HatcheryPendingWorkerCreation) RemoveJobFromPendingWorkerCreation(id string) {
c.mapSpawnJobRequestMutex.Lock()
delete(c.mapSpawnJobRequest, id)
c.mapSpawnJobRequestMutex.Unlock()
}

func (c *HatcheryPendingWorkerCreation) IsJobAlreadyPendingWorkerCreation(id string) bool {
c.mapSpawnJobRequestMutex.Lock()
_, has := c.mapSpawnJobRequest[id]
c.mapSpawnJobRequestMutex.Unlock()
return has
}

type HatcheryConfig map[string]interface{}

func (hc HatcheryConfig) Value() (driver.Value, error) {
Expand Down
Loading

0 comments on commit a65f819

Please sign in to comment.