Skip to content

Commit

Permalink
feat(cdn): use logrus to send worker and service log (#5153)
Browse files Browse the repository at this point in the history
  • Loading branch information
sguiheux authored May 11, 2020
1 parent 4285535 commit 1a6173f
Show file tree
Hide file tree
Showing 45 changed files with 1,374 additions and 111 deletions.
9 changes: 9 additions & 0 deletions engine/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/ovh/cds/engine/api/worker"
"github.com/ovh/cds/engine/api/workermodel"
"github.com/ovh/cds/engine/api/workflow"
"github.com/ovh/cds/engine/cdn"
"github.com/ovh/cds/engine/service"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/cdsclient"
Expand Down Expand Up @@ -192,6 +193,7 @@ type Configuration struct {
StepMaxSize int64 `toml:"stepMaxSize" default:"15728640" comment:"Max step logs size in bytes (default: 15MB)" json:"stepMaxSize"`
ServiceMaxSize int64 `toml:"serviceMaxSize" default:"15728640" comment:"Max service logs size in bytes (default: 15MB)" json:"serviceMaxSize"`
} `toml:"log" json:"log" comment:"###########################\n Log settings.\n##########################"`
CDN cdn.Configuration `toml:"cdn" json:"cdn" comment:"###########################\n CDN settings.\n##########################"`
}

// ServiceConfiguration is the configuration of external service
Expand Down Expand Up @@ -872,6 +874,13 @@ func (a *API) Serve(ctx context.Context) error {
log.Error(ctx, "api> heap dump uploaded to %s", s)
}()

cdsService := &cdn.Service{
Cfg: a.Config.CDN,
Db: a.mustDB(),
Cache: a.Cache,
}
cdsService.RunTcpLogServer(ctx)

log.Info(ctx, "Starting CDS API HTTP Server on %s:%d", a.Config.HTTP.Addr, a.Config.HTTP.Port)
if err := s.ListenAndServe(); err != nil {
return fmt.Errorf("Cannot start HTTP server: %v", err)
Expand Down
4 changes: 2 additions & 2 deletions engine/api/database/gorpmapping/encryption.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ func updateEncryptedData(db gorp.SqlExecutor, i interface{}) error {
updateSlice = append(updateSlice, encryptedColumns[f]+" = $"+strconv.Itoa(c))
c++
}

query := fmt.Sprintf("UPDATE %s SET %s WHERE %s = %v", table, strings.Join(updateSlice, ","), key, id)
encryptedContentArgs = append(encryptedContentArgs, id)
query := fmt.Sprintf("UPDATE %s SET %s WHERE %s = $%d", table, strings.Join(updateSlice, ","), key, c)
res, err := db.Exec(query, encryptedContentArgs...)
if err != nil {
return sdk.WithStack(err)
Expand Down
1 change: 1 addition & 0 deletions engine/api/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func (api *API) postServiceRegisterHandler() service.Handler {
}

srv.Uptodate = data.Version == sdk.VERSION
srv.LogServer = api.Config.CDN.TCP

return service.WriteJSON(w, srv, http.StatusOK)
}
Expand Down
1 change: 1 addition & 0 deletions engine/api/services/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const (
TypeVCS = "vcs"
TypeAPI = "api"
TypeUI = "ui"
TypeCDN = "cdn"
TypeHatchery = "hatchery"
TypeDBMigrate = "dbmigrate"
)
4 changes: 2 additions & 2 deletions engine/api/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (api *API) workerWaitingHandler() service.Handler {
return nil
}

if err := worker.SetStatus(api.mustDB(), wk.ID, sdk.StatusWaiting); err != nil {
if err := worker.SetStatus(ctx, api.mustDB(), wk.ID, sdk.StatusWaiting); err != nil {
return sdk.WrapError(err, "cannot update worker %s", wk.ID)
}
return nil
Expand Down Expand Up @@ -254,7 +254,7 @@ func DisableWorker(ctx context.Context, db *gorp.DbMap, id string) error {
log.Info(ctx, "DisableWorker> Worker %s crashed while building %d !", name, jobID.Int64)
}

if err := worker.SetStatus(tx, id, sdk.StatusDisabled); err != nil {
if err := worker.SetStatus(ctx, tx, id, sdk.StatusDisabled); err != nil {
cause := sdk.Cause(err)
if cause == worker.ErrNoWorker || cause == sql.ErrNoRows {
return sdk.WrapError(sdk.ErrWrongRequest, "DisableWorker> worker %s does not exists", id)
Expand Down
152 changes: 127 additions & 25 deletions engine/api/worker/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,13 @@ import (
"github.com/ovh/cds/sdk"
)

func Insert(db gorp.SqlExecutor, w *sdk.Worker) error {
return gorpmapping.Insert(db, w)
func Insert(ctx context.Context, db gorp.SqlExecutor, w *sdk.Worker) error {
dbData := &dbWorker{Worker: *w}
if err := gorpmapping.InsertAndSign(ctx, db, dbData); err != nil {
return err
}
*w = dbData.Worker
return nil
}

// Delete remove worker from database, it also removes the associated access_token
Expand All @@ -37,83 +42,180 @@ func Delete(db gorp.SqlExecutor, id string) error {

func LoadByConsumerID(ctx context.Context, db gorp.SqlExecutor, id string) (*sdk.Worker, error) {
query := gorpmapping.NewQuery("SELECT * FROM worker WHERE auth_consumer_id = $1").Args(id)
var w sdk.Worker
var w dbWorker
found, err := gorpmapping.Get(ctx, db, query, &w)
if err != nil {
return nil, err
}
if !found {
return nil, sdk.WithStack(sdk.ErrNotFound)
}
return &w, nil
isValid, err := gorpmapping.CheckSignature(w, w.Signature)
if err != nil {
return nil, err
}
if !isValid {
return nil, sdk.WithStack(sdk.ErrInvalidData)
}
return &w.Worker, nil
}

func LoadByID(ctx context.Context, db gorp.SqlExecutor, id string) (*sdk.Worker, error) {
query := gorpmapping.NewQuery("SELECT * FROM worker WHERE id = $1").Args(id)
var w sdk.Worker
var w dbWorker
found, err := gorpmapping.Get(ctx, db, query, &w)
if err != nil {
return nil, err
}
if !found {
return nil, sdk.WithStack(sdk.ErrNotFound)
}
return &w, nil
isValid, err := gorpmapping.CheckSignature(w, w.Signature)
if err != nil {
return nil, err
}
if !isValid {
return nil, sdk.WithStack(sdk.ErrInvalidData)
}
return &w.Worker, nil
}

func LoadAll(ctx context.Context, db gorp.SqlExecutor) ([]sdk.Worker, error) {
var workers []sdk.Worker
var wks []dbWorker
query := gorpmapping.NewQuery(`SELECT * FROM worker ORDER BY name ASC`)
if err := gorpmapping.GetAll(ctx, db, query, &workers); err != nil {
if err := gorpmapping.GetAll(ctx, db, query, &wks); err != nil {
return nil, err
}
workers := make([]sdk.Worker, len(wks))
for i := range wks {
isValid, err := gorpmapping.CheckSignature(wks[i], wks[i].Signature)
if err != nil {
return nil, err
}
if !isValid {
return nil, sdk.WithStack(sdk.ErrInvalidData)
}
workers[i] = wks[i].Worker
}
return workers, nil
}

func LoadByHatcheryID(ctx context.Context, db gorp.SqlExecutor, hatcheryID int64) ([]sdk.Worker, error) {
var workers []sdk.Worker
var wks []dbWorker
query := gorpmapping.NewQuery(`SELECT * FROM worker WHERE hatchery_id = $1 ORDER BY name ASC`).Args(hatcheryID)
if err := gorpmapping.GetAll(ctx, db, query, &workers); err != nil {
if err := gorpmapping.GetAll(ctx, db, query, &wks); err != nil {
return nil, err
}
workers := make([]sdk.Worker, len(wks))
for i := range wks {
isValid, err := gorpmapping.CheckSignature(wks[i], wks[i].Signature)
if err != nil {
return nil, err
}
if !isValid {
return nil, sdk.WithStack(sdk.ErrInvalidData)
}
workers[i] = wks[i].Worker
}
return workers, nil
}

func LoadDeadWorkers(ctx context.Context, db gorp.SqlExecutor, timeout float64, status []string) ([]sdk.Worker, error) {
var workers []sdk.Worker
var wks []dbWorker
query := gorpmapping.NewQuery(`SELECT *
FROM worker
WHERE status = ANY(string_to_array($1, ',')::text[])
AND now() - last_beat > $2 * INTERVAL '1' SECOND
ORDER BY last_beat ASC`).Args(strings.Join(status, ","), timeout)
if err := gorpmapping.GetAll(ctx, db, query, &workers); err != nil {
if err := gorpmapping.GetAll(ctx, db, query, &wks); err != nil {
return nil, err
}
workers := make([]sdk.Worker, len(wks))
for i := range wks {
isValid, err := gorpmapping.CheckSignature(wks[i], wks[i].Signature)
if err != nil {
return nil, err
}
if !isValid {
return nil, sdk.WithStack(sdk.ErrInvalidData)
}
workers[i] = wks[i].Worker
}
return workers, nil
}

// SetStatus sets job_run_id and status to building on given worker
func SetStatus(db gorp.SqlExecutor, workerID string, status string) error {
query := `UPDATE worker SET status = $1 WHERE id = $2`
func SetStatus(ctx context.Context, db gorp.SqlExecutor, workerID string, status string) error {
w, err := LoadByID(ctx, db, workerID)
if err != nil {
return err
}
w.Status = status
if status == sdk.StatusBuilding || status == sdk.StatusWaiting {
query = `UPDATE worker SET status = $1, job_run_id = NULL WHERE id = $2`
w.JobRunID = nil
}

if _, err := db.Exec(query, status, workerID); err != nil {
return sdk.WithStack(err)
dbData := &dbWorker{Worker: *w}
if err := gorpmapping.UpdateAndSign(ctx, db, dbData); err != nil {
return err
}
return nil
}

// SetToBuilding sets job_run_id and status to building on given worker
func SetToBuilding(db gorp.SqlExecutor, workerID string, jobRunID int64) error {
query := `UPDATE worker SET status = $1, job_run_id = $2 WHERE id = $3`
func SetToBuilding(ctx context.Context, db gorp.SqlExecutor, workerID string, jobRunID int64, key []byte) error {
w, err := LoadByID(ctx, db, workerID)
if err != nil {
return err
}
w.Status = sdk.StatusBuilding
w.JobRunID = &jobRunID
w.PrivateKey = key

res, errE := db.Exec(query, sdk.StatusBuilding, jobRunID, workerID)
if errE != nil {
return sdk.WithStack(errE)
dbData := &dbWorker{Worker: *w}
if err := gorpmapping.UpdateAndSign(ctx, db, dbData); err != nil {
return err
}
return nil
}

// LoadWorkerByIDWithDecryptKey load worker with decrypted private key
func LoadWorkerByIDWithDecryptKey(ctx context.Context, db gorp.SqlExecutor, workerID string) (*sdk.Worker, error) {
var work dbWorker
query := gorpmapping.NewQuery(`SELECT * FROM worker WHERE id = $1`).Args(workerID)
found, err := gorpmapping.Get(ctx, db, query, &work, gorpmapping.GetOptions.WithDecryption)
if err != nil {
return nil, err
}
if !found {
return nil, sdk.WithStack(sdk.ErrNotFound)
}
isValid, err := gorpmapping.CheckSignature(work, work.Signature)
if err != nil {
return nil, err
}
if !isValid {
return nil, sdk.WithStack(sdk.ErrInvalidData)
}
return &work.Worker, err
}

_, err := res.RowsAffected()
return err
// LoadWorkerByName load worker by name
func LoadWorkerByName(ctx context.Context, db gorp.SqlExecutor, workerName string) (*sdk.Worker, error) {
var work dbWorker
query := gorpmapping.NewQuery(`SELECT * FROM worker WHERE name = $1`).Args(workerName)
found, err := gorpmapping.Get(ctx, db, query, &work)
if err != nil {
return nil, err
}
if !found {
return nil, sdk.WithStack(sdk.ErrNotFound)
}
isValid, err := gorpmapping.CheckSignature(work, work.Signature)
if err != nil {
return nil, err
}
if !isValid {
return nil, sdk.WithStack(sdk.ErrInvalidData)
}
return &work.Worker, err
}
22 changes: 22 additions & 0 deletions engine/api/worker/gorp_model.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package worker

import (
"github.com/ovh/cds/engine/api/database/gorpmapping"
"github.com/ovh/cds/sdk"
)

type dbWorker struct {
gorpmapping.SignedEntity
sdk.Worker
}

func init() {
gorpmapping.Register(gorpmapping.New(dbWorker{}, "worker", false, "id"))
}

func (e dbWorker) Canonical() gorpmapping.CanonicalForms {
var _ = []interface{}{e.ID, e.Name}
return gorpmapping.CanonicalForms{
"{{print .ID}}{{.Name}}",
}
}
2 changes: 1 addition & 1 deletion engine/api/worker/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func DisableDeadWorkers(ctx context.Context, db *gorp.DbMap) error {
}
for i := range workers {
log.Debug("Disable worker %s[%s] LastBeat:%v status:%s", workers[i].Name, workers[i].ID, workers[i].LastBeat, workers[i].Status)
if errD := SetStatus(db, workers[i].ID, sdk.StatusDisabled); errD != nil {
if errD := SetStatus(ctx, db, workers[i].ID, sdk.StatusDisabled); errD != nil {
log.Warning(ctx, "Cannot disable worker %v: %v", workers[i].ID, errD)
}
}
Expand Down
6 changes: 0 additions & 6 deletions engine/api/worker/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"github.com/go-gorp/gorp"

"github.com/ovh/cds/engine/api/cache"
"github.com/ovh/cds/engine/api/database/gorpmapping"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/log"
)

Expand Down Expand Up @@ -39,7 +37,3 @@ func Initialize(c context.Context, DBFunc func() *gorp.DbMap, store cache.Store)
}
}
}

func init() {
gorpmapping.Register(gorpmapping.New(sdk.Worker{}, "worker", false, "id"))
}
2 changes: 1 addition & 1 deletion engine/api/worker/registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func RegisterWorker(ctx context.Context, db gorp.SqlExecutor, store cache.Store,

w.Uptodate = registrationForm.Version == sdk.VERSION

if err := Insert(db, w); err != nil {
if err := Insert(ctx, db, w); err != nil {
return nil, err
}

Expand Down
4 changes: 2 additions & 2 deletions engine/api/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestDAO(t *testing.T) {
Status: sdk.StatusWaiting,
}

if err := worker.Insert(db, w); err != nil {
if err := worker.Insert(context.TODO(), db, w); err != nil {
t.Fatalf("Cannot insert worker %+v: %v", w, err)
}

Expand All @@ -57,7 +57,7 @@ func TestDAO(t *testing.T) {
assert.Equal(t, "foofoo", wk.ID)
}

test.NoError(t, worker.SetStatus(db, wk.ID, sdk.StatusBuilding))
test.NoError(t, worker.SetStatus(context.TODO(), db, wk.ID, sdk.StatusBuilding))
test.NoError(t, worker.RefreshWorker(db, wk.ID))
}

Expand Down
Loading

0 comments on commit 1a6173f

Please sign in to comment.