Skip to content

Commit

Permalink
refactor: delete ratio service option (#6099)
Browse files Browse the repository at this point in the history
  • Loading branch information
yesnault authored Mar 1, 2022
1 parent bed92ee commit b4967bd
Show file tree
Hide file tree
Showing 11 changed files with 51 additions and 273 deletions.
1 change: 0 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ services:
/app/cds-engine-linux-amd64 config edit /app/conf/conf.toml --output /app/conf/conf.toml hatchery.swarm.commonConfiguration.provision.workerApiHttp.url=http://$HOSTNAME:8081;
/app/cds-engine-linux-amd64 config edit /app/conf/conf.toml --output /app/conf/conf.toml hatchery.swarm.dockerEngines.sample-docker-engine.host=tcp://dockerhost:2375;
/app/cds-engine-linux-amd64 config edit /app/conf/conf.toml --output /app/conf/conf.toml hatchery.swarm.dockerEngines.sample-docker-engine.maxContainers=4;
/app/cds-engine-linux-amd64 config edit /app/conf/conf.toml --output /app/conf/conf.toml hatchery.swarm.ratioService=50;
/app/cds-engine-linux-amd64 config edit /app/conf/conf.toml --output /app/conf/conf.toml hooks.api.http.url=http://cds-api:8081;
/app/cds-engine-linux-amd64 config edit /app/conf/conf.toml --output /app/conf/conf.toml hooks.cache.redis.host=cds-cache:6379;
/app/cds-engine-linux-amd64 config edit /app/conf/conf.toml --output /app/conf/conf.toml hooks.cache.redis.password=cds;
Expand Down
88 changes: 32 additions & 56 deletions engine/api/workflow/dao_node_run_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@ import (

// QueueFilter contains all criteria used to fetch queue
type QueueFilter struct {
ModelType []string
RatioService *int
Rights int
Since *time.Time
Until *time.Time
Limit *int
Statuses []string
Regions []string
ModelType []string
Rights int
Since *time.Time
Until *time.Time
Limit *int
Statuses []string
Regions []string
}

func NewQueueFilter() QueueFilter {
Expand Down Expand Up @@ -79,38 +78,27 @@ func CountNodeJobRunQueueByGroupIDs(ctx context.Context, db gorp.SqlExecutor, st
func LoadNodeJobRunQueue(ctx context.Context, db gorp.SqlExecutor, store cache.Store, filter QueueFilter) ([]sdk.WorkflowNodeJobRun, error) {
ctx, end := telemetry.Span(ctx, "workflow.LoadNodeJobRunQueue")
defer end()
containsService := []bool{true, false}
if filter.RatioService != nil {
if *filter.RatioService == 100 {
containsService = []bool{true, true}
} else if *filter.RatioService == 0 {
containsService = []bool{false, false}
}
}

query := gorpmapping.NewQuery(`select distinct workflow_node_run_job.*
from workflow_node_run_job
where workflow_node_run_job.queued >= $1
and workflow_node_run_job.queued <= $2
and workflow_node_run_job.status = ANY($3)
AND contains_service IN ($4, $5)
AND (model_type is NULL OR model_type = '' OR model_type = ANY($6))
AND (model_type is NULL OR model_type = '' OR model_type = ANY($4))
AND (
workflow_node_run_job.region = ANY($7)
workflow_node_run_job.region = ANY($5)
OR
(workflow_node_run_job.region is NULL AND '' = ANY($7))
(workflow_node_run_job.region is NULL AND '' = ANY($5))
OR
array_length($7, 1) is NULL
array_length($5, 1) is NULL
)
ORDER BY workflow_node_run_job.queued ASC
`).Args(
*filter.Since, // $1
*filter.Until, // $2
pq.StringArray(filter.Statuses), // $3
containsService[0], // $4
containsService[1], // $5
pq.StringArray(filter.ModelType), // $6
pq.StringArray(filter.Regions), // $7
pq.StringArray(filter.ModelType), // $4
pq.StringArray(filter.Regions), // $5
)

return loadNodeJobRunQueue(ctx, db, store, query, filter.Limit)
Expand All @@ -120,45 +108,36 @@ func LoadNodeJobRunQueue(ctx context.Context, db gorp.SqlExecutor, store cache.S
func LoadNodeJobRunQueueByGroupIDs(ctx context.Context, db gorp.SqlExecutor, store cache.Store, filter QueueFilter, groupIDs []int64) ([]sdk.WorkflowNodeJobRun, error) {
ctx, end := telemetry.Span(ctx, "workflow.LoadNodeJobRunQueueByGroups")
defer end()
containsService := []bool{true, false}
if filter.RatioService != nil {
if *filter.RatioService == 100 {
containsService = []bool{true, true}
} else if *filter.RatioService == 0 {
containsService = []bool{false, false}
}
}

query := gorpmapping.NewQuery(`
-- Parameters:
-- $1: Queue since
-- $2: Queue until
-- $3: List of status
-- $4, $5: Should (or should not) contains service, or we don't care
-- $6: List of model types
-- $7: Comman separated list of groups ID
-- $8: shared infra group ID
-- $9: minimum level of permission
-- $10: List of regions
-- $4: List of model types
-- $5: Comman separated list of groups ID
-- $6: shared infra group ID
-- $7: minimum level of permission
-- $8: List of regions
WITH workflow_id_with_permissions AS (
SELECT workflow_perm.workflow_id,
CASE WHEN $8 = ANY(string_to_array($7, ',')::int[]) THEN 7
CASE WHEN $6 = ANY(string_to_array($5, ',')::int[]) THEN 7
ELSE max(workflow_perm.role)
END as "role"
FROM workflow_perm
JOIN project_group ON project_group.id = workflow_perm.project_group_id
WHERE
project_group.group_id = ANY(string_to_array($7, ',')::int[])
project_group.group_id = ANY(string_to_array($5, ',')::int[])
OR
$8 = ANY(string_to_array($7, ',')::int[])
$6 = ANY(string_to_array($5, ',')::int[])
GROUP BY workflow_perm.workflow_id
), workflow_node_run_job_exec_groups AS (
SELECT id, jsonb_array_elements_text(exec_groups)::jsonb->'id' AS exec_group_id
FROM workflow_node_run_job
), workflow_node_run_job_matching_exec_groups AS (
SELECT id
FROM workflow_node_run_job_exec_groups
WHERE exec_group_id::text = ANY(string_to_array($7, ','))
WHERE exec_group_id::text = ANY(string_to_array($5, ','))
)
SELECT DISTINCT workflow_node_run_job.*
FROM workflow_node_run_job
Expand All @@ -168,7 +147,7 @@ func LoadNodeJobRunQueueByGroupIDs(ctx context.Context, db gorp.SqlExecutor, sto
WHERE workflow.id IN (
SELECT workflow_id
FROM workflow_id_with_permissions
WHERE role >= $9
WHERE role >= $7
)
AND workflow_node_run_job.id IN (
SELECT id
Expand All @@ -177,31 +156,28 @@ func LoadNodeJobRunQueueByGroupIDs(ctx context.Context, db gorp.SqlExecutor, sto
AND workflow_node_run_job.queued >= $1
AND workflow_node_run_job.queued <= $2
AND workflow_node_run_job.status = ANY($3)
AND workflow_node_run_job.contains_service IN ($4, $5)
AND (
workflow_node_run_job.model_type is NULL
OR
model_type = '' OR model_type = ANY($6)
model_type = '' OR model_type = ANY($4)
)
AND (
workflow_node_run_job.region = ANY($10)
workflow_node_run_job.region = ANY($8)
OR
(workflow_node_run_job.region is NULL AND '' = ANY($10))
(workflow_node_run_job.region is NULL AND '' = ANY($8))
OR
array_length($10, 1) is NULL
array_length($8, 1) is NULL
)
ORDER BY workflow_node_run_job.queued ASC
`).Args(
*filter.Since, // $1
*filter.Until, // $2
pq.StringArray(filter.Statuses), // $3
containsService[0], // $4
containsService[1], // $5
pq.StringArray(filter.ModelType), // $6
gorpmapping.IDsToQueryString(groupIDs), // $7
group.SharedInfraGroup.ID, // $8
filter.Rights, // $9
pq.StringArray(filter.Regions), // $10
pq.StringArray(filter.ModelType), // $4
gorpmapping.IDsToQueryString(groupIDs), // $5
group.SharedInfraGroup.ID, // $6
filter.Rights, // $7
pq.StringArray(filter.Regions), // $8
)
return loadNodeJobRunQueue(ctx, db, store, query, filter.Limit)
}
Expand Down
30 changes: 0 additions & 30 deletions engine/api/workflow/run_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -732,36 +732,6 @@ queueRun:
}
}

// there is one job with a CDS Service prerequisiste
// Getting queue with RatioService=100 -> we want this job only.
// If we get a job without a service, it's a failure
cent := 100
filter = workflow.NewQueueFilter()
filter.Rights = sdk.PermissionReadExecute
filter.RatioService = &cent
jobsSince, err = workflow.LoadNodeJobRunQueueByGroupIDs(ctx, db, cache, filter, sdk.Groups(append(u.Groups, proj.ProjectGroups[0].Group, g0, g1)).ToIDs())
require.NoError(t, err)
for _, job := range jobsSince {
if !job.ContainsService {
assert.Fail(t, " this job should not be in queue !job.ContainsService: job")
}
}

// there is one job with a CDS Service prerequisiste
// Getting queue with RatioService=0 -> we want job only without CDS Service.
// If we get a job with a service, it's a failure
zero := 0
filter = workflow.NewQueueFilter()
filter.Rights = sdk.PermissionReadExecute
filter.RatioService = &zero
jobsSince, err = workflow.LoadNodeJobRunQueueByGroupIDs(ctx, db, cache, filter, sdk.Groups(append(u.Groups, proj.ProjectGroups[0].Group, g0, g1)).ToIDs())
require.NoError(t, err)
for _, job := range jobsSince {
if job.ContainsService {
assert.Fail(t, " this job should not be in queue job.ContainsService")
}
}

// there is one job with a CDS Model prerequisiste
// we get the queue with a modelType openstack : we don't want
// job with worker model type docker in result
Expand Down
21 changes: 5 additions & 16 deletions engine/api/workflow_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -811,14 +811,13 @@ func (api *API) countWorkflowJobQueueHandler() service.Handler {
}

since, until, _ := getSinceUntilLimitHeader(ctx, w, r)
modelType, ratioService, err := getModelTypeRatioService(ctx, r)
modelType, err := getModelType(ctx, r)
if err != nil {
return err
}

filter := workflow.NewQueueFilter()
filter.ModelType = []string{modelType}
filter.RatioService = ratioService
filter.Since = &since
filter.Until = &until

Expand Down Expand Up @@ -849,7 +848,7 @@ func (api *API) getWorkflowJobQueueHandler() service.Handler {
status = []string{sdk.StatusWaiting}
}

modelType, ratioService, err := getModelTypeRatioService(ctx, r)
modelType, err := getModelType(ctx, r)
if err != nil {
return err
}
Expand Down Expand Up @@ -880,7 +879,6 @@ func (api *API) getWorkflowJobQueueHandler() service.Handler {
permissions = sdk.PermissionReadExecute
}
filter := workflow.NewQueueFilter()
filter.RatioService = ratioService
filter.Since = &since
filter.Until = &until
filter.Rights = permissions
Expand All @@ -905,23 +903,14 @@ func (api *API) getWorkflowJobQueueHandler() service.Handler {
}
}

func getModelTypeRatioService(ctx context.Context, r *http.Request) (string, *int, error) {
func getModelType(ctx context.Context, r *http.Request) (string, error) {
modelType := FormString(r, "modelType")
if modelType != "" {
if !sdk.WorkerModelValidate(modelType) {
return "", nil, sdk.NewErrorFrom(sdk.ErrWrongRequest, "invalid given modelType")
return "", sdk.NewErrorFrom(sdk.ErrWrongRequest, "invalid given modelType")
}
}
ratioService := FormString(r, "ratioService")
var ratio *int
if ratioService != "" {
i, err := strconv.Atoi(ratioService)
if err != nil {
return "", nil, sdk.NewErrorFrom(sdk.ErrInvalidNumber, " %s is not a integer", ratioService)
}
ratio = &i
}
return modelType, ratio, nil
return modelType, nil
}

// getSinceUntilLimitHeader returns since, until, limit
Expand Down
19 changes: 0 additions & 19 deletions engine/hatchery/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,9 +465,6 @@ func (h *HatcherySwarm) CanSpawn(ctx context.Context, model *sdk.Model, jobID in

nbContainersFromHatchery := len(cs)

// List all workers containers
ws := cs.FilterWorkers()

// Checking the number of container on each docker engine
if nbContainersFromHatchery >= dockerClient.MaxContainers {
log.Debug(ctx, "hatchery> swarm> CanSpawn> max containers reached on %s. current:%d max:%d", dockerName, nbContainersFromHatchery, dockerClient.MaxContainers)
Expand All @@ -482,22 +479,6 @@ func (h *HatcherySwarm) CanSpawn(ctx context.Context, model *sdk.Model, jobID in
}
}

// hatcherySwarm.ratioService: Percent reserved for spawning worker with service requirement
// if no link -> we need to check ratioService
if len(links) == 0 {
ratioService := h.Config.Provision.RatioService
if ratioService != nil && *ratioService >= 100 {
log.Debug(ctx, "hatchery> swarm> CanSpawn> ratioService 100 by conf on %s - no spawn worker without CDS Service", dockerName)
return false
}
if nbContainersFromHatchery > 0 {
percentFree := 100 - (100 * len(ws) / dockerClient.MaxContainers)
if ratioService != nil && percentFree <= *ratioService {
log.Debug(ctx, "hatchery> swarm> CanSpawn> ratio reached on %s. percentFree:%d ratioService:%d", dockerName, percentFree, *ratioService)
return false
}
}
}
return true
}
return false
Expand Down
Loading

0 comments on commit b4967bd

Please sign in to comment.