Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: delete ratio service option #6099

Merged
merged 2 commits into from
Mar 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ services:
/app/cds-engine-linux-amd64 config edit /app/conf/conf.toml --output /app/conf/conf.toml hatchery.swarm.commonConfiguration.provision.workerApiHttp.url=http://$HOSTNAME:8081;
/app/cds-engine-linux-amd64 config edit /app/conf/conf.toml --output /app/conf/conf.toml hatchery.swarm.dockerEngines.sample-docker-engine.host=tcp://dockerhost:2375;
/app/cds-engine-linux-amd64 config edit /app/conf/conf.toml --output /app/conf/conf.toml hatchery.swarm.dockerEngines.sample-docker-engine.maxContainers=4;
/app/cds-engine-linux-amd64 config edit /app/conf/conf.toml --output /app/conf/conf.toml hatchery.swarm.ratioService=50;
/app/cds-engine-linux-amd64 config edit /app/conf/conf.toml --output /app/conf/conf.toml hooks.api.http.url=http://cds-api:8081;
/app/cds-engine-linux-amd64 config edit /app/conf/conf.toml --output /app/conf/conf.toml hooks.cache.redis.host=cds-cache:6379;
/app/cds-engine-linux-amd64 config edit /app/conf/conf.toml --output /app/conf/conf.toml hooks.cache.redis.password=cds;
Expand Down
88 changes: 32 additions & 56 deletions engine/api/workflow/dao_node_run_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@ import (

// QueueFilter contains all criteria used to fetch queue
type QueueFilter struct {
ModelType []string
RatioService *int
Rights int
Since *time.Time
Until *time.Time
Limit *int
Statuses []string
Regions []string
ModelType []string
Rights int
Since *time.Time
Until *time.Time
Limit *int
Statuses []string
Regions []string
}

func NewQueueFilter() QueueFilter {
Expand Down Expand Up @@ -79,38 +78,27 @@ func CountNodeJobRunQueueByGroupIDs(ctx context.Context, db gorp.SqlExecutor, st
func LoadNodeJobRunQueue(ctx context.Context, db gorp.SqlExecutor, store cache.Store, filter QueueFilter) ([]sdk.WorkflowNodeJobRun, error) {
ctx, end := telemetry.Span(ctx, "workflow.LoadNodeJobRunQueue")
defer end()
containsService := []bool{true, false}
if filter.RatioService != nil {
if *filter.RatioService == 100 {
containsService = []bool{true, true}
} else if *filter.RatioService == 0 {
containsService = []bool{false, false}
}
}

query := gorpmapping.NewQuery(`select distinct workflow_node_run_job.*
from workflow_node_run_job
where workflow_node_run_job.queued >= $1
and workflow_node_run_job.queued <= $2
and workflow_node_run_job.status = ANY($3)
AND contains_service IN ($4, $5)
AND (model_type is NULL OR model_type = '' OR model_type = ANY($6))
AND (model_type is NULL OR model_type = '' OR model_type = ANY($4))
AND (
workflow_node_run_job.region = ANY($7)
workflow_node_run_job.region = ANY($5)
OR
(workflow_node_run_job.region is NULL AND '' = ANY($7))
(workflow_node_run_job.region is NULL AND '' = ANY($5))
OR
array_length($7, 1) is NULL
array_length($5, 1) is NULL
)
ORDER BY workflow_node_run_job.queued ASC
`).Args(
*filter.Since, // $1
*filter.Until, // $2
pq.StringArray(filter.Statuses), // $3
containsService[0], // $4
containsService[1], // $5
pq.StringArray(filter.ModelType), // $6
pq.StringArray(filter.Regions), // $7
pq.StringArray(filter.ModelType), // $4
pq.StringArray(filter.Regions), // $5
)

return loadNodeJobRunQueue(ctx, db, store, query, filter.Limit)
Expand All @@ -120,45 +108,36 @@ func LoadNodeJobRunQueue(ctx context.Context, db gorp.SqlExecutor, store cache.S
func LoadNodeJobRunQueueByGroupIDs(ctx context.Context, db gorp.SqlExecutor, store cache.Store, filter QueueFilter, groupIDs []int64) ([]sdk.WorkflowNodeJobRun, error) {
ctx, end := telemetry.Span(ctx, "workflow.LoadNodeJobRunQueueByGroups")
defer end()
containsService := []bool{true, false}
if filter.RatioService != nil {
if *filter.RatioService == 100 {
containsService = []bool{true, true}
} else if *filter.RatioService == 0 {
containsService = []bool{false, false}
}
}

query := gorpmapping.NewQuery(`
-- Parameters:
-- $1: Queue since
-- $2: Queue until
-- $3: List of status
-- $4, $5: Should (or should not) contains service, or we don't care
-- $6: List of model types
-- $7: Comman separated list of groups ID
-- $8: shared infra group ID
-- $9: minimum level of permission
-- $10: List of regions
-- $4: List of model types
-- $5: Comman separated list of groups ID
-- $6: shared infra group ID
-- $7: minimum level of permission
-- $8: List of regions
WITH workflow_id_with_permissions AS (
SELECT workflow_perm.workflow_id,
CASE WHEN $8 = ANY(string_to_array($7, ',')::int[]) THEN 7
CASE WHEN $6 = ANY(string_to_array($5, ',')::int[]) THEN 7
ELSE max(workflow_perm.role)
END as "role"
FROM workflow_perm
JOIN project_group ON project_group.id = workflow_perm.project_group_id
WHERE
project_group.group_id = ANY(string_to_array($7, ',')::int[])
project_group.group_id = ANY(string_to_array($5, ',')::int[])
OR
$8 = ANY(string_to_array($7, ',')::int[])
$6 = ANY(string_to_array($5, ',')::int[])
GROUP BY workflow_perm.workflow_id
), workflow_node_run_job_exec_groups AS (
SELECT id, jsonb_array_elements_text(exec_groups)::jsonb->'id' AS exec_group_id
FROM workflow_node_run_job
), workflow_node_run_job_matching_exec_groups AS (
SELECT id
FROM workflow_node_run_job_exec_groups
WHERE exec_group_id::text = ANY(string_to_array($7, ','))
WHERE exec_group_id::text = ANY(string_to_array($5, ','))
)
SELECT DISTINCT workflow_node_run_job.*
FROM workflow_node_run_job
Expand All @@ -168,7 +147,7 @@ func LoadNodeJobRunQueueByGroupIDs(ctx context.Context, db gorp.SqlExecutor, sto
WHERE workflow.id IN (
SELECT workflow_id
FROM workflow_id_with_permissions
WHERE role >= $9
WHERE role >= $7
)
AND workflow_node_run_job.id IN (
SELECT id
Expand All @@ -177,31 +156,28 @@ func LoadNodeJobRunQueueByGroupIDs(ctx context.Context, db gorp.SqlExecutor, sto
AND workflow_node_run_job.queued >= $1
AND workflow_node_run_job.queued <= $2
AND workflow_node_run_job.status = ANY($3)
AND workflow_node_run_job.contains_service IN ($4, $5)
AND (
workflow_node_run_job.model_type is NULL
OR
model_type = '' OR model_type = ANY($6)
model_type = '' OR model_type = ANY($4)
)
AND (
workflow_node_run_job.region = ANY($10)
workflow_node_run_job.region = ANY($8)
OR
(workflow_node_run_job.region is NULL AND '' = ANY($10))
(workflow_node_run_job.region is NULL AND '' = ANY($8))
OR
array_length($10, 1) is NULL
array_length($8, 1) is NULL
)
ORDER BY workflow_node_run_job.queued ASC
`).Args(
*filter.Since, // $1
*filter.Until, // $2
pq.StringArray(filter.Statuses), // $3
containsService[0], // $4
containsService[1], // $5
pq.StringArray(filter.ModelType), // $6
gorpmapping.IDsToQueryString(groupIDs), // $7
group.SharedInfraGroup.ID, // $8
filter.Rights, // $9
pq.StringArray(filter.Regions), // $10
pq.StringArray(filter.ModelType), // $4
gorpmapping.IDsToQueryString(groupIDs), // $5
group.SharedInfraGroup.ID, // $6
filter.Rights, // $7
pq.StringArray(filter.Regions), // $8
)
return loadNodeJobRunQueue(ctx, db, store, query, filter.Limit)
}
Expand Down
30 changes: 0 additions & 30 deletions engine/api/workflow/run_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -732,36 +732,6 @@ queueRun:
}
}

// there is one job with a CDS Service prerequisiste
// Getting queue with RatioService=100 -> we want this job only.
// If we get a job without a service, it's a failure
cent := 100
filter = workflow.NewQueueFilter()
filter.Rights = sdk.PermissionReadExecute
filter.RatioService = &cent
jobsSince, err = workflow.LoadNodeJobRunQueueByGroupIDs(ctx, db, cache, filter, sdk.Groups(append(u.Groups, proj.ProjectGroups[0].Group, g0, g1)).ToIDs())
require.NoError(t, err)
for _, job := range jobsSince {
if !job.ContainsService {
assert.Fail(t, " this job should not be in queue !job.ContainsService: job")
}
}

// there is one job with a CDS Service prerequisiste
// Getting queue with RatioService=0 -> we want job only without CDS Service.
// If we get a job with a service, it's a failure
zero := 0
filter = workflow.NewQueueFilter()
filter.Rights = sdk.PermissionReadExecute
filter.RatioService = &zero
jobsSince, err = workflow.LoadNodeJobRunQueueByGroupIDs(ctx, db, cache, filter, sdk.Groups(append(u.Groups, proj.ProjectGroups[0].Group, g0, g1)).ToIDs())
require.NoError(t, err)
for _, job := range jobsSince {
if job.ContainsService {
assert.Fail(t, " this job should not be in queue job.ContainsService")
}
}

// there is one job with a CDS Model prerequisiste
// we get the queue with a modelType openstack : we don't want
// job with worker model type docker in result
Expand Down
21 changes: 5 additions & 16 deletions engine/api/workflow_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -811,14 +811,13 @@ func (api *API) countWorkflowJobQueueHandler() service.Handler {
}

since, until, _ := getSinceUntilLimitHeader(ctx, w, r)
modelType, ratioService, err := getModelTypeRatioService(ctx, r)
modelType, err := getModelType(ctx, r)
if err != nil {
return err
}

filter := workflow.NewQueueFilter()
filter.ModelType = []string{modelType}
filter.RatioService = ratioService
filter.Since = &since
filter.Until = &until

Expand Down Expand Up @@ -849,7 +848,7 @@ func (api *API) getWorkflowJobQueueHandler() service.Handler {
status = []string{sdk.StatusWaiting}
}

modelType, ratioService, err := getModelTypeRatioService(ctx, r)
modelType, err := getModelType(ctx, r)
if err != nil {
return err
}
Expand Down Expand Up @@ -880,7 +879,6 @@ func (api *API) getWorkflowJobQueueHandler() service.Handler {
permissions = sdk.PermissionReadExecute
}
filter := workflow.NewQueueFilter()
filter.RatioService = ratioService
filter.Since = &since
filter.Until = &until
filter.Rights = permissions
Expand All @@ -905,23 +903,14 @@ func (api *API) getWorkflowJobQueueHandler() service.Handler {
}
}

func getModelTypeRatioService(ctx context.Context, r *http.Request) (string, *int, error) {
func getModelType(ctx context.Context, r *http.Request) (string, error) {
modelType := FormString(r, "modelType")
if modelType != "" {
if !sdk.WorkerModelValidate(modelType) {
return "", nil, sdk.NewErrorFrom(sdk.ErrWrongRequest, "invalid given modelType")
return "", sdk.NewErrorFrom(sdk.ErrWrongRequest, "invalid given modelType")
}
}
ratioService := FormString(r, "ratioService")
var ratio *int
if ratioService != "" {
i, err := strconv.Atoi(ratioService)
if err != nil {
return "", nil, sdk.NewErrorFrom(sdk.ErrInvalidNumber, " %s is not a integer", ratioService)
}
ratio = &i
}
return modelType, ratio, nil
return modelType, nil
}

// getSinceUntilLimitHeader returns since, until, limit
Expand Down
19 changes: 0 additions & 19 deletions engine/hatchery/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,9 +465,6 @@ func (h *HatcherySwarm) CanSpawn(ctx context.Context, model *sdk.Model, jobID in

nbContainersFromHatchery := len(cs)

// List all workers containers
ws := cs.FilterWorkers()

// Checking the number of container on each docker engine
if nbContainersFromHatchery >= dockerClient.MaxContainers {
log.Debug(ctx, "hatchery> swarm> CanSpawn> max containers reached on %s. current:%d max:%d", dockerName, nbContainersFromHatchery, dockerClient.MaxContainers)
Expand All @@ -482,22 +479,6 @@ func (h *HatcherySwarm) CanSpawn(ctx context.Context, model *sdk.Model, jobID in
}
}

// hatcherySwarm.ratioService: Percent reserved for spawning worker with service requirement
// if no link -> we need to check ratioService
if len(links) == 0 {
ratioService := h.Config.Provision.RatioService
if ratioService != nil && *ratioService >= 100 {
log.Debug(ctx, "hatchery> swarm> CanSpawn> ratioService 100 by conf on %s - no spawn worker without CDS Service", dockerName)
return false
}
if nbContainersFromHatchery > 0 {
percentFree := 100 - (100 * len(ws) / dockerClient.MaxContainers)
if ratioService != nil && percentFree <= *ratioService {
log.Debug(ctx, "hatchery> swarm> CanSpawn> ratio reached on %s. percentFree:%d ratioService:%d", dockerName, percentFree, *ratioService)
return false
}
}
}
return true
}
return false
Expand Down
Loading