Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cdn): use logrus to send worker and service log #5153

Merged
merged 25 commits into from
May 11, 2020
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions engine/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/ovh/cds/engine/api/worker"
"github.com/ovh/cds/engine/api/workermodel"
"github.com/ovh/cds/engine/api/workflow"
"github.com/ovh/cds/engine/cdn"
"github.com/ovh/cds/engine/service"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/cdsclient"
Expand Down Expand Up @@ -192,6 +193,7 @@ type Configuration struct {
StepMaxSize int64 `toml:"stepMaxSize" default:"15728640" comment:"Max step logs size in bytes (default: 15MB)" json:"stepMaxSize"`
ServiceMaxSize int64 `toml:"serviceMaxSize" default:"15728640" comment:"Max service logs size in bytes (default: 15MB)" json:"serviceMaxSize"`
} `toml:"log" json:"log" comment:"###########################\n Log settings.\n##########################"`
CDN cdn.Configuration `toml:"cdn" json:"cdn" comment:"###########################\n CDN settings.\n##########################"`
}

// ServiceConfiguration is the configuration of external service
Expand Down Expand Up @@ -872,6 +874,13 @@ func (a *API) Serve(ctx context.Context) error {
log.Error(ctx, "api> heap dump uploaded to %s", s)
}()

cdsService := &cdn.Service{
Cfg: a.Config.CDN,
Db: a.mustDB(),
Cache: a.Cache,
}
cdsService.RunTcpLogServer(ctx)

