Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: delete ratio service option #6099

Merged
merged 2 commits into from
Mar 1, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -93,7 +93,6 @@ services:
/app/cds-engine-linux-amd64 config edit /app/conf/conf.toml --output /app/conf/conf.toml hatchery.swarm.commonConfiguration.provision.workerApiHttp.url=http://$HOSTNAME:8081;
/app/cds-engine-linux-amd64 config edit /app/conf/conf.toml --output /app/conf/conf.toml hatchery.swarm.dockerEngines.sample-docker-engine.host=tcp://dockerhost:2375;
/app/cds-engine-linux-amd64 config edit /app/conf/conf.toml --output /app/conf/conf.toml hatchery.swarm.dockerEngines.sample-docker-engine.maxContainers=4;
/app/cds-engine-linux-amd64 config edit /app/conf/conf.toml --output /app/conf/conf.toml hatchery.swarm.ratioService=50;
/app/cds-engine-linux-amd64 config edit /app/conf/conf.toml --output /app/conf/conf.toml hooks.api.http.url=http://cds-api:8081;
/app/cds-engine-linux-amd64 config edit /app/conf/conf.toml --output /app/conf/conf.toml hooks.cache.redis.host=cds-cache:6379;
/app/cds-engine-linux-amd64 config edit /app/conf/conf.toml --output /app/conf/conf.toml hooks.cache.redis.password=cds;
88 changes: 32 additions & 56 deletions engine/api/workflow/dao_node_run_job.go
Original file line number Diff line number Diff line change
@@ -19,14 +19,13 @@ import (

// QueueFilter contains all criteria used to fetch queue
type QueueFilter struct {
ModelType []string
RatioService *int
Rights int
Since *time.Time
Until *time.Time
Limit *int
Statuses []string
Regions []string
ModelType []string
Rights int
Since *time.Time
Until *time.Time
Limit *int
Statuses []string
Regions []string
}

func NewQueueFilter() QueueFilter {
@@ -79,38 +78,27 @@ func CountNodeJobRunQueueByGroupIDs(ctx context.Context, db gorp.SqlExecutor, st
func LoadNodeJobRunQueue(ctx context.Context, db gorp.SqlExecutor, store cache.Store, filter QueueFilter) ([]sdk.WorkflowNodeJobRun, error) {
ctx, end := telemetry.Span(ctx, "workflow.LoadNodeJobRunQueue")
defer end()
containsService := []bool{true, false}
if filter.RatioService != nil {
if *filter.RatioService == 100 {
containsService = []bool{true, true}
} else if *filter.RatioService == 0 {
containsService = []bool{false, false}
}
}

query := gorpmapping.NewQuery(`select distinct workflow_node_run_job.*
from workflow_node_run_job
where workflow_node_run_job.queued >= $1
and workflow_node_run_job.queued <= $2
and workflow_node_run_job.status = ANY($3)
AND contains_service IN ($4, $5)
AND (model_type is NULL OR model_type = '' OR model_type = ANY($6))
AND (model_type is NULL OR model_type = '' OR model_type = ANY($4))
AND (
workflow_node_run_job.region = ANY($7)
workflow_node_run_job.region = ANY($5)
OR
(workflow_node_run_job.region is NULL AND '' = ANY($7))
(workflow_node_run_job.region is NULL AND '' = ANY($5))
OR
array_length($7, 1) is NULL
array_length($5, 1) is NULL
)
ORDER BY workflow_node_run_job.queued ASC
`).Args(
*filter.Since, // $1
*filter.Until, // $2
pq.StringArray(filter.Statuses), // $3
containsService[0], // $4
containsService[1], // $5
pq.StringArray(filter.ModelType), // $6
pq.StringArray(filter.Regions), // $7
pq.StringArray(filter.ModelType), // $4
pq.StringArray(filter.Regions), // $5
)

return loadNodeJobRunQueue(ctx, db, store, query, filter.Limit)
@@ -120,45 +108,36 @@ func LoadNodeJobRunQueue(ctx context.Context, db gorp.SqlExecutor, store cache.S
func LoadNodeJobRunQueueByGroupIDs(ctx context.Context, db gorp.SqlExecutor, store cache.Store, filter QueueFilter, groupIDs []int64) ([]sdk.WorkflowNodeJobRun, error) {
ctx, end := telemetry.Span(ctx, "workflow.LoadNodeJobRunQueueByGroups")
defer end()
containsService := []bool{true, false}
if filter.RatioService != nil {
if *filter.RatioService == 100 {
containsService = []bool{true, true}
} else if *filter.RatioService == 0 {
containsService = []bool{false, false}
}
}

query := gorpmapping.NewQuery(`
-- Parameters:
-- $1: Queue since
-- $2: Queue until
-- $3: List of status
-- $4, $5: Should (or should not) contains service, or we don't care
-- $6: List of model types
-- $7: Comman separated list of groups ID
-- $8: shared infra group ID
-- $9: minimum level of permission
-- $10: List of regions
-- $4: List of model types
-- $5: Comman separated list of groups ID
-- $6: shared infra group ID
-- $7: minimum level of permission
-- $8: List of regions
WITH workflow_id_with_permissions AS (
SELECT workflow_perm.workflow_id,
CASE WHEN $8 = ANY(string_to_array($7, ',')::int[]) THEN 7
CASE WHEN $6 = ANY(string_to_array($5, ',')::int[]) THEN 7
ELSE max(workflow_perm.role)
END as "role"
FROM workflow_perm
JOIN project_group ON project_group.id = workflow_perm.project_group_id
WHERE
project_group.group_id = ANY(string_to_array($7, ',')::int[])
project_group.group_id = ANY(string_to_array($5, ',')::int[])
OR
$8 = ANY(string_to_array($7, ',')::int[])
$6 = ANY(string_to_array($5, ',')::int[])
GROUP BY workflow_perm.workflow_id
), workflow_node_run_job_exec_groups AS (
SELECT id, jsonb_array_elements_text(exec_groups)::jsonb->'id' AS exec_group_id
FROM workflow_node_run_job
), workflow_node_run_job_matching_exec_groups AS (
SELECT id
FROM workflow_node_run_job_exec_groups
WHERE exec_group_id::text = ANY(string_to_array($7, ','))
WHERE exec_group_id::text = ANY(string_to_array($5, ','))
)
SELECT DISTINCT workflow_node_run_job.*
FROM workflow_node_run_job
@@ -168,7 +147,7 @@ func LoadNodeJobRunQueueByGroupIDs(ctx context.Context, db gorp.SqlExecutor, sto
WHERE workflow.id IN (
SELECT workflow_id
FROM workflow_id_with_permissions
WHERE role >= $9
WHERE role >= $7
)
AND workflow_node_run_job.id IN (
SELECT id
@@ -177,31 +156,28 @@ func LoadNodeJobRunQueueByGroupIDs(ctx context.Context, db gorp.SqlExecutor, sto
AND workflow_node_run_job.queued >= $1
AND workflow_node_run_job.queued <= $2
AND workflow_node_run_job.status = ANY($3)
AND workflow_node_run_job.contains_service IN ($4, $5)
AND (
workflow_node_run_job.model_type is NULL
OR
model_type = '' OR model_type = ANY($6)
model_type = '' OR model_type = ANY($4)
)
AND (
workflow_node_run_job.region = ANY($10)
workflow_node_run_job.region = ANY($8)
OR
(workflow_node_run_job.region is NULL AND '' = ANY($10))
(workflow_node_run_job.region is NULL AND '' = ANY($8))
OR
array_length($10, 1) is NULL
array_length($8, 1) is NULL
)
ORDER BY workflow_node_run_job.queued ASC
`).Args(
*filter.Since, // $1
*filter.Until, // $2
pq.StringArray(filter.Statuses), // $3
containsService[0], // $4
containsService[1], // $5
pq.StringArray(filter.ModelType), // $6
gorpmapping.IDsToQueryString(groupIDs), // $7
group.SharedInfraGroup.ID, // $8
filter.Rights, // $9
pq.StringArray(filter.Regions), // $10
pq.StringArray(filter.ModelType), // $4
gorpmapping.IDsToQueryString(groupIDs), // $5
group.SharedInfraGroup.ID, // $6
filter.Rights, // $7
pq.StringArray(filter.Regions), // $8
)
return loadNodeJobRunQueue(ctx, db, store, query, filter.Limit)
}
30 changes: 0 additions & 30 deletions engine/api/workflow/run_workflow_test.go
Original file line number Diff line number Diff line change
@@ -732,36 +732,6 @@ queueRun:
}
}

// there is one job with a CDS Service prerequisiste
// Getting queue with RatioService=100 -> we want this job only.
// If we get a job without a service, it's a failure
cent := 100
filter = workflow.NewQueueFilter()
filter.Rights = sdk.PermissionReadExecute
filter.RatioService = &cent
jobsSince, err = workflow.LoadNodeJobRunQueueByGroupIDs(ctx, db, cache, filter, sdk.Groups(append(u.Groups, proj.ProjectGroups[0].Group, g0, g1)).ToIDs())
require.NoError(t, err)
for _, job := range jobsSince {
if !job.ContainsService {
assert.Fail(t, " this job should not be in queue !job.ContainsService: job")
}
}

// there is one job with a CDS Service prerequisiste
// Getting queue with RatioService=0 -> we want job only without CDS Service.
// If we get a job with a service, it's a failure
zero := 0
filter = workflow.NewQueueFilter()
filter.Rights = sdk.PermissionReadExecute
filter.RatioService = &zero
jobsSince, err = workflow.LoadNodeJobRunQueueByGroupIDs(ctx, db, cache, filter, sdk.Groups(append(u.Groups, proj.ProjectGroups[0].Group, g0, g1)).ToIDs())
require.NoError(t, err)
for _, job := range jobsSince {
if job.ContainsService {
assert.Fail(t, " this job should not be in queue job.ContainsService")
}
}

// there is one job with a CDS Model prerequisiste
// we get the queue with a modelType openstack : we don't want
// job with worker model type docker in result
21 changes: 5 additions & 16 deletions engine/api/workflow_queue.go
Original file line number Diff line number Diff line change
@@ -811,14 +811,13 @@ func (api *API) countWorkflowJobQueueHandler() service.Handler {
}

since, until, _ := getSinceUntilLimitHeader(ctx, w, r)
modelType, ratioService, err := getModelTypeRatioService(ctx, r)
modelType, err := getModelType(ctx, r)
if err != nil {
return err
}

filter := workflow.NewQueueFilter()
filter.ModelType = []string{modelType}
filter.RatioService = ratioService
filter.Since = &since
filter.Until = &until

@@ -849,7 +848,7 @@ func (api *API) getWorkflowJobQueueHandler() service.Handler {
status = []string{sdk.StatusWaiting}
}

modelType, ratioService, err := getModelTypeRatioService(ctx, r)
modelType, err := getModelType(ctx, r)
if err != nil {
return err
}
@@ -880,7 +879,6 @@ func (api *API) getWorkflowJobQueueHandler() service.Handler {
permissions = sdk.PermissionReadExecute
}
filter := workflow.NewQueueFilter()
filter.RatioService = ratioService
filter.Since = &since
filter.Until = &until
filter.Rights = permissions
@@ -905,23 +903,14 @@ func (api *API) getWorkflowJobQueueHandler() service.Handler {
}
}

func getModelTypeRatioService(ctx context.Context, r *http.Request) (string, *int, error) {
func getModelType(ctx context.Context, r *http.Request) (string, error) {
modelType := FormString(r, "modelType")
if modelType != "" {
if !sdk.WorkerModelValidate(modelType) {
return "", nil, sdk.NewErrorFrom(sdk.ErrWrongRequest, "invalid given modelType")
return "", sdk.NewErrorFrom(sdk.ErrWrongRequest, "invalid given modelType")
}
}
ratioService := FormString(r, "ratioService")
var ratio *int
if ratioService != "" {
i, err := strconv.Atoi(ratioService)
if err != nil {
return "", nil, sdk.NewErrorFrom(sdk.ErrInvalidNumber, " %s is not a integer", ratioService)
}
ratio = &i
}
return modelType, ratio, nil
return modelType, nil
}

// getSinceUntilLimitHeader returns since, until, limit
19 changes: 0 additions & 19 deletions engine/hatchery/swarm/swarm.go
Original file line number Diff line number Diff line change
@@ -465,9 +465,6 @@ func (h *HatcherySwarm) CanSpawn(ctx context.Context, model *sdk.Model, jobID in

nbContainersFromHatchery := len(cs)

// List all workers containers
ws := cs.FilterWorkers()

// Checking the number of container on each docker engine
if nbContainersFromHatchery >= dockerClient.MaxContainers {
log.Debug(ctx, "hatchery> swarm> CanSpawn> max containers reached on %s. current:%d max:%d", dockerName, nbContainersFromHatchery, dockerClient.MaxContainers)
@@ -482,22 +479,6 @@ func (h *HatcherySwarm) CanSpawn(ctx context.Context, model *sdk.Model, jobID in
}
}

// hatcherySwarm.ratioService: Percent reserved for spawning worker with service requirement
// if no link -> we need to check ratioService
if len(links) == 0 {
ratioService := h.Config.Provision.RatioService
if ratioService != nil && *ratioService >= 100 {
log.Debug(ctx, "hatchery> swarm> CanSpawn> ratioService 100 by conf on %s - no spawn worker without CDS Service", dockerName)
return false
}
if nbContainersFromHatchery > 0 {
percentFree := 100 - (100 * len(ws) / dockerClient.MaxContainers)
if ratioService != nil && percentFree <= *ratioService {
log.Debug(ctx, "hatchery> swarm> CanSpawn> ratio reached on %s. percentFree:%d ratioService:%d", dockerName, percentFree, *ratioService)
return false
}
}
}
return true
}
return false
120 changes: 0 additions & 120 deletions engine/hatchery/swarm/swarm_test.go
Original file line number Diff line number Diff line change
@@ -435,126 +435,6 @@ func TestHatcherySwarm_MaxContainerReached(t *testing.T) {
assert.True(t, gock.IsDone())
}

func TestHatcherySwarm_MaxContainerRatioService100(t *testing.T) {
defer gock.Off()
h := InitTestHatcherySwarm(t)
h.dockerClients["default"].MaxContainers = 2

ratio := 100
h.Config.Provision.RatioService = &ratio
m := sdk.Model{
ID: 1,
Name: "my-model",
Group: &sdk.Group{
ID: 1,
Name: "mygroup",
},
}
jobID := int64(1)

containers := []types.Container{}

gock.New("https://lolcat.host").Get("/v6.66/containers/json").Reply(http.StatusOK).JSON(containers)

b := h.CanSpawn(context.TODO(), &m, jobID, []sdk.Requirement{})
assert.False(t, b)
assert.True(t, gock.IsDone())
}

func TestHatcherySwarm_MaxContainerRatioPercentReached(t *testing.T) {
defer gock.Off()
h := InitTestHatcherySwarm(t)
h.Config.Name = "swarmy"
h.dockerClients["default"].MaxContainers = 5

ratio := 20
h.Config.Provision.RatioService = &ratio
m := sdk.Model{
ID: 1,
Name: "my-model",
Group: &sdk.Group{
ID: 1,
Name: "mygroup",
},
}
jobID := int64(1)

containers := []types.Container{
{
Names: []string{"worker1"},
Labels: map[string]string{
LabelHatchery: "swarmy",
LabelWorkerName: "w1",
},
},
{
Names: []string{"worker2"},
Labels: map[string]string{
LabelHatchery: "swarmy",
LabelWorkerName: "w2",
},
},
{
Names: []string{"worker3"},
Labels: map[string]string{
LabelHatchery: "swarmy",
LabelWorkerName: "w3",
},
},
{
Names: []string{"worker4"},
Labels: map[string]string{
LabelHatchery: "swarmy",
LabelWorkerName: "w4",
},
},
}

gock.New("https://lolcat.host").Get("/v6.66/containers/json").Reply(http.StatusOK).JSON(containers)
b := h.CanSpawn(context.TODO(), &m, jobID, []sdk.Requirement{})
assert.False(t, b)
assert.True(t, gock.IsDone())
}

func TestHatcherySwarm_MaxContainerRatioPercentOK(t *testing.T) {
defer gock.Off()
h := InitTestHatcherySwarm(t)
h.dockerClients["default"].MaxContainers = 3
h.Config.Provision.MaxWorker = 3

ratio := 90
h.Config.Provision.RatioService = &ratio
m := sdk.Model{
ID: 1,
Name: "my-model",
Group: &sdk.Group{
ID: 1,
Name: "mygroup",
},
}
jobID := int64(1)

containers := []types.Container{
{
Names: []string{"worker1"},
Labels: map[string]string{
LabelHatchery: "swarmy",
},
},
{
Names: []string{"worker2"},
Labels: map[string]string{
LabelHatchery: "swarmy",
},
},
}

gock.New("https://lolcat.host").Get("/v6.66/containers/json").Reply(http.StatusOK).JSON(containers)
b := h.CanSpawn(context.TODO(), &m, jobID, []sdk.Requirement{{Name: "pg", Type: sdk.ServiceRequirement, Value: "postgresql"}})
assert.True(t, b)
assert.True(t, gock.IsDone())
}

func TestHatcherySwarm_CanSpawnNoDockerClient(t *testing.T) {
defer gock.Off()
h := InitTestHatcherySwarm(t)
1 change: 0 additions & 1 deletion engine/service/types.go
Original file line number Diff line number Diff line change
@@ -47,7 +47,6 @@ type HatcheryCommonConfiguration struct {
} `toml:"api" json:"api"`
Provision struct {
InjectEnvVars []string `toml:"injectEnvVars" commented:"true" comment:"Inject env variables in workers" json:"-" mapstructure:"injectEnvVars"`
RatioService *int `toml:"ratioService" default:"50" commented:"true" comment:"Percent reserved for spawning worker with service requirement" json:"ratioService,omitempty" mapstructure:"ratioService"`
MaxWorker int `toml:"maxWorker" default:"10" comment:"Maximum allowed simultaneous workers" json:"maxWorker"`
MaxConcurrentProvisioning int `toml:"maxConcurrentProvisioning" default:"10" comment:"Maximum allowed simultaneous workers provisioning" json:"maxConcurrentProvisioning"`
MaxConcurrentRegistering int `toml:"maxConcurrentRegistering" default:"2" comment:"Maximum allowed simultaneous workers registering. -1 to disable registering on this hatchery" json:"maxConcurrentRegistering"`
5 changes: 1 addition & 4 deletions sdk/cdsclient/client_queue.go
Original file line number Diff line number Diff line change
@@ -140,7 +140,7 @@ func (c *client) QueueWorkflowNodeJobRun(ms ...RequestModifier) ([]sdk.WorkflowN
return wJobs, nil
}

func (c *client) QueueCountWorkflowNodeJobRun(since *time.Time, until *time.Time, modelType string, ratioService *int) (sdk.WorkflowNodeJobRunCount, error) {
func (c *client) QueueCountWorkflowNodeJobRun(since *time.Time, until *time.Time, modelType string) (sdk.WorkflowNodeJobRunCount, error) {
if since == nil {
since = new(time.Time)
}
@@ -150,9 +150,6 @@ func (c *client) QueueCountWorkflowNodeJobRun(since *time.Time, until *time.Time
}
url, _ := url.Parse("/queue/workflows/count")
q := url.Query()
if ratioService != nil {
q.Add("ratioService", fmt.Sprintf("%d", *ratioService))
}
if modelType != "" {
q.Add("modelType", modelType)
}
11 changes: 1 addition & 10 deletions sdk/cdsclient/interface.go
Original file line number Diff line number Diff line change
@@ -7,7 +7,6 @@ import (
"io"
"net/http"
"net/url"
"strconv"
"time"

"github.com/spf13/afero"
@@ -251,7 +250,7 @@ type ProjectVariablesClient interface {
// QueueClient exposes queue related functions
type QueueClient interface {
QueueWorkflowNodeJobRun(mods ...RequestModifier) ([]sdk.WorkflowNodeJobRun, error)
QueueCountWorkflowNodeJobRun(since *time.Time, until *time.Time, modelType string, ratioService *int) (sdk.WorkflowNodeJobRunCount, error)
QueueCountWorkflowNodeJobRun(since *time.Time, until *time.Time, modelType string) (sdk.WorkflowNodeJobRunCount, error)
QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...RequestModifier) error
QueueTakeJob(ctx context.Context, job sdk.WorkflowNodeJobRun) (*sdk.WorkflowNodeJobRunData, error)
QueueJobBook(ctx context.Context, id int64) (sdk.WorkflowNodeJobRunBooked, error)
@@ -621,14 +620,6 @@ func Region(regions ...string) RequestModifier {
}
}

func RatioService(ratioService int) RequestModifier {
return func(r *http.Request) {
q := r.URL.Query()
q.Set("ratioService", strconv.Itoa(ratioService))
r.URL.RawQuery = q.Encode()
}
}

func ModelType(modelType string) RequestModifier {
return func(r *http.Request) {
q := r.URL.Query()
24 changes: 12 additions & 12 deletions sdk/cdsclient/mock_cdsclient/interface_mock.go
4 changes: 0 additions & 4 deletions sdk/hatchery/hatchery.go
Original file line number Diff line number Diff line change
@@ -96,10 +96,6 @@ func Create(ctx context.Context, h Interface) error {
log.Debug(ctx, "starting queue polling")

var ms []cdsclient.RequestModifier
ratioService := h.Configuration().Provision.RatioService
if ratioService != nil {
ms = append(ms, cdsclient.RatioService(*ratioService))
}
if modelType != "" {
ms = append(ms, cdsclient.ModelType(modelType))
}