Skip to content

Commit

Permalink
fix(api): worker should not be removed when hatchery is lost (#5296)
Browse files Browse the repository at this point in the history
  • Loading branch information
richardlt authored Jul 7, 2020
1 parent d752efe commit 84f4ed5
Show file tree
Hide file tree
Showing 12 changed files with 343 additions and 186 deletions.
9 changes: 9 additions & 0 deletions engine/api/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/ovh/cds/engine/api/event"
"github.com/ovh/cds/engine/api/services"
"github.com/ovh/cds/engine/api/worker"
"github.com/ovh/cds/engine/service"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/log"
Expand Down Expand Up @@ -111,6 +112,14 @@ func (api *API) postServiceRegisterHandler() service.Handler {
log.Debug("postServiceRegisterHandler> service %s registered with public key: %s", srv.Name, string(srv.PublicKey))
}

// For hatchery service we need to check if there are workers that are not attached to an existing hatchery
// If some worker's parent consumer match current hatchery consumer we will attach this worker to the new hatchery.
if srv.Type == services.TypeHatchery {
if err := worker.ReAttachAllToHatchery(ctx, tx, *srv); err != nil {
return err
}
}

if err := tx.Commit(); err != nil {
return sdk.WithStack(err)
}
Expand Down
14 changes: 3 additions & 11 deletions engine/api/services/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ func LoadAllByType(ctx context.Context, db gorp.SqlExecutor, typeService string)
// LoadAllByType returns all services that users can see with given type.
func LoadAllByTypeAndUserID(ctx context.Context, db gorp.SqlExecutor, typeService string, userID string) ([]sdk.Service, error) {
query := gorpmapping.NewQuery(`
SELECT service.*
FROM service
SELECT service.*
FROM service
JOIN auth_consumer on auth_consumer.id = service.auth_consumer_id
WHERE service.type = $1 AND auth_consumer.user_id = $2`).Args(typeService, userID)
return getAll(ctx, db, query)
Expand Down Expand Up @@ -146,18 +146,10 @@ func Update(ctx context.Context, db gorp.SqlExecutor, s *sdk.Service) error {
// Delete a service.
func Delete(db gorp.SqlExecutor, s *sdk.Service) error {
if s.Type == TypeHatchery {
wks, err := worker.LoadByHatcheryID(context.Background(), db, s.ID)
if err != nil {
if err := worker.ReleaseAllFromHatchery(db, s.ID); err != nil {
return err
}

for _, wk := range wks {
if err := worker.Delete(db, wk.ID); err != nil {
return err
}
}
}

sdb := service{Service: *s}
log.Debug("services.Delete> deleting service %s(%d) from database", s.Name, s.ID)
if _, err := db.Delete(&sdb); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions engine/api/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (api *API) postRegisterWorkerHandler() service.Handler {
}

// Try to register worker
wk, err := worker.RegisterWorker(ctx, tx, api.Cache, workerTokenFromHatchery.Worker, hatchSrv.ID, workerConsumer, registrationForm)
wk, err := worker.RegisterWorker(ctx, tx, api.Cache, workerTokenFromHatchery.Worker, *hatchSrv, workerConsumer, registrationForm)
if err != nil {
return sdk.NewErrorWithStack(
sdk.WrapError(err, "[%s] Registering failed", workerTokenFromHatchery.Worker.WorkerName),
Expand Down Expand Up @@ -119,7 +119,7 @@ func (api *API) getWorkersHandler() service.Handler {
var workers []sdk.Worker
var err error
if isHatchery(ctx) {
workers, err = worker.LoadByHatcheryID(ctx, api.mustDB(), getAPIConsumer(ctx).Service.ID)
workers, err = worker.LoadAllByHatcheryID(ctx, api.mustDB(), getAPIConsumer(ctx).Service.ID)
if err != nil {
return err
}
Expand Down Expand Up @@ -153,7 +153,7 @@ func (api *API) disableWorkerHandler() service.Handler {
if err != nil {
return sdk.WrapError(sdk.ErrForbidden, "Cannot disable a worker from this hatchery: %v", err)
}
if wk.HatcheryID != hatcherySrv.ID {
if wk.HatcheryID != nil && *wk.HatcheryID != hatcherySrv.ID {
return sdk.WrapError(sdk.ErrForbidden, "Cannot disable a worker from hatchery (expected: %d/actual: %d)", wk.HatcheryID, hatcherySrv.ID)
}
}
Expand Down
42 changes: 42 additions & 0 deletions engine/api/worker/attach.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package worker

import (
"context"

"github.com/go-gorp/gorp"
"github.com/ovh/cds/engine/api/database/gorpmapping"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/log"
)

// ReleaseAllFromHatchery remove dependency to given given hatchery for all workers linked to it.
func ReleaseAllFromHatchery(db gorp.SqlExecutor, hatcheryID int64) error {
if _, err := db.Exec("UPDATE worker SET hatchery_id = NULL WHERE hatchery_id = $1", hatcheryID); err != nil {
return sdk.WrapError(err, "cannot release workers for hatchery with id %d", hatcheryID)
}
return nil
}

// ReAttachAllToHatchery search for workers without hatchery an re-attach workers if the hatchery consumer match worker consumer's parent.
func ReAttachAllToHatchery(ctx context.Context, db gorp.SqlExecutor, hatchery sdk.Service) error {
query := gorpmapping.NewQuery(`
SELECT worker.* FROM worker
JOIN auth_consumer ON auth_consumer.id = worker.auth_consumer_id
WHERE auth_consumer.parent_id = $1 and worker.hatchery_id IS NULL
`).Args(hatchery.ConsumerID)
ws, err := getAll(ctx, db, query)
if err != nil {
return err
}

for i := range ws {
log.Info(ctx, "worker.ReAttachAllToHatchery> re-attach worker %s (%s) to hatchery %d (%s)", ws[i].ID, ws[i].Name, hatchery.ID, hatchery.Name)
ws[i].HatcheryID = &hatchery.ID
ws[i].HatcheryName = hatchery.Name
if err := gorpmapping.UpdateAndSign(ctx, db, &dbWorker{Worker: ws[i]}); err != nil {
return err
}
}

return nil
}
122 changes: 122 additions & 0 deletions engine/api/worker/attach_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package worker_test

import (
"context"
"sort"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/ovh/cds/engine/api/authentication"
"github.com/ovh/cds/engine/api/bootstrap"
"github.com/ovh/cds/engine/api/test"
"github.com/ovh/cds/engine/api/test/assets"
"github.com/ovh/cds/engine/api/worker"
"github.com/ovh/cds/sdk"
)

func TestReleaseAllFromHatchery(t *testing.T) {
db, _ := test.SetupPG(t, bootstrap.InitiliazeDB)

// Remove all existing workers in database
workers, err := worker.LoadAll(context.TODO(), db)
require.NoError(t, err)
for _, w := range workers {
require.NoError(t, worker.Delete(db, w.ID))
}

g := assets.InsertGroup(t, db)
m := assets.InsertWorkerModel(t, db, sdk.RandomString(5), g.ID)

h1, _, h1Consumer, _ := assets.InsertHatchery(t, db, *g)
h2, _, h2Consumer, _ := assets.InsertHatchery(t, db, *g)

w1Consumer, err := authentication.NewConsumerWorker(context.TODO(), db, "worker-1", h1, h1Consumer, []int64{g.ID})
require.NoError(t, err)
w2Consumer, err := authentication.NewConsumerWorker(context.TODO(), db, "worker-2", h2, h2Consumer, []int64{g.ID})
require.NoError(t, err)

require.NoError(t, worker.Insert(context.TODO(), db, &sdk.Worker{
ID: sdk.UUID(),
Name: "worker-1",
ModelID: &m.ID,
HatcheryID: &h1.ID,
HatcheryName: h1.Name,
ConsumerID: w1Consumer.ID,
Status: sdk.StatusWaiting,
}))
require.NoError(t, worker.Insert(context.TODO(), db, &sdk.Worker{
ID: sdk.UUID(),
Name: "worker-2",
ModelID: &m.ID,
HatcheryID: &h2.ID,
HatcheryName: h2.Name,
ConsumerID: w2Consumer.ID,
Status: sdk.StatusWaiting,
}))

require.NoError(t, worker.ReleaseAllFromHatchery(db, h1.ID))

workers, err = worker.LoadAll(context.TODO(), db)
require.NoError(t, err)
require.Len(t, workers, 2)
sort.Slice(workers, func(i, j int) bool { return workers[i].Name < workers[i].Name })
assert.Equal(t, "worker-1", workers[0].Name)
assert.Nil(t, workers[0].HatcheryID)
assert.Equal(t, "worker-2", workers[1].Name)
assert.NotNil(t, workers[1].HatcheryID)
}

func TestReAttachAllToHatchery(t *testing.T) {
db, _ := test.SetupPG(t, bootstrap.InitiliazeDB)

// Remove all existing workers in database
workers, err := worker.LoadAll(context.TODO(), db)
require.NoError(t, err)
for _, w := range workers {
require.NoError(t, worker.Delete(db, w.ID))
}

g := assets.InsertGroup(t, db)
m := assets.InsertWorkerModel(t, db, sdk.RandomString(5), g.ID)

h1, _, h1Consumer, _ := assets.InsertHatchery(t, db, *g)
h2, _, h2Consumer, _ := assets.InsertHatchery(t, db, *g)

w1Consumer, err := authentication.NewConsumerWorker(context.TODO(), db, "worker-1", h1, h1Consumer, []int64{g.ID})
require.NoError(t, err)
w2Consumer, err := authentication.NewConsumerWorker(context.TODO(), db, "worker-2", h2, h2Consumer, []int64{g.ID})
require.NoError(t, err)

require.NoError(t, worker.Insert(context.TODO(), db, &sdk.Worker{
ID: sdk.UUID(),
Name: "worker-1",
ModelID: &m.ID,
HatcheryID: &h1.ID,
HatcheryName: h1.Name,
ConsumerID: w1Consumer.ID,
Status: sdk.StatusWaiting,
}))
require.NoError(t, worker.Insert(context.TODO(), db, &sdk.Worker{
ID: sdk.UUID(),
Name: "worker-2",
ModelID: &m.ID,
HatcheryName: h2.Name,
ConsumerID: w2Consumer.ID,
Status: sdk.StatusWaiting,
}))

require.NoError(t, worker.ReAttachAllToHatchery(context.TODO(), db, *h2))

workers, err = worker.LoadAll(context.TODO(), db)
require.NoError(t, err)
require.Len(t, workers, 2)
sort.Slice(workers, func(i, j int) bool { return workers[i].Name < workers[i].Name })
require.Equal(t, "worker-1", workers[0].Name)
require.NotNil(t, workers[0].HatcheryID)
require.Equal(t, h1.ID, *workers[0].HatcheryID)
require.Equal(t, "worker-2", workers[1].Name)
require.NotNil(t, workers[1].HatcheryID)
require.Equal(t, h2.ID, *workers[1].HatcheryID)
}
Loading

0 comments on commit 84f4ed5

Please sign in to comment.