log.Info(ctx, "Starting CDS API HTTP Server on %s:%d", a.Config.HTTP.Addr, a.Config.HTTP.Port)
if err := s.ListenAndServe(); err != nil {
return fmt.Errorf("Cannot start HTTP server: %v", err)
Expand Down
4 changes: 2 additions & 2 deletions engine/api/database/gorpmapping/encryption.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ func updateEncryptedData(db gorp.SqlExecutor, i interface{}) error {
updateSlice = append(updateSlice, encryptedColumns[f]+" = $"+strconv.Itoa(c))
c++
}

query := fmt.Sprintf("UPDATE %s SET %s WHERE %s = %v", table, strings.Join(updateSlice, ","), key, id)
encryptedContentArgs = append(encryptedContentArgs, id)
query := fmt.Sprintf("UPDATE %s SET %s WHERE %s = $%d", table, strings.Join(updateSlice, ","), key, c)
res, err := db.Exec(query, encryptedContentArgs...)
if err != nil {
return sdk.WithStack(err)
Expand Down
7 changes: 4 additions & 3 deletions engine/api/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ func (api *API) postServiceRegisterHandler() service.Handler {
}
if srv != nil && !(srv.Type == data.Type) {
return sdk.WrapError(sdk.ErrForbidden, "cannot register service %s of type %s for consumer %s while existing service type is different", data.Name, data.Type, consumer.ID)
}
// Update or create the service
}

// Update or create the service
if srv != nil {
srv.Update(data)
if err := services.Update(ctx, tx, srv); err != nil {
Expand All @@ -87,6 +87,7 @@ func (api *API) postServiceRegisterHandler() service.Handler {
}

srv.Uptodate = data.Version == sdk.VERSION
srv.LogServer = api.Config.CDN.TCP

return service.WriteJSON(w, srv, http.StatusOK)
}
Expand Down
1 change: 1 addition & 0 deletions engine/api/services/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const (
TypeVCS = "vcs"
TypeAPI = "api"
TypeUI = "ui"
TypeCDN = "cdn"
TypeHatchery = "hatchery"
TypeDBMigrate = "dbmigrate"
)
4 changes: 2 additions & 2 deletions engine/api/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (api *API) workerWaitingHandler() service.Handler {
return nil
}

if err := worker.SetStatus(api.mustDB(), wk.ID, sdk.StatusWaiting); err != nil {
if err := worker.SetStatus(ctx, api.mustDB(), wk.ID, sdk.StatusWaiting); err != nil {
return sdk.WrapError(err, "cannot update worker %s", wk.ID)
}
return nil
Expand Down Expand Up @@ -254,7 +254,7 @@ func DisableWorker(ctx context.Context, db *gorp.DbMap, id string) error {
log.Info(ctx, "DisableWorker> Worker %s crashed while building %d !", name, jobID.Int64)
}

if err := worker.SetStatus(tx, id, sdk.StatusDisabled); err != nil {
if err := worker.SetStatus(ctx, tx, id, sdk.StatusDisabled); err != nil {
cause := sdk.Cause(err)
if cause == worker.ErrNoWorker || cause == sql.ErrNoRows {
return sdk.WrapError(sdk.ErrWrongRequest, "DisableWorker> worker %s does not exists", id)
Expand Down
152 changes: 127 additions & 25 deletions engine/api/worker/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,13 @@ import (
"github.com/ovh/cds/sdk"
)

func Insert(db gorp.SqlExecutor, w *sdk.Worker) error {
return gorpmapping.Insert(db, w)
func Insert(ctx context.Context, db gorp.SqlExecutor, w *sdk.Worker) error {
dbData := &dbWorker{Worker: *w}
if err := gorpmapping.InsertAndSign(ctx, db, dbData); err != nil {
return err
}
*w = dbData.Worker
return nil
}

// Delete remove worker from database, it also removes the associated access_token
Expand All @@ -37,83 +42,180 @@ func Delete(db gorp.SqlExecutor, id string) error {

func LoadByConsumerID(ctx context.Context, db gorp.SqlExecutor, id string) (*sdk.Worker, error) {
sguiheux marked this conversation as resolved.
Show resolved Hide resolved
query := gorpmapping.NewQuery("SELECT * FROM worker WHERE auth_consumer_id = $1").Args(id)
var w sdk.Worker
var w dbWorker
found, err := gorpmapping.Get(ctx, db, query, &w)
if err != nil {
return nil, err
}
if !found {
return nil, sdk.WithStack(sdk.ErrNotFound)
}
return &w, nil
isValid, err := gorpmapping.CheckSignature(w, w.Signature)
if err != nil {
return nil, err
}
if !isValid {
return nil, sdk.WithStack(sdk.ErrInvalidData)
}
return &w.Worker, nil
}

func LoadByID(ctx context.Context, db gorp.SqlExecutor, id string) (*sdk.Worker, error) {
query := gorpmapping.NewQuery("SELECT * FROM worker WHERE id = $1").Args(id)
var w sdk.Worker
var w dbWorker
found, err := gorpmapping.Get(ctx, db, query, &w)
if err != nil {
return nil, err
}
if !found {
return nil, sdk.WithStack(sdk.ErrNotFound)
}
return &w, nil
isValid, err := gorpmapping.CheckSignature(w, w.Signature)
if err != nil {
return nil, err
}
if !isValid {
return nil, sdk.WithStack(sdk.ErrInvalidData)
}
return &w.Worker, nil
}

func LoadAll(ctx context.Context, db gorp.SqlExecutor) ([]sdk.Worker, error) {
var workers []sdk.Worker
var wks []dbWorker
query := gorpmapping.NewQuery(`SELECT * FROM worker ORDER BY name ASC`)
if err := gorpmapping.GetAll(ctx, db, query, &workers); err != nil {
if err := gorpmapping.GetAll(ctx, db, query, &wks); err != nil {
return nil, err
}
workers := make([]sdk.Worker, len(wks))
for i := range wks {
isValid, err := gorpmapping.CheckSignature(wks[i], wks[i].Signature)
if err != nil {
return nil, err
}
if !isValid {
return nil, sdk.WithStack(sdk.ErrInvalidData)
}
workers[i] = wks[i].Worker
}
return workers, nil
}

func LoadByHatcheryID(ctx context.Context, db gorp.SqlExecutor, hatcheryID int64) ([]sdk.Worker, error) {
var workers []sdk.Worker
var wks []dbWorker
query := gorpmapping.NewQuery(`SELECT * FROM worker WHERE hatchery_id = $1 ORDER BY name ASC`).Args(hatcheryID)
if err := gorpmapping.GetAll(ctx, db, query, &workers); err != nil {
if err := gorpmapping.GetAll(ctx, db, query, &wks); err != nil {
return nil, err
}
workers := make([]sdk.Worker, len(wks))
for i := range wks {
isValid, err := gorpmapping.CheckSignature(wks[i], wks[i].Signature)
if err != nil {
return nil, err
}
if !isValid {
return nil, sdk.WithStack(sdk.ErrInvalidData)
}
workers[i] = wks[i].Worker
}
return workers, nil
}

func LoadDeadWorkers(ctx context.Context, db gorp.SqlExecutor, timeout float64, status []string) ([]sdk.Worker, error) {
var workers []sdk.Worker
var wks []dbWorker
query := gorpmapping.NewQuery(`SELECT *
FROM worker
WHERE status = ANY(string_to_array($1, ',')::text[])
AND now() - last_beat > $2 * INTERVAL '1' SECOND
ORDER BY last_beat ASC`).Args(strings.Join(status, ","), timeout)
if err := gorpmapping.GetAll(ctx, db, query, &workers); err != nil {
if err := gorpmapping.GetAll(ctx, db, query, &wks); err != nil {
return nil, err
}
workers := make([]sdk.Worker, len(wks))
for i := range wks {
isValid, err := gorpmapping.CheckSignature(wks[i], wks[i].Signature)
if err != nil {
return nil, err
}
if !isValid {
return nil, sdk.WithStack(sdk.ErrInvalidData)
}
workers[i] = wks[i].Worker
}
return workers, nil
}

// SetStatus sets job_run_id and status to building on given worker
func SetStatus(db gorp.SqlExecutor, workerID string, status string) error {
query := `UPDATE worker SET status = $1 WHERE id = $2`
func SetStatus(ctx context.Context, db gorp.SqlExecutor, workerID string, status string) error {
w, err := LoadByID(ctx, db, workerID)
if err != nil {
return err
}
w.Status = status
if status == sdk.StatusBuilding || status == sdk.StatusWaiting {
query = `UPDATE worker SET status = $1, job_run_id = NULL WHERE id = $2`
w.JobRunID = nil
}

if _, err := db.Exec(query, status, workerID); err != nil {
return sdk.WithStack(err)
dbData := &dbWorker{Worker: *w}
if err := gorpmapping.UpdateAndSign(ctx, db, dbData); err != nil {
return err
}
return nil
}

// SetToBuilding sets job_run_id and status to building on given worker
func SetToBuilding(db gorp.SqlExecutor, workerID string, jobRunID int64) error {
query := `UPDATE worker SET status = $1, job_run_id = $2 WHERE id = $3`
func SetToBuilding(ctx context.Context, db gorp.SqlExecutor, workerID string, jobRunID int64, key []byte) error {
w, err := LoadByID(ctx, db, workerID)
if err != nil {
return err
}
w.Status = sdk.StatusBuilding
w.JobRunID = &jobRunID
w.PrivateKey = key

res, errE := db.Exec(query, sdk.StatusBuilding, jobRunID, workerID)
if errE != nil {
return sdk.WithStack(errE)
dbData := &dbWorker{Worker: *w}
if err := gorpmapping.UpdateAndSign(ctx, db, dbData); err != nil {
return err
}
return nil
}

// LoadWorkerByIDWithDecryptKey load worker with decrypted private key
func LoadWorkerByIDWithDecryptKey(ctx context.Context, db gorp.SqlExecutor, workerID string) (*sdk.Worker, error) {
var work dbWorker
query := gorpmapping.NewQuery(`SELECT * FROM worker WHERE id = $1`).Args(workerID)
found, err := gorpmapping.Get(ctx, db, query, &work, gorpmapping.GetOptions.WithDecryption)
if err != nil {
return nil, err
}
if !found {
return nil, sdk.WithStack(sdk.ErrNotFound)
}
isValid, err := gorpmapping.CheckSignature(work, work.Signature)
if err != nil {
return nil, err
}
if !isValid {
return nil, sdk.WithStack(sdk.ErrInvalidData)
}
return &work.Worker, err
}

_, err := res.RowsAffected()
return err
// LoadWorkerByName load worker by name
func LoadWorkerByName(ctx context.Context, db gorp.SqlExecutor, workerName string) (*sdk.Worker, error) {
var work dbWorker
query := gorpmapping.NewQuery(`SELECT * FROM worker WHERE name = $1`).Args(workerName)
found, err := gorpmapping.Get(ctx, db, query, &work)
if err != nil {
return nil, err
}
if !found {
return nil, sdk.WithStack(sdk.ErrNotFound)
}
isValid, err := gorpmapping.CheckSignature(work, work.Signature)
if err != nil {
return nil, err
}
if !isValid {
return nil, sdk.WithStack(sdk.ErrInvalidData)
}
return &work.Worker, err
}
22 changes: 22 additions & 0 deletions engine/api/worker/gorp_model.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package worker

import (
"github.com/ovh/cds/engine/api/database/gorpmapping"
"github.com/ovh/cds/sdk"
)

type dbWorker struct {
gorpmapping.SignedEntity
sdk.Worker
}

func init() {
gorpmapping.Register(gorpmapping.New(dbWorker{}, "worker", false, "id"))
}

func (e dbWorker) Canonical() gorpmapping.CanonicalForms {
var _ = []interface{}{e.ID, e.Name}
return gorpmapping.CanonicalForms{
"{{print .ID}}{{.Name}}",
}
}
2 changes: 1 addition & 1 deletion engine/api/worker/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func DisableDeadWorkers(ctx context.Context, db *gorp.DbMap) error {
}
for i := range workers {
log.Debug("Disable worker %s[%s] LastBeat:%v status:%s", workers[i].Name, workers[i].ID, workers[i].LastBeat, workers[i].Status)
if errD := SetStatus(db, workers[i].ID, sdk.StatusDisabled); errD != nil {
if errD := SetStatus(ctx, db, workers[i].ID, sdk.StatusDisabled); errD != nil {
log.Warning(ctx, "Cannot disable worker %v: %v", workers[i].ID, errD)
}
}
Expand Down
6 changes: 0 additions & 6 deletions engine/api/worker/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"github.com/go-gorp/gorp"

"github.com/ovh/cds/engine/api/cache"
"github.com/ovh/cds/engine/api/database/gorpmapping"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/log"
)

Expand Down Expand Up @@ -39,7 +37,3 @@ func Initialize(c context.Context, DBFunc func() *gorp.DbMap, store cache.Store)
}
}
}

func init() {
gorpmapping.Register(gorpmapping.New(sdk.Worker{}, "worker", false, "id"))
}
2 changes: 1 addition & 1 deletion engine/api/worker/registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func RegisterWorker(ctx context.Context, db gorp.SqlExecutor, store cache.Store,

w.Uptodate = registrationForm.Version == sdk.VERSION

if err := Insert(db, w); err != nil {
if err := Insert(ctx, db, w); err != nil {
return nil, err
}

Expand Down
4 changes: 2 additions & 2 deletions engine/api/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestDAO(t *testing.T) {
Status: sdk.StatusWaiting,
}

if err := worker.Insert(db, w); err != nil {
if err := worker.Insert(context.TODO(), db, w); err != nil {
t.Fatalf("Cannot insert worker %+v: %v", w, err)
}

Expand All @@ -57,7 +57,7 @@ func TestDAO(t *testing.T) {
assert.Equal(t, "foofoo", wk.ID)
}

test.NoError(t, worker.SetStatus(db, wk.ID, sdk.StatusBuilding))
test.NoError(t, worker.SetStatus(context.TODO(), db, wk.ID, sdk.StatusBuilding))
test.NoError(t, worker.RefreshWorker(db, wk.ID))
}

Expand Down
Loading