diff --git a/engine/api/api.go b/engine/api/api.go index 1f2f0d6e29..5e0a6d1f52 100644 --- a/engine/api/api.go +++ b/engine/api/api.go @@ -49,6 +49,7 @@ import ( "github.com/ovh/cds/engine/api/worker" "github.com/ovh/cds/engine/api/workermodel" "github.com/ovh/cds/engine/api/workflow" + "github.com/ovh/cds/engine/cdn" "github.com/ovh/cds/engine/service" "github.com/ovh/cds/sdk" "github.com/ovh/cds/sdk/cdsclient" @@ -192,6 +193,7 @@ type Configuration struct { StepMaxSize int64 `toml:"stepMaxSize" default:"15728640" comment:"Max step logs size in bytes (default: 15MB)" json:"stepMaxSize"` ServiceMaxSize int64 `toml:"serviceMaxSize" default:"15728640" comment:"Max service logs size in bytes (default: 15MB)" json:"serviceMaxSize"` } `toml:"log" json:"log" comment:"###########################\n Log settings.\n##########################"` + CDN cdn.Configuration `toml:"cdn" json:"cdn" comment:"###########################\n CDN settings.\n##########################"` } // ServiceConfiguration is the configuration of external service @@ -872,6 +874,13 @@ func (a *API) Serve(ctx context.Context) error { log.Error(ctx, "api> heap dump uploaded to %s", s) }() + cdsService := &cdn.Service{ + Cfg: a.Config.CDN, + Db: a.mustDB(), + Cache: a.Cache, + } + cdsService.RunTcpLogServer(ctx) + log.Info(ctx, "Starting CDS API HTTP Server on %s:%d", a.Config.HTTP.Addr, a.Config.HTTP.Port) if err := s.ListenAndServe(); err != nil { return fmt.Errorf("Cannot start HTTP server: %v", err) diff --git a/engine/api/database/gorpmapping/encryption.go b/engine/api/database/gorpmapping/encryption.go index 057b26d401..d8e12c0c7e 100644 --- a/engine/api/database/gorpmapping/encryption.go +++ b/engine/api/database/gorpmapping/encryption.go @@ -108,8 +108,8 @@ func updateEncryptedData(db gorp.SqlExecutor, i interface{}) error { updateSlice = append(updateSlice, encryptedColumns[f]+" = $"+strconv.Itoa(c)) c++ } - - query := fmt.Sprintf("UPDATE %s SET %s WHERE %s = %v", table, strings.Join(updateSlice, ","), key, id) + encryptedContentArgs = append(encryptedContentArgs, id) + query := fmt.Sprintf("UPDATE %s SET %s WHERE %s = $%d", table, strings.Join(updateSlice, ","), key, c) res, err := db.Exec(query, encryptedContentArgs...) if err != nil { return sdk.WithStack(err) diff --git a/engine/api/services.go b/engine/api/services.go index d10bfb1adf..26dd5f18bd 100644 --- a/engine/api/services.go +++ b/engine/api/services.go @@ -94,6 +94,7 @@ func (api *API) postServiceRegisterHandler() service.Handler { } srv.Uptodate = data.Version == sdk.VERSION + srv.LogServer = api.Config.CDN.TCP return service.WriteJSON(w, srv, http.StatusOK) } diff --git a/engine/api/services/const.go b/engine/api/services/const.go index f9a5a4b503..b0b14971dd 100644 --- a/engine/api/services/const.go +++ b/engine/api/services/const.go @@ -8,6 +8,7 @@ const ( TypeVCS = "vcs" TypeAPI = "api" TypeUI = "ui" + TypeCDN = "cdn" TypeHatchery = "hatchery" TypeDBMigrate = "dbmigrate" ) diff --git a/engine/api/worker.go b/engine/api/worker.go index 2c47cea6ad..61533a77b6 100644 --- a/engine/api/worker.go +++ b/engine/api/worker.go @@ -213,7 +213,7 @@ func (api *API) workerWaitingHandler() service.Handler { return nil } - if err := worker.SetStatus(api.mustDB(), wk.ID, sdk.StatusWaiting); err != nil { + if err := worker.SetStatus(ctx, api.mustDB(), wk.ID, sdk.StatusWaiting); err != nil { return sdk.WrapError(err, "cannot update worker %s", wk.ID) } return nil @@ -254,7 +254,7 @@ func DisableWorker(ctx context.Context, db *gorp.DbMap, id string) error { log.Info(ctx, "DisableWorker> Worker %s crashed while building %d !", name, jobID.Int64) } - if err := worker.SetStatus(tx, id, sdk.StatusDisabled); err != nil { + if err := worker.SetStatus(ctx, tx, id, sdk.StatusDisabled); err != nil { cause := sdk.Cause(err) if cause == worker.ErrNoWorker || cause == sql.ErrNoRows { return sdk.WrapError(sdk.ErrWrongRequest, "DisableWorker> worker %s does not exists", id) diff --git a/engine/api/worker/dao.go b/engine/api/worker/dao.go index 35cd061d49..d151c8918e 100644 --- a/engine/api/worker/dao.go +++ b/engine/api/worker/dao.go @@ -11,8 +11,13 @@ import ( "github.com/ovh/cds/sdk" ) -func Insert(db gorp.SqlExecutor, w *sdk.Worker) error { - return gorpmapping.Insert(db, w) +func Insert(ctx context.Context, db gorp.SqlExecutor, w *sdk.Worker) error { + dbData := &dbWorker{Worker: *w} + if err := gorpmapping.InsertAndSign(ctx, db, dbData); err != nil { + return err + } + *w = dbData.Worker + return nil } // Delete remove worker from database, it also removes the associated access_token @@ -37,7 +42,7 @@ func Delete(db gorp.SqlExecutor, id string) error { func LoadByConsumerID(ctx context.Context, db gorp.SqlExecutor, id string) (*sdk.Worker, error) { query := gorpmapping.NewQuery("SELECT * FROM worker WHERE auth_consumer_id = $1").Args(id) - var w sdk.Worker + var w dbWorker found, err := gorpmapping.Get(ctx, db, query, &w) if err != nil { return nil, err @@ -45,12 +50,19 @@ func LoadByConsumerID(ctx context.Context, db gorp.SqlExecutor, id string) (*sdk if !found { return nil, sdk.WithStack(sdk.ErrNotFound) } - return &w, nil + isValid, err := gorpmapping.CheckSignature(w, w.Signature) + if err != nil { + return nil, err + } + if !isValid { + return nil, sdk.WithStack(sdk.ErrInvalidData) + } + return &w.Worker, nil } func LoadByID(ctx context.Context, db gorp.SqlExecutor, id string) (*sdk.Worker, error) { query := gorpmapping.NewQuery("SELECT * FROM worker WHERE id = $1").Args(id) - var w sdk.Worker + var w dbWorker found, err := gorpmapping.Get(ctx, db, query, &w) if err != nil { return nil, err @@ -58,62 +70,152 @@ func LoadByID(ctx context.Context, db gorp.SqlExecutor, id string) (*sdk.Worker, if !found { return nil, sdk.WithStack(sdk.ErrNotFound) } - return &w, nil + isValid, err := gorpmapping.CheckSignature(w, w.Signature) + if err != nil { + return nil, err + } + if !isValid { + return nil, sdk.WithStack(sdk.ErrInvalidData) + } + return &w.Worker, nil } func LoadAll(ctx context.Context, db gorp.SqlExecutor) ([]sdk.Worker, error) { - var workers []sdk.Worker + var wks []dbWorker query := gorpmapping.NewQuery(`SELECT * FROM worker ORDER BY name ASC`) - if err := gorpmapping.GetAll(ctx, db, query, &workers); err != nil { + if err := gorpmapping.GetAll(ctx, db, query, &wks); err != nil { return nil, err } + workers := make([]sdk.Worker, len(wks)) + for i := range wks { + isValid, err := gorpmapping.CheckSignature(wks[i], wks[i].Signature) + if err != nil { + return nil, err + } + if !isValid { + return nil, sdk.WithStack(sdk.ErrInvalidData) + } + workers[i] = wks[i].Worker + } return workers, nil } func LoadByHatcheryID(ctx context.Context, db gorp.SqlExecutor, hatcheryID int64) ([]sdk.Worker, error) { - var workers []sdk.Worker + var wks []dbWorker query := gorpmapping.NewQuery(`SELECT * FROM worker WHERE hatchery_id = $1 ORDER BY name ASC`).Args(hatcheryID) - if err := gorpmapping.GetAll(ctx, db, query, &workers); err != nil { + if err := gorpmapping.GetAll(ctx, db, query, &wks); err != nil { return nil, err } + workers := make([]sdk.Worker, len(wks)) + for i := range wks { + isValid, err := gorpmapping.CheckSignature(wks[i], wks[i].Signature) + if err != nil { + return nil, err + } + if !isValid { + return nil, sdk.WithStack(sdk.ErrInvalidData) + } + workers[i] = wks[i].Worker + } return workers, nil } func LoadDeadWorkers(ctx context.Context, db gorp.SqlExecutor, timeout float64, status []string) ([]sdk.Worker, error) { - var workers []sdk.Worker + var wks []dbWorker query := gorpmapping.NewQuery(`SELECT * FROM worker WHERE status = ANY(string_to_array($1, ',')::text[]) AND now() - last_beat > $2 * INTERVAL '1' SECOND ORDER BY last_beat ASC`).Args(strings.Join(status, ","), timeout) - if err := gorpmapping.GetAll(ctx, db, query, &workers); err != nil { + if err := gorpmapping.GetAll(ctx, db, query, &wks); err != nil { return nil, err } + workers := make([]sdk.Worker, len(wks)) + for i := range wks { + isValid, err := gorpmapping.CheckSignature(wks[i], wks[i].Signature) + if err != nil { + return nil, err + } + if !isValid { + return nil, sdk.WithStack(sdk.ErrInvalidData) + } + workers[i] = wks[i].Worker + } return workers, nil } // SetStatus sets job_run_id and status to building on given worker -func SetStatus(db gorp.SqlExecutor, workerID string, status string) error { - query := `UPDATE worker SET status = $1 WHERE id = $2` +func SetStatus(ctx context.Context, db gorp.SqlExecutor, workerID string, status string) error { + w, err := LoadByID(ctx, db, workerID) + if err != nil { + return err + } + w.Status = status if status == sdk.StatusBuilding || status == sdk.StatusWaiting { - query = `UPDATE worker SET status = $1, job_run_id = NULL WHERE id = $2` + w.JobRunID = nil } - - if _, err := db.Exec(query, status, workerID); err != nil { - return sdk.WithStack(err) + dbData := &dbWorker{Worker: *w} + if err := gorpmapping.UpdateAndSign(ctx, db, dbData); err != nil { + return err } return nil } // SetToBuilding sets job_run_id and status to building on given worker -func SetToBuilding(db gorp.SqlExecutor, workerID string, jobRunID int64) error { - query := `UPDATE worker SET status = $1, job_run_id = $2 WHERE id = $3` +func SetToBuilding(ctx context.Context, db gorp.SqlExecutor, workerID string, jobRunID int64, key []byte) error { + w, err := LoadByID(ctx, db, workerID) + if err != nil { + return err + } + w.Status = sdk.StatusBuilding + w.JobRunID = &jobRunID + w.PrivateKey = key - res, errE := db.Exec(query, sdk.StatusBuilding, jobRunID, workerID) - if errE != nil { - return sdk.WithStack(errE) + dbData := &dbWorker{Worker: *w} + if err := gorpmapping.UpdateAndSign(ctx, db, dbData); err != nil { + return err } + return nil +} + +// LoadWorkerByIDWithDecryptKey load worker with decrypted private key +func LoadWorkerByIDWithDecryptKey(ctx context.Context, db gorp.SqlExecutor, workerID string) (*sdk.Worker, error) { + var work dbWorker + query := gorpmapping.NewQuery(`SELECT * FROM worker WHERE id = $1`).Args(workerID) + found, err := gorpmapping.Get(ctx, db, query, &work, gorpmapping.GetOptions.WithDecryption) + if err != nil { + return nil, err + } + if !found { + return nil, sdk.WithStack(sdk.ErrNotFound) + } + isValid, err := gorpmapping.CheckSignature(work, work.Signature) + if err != nil { + return nil, err + } + if !isValid { + return nil, sdk.WithStack(sdk.ErrInvalidData) + } + return &work.Worker, err +} - _, err := res.RowsAffected() - return err +// LoadWorkerByName load worker by name +func LoadWorkerByName(ctx context.Context, db gorp.SqlExecutor, workerName string) (*sdk.Worker, error) { + var work dbWorker + query := gorpmapping.NewQuery(`SELECT * FROM worker WHERE name = $1`).Args(workerName) + found, err := gorpmapping.Get(ctx, db, query, &work) + if err != nil { + return nil, err + } + if !found { + return nil, sdk.WithStack(sdk.ErrNotFound) + } + isValid, err := gorpmapping.CheckSignature(work, work.Signature) + if err != nil { + return nil, err + } + if !isValid { + return nil, sdk.WithStack(sdk.ErrInvalidData) + } + return &work.Worker, err } diff --git a/engine/api/worker/gorp_model.go b/engine/api/worker/gorp_model.go new file mode 100644 index 0000000000..3e748823eb --- /dev/null +++ b/engine/api/worker/gorp_model.go @@ -0,0 +1,22 @@ +package worker + +import ( + "github.com/ovh/cds/engine/api/database/gorpmapping" + "github.com/ovh/cds/sdk" +) + +type dbWorker struct { + gorpmapping.SignedEntity + sdk.Worker +} + +func init() { + gorpmapping.Register(gorpmapping.New(dbWorker{}, "worker", false, "id")) +} + +func (e dbWorker) Canonical() gorpmapping.CanonicalForms { + var _ = []interface{}{e.ID, e.Name} + return gorpmapping.CanonicalForms{ + "{{print .ID}}{{.Name}}", + } +} diff --git a/engine/api/worker/heartbeat.go b/engine/api/worker/heartbeat.go index f689d3d04a..029c3fa921 100644 --- a/engine/api/worker/heartbeat.go +++ b/engine/api/worker/heartbeat.go @@ -19,7 +19,7 @@ func DisableDeadWorkers(ctx context.Context, db *gorp.DbMap) error { } for i := range workers { log.Debug("Disable worker %s[%s] LastBeat:%v status:%s", workers[i].Name, workers[i].ID, workers[i].LastBeat, workers[i].Status) - if errD := SetStatus(db, workers[i].ID, sdk.StatusDisabled); errD != nil { + if errD := SetStatus(ctx, db, workers[i].ID, sdk.StatusDisabled); errD != nil { log.Warning(ctx, "Cannot disable worker %v: %v", workers[i].ID, errD) } } diff --git a/engine/api/worker/init.go b/engine/api/worker/init.go index 5d5aee2e4e..ec2e49005a 100644 --- a/engine/api/worker/init.go +++ b/engine/api/worker/init.go @@ -7,8 +7,6 @@ import ( "github.com/go-gorp/gorp" "github.com/ovh/cds/engine/api/cache" - "github.com/ovh/cds/engine/api/database/gorpmapping" - "github.com/ovh/cds/sdk" "github.com/ovh/cds/sdk/log" ) @@ -39,7 +37,3 @@ func Initialize(c context.Context, DBFunc func() *gorp.DbMap, store cache.Store) } } } - -func init() { - gorpmapping.Register(gorpmapping.New(sdk.Worker{}, "worker", false, "id")) -} diff --git a/engine/api/worker/registration.go b/engine/api/worker/registration.go index 7bda56c45f..965f1463e4 100644 --- a/engine/api/worker/registration.go +++ b/engine/api/worker/registration.go @@ -106,7 +106,7 @@ func RegisterWorker(ctx context.Context, db gorp.SqlExecutor, store cache.Store, w.Uptodate = registrationForm.Version == sdk.VERSION - if err := Insert(db, w); err != nil { + if err := Insert(ctx, db, w); err != nil { return nil, err } diff --git a/engine/api/worker/worker_test.go b/engine/api/worker/worker_test.go index 02e7f8ecb1..b5085f12d1 100644 --- a/engine/api/worker/worker_test.go +++ b/engine/api/worker/worker_test.go @@ -38,7 +38,7 @@ func TestDAO(t *testing.T) { Status: sdk.StatusWaiting, } - if err := worker.Insert(db, w); err != nil { + if err := worker.Insert(context.TODO(), db, w); err != nil { t.Fatalf("Cannot insert worker %+v: %v", w, err) } @@ -57,7 +57,7 @@ func TestDAO(t *testing.T) { assert.Equal(t, "foofoo", wk.ID) } - test.NoError(t, worker.SetStatus(db, wk.ID, sdk.StatusBuilding)) + test.NoError(t, worker.SetStatus(context.TODO(), db, wk.ID, sdk.StatusBuilding)) test.NoError(t, worker.RefreshWorker(db, wk.ID)) } diff --git a/engine/api/workflow_queue.go b/engine/api/workflow_queue.go index 09088da46a..5499120537 100644 --- a/engine/api/workflow_queue.go +++ b/engine/api/workflow_queue.go @@ -2,6 +2,7 @@ package api import ( "context" + "encoding/base64" "fmt" "net/http" "strconv" @@ -26,6 +27,7 @@ import ( "github.com/ovh/cds/engine/api/workflow" "github.com/ovh/cds/engine/service" "github.com/ovh/cds/sdk" + "github.com/ovh/cds/sdk/jws" "github.com/ovh/cds/sdk/log" ) @@ -102,6 +104,9 @@ func (api *API) postTakeWorkflowJobHandler() service.Handler { return sdk.WrapError(err, "cannot takeJob nodeJobRunID:%d", id) } + if api.Config.CDN.TCP.Addr != "" && api.Config.CDN.TCP.Port > 0 { + pbji.GelfServiceAddr = fmt.Sprintf("%s:%d", api.Config.CDN.TCP.Addr, api.Config.CDN.TCP.Port) + } workflow.ResyncNodeRunsWithCommits(ctx, api.mustDB(), api.Cache, *p, report) go WorkflowSendEvent(context.Background(), api.mustDB(), api.Cache, *p, report) @@ -135,10 +140,16 @@ func takeJob(ctx context.Context, dbFunc func() *gorp.DbMap, store cache.Store, return nil, sdk.WrapError(err, "cannot take job %d", id) } + workerKey, err := jws.NewRandomSymmetricKey(32) + if err != nil { + return nil, err + } + // Change worker status - if err := worker.SetToBuilding(tx, wk.ID, job.ID); err != nil { + if err := worker.SetToBuilding(ctx, tx, wk.ID, job.ID, workerKey); err != nil { return nil, sdk.WrapError(err, "cannot update worker %s status", wk.Name) } + wnjri.SigningKey = base64.StdEncoding.EncodeToString(workerKey) // Load the node run noderun, err := workflow.LoadNodeRunByID(tx, job.WorkflowNodeRunID, workflow.LoadRunOptions{}) @@ -184,6 +195,9 @@ func takeJob(ctx context.Context, dbFunc func() *gorp.DbMap, store cache.Store, wnjri.Secrets = append(wnjri.Secrets, secretsKeys...) wnjri.NodeJobRun.Parameters = append(wnjri.NodeJobRun.Parameters, params...) + if err != nil { + return nil, err + } if err := tx.Commit(); err != nil { return nil, sdk.WithStack(err) } @@ -491,7 +505,7 @@ func postJobResult(ctx context.Context, dbFunc func(context.Context) *gorp.DbMap // ^ build variables are now updated on job run and on node //Update worker status - if err := worker.SetStatus(tx, wr.ID, sdk.StatusWaiting); err != nil { + if err := worker.SetStatus(ctx, tx, wr.ID, sdk.StatusWaiting); err != nil { return nil, sdk.WrapError(err, "cannot update worker %s status", wr.ID) } @@ -530,15 +544,15 @@ func (api *API) postWorkflowJobLogsHandler() service.Handler { return sdk.WrapError(err, "invalid id") } + if ok := isWorker(ctx); !ok { + return sdk.WithStack(sdk.ErrForbidden) + } + pbJob, err := workflow.LoadNodeJobRun(ctx, api.mustDB(), api.Cache, id) if err != nil { return sdk.WrapError(err, "cannot get job run %d", id) } - if ok := isWorker(ctx); !ok { - return sdk.WithStack(sdk.ErrForbidden) - } - // Checks that the token used by the worker cas access to one of the execgroups grantedGroupIDs := append(getAPIConsumer(ctx).GetGroupIDs(), group.SharedInfraGroup.ID) if !pbJob.ExecGroups.HasOneOf(grantedGroupIDs...) { diff --git a/engine/api/workflow_queue_test.go b/engine/api/workflow_queue_test.go index 441666a6ee..5e1bc95a2c 100644 --- a/engine/api/workflow_queue_test.go +++ b/engine/api/workflow_queue_test.go @@ -6,6 +6,7 @@ import ( "encoding/base64" "encoding/json" "fmt" + "github.com/ovh/cds/engine/cdn" "io/ioutil" "net/http" "net/http/httptest" @@ -436,6 +437,14 @@ func Test_postTakeWorkflowJobHandler(t *testing.T) { //Register the worker testRegisterWorker(t, api, router, &ctx) + // Add cdn config + api.Config.CDN = cdn.Configuration{ + TCP: sdk.TCPServer{ + Port: 8090, + Addr: "localhost", + }, + } + uri := router.GetRoute("POST", api.postTakeWorkflowJobHandler, vars) require.NotEmpty(t, uri) @@ -469,12 +478,18 @@ func Test_postTakeWorkflowJobHandler(t *testing.T) { } } + assert.Equal(t, "localhost:8090", pbji.GelfServiceAddr) + run, err := workflow.LoadNodeJobRun(context.TODO(), api.mustDB(), api.Cache, ctx.job.ID) require.NoError(t, err) assert.Equal(t, "Building", run.Status) assert.Equal(t, ctx.model.Name, run.Model) assert.Equal(t, ctx.worker.Name, run.WorkerName) assert.NotEmpty(t, run.HatcheryName) + + wkrDB, err := worker.LoadWorkerByIDWithDecryptKey(context.TODO(), api.mustDB(), ctx.worker.ID) + assert.NoError(t, err) + assert.Len(t, wkrDB.PrivateKey, 32) } func Test_postTakeWorkflowInvalidJobHandler(t *testing.T) { @@ -948,6 +963,123 @@ func Test_postWorkflowJobStaticFilesHandler(t *testing.T) { require.Equal(t, http.StatusNotImplemented, rec.Code) } +func TestWorkerPrivateKey(t *testing.T) { + api, db, router, end := newTestAPI(t) + defer end() + + // Create user + u, pass := assets.InsertAdminUser(t, api.mustDB()) + consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser) + + // Create project + key := sdk.RandomString(10) + proj := assets.InsertTestProject(t, db, api.Cache, key, key) + + // add group + require.NoError(t, group.InsertLinkGroupUser(context.TODO(), api.mustDB(), &group.LinkGroupUser{ + GroupID: proj.ProjectGroups[0].Group.ID, + AuthentifiedUserID: u.ID, + Admin: true, + })) + u.Groups = append(u.Groups, proj.ProjectGroups[0].Group) + + // Create pipeline + pip := &sdk.Pipeline{ + ProjectID: proj.ID, + Name: sdk.RandomString(10), + } + assert.NoError(t, pipeline.InsertPipeline(db, pip)) + + s := sdk.Stage{ + PipelineID: pip.ID, + Name: "foo", + Enabled: true, + } + + assert.NoError(t, pipeline.InsertStage(db, &s)) + + // get script action + script := assets.GetBuiltinOrPluginActionByName(t, db, sdk.ScriptAction) + + j := sdk.Job{ + Enabled: true, + PipelineStageID: s.ID, + Action: sdk.Action{ + Name: "script", + Actions: []sdk.Action{ + assets.NewAction(script.ID, sdk.Parameter{Name: "script", Value: "echo lol"}), + }, + }, + } + assert.NoError(t, pipeline.InsertJob(db, &j, s.ID, pip)) + + var errPip error + pip, errPip = pipeline.LoadPipelineByID(context.TODO(), db, pip.ID, true) + assert.NoError(t, errPip) + + // Create application + app := sdk.Application{ + ProjectID: proj.ID, + Name: sdk.RandomString(10), + } + assert.NoError(t, application.Insert(db, *proj, &app)) + + // Create workflow + w := sdk.Workflow{ + Name: sdk.RandomString(10), + ProjectID: proj.ID, + ProjectKey: proj.Key, + WorkflowData: sdk.WorkflowData{ + Node: sdk.Node{ + Name: "node1", + Ref: "node1", + Type: sdk.NodeTypePipeline, + Context: &sdk.NodeContext{ + PipelineID: pip.ID, + ApplicationID: app.ID, + }, + }, + }, + } + + p, err := project.Load(db, proj.Key, project.LoadOptions.WithPipelines, project.LoadOptions.WithApplications) + assert.NoError(t, err) + assert.NoError(t, workflow.Insert(context.TODO(), db, api.Cache, *p, &w)) + + workflowDeepPipeline, err := workflow.LoadByID(context.TODO(), db, api.Cache, *p, w.ID, workflow.LoadOptions{DeepPipeline: true}) + assert.NoError(t, err) + + wrDB, errwr := workflow.CreateRun(db, workflowDeepPipeline, nil, u) + assert.NoError(t, errwr) + wrDB.Workflow = *workflowDeepPipeline + + _, errmr := workflow.StartWorkflowRun(context.Background(), db, api.Cache, *p, wrDB, + &sdk.WorkflowRunPostHandlerOption{ + Manual: &sdk.WorkflowNodeRunManual{Username: u.Username}, + }, + consumer, nil) + assert.NoError(t, errmr) + + ctx := testRunWorkflowCtx{ + user: u, + password: pass, + project: proj, + workflow: &w, + run: wrDB, + } + testRegisterWorker(t, api, router, &ctx) + ctx.worker.JobRunID = &wrDB.WorkflowNodeRuns[w.WorkflowData.Node.ID][0].Stages[0].RunJobs[0].ID + assert.NoError(t, worker.SetToBuilding(context.TODO(), db, ctx.worker.ID, *ctx.worker.JobRunID, []byte("mysecret"))) + + wkFromDB, err := worker.LoadWorkerByName(context.TODO(), db, ctx.worker.Name) + require.NoError(t, err) + require.NotEqual(t, "mysecret", string(wkFromDB.PrivateKey)) + + wkFromDB, err = worker.LoadWorkerByIDWithDecryptKey(context.TODO(), db, ctx.worker.ID) + require.NoError(t, err) + require.Equal(t, "mysecret", string(wkFromDB.PrivateKey)) +} + func TestPostVulnerabilityReportHandler(t *testing.T) { api, db, router, end := newTestAPI(t) defer end() @@ -1062,7 +1194,7 @@ func TestPostVulnerabilityReportHandler(t *testing.T) { } testRegisterWorker(t, api, router, &ctx) ctx.worker.JobRunID = &wrDB.WorkflowNodeRuns[w.WorkflowData.Node.ID][0].Stages[0].RunJobs[0].ID - assert.NoError(t, worker.SetToBuilding(db, ctx.worker.ID, *ctx.worker.JobRunID)) + assert.NoError(t, worker.SetToBuilding(context.TODO(), db, ctx.worker.ID, *ctx.worker.JobRunID, nil)) request := sdk.VulnerabilityWorkerReport{ Vulnerabilities: []sdk.Vulnerability{ @@ -1384,7 +1516,7 @@ func TestInsertNewCodeCoverageReport(t *testing.T) { } testRegisterWorker(t, api, router, &ctx) ctx.worker.JobRunID = &wrr.WorkflowNodeRuns[w.WorkflowData.Node.ID][0].Stages[0].RunJobs[0].ID - assert.NoError(t, worker.SetToBuilding(db, ctx.worker.ID, *ctx.worker.JobRunID)) + assert.NoError(t, worker.SetToBuilding(context.TODO(), db, ctx.worker.ID, *ctx.worker.JobRunID, nil)) uri := router.GetRoute("POST", api.postWorkflowJobCoverageResultsHandler, vars) test.NotEmpty(t, uri) diff --git a/engine/cdn/cdn.go b/engine/cdn/cdn.go new file mode 100644 index 0000000000..236b5108ec --- /dev/null +++ b/engine/cdn/cdn.go @@ -0,0 +1,104 @@ +package cdn + +import ( + "context" + "fmt" + "net/http" + + "github.com/ovh/cds/engine/api/services" + "github.com/ovh/cds/sdk" + "github.com/ovh/cds/sdk/cdsclient" + "github.com/ovh/cds/sdk/log" +) + +// New returns a new service +func New() *Service { + s := new(Service) + /* + s.Router = &api.Router{ + Mux: mux.NewRouter(), + } + */ + return s +} + +func (s *Service) Init(config interface{}) (cdsclient.ServiceConfig, error) { + var cfg cdsclient.ServiceConfig + sConfig, ok := config.(Configuration) + if !ok { + return cfg, sdk.WithStack(fmt.Errorf("invalid CDN service configuration")) + } + + cfg.Host = sConfig.API.HTTP.URL + cfg.Token = sConfig.API.Token + cfg.InsecureSkipVerifyTLS = sConfig.API.HTTP.Insecure + cfg.RequestSecondsTimeout = sConfig.API.RequestTimeout + return cfg, nil +} + +// ApplyConfiguration apply an object of type CDN.Configuration after checking it +func (s *Service) ApplyConfiguration(config interface{}) error { + if err := s.CheckConfiguration(config); err != nil { + return err + } + var ok bool + s.Cfg, ok = config.(Configuration) + if !ok { + return fmt.Errorf("invalid configuration") + } + + s.ServiceName = s.Cfg.Name + s.ServiceType = services.TypeCDN + s.HTTPURL = s.Cfg.URL + s.MaxHeartbeatFailures = s.Cfg.API.MaxHeartbeatFailures + return nil +} + +// CheckConfiguration checks the validity of the configuration object +func (s *Service) CheckConfiguration(config interface{}) error { + sConfig, ok := config.(Configuration) + if !ok { + return fmt.Errorf("invalid configuration") + } + + if sConfig.URL == "" { + return fmt.Errorf("your CDS configuration seems to be empty. Please use environment variables, file or Consul to set your configuration") + } + if sConfig.Name == "" { + return fmt.Errorf("please enter a name in your CDN configuration") + } + + return nil +} + +// Serve will start the http api server +func (s *Service) Serve(c context.Context) error { + ctx, cancel := context.WithCancel(c) + defer cancel() + + s.RunTcpLogServer(ctx) + + //Init the http server + s.initRouter(ctx) + server := &http.Server{ + Addr: fmt.Sprintf("%s:%d", s.Cfg.HTTP.Addr, s.Cfg.HTTP.Port), + //Handler: s.Router.Mux, + MaxHeaderBytes: 1 << 20, + } + + //Gracefully shutdown the http server + go func() { + select { + case <-ctx.Done(): + log.Info(ctx, "CDN> Shutdown HTTP Server") + _ = server.Shutdown(ctx) + } + }() + + //Start the http server + log.Info(ctx, "CDN> Starting HTTP Server on port %d", s.Cfg.HTTP.Port) + if err := server.ListenAndServe(); err != nil { + log.Fatalf("CDN> Cannot start cds-cdn: %v", err) + } + return ctx.Err() +} diff --git a/engine/cdn/cdn_log.go b/engine/cdn/cdn_log.go new file mode 100644 index 0000000000..4c6c305c14 --- /dev/null +++ b/engine/cdn/cdn_log.go @@ -0,0 +1,237 @@ +package cdn + +import ( + "bufio" + "context" + "crypto/rsa" + "fmt" + "net" + "strings" + "time" + + gocache "github.com/patrickmn/go-cache" + + "github.com/ovh/cds/engine/api/services" + "github.com/ovh/cds/engine/api/worker" + "github.com/ovh/cds/engine/api/workflow" + "github.com/ovh/cds/sdk" + "github.com/ovh/cds/sdk/jws" + "github.com/ovh/cds/sdk/log" + "github.com/ovh/cds/sdk/log/hook" +) + +var ( + logCache = gocache.New(20*time.Minute, 30*time.Minute) +) + +func (s *Service) RunTcpLogServer(ctx context.Context) { + log.Info(ctx, "Starting tcp server %s:%d", s.Cfg.TCP.Addr, s.Cfg.TCP.Port) + listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", s.Cfg.TCP.Addr, s.Cfg.TCP.Port)) + if err != nil { + log.Fatalf("unable to start tcp log server: %v", err) + } + + //Gracefully shutdown the tcp server + go func() { + select { + case <-ctx.Done(): + log.Info(ctx, "CDN> Shutdown tcp log Server") + _ = listener.Close() + } + }() + + go func() { + for { + conn, err := listener.Accept() + if err != nil { + log.Error(ctx, "unable to accept connection: %v", err) + return + } + sdk.GoRoutine(ctx, "cdn-logServer", func(ctx context.Context) { + s.handleConnection(ctx, conn) + }) + } + }() +} + +func (s *Service) handleConnection(ctx context.Context, conn net.Conn) { + defer func() { + _ = conn.Close() + }() + bufReader := bufio.NewReader(conn) + for { + bytes, err := bufReader.ReadBytes(byte(0)) + if err != nil { + log.Info(ctx, "client left") + return + } + // remove byte(0) + bytes = bytes[:len(bytes)-1] + + if err := s.handleLogMessage(ctx, bytes); err != nil { + log.Error(ctx, "cdn.log> %v", err) + continue + } + } +} + +func (s *Service) handleLogMessage(ctx context.Context, messageReceived []byte) error { + m := hook.Message{} + if err := m.UnmarshalJSON(messageReceived); err != nil { + return sdk.WrapError(err, "unable to unmarshall gelf message: %s", string(messageReceived)) + } + + sig, ok := m.Extra["_"+log.ExtraFieldSignature] + if !ok || sig == "" { + return sdk.WithStack(fmt.Errorf("signature not found on log message: %+v", m)) + } + + // Get worker datas + var signature log.Signature + if err := jws.UnsafeParse(sig.(string), &signature); err != nil { + return err + } + + switch { + case signature.Worker != nil: + return s.handleWorkerLog(ctx, signature.Worker.WorkerID, sig, m) + case signature.Service != nil: + return s.handleServiceLog(ctx, signature.Service.HatcheryID, signature.Service.HatcheryName, signature.Service.WorkerName, sig, m) + default: + return sdk.WithStack(sdk.ErrWrongRequest) + } +} + +func (s *Service) handleWorkerLog(ctx context.Context, workerID string, sig interface{}, m hook.Message) error { + var signature log.Signature + var workerData sdk.Worker + cacheData, ok := logCache.Get(fmt.Sprintf("worker-%s", workerID)) + if !ok { + var err error + workerData, err = s.getWorker(ctx, workerID) + if err != nil { + return err + } + } else { + workerData = cacheData.(sdk.Worker) + } + if err := jws.Verify(workerData.PrivateKey, sig.(string), &signature); err != nil { + return err + } + if workerData.JobRunID == nil || *workerData.JobRunID != signature.JobID { + return sdk.WithStack(sdk.ErrForbidden) + } + + pbJob, err := workflow.LoadNodeJobRun(ctx, s.Db, s.Cache, signature.JobID) + if err != nil { + return err + } + + logDate := time.Unix(0, int64(m.Time*1e9)) + logs := sdk.Log{ + JobID: pbJob.ID, + LastModified: &logDate, + NodeRunID: pbJob.WorkflowNodeRunID, + Start: &logDate, + StepOrder: signature.Worker.StepOrder, + Val: m.Full, + } + if !strings.HasSuffix(logs.Val, "\n") { + logs.Val += "\n" + } + + tx, err := s.Db.Begin() + if err != nil { + return sdk.WithStack(err) + } + defer tx.Rollback() // nolint + + if err := workflow.AddLog(tx, pbJob, &logs, s.Cfg.Log.StepMaxSize); err != nil { + return err + } + return sdk.WithStack(tx.Commit()) +} + +func (s *Service) handleServiceLog(ctx context.Context, hatcheryID int64, hatcheryName string, workerName string, sig interface{}, m hook.Message) error { + var signature log.Signature + + var pk *rsa.PublicKey + cacheData, ok := logCache.Get(fmt.Sprintf("hatchery-key-%d", hatcheryID)) + if !ok { + var err error + pk, err = s.getHatchery(ctx, hatcheryID, hatcheryName) + if err != nil { + return err + } + } else { + pk = cacheData.(*rsa.PublicKey) + } + + if err := jws.Verify(pk, sig.(string), &signature); err != nil { + return err + } + + // Verified that worker has been spawn by this hatchery + workerCacheKey := fmt.Sprintf("service-worker-%s", workerName) + _, ok = logCache.Get(workerCacheKey) + if !ok { + // Verify that the worker has been spawn by this hatchery + w, err := worker.LoadWorkerByName(ctx, s.Db, workerName) + if err != nil { + return err + } + if w.HatcheryID != signature.Service.HatcheryID { + return sdk.WrapError(sdk.ErrWrongRequest, "hatchery and worker does not match") + } + logCache.Set(workerCacheKey, true, gocache.DefaultExpiration) + } + + nodeRunJob, err := workflow.LoadNodeJobRun(ctx, s.Db, s.Cache, signature.JobID) + if err != nil { + return err + } + + logs := sdk.ServiceLog{ + ServiceRequirementName: signature.Service.RequirementName, + ServiceRequirementID: signature.Service.RequirementID, + WorkflowNodeJobRunID: signature.JobID, + WorkflowNodeRunID: nodeRunJob.WorkflowNodeRunID, + Val: m.Full, + } + if !strings.HasSuffix(logs.Val, "\n") { + logs.Val += "\n" + } + + if err := workflow.AddServiceLog(s.Db, nodeRunJob, &logs, s.Cfg.Log.ServiceMaxSize); err != nil { + return err + } + return nil +} + +func (s *Service) getWorker(ctx context.Context, workerID string) (sdk.Worker, error) { + w, err := worker.LoadWorkerByIDWithDecryptKey(ctx, s.Db, workerID) + if err != nil { + return sdk.Worker{}, err + } + logCache.Set(fmt.Sprintf("worker-%s", w.ID), *w, gocache.DefaultExpiration) + return *w, nil +} + +func (s *Service) getHatchery(ctx context.Context, hatcheryID int64, hatcheryName string) (*rsa.PublicKey, error) { + h, err := services.LoadByNameAndType(ctx, s.Db, hatcheryName, services.TypeHatchery) + if err != nil { + return nil, err + } + + if h.ID != hatcheryID { + return nil, sdk.WithStack(sdk.ErrWrongRequest) + } + + // Verify signature + pk, err := jws.NewPublicKeyFromPEM(h.PublicKey) + if err != nil { + return nil, sdk.WithStack(err) + } + logCache.Set(fmt.Sprintf("hatchery-key-%d", hatcheryID), pk, gocache.DefaultExpiration) + return pk, nil +} diff --git a/engine/cdn/cdn_log_test.go b/engine/cdn/cdn_log_test.go new file mode 100644 index 0000000000..37eda70a19 --- /dev/null +++ b/engine/cdn/cdn_log_test.go @@ -0,0 +1,132 @@ +package cdn + +import ( + "context" + "fmt" + "github.com/ovh/cds/engine/api/bootstrap" + "github.com/ovh/cds/engine/api/test" + "github.com/ovh/cds/engine/api/workflow" + "github.com/ovh/cds/sdk" + gocache "github.com/patrickmn/go-cache" + "testing" + "time" + + "github.com/ovh/cds/sdk/jws" + "github.com/ovh/cds/sdk/log" + "github.com/stretchr/testify/require" +) + +func TestWorkerLog(t *testing.T) { + // Init DB + db, cache, end := test.SetupPG(t, bootstrap.InitiliazeDB) + defer end() + + // Create worker private key + key, err := jws.NewRandomSymmetricKey(32) + require.NoError(t, err) + + // Create worker signer + sign, err := jws.NewHMacSigner(key) + require.NoError(t, err) + + // Create cdn service + s := Service{ + Db: db, + Cache: cache, + } + + // Create run job + jobRun := sdk.WorkflowNodeJobRun{ + Start: time.Now(), + WorkflowNodeRunID: 1, + Status: sdk.StatusBuilding, + } + dbj := new(workflow.JobRun) + require.NoError(t, dbj.ToJobRun(&jobRun)) + require.NoError(t, db.Insert(dbj)) + + signature := log.Signature{ + Worker: &log.SignatureWorker{ + WorkerID: "abcdef-123456", + StepOrder: 0, + }, + JobID: dbj.ID, + Timestamp: time.Now().UnixNano(), + } + logCache.Set(fmt.Sprintf("worker-%s", signature.Worker.WorkerID), sdk.Worker{ + JobRunID: &signature.JobID, + PrivateKey: key, + }, gocache.DefaultExpiration) + + signatureField, err := jws.Sign(sign, signature) + require.NoError(t, err) + + message := `{"level": 1, "version": "1", "short": "this", "_facility": "fa", "_file": "file", + "host": "host", "_line":1, "_pid": 1, "_prefix": "prefix", "full_message": "this is my message", "_Signature": "%s"}` + message = fmt.Sprintf(message, signatureField) + + require.NoError(t, s.handleLogMessage(context.TODO(), []byte(message))) + + logs, err := workflow.LoadLogs(s.Db, dbj.ID) + require.NoError(t, err) + require.Len(t, logs, 1) + require.Equal(t, "this is my message\n", logs[0].Val) +} + +func TestServiceLog(t *testing.T) { + // Init DB + db, cache, end := test.SetupPG(t, bootstrap.InitiliazeDB) + defer end() + + // Create hatchery private key + key, err := jws.NewRandomRSAKey() + require.NoError(t, err) + + // Create worker signer + sign, err := jws.NewSigner(key) + require.NoError(t, err) + + // Create cdn service + s := Service{ + Db: db, + Cache: cache, + } + + // Create run job + jobRun := sdk.WorkflowNodeJobRun{ + Start: time.Now(), + WorkflowNodeRunID: 1, + Status: sdk.StatusBuilding, + } + dbj := new(workflow.JobRun) + require.NoError(t, dbj.ToJobRun(&jobRun)) + require.NoError(t, db.Insert(dbj)) + + signature := log.Signature{ + Service: &log.SignatureService{ + WorkerName: "my-worker-name", + HatcheryID: 1, + HatcheryName: "my-hatchery-name", + RequirementID: 1, + RequirementName: "service-1", + }, + JobID: dbj.ID, + Timestamp: time.Now().UnixNano(), + } + + logCache.Set(fmt.Sprintf("hatchery-key-%d", signature.Service.HatcheryID), &key.PublicKey, gocache.DefaultExpiration) + logCache.Set(fmt.Sprintf("service-worker-%s", signature.Service.WorkerName), true, gocache.DefaultExpiration) + + signatureField, err := jws.Sign(sign, signature) + require.NoError(t, err) + + message := `{"level": 1, "version": "1", "short": "this", "_facility": "fa", "_file": "file", + "host": "host", "_line":1, "_pid": 1, "_prefix": "prefix", "full_message": "this is my service message", "_Signature": "%s"}` + message = fmt.Sprintf(message, signatureField) + + require.NoError(t, s.handleLogMessage(context.TODO(), []byte(message))) + + logs, err := workflow.LoadServiceLog(db, dbj.ID, signature.Service.RequirementName) + require.NoError(t, err) + require.Equal(t, "this is my service message\n", logs.Val) +} diff --git a/engine/cdn/cdn_router.go b/engine/cdn/cdn_router.go new file mode 100644 index 0000000000..9f343b9ad4 --- /dev/null +++ b/engine/cdn/cdn_router.go @@ -0,0 +1,18 @@ +package cdn + +import ( + "context" +) + +func (s *Service) initRouter(ctx context.Context) { + //r := s.Router + //r.Background = ctx + //r.URL = s.Cfg.URL + //r.SetHeaderFunc = api.DefaultHeaders + //r.Middlewares = append(r.Middlewares, service.CheckRequestSignatureMiddleware(s.ParsedAPIPublicKey)) + + //r.Handle("/mon/version", nil, r.GET(api.VersionHandler, api.Auth(false))) + //r.Handle("/mon/status", nil, r.GET(s.statusHandler, api.Auth(false))) + //r.Handle("/mon/metrics", nil, r.GET(service.GetPrometheustMetricsHandler(s), api.Auth(false))) + //r.Handle("/mon/metrics/all", nil, r.GET(service.GetMetricsHandler, api.Auth(false))) +} diff --git a/engine/cdn/status_handler.go b/engine/cdn/status_handler.go new file mode 100644 index 0000000000..294a431058 --- /dev/null +++ b/engine/cdn/status_handler.go @@ -0,0 +1,24 @@ +package cdn + +import ( + "context" + "net/http" + + "github.com/ovh/cds/engine/service" + "github.com/ovh/cds/sdk" +) + +func (s *Service) statusHandler() service.Handler { + return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error { + var status = http.StatusOK + return service.WriteJSON(w, s.Status(ctx), status) + } +} + +func (s *Service) Status(ctx context.Context) sdk.MonitoringStatus { + m := s.CommonMonitoring() + + status := sdk.MonitoringStatusOK + m.Lines = append(m.Lines, sdk.MonitoringStatusLine{Component: "CDN", Value: status, Status: status}) + return m +} diff --git a/engine/cdn/types.go b/engine/cdn/types.go new file mode 100644 index 0000000000..cf805f9e27 --- /dev/null +++ b/engine/cdn/types.go @@ -0,0 +1,33 @@ +package cdn + +import ( + "github.com/go-gorp/gorp" + "github.com/ovh/cds/engine/api/cache" + "github.com/ovh/cds/engine/service" + "github.com/ovh/cds/sdk" +) + +// Service is the stuct representing a hooks µService +type Service struct { + service.Common + Cfg Configuration + //Router *api.Router + Db *gorp.DbMap + Cache cache.Store +} + +// Configuration is the hooks configuration structure +type Configuration struct { + Name string `toml:"name" default:"cds-cdn" comment:"Name of this CDS CDN Service\n Enter a name to enable this service" json:"name"` + TCP sdk.TCPServer `toml:"tcp" comment:"######################\n CDS CDN TCP Configuration \n######################" json:"tcp"` + HTTP struct { + Addr string `toml:"addr" default:"" commented:"true" comment:"Listen address without port, example: 127.0.0.1" json:"addr"` + Port int `toml:"port" default:"8089" json:"port"` + } `toml:"http" comment:"######################\n CDS CDN HTTP Configuration \n######################" json:"http"` + URL string `default:"http://localhost:8087" json:"url"` + API service.APIServiceConfiguration `toml:"api" comment:"######################\n CDS API Settings \n######################" json:"api"` + Log struct { + StepMaxSize int64 `toml:"stepMaxSize" default:"15728640" comment:"Max step logs size in bytes (default: 15MB)" json:"stepMaxSize"` + ServiceMaxSize int64 `toml:"serviceMaxSize" default:"15728640" comment:"Max service logs size in bytes (default: 15MB)" json:"serviceMaxSize"` + } `toml:"log" json:"log" comment:"###########################\n Log settings.\n##########################"` +} diff --git a/engine/cmd_config.go b/engine/cmd_config.go index f89a267a96..c630964f4d 100644 --- a/engine/cmd_config.go +++ b/engine/cmd_config.go @@ -11,6 +11,7 @@ import ( toml "github.com/yesnault/go-toml" "github.com/ovh/cds/engine/api" + "github.com/ovh/cds/engine/cdn" "github.com/ovh/cds/engine/hatchery/kubernetes" "github.com/ovh/cds/engine/hatchery/local" "github.com/ovh/cds/engine/hatchery/marathon" @@ -203,6 +204,14 @@ var configCheckCmd = &cobra.Command{ } } + if conf.CDN != nil && conf.CDN.API.HTTP.URL != "" { + fmt.Printf("checking cdn configuration...\n") + if err := cdn.New().CheckConfiguration(*conf.CDN); err != nil { + fmt.Printf("cdn Configuration: %v\n", err) + hasError = true + } + } + if !hasError { fmt.Println("Configuration file OK") } diff --git a/engine/cmd_start.go b/engine/cmd_start.go index e90f5f5df5..d8c7faa3bb 100644 --- a/engine/cmd_start.go +++ b/engine/cmd_start.go @@ -14,6 +14,7 @@ import ( "github.com/ovh/cds/engine/api" "github.com/ovh/cds/engine/api/observability" "github.com/ovh/cds/engine/api/services" + "github.com/ovh/cds/engine/cdn" "github.com/ovh/cds/engine/elasticsearch" "github.com/ovh/cds/engine/hatchery/kubernetes" "github.com/ovh/cds/engine/hatchery/local" @@ -79,9 +80,12 @@ This component operates CDS workflow repositories #### VCS This component operates CDS VCS connectivity +#### CDN +This component operates CDS CDN to handle storage + Start all of this with a single command: - $ engine start [api] [hatchery:local] [hatchery:marathon] [hatchery:openstack] [hatchery:swarm] [hatchery:vsphere] [elasticsearch] [hooks] [vcs] [repositories] [migrate] [ui] + $ engine start [api] [cdn] [hatchery:local] [hatchery:marathon] [hatchery:openstack] [hatchery:swarm] [hatchery:vsphere] [elasticsearch] [hooks] [vcs] [repositories] [migrate] [ui] All the services are using the same configuration file format. @@ -133,7 +137,7 @@ See $ engine config command for more details. for _, a := range args { fmt.Printf("Starting service %s\n", a) switch a { - case "api": + case services.TypeAPI: if conf.API == nil { sdk.Exit("Unable to start: missing service %s configuration", a) } @@ -141,7 +145,7 @@ See $ engine config command for more details. names = append(names, conf.API.Name) types = append(types, services.TypeAPI) - case "ui": + case services.TypeUI: if conf.UI == nil { sdk.Exit("Unable to start: missing service %s configuration", a) } @@ -157,7 +161,7 @@ See $ engine config command for more details. names = append(names, conf.DatabaseMigrate.Name) types = append(types, services.TypeDBMigrate) - case "hatchery:local": + case services.TypeHatchery + ":local": if conf.Hatchery.Local == nil { sdk.Exit("Unable to start: missing service %s configuration", a) } @@ -165,7 +169,7 @@ See $ engine config command for more details. names = append(names, conf.Hatchery.Local.Name) types = append(types, services.TypeHatchery) - case "hatchery:kubernetes": + case services.TypeHatchery + ":kubernetes": if conf.Hatchery.Kubernetes == nil { sdk.Exit("Unable to start: missing service %s configuration", a) } @@ -173,7 +177,7 @@ See $ engine config command for more details. names = append(names, conf.Hatchery.Kubernetes.Name) types = append(types, services.TypeHatchery) - case "hatchery:marathon": + case services.TypeHatchery + ":marathon": if conf.Hatchery.Marathon == nil { sdk.Exit("Unable to start: missing service %s configuration", a) } @@ -181,7 +185,7 @@ See $ engine config command for more details. names = append(names, conf.Hatchery.Marathon.Name) types = append(types, services.TypeHatchery) - case "hatchery:openstack": + case services.TypeHatchery + ":openstack": if conf.Hatchery.Openstack == nil { sdk.Exit("Unable to start: missing service %s configuration", a) } @@ -189,7 +193,7 @@ See $ engine config command for more details. names = append(names, conf.Hatchery.Openstack.Name) types = append(types, services.TypeAPI) - case "hatchery:swarm": + case services.TypeHatchery + ":swarm": if conf.Hatchery.Swarm == nil { sdk.Exit("Unable to start: missing service %s configuration", a) } @@ -197,7 +201,7 @@ See $ engine config command for more details. names = append(names, conf.Hatchery.Swarm.Name) types = append(types, services.TypeHatchery) - case "hatchery:vsphere": + case services.TypeHatchery + ":vsphere": if conf.Hatchery.VSphere == nil { sdk.Exit("Unable to start: missing service %s configuration", a) } @@ -205,7 +209,7 @@ See $ engine config command for more details. names = append(names, conf.Hatchery.VSphere.Name) types = append(types, services.TypeHatchery) - case "hooks": + case services.TypeHooks: if conf.Hooks == nil { sdk.Exit("Unable to start: missing service %s configuration", a) } @@ -213,7 +217,15 @@ See $ engine config command for more details. names = append(names, conf.Hooks.Name) types = append(types, services.TypeHooks) - case "vcs": + case services.TypeCDN: + if conf.CDN == nil { + sdk.Exit("Unable to start: missing service %s configuration", a) + } + serviceConfs = append(serviceConfs, serviceConf{arg: a, service: cdn.New(), cfg: *conf.CDN}) + names = append(names, conf.CDN.Name) + types = append(types, services.TypeCDN) + + case services.TypeVCS: if conf.VCS == nil { sdk.Exit("Unable to start: missing service %s configuration", a) } @@ -221,7 +233,7 @@ See $ engine config command for more details. names = append(names, conf.VCS.Name) types = append(types, services.TypeVCS) - case "repositories": + case services.TypeRepositories: if conf.Repositories == nil { sdk.Exit("Unable to start: missing service %s configuration", a) } diff --git a/engine/config.go b/engine/config.go index 0f745413de..0c4d6c1d99 100644 --- a/engine/config.go +++ b/engine/config.go @@ -20,6 +20,7 @@ import ( "github.com/ovh/cds/engine/api/database" "github.com/ovh/cds/engine/api/database/gorpmapping" "github.com/ovh/cds/engine/api/services" + "github.com/ovh/cds/engine/cdn" "github.com/ovh/cds/engine/elasticsearch" "github.com/ovh/cds/engine/hatchery/kubernetes" "github.com/ovh/cds/engine/hatchery/local" @@ -63,7 +64,7 @@ func configBootstrap(args []string) Configuration { } for _, a := range args { switch a { - case "api": + case services.TypeAPI: conf.API = &api.Configuration{} conf.API.Name = "cds-api-" + namesgenerator.GetRandomNameCDS(0) defaults.SetDefaults(conf.API) @@ -77,7 +78,7 @@ func configBootstrap(args []string) Configuration { HealthURL: "https://ovh.github.io", Type: "doc", }) - case "ui": + case services.TypeUI: conf.UI = &ui.Configuration{} conf.UI.Name = "cds-ui-" + namesgenerator.GetRandomNameCDS(0) defaults.SetDefaults(conf.UI) @@ -85,23 +86,23 @@ func configBootstrap(args []string) Configuration { conf.DatabaseMigrate = &migrateservice.Configuration{} defaults.SetDefaults(conf.DatabaseMigrate) conf.DatabaseMigrate.Name = "cds-migrate-" + namesgenerator.GetRandomNameCDS(0) - case "hatchery:local": + case services.TypeHatchery + ":local": conf.Hatchery.Local = &local.HatcheryConfiguration{} defaults.SetDefaults(conf.Hatchery.Local) conf.Hatchery.Local.Name = "cds-hatchery-local-" + namesgenerator.GetRandomNameCDS(0) - case "hatchery:kubernetes": + case services.TypeHatchery + ":kubernetes": conf.Hatchery.Kubernetes = &kubernetes.HatcheryConfiguration{} defaults.SetDefaults(conf.Hatchery.Kubernetes) conf.Hatchery.Kubernetes.Name = "cds-hatchery-kubernetes-" + namesgenerator.GetRandomNameCDS(0) - case "hatchery:marathon": + case services.TypeHatchery + ":marathon": conf.Hatchery.Marathon = &marathon.HatcheryConfiguration{} defaults.SetDefaults(conf.Hatchery.Marathon) conf.Hatchery.Marathon.Name = "cds-hatchery-marathon-" + namesgenerator.GetRandomNameCDS(0) - case "hatchery:openstack": + case services.TypeHatchery + ":openstack": conf.Hatchery.Openstack = &openstack.HatcheryConfiguration{} defaults.SetDefaults(conf.Hatchery.Openstack) conf.Hatchery.Openstack.Name = "cds-hatchery-openstack-" + namesgenerator.GetRandomNameCDS(0) - case "hatchery:swarm": + case services.TypeHatchery + ":swarm": conf.Hatchery.Swarm = &swarm.HatcheryConfiguration{} defaults.SetDefaults(conf.Hatchery.Swarm) conf.Hatchery.Swarm.DockerEngines = map[string]swarm.DockerEngineConfiguration{ @@ -110,15 +111,15 @@ func configBootstrap(args []string) Configuration { }, } conf.Hatchery.Swarm.Name = "cds-hatchery-swarm-" + namesgenerator.GetRandomNameCDS(0) - case "hatchery:vsphere": + case services.TypeHatchery + ":vsphere": conf.Hatchery.VSphere = &vsphere.HatcheryConfiguration{} defaults.SetDefaults(conf.Hatchery.VSphere) conf.Hatchery.VSphere.Name = "cds-hatchery-vsphere-" + namesgenerator.GetRandomNameCDS(0) - case "hooks": + case services.TypeHooks: conf.Hooks = &hooks.Configuration{} defaults.SetDefaults(conf.Hooks) conf.Hooks.Name = "cds-hooks-" + namesgenerator.GetRandomNameCDS(0) - case "vcs": + case services.TypeVCS: conf.VCS = &vcs.Configuration{} defaults.SetDefaults(conf.VCS) var github vcs.GithubServerConfiguration @@ -132,18 +133,21 @@ func configBootstrap(args []string) Configuration { var gerrit vcs.GerritServerConfiguration defaults.SetDefaults(&gerrit) conf.VCS.Servers = map[string]vcs.ServerConfiguration{ - "github": vcs.ServerConfiguration{URL: "https://github.com", Github: &github}, - "bitbucket": vcs.ServerConfiguration{URL: "https://mybitbucket.com", Bitbucket: &bitbucket}, - "bitbucketcloud": vcs.ServerConfiguration{BitbucketCloud: &bitbucketcloud}, - "gitlab": vcs.ServerConfiguration{URL: "https://gitlab.com", Gitlab: &gitlab}, - "gerrit": vcs.ServerConfiguration{URL: "http://localhost:8080", Gerrit: &gerrit}, + "github": {URL: "https://github.com", Github: &github}, + "bitbucket": {URL: "https://mybitbucket.com", Bitbucket: &bitbucket}, + "bitbucketcloud": {BitbucketCloud: &bitbucketcloud}, + "gitlab": {URL: "https://gitlab.com", Gitlab: &gitlab}, + "gerrit": {URL: "http://localhost:8080", Gerrit: &gerrit}, } conf.VCS.Name = "cds-vcs-" + namesgenerator.GetRandomNameCDS(0) - case "repositories": + case services.TypeRepositories: conf.Repositories = &repositories.Configuration{} defaults.SetDefaults(conf.Repositories) conf.Repositories.Name = "cds-repositories-" + namesgenerator.GetRandomNameCDS(0) - case "elasticsearch": + case services.TypeCDN: + conf.CDN = &cdn.Configuration{} + defaults.SetDefaults(conf.CDN) + case services.TypeElasticsearch: conf.ElasticSearch = &elasticsearch.Configuration{} defaults.SetDefaults(conf.ElasticSearch) default: diff --git a/engine/hatchery/kubernetes/kubernetes.go b/engine/hatchery/kubernetes/kubernetes.go index db40b8ba9a..05173edd8d 100644 --- a/engine/hatchery/kubernetes/kubernetes.go +++ b/engine/hatchery/kubernetes/kubernetes.go @@ -43,6 +43,9 @@ func New() *HatcheryKubernetes { // InitHatchery register local hatchery with its worker model func (h *HatcheryKubernetes) InitHatchery(ctx context.Context) error { + if err := h.Common.InitServiceLogger(); err != nil { + return err + } sdk.GoRoutine(context.Background(), "hatchery kubernetes routines", func(ctx context.Context) { h.routines(ctx) }) diff --git a/engine/hatchery/kubernetes/services.go b/engine/hatchery/kubernetes/services.go index e340a7392e..cb149eba61 100644 --- a/engine/hatchery/kubernetes/services.go +++ b/engine/hatchery/kubernetes/services.go @@ -59,18 +59,21 @@ func (h *HatcheryKubernetes) getServicesLogs(ctx context.Context) error { ServiceRequirementID: reqServiceID, ServiceRequirementName: subsStr[0][2], Val: string(logs), + WorkerName: pod.ObjectMeta.Name, }) } } if len(servicesLogs) > 0 { - // Do call api ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) - if err := h.Client.QueueServiceLogs(ctx, servicesLogs); err != nil { - cancel() - return fmt.Errorf("Hatchery> Swarm> Cannot send service logs : %v", err) + defer cancel() + if h.Common.ServiceLogger == nil { + if err := h.Client.QueueServiceLogs(ctx, servicesLogs); err != nil { + return sdk.WithStack(fmt.Errorf("hatchery> Swarm> Cannot send service logs : %v", err)) + } + } else { + h.Common.SendServiceLog(ctx, servicesLogs) } - cancel() } return nil diff --git a/engine/hatchery/kubernetes/services_test.go b/engine/hatchery/kubernetes/services_test.go new file mode 100644 index 0000000000..b26005efb3 --- /dev/null +++ b/engine/hatchery/kubernetes/services_test.go @@ -0,0 +1,100 @@ +package kubernetes + +import ( + "context" + "crypto/rand" + "crypto/rsa" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "net/http" + "strings" + "testing" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/h2non/gock.v1" + + "github.com/ovh/cds/sdk" +) + +var loggerCall = 0 + +func Test_serviceLogs(t *testing.T) { + h := NewHatcheryKubernetesTest(t) + h.Common.ServiceInstance = &sdk.Service{ + LogServer: sdk.TCPServer{ + Addr: "tcphost", + Port: 8090, + }, + } + reader := rand.Reader + bitSize := 2048 + key, err := rsa.GenerateKey(reader, bitSize) + require.NoError(t, err) + h.Common.PrivateKey = key + require.NoError(t, h.InitServiceLogger()) + + podsList := v1.PodList{ + Items: []v1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-name", + Namespace: "kyubi", + Labels: map[string]string{ + LABEL_SERVICE_JOB_ID: "666", + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "service-666-blabla", + }, + }, + }, + }, + }, + } + gock.New("http://lolcat.kube").Get("/api/v1/namespaces/hachibi/pods").Reply(http.StatusOK).JSON(podsList) + + gock.New("http://lolcat.kube").AddMatcher(func(r *http.Request, rr *gock.Request) (bool, error) { + b, err := gock.MatchPath(r, rr) + assert.NoError(t, err) + if r.Method == http.MethodGet && strings.HasPrefix(r.URL.String(), "http://lolcat.kube/api/v1/namespaces/hachibi/pods/pod-name/log?container=service-666-blabla") { + if b { + return true, nil + } + return false, nil + } + return true, nil + }).Reply(http.StatusOK).Body(strings.NewReader("Je suis le log")) + + h.ServiceLogger = GetMockLogger() + + loggerCall = 0 + assert.NoError(t, h.getServicesLogs(context.TODO())) + + for _, p := range gock.Pending() { + t.Logf("%+v", p.Request().URLStruct.String()) + } + require.True(t, gock.IsDone()) + require.Equal(t, 1, loggerCall) +} + +func GetMockLogger() *logrus.Logger { + log := logrus.New() + log.AddHook(&HookMock{}) + return log +} + +type HookMock struct{} + +func (h *HookMock) Levels() []logrus.Level { + return []logrus.Level{ + logrus.InfoLevel, + } +} +func (h *HookMock) Fire(e *logrus.Entry) error { + loggerCall++ + return nil +} diff --git a/engine/hatchery/serve.go b/engine/hatchery/serve.go index 3577d279e3..ae29fd127a 100644 --- a/engine/hatchery/serve.go +++ b/engine/hatchery/serve.go @@ -13,12 +13,15 @@ import ( "time" "github.com/gorilla/mux" + "github.com/sirupsen/logrus" + "gopkg.in/square/go-jose.v2" "github.com/ovh/cds/engine/api" "github.com/ovh/cds/engine/service" "github.com/ovh/cds/sdk" "github.com/ovh/cds/sdk/cdsclient" "github.com/ovh/cds/sdk/hatchery" + "github.com/ovh/cds/sdk/jws" "github.com/ovh/cds/sdk/log" ) @@ -187,6 +190,46 @@ func (c *Common) getPanicDumpListHandler() service.Handler { } } +func (c *Common) InitServiceLogger() error { + tcpServer := c.Common.ServiceInstance.LogServer + var signer jose.Signer + if tcpServer.Addr != "" && tcpServer.Port > 0 { + logger, err := log.New(fmt.Sprintf("%s:%d", tcpServer.Addr, tcpServer.Port)) + if err != nil { + return sdk.WithStack(err) + } + signer, err = jws.NewSigner(c.Common.PrivateKey) + if err != nil { + return sdk.WithStack(err) + } + c.Signer = signer + c.ServiceLogger = logger + } + return nil +} + +func (c *Common) SendServiceLog(ctx context.Context, servicesLogs []sdk.ServiceLog) { + for _, s := range servicesLogs { + dataToSign := log.Signature{ + Service: &log.SignatureService{ + HatcheryID: c.Service().ID, + HatcheryName: c.ServiceName(), + RequirementID: s.ServiceRequirementID, + RequirementName: s.ServiceRequirementName, + WorkerName: s.WorkerName, + }, + JobID: s.WorkflowNodeJobRunID, + Timestamp: time.Now().UnixNano(), + } + signature, err := jws.Sign(c.Signer, dataToSign) + if err != nil { + log.Error(ctx, "SendServiceLog> unable to sign service log message: %v", err) + continue + } + c.ServiceLogger.WithField("Signature", signature).Log(logrus.InfoLevel, s) + } +} + func (c *Common) getPanicDumpHandler() service.Handler { return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error { vars := mux.Vars(r) diff --git a/engine/hatchery/swarm/helper_test.go b/engine/hatchery/swarm/helper_test.go index d14ee8360f..c8c80f9c09 100644 --- a/engine/hatchery/swarm/helper_test.go +++ b/engine/hatchery/swarm/helper_test.go @@ -1,6 +1,7 @@ package swarm import ( + "github.com/ovh/cds/engine/hatchery" "github.com/ovh/cds/sdk/cdsclient" "github.com/stretchr/testify/require" "gopkg.in/h2non/gock.v1" @@ -37,6 +38,7 @@ func testSwarmHatchery(t *testing.T) *HatcherySwarm { Config: HatcheryConfiguration{ DisableDockerOptsOnRequirements: false, }, + Common: hatchery.Common{}, } h.dockerClients["default"] = &dockerClient{Client: *c, MaxContainers: 2, name: "default"} diff --git a/engine/hatchery/swarm/swarm.go b/engine/hatchery/swarm/swarm.go index f94b1cfb37..2a2f8b55a9 100644 --- a/engine/hatchery/swarm/swarm.go +++ b/engine/hatchery/swarm/swarm.go @@ -172,6 +172,9 @@ func (h *HatcherySwarm) InitHatchery(ctx context.Context) error { return fmt.Errorf("no docker engine available") } } + if err := h.Common.InitServiceLogger(); err != nil { + return err + } sdk.GoRoutine(context.Background(), "swarm", func(ctx context.Context) { h.routines(ctx) }) diff --git a/engine/hatchery/swarm/swarm_util_logs.go b/engine/hatchery/swarm/swarm_util_logs.go index 4b89064053..12389cfef3 100644 --- a/engine/hatchery/swarm/swarm_util_logs.go +++ b/engine/hatchery/swarm/swarm_util_logs.go @@ -7,6 +7,7 @@ import ( "time" "github.com/docker/docker/api/types" + "github.com/ovh/cds/sdk" "github.com/ovh/cds/sdk/log" ) @@ -24,6 +25,7 @@ func (h *HatcherySwarm) getServicesLogs() error { if !isWorkflowService { continue } + workerName := cnt.Labels["service_worker"] ctx, cancel := context.WithTimeout(context.Background(), time.Minute*2) logsOpts := types.ContainerLogsOptions{ Details: true, @@ -72,17 +74,21 @@ func (h *HatcherySwarm) getServicesLogs() error { ServiceRequirementID: reqServiceID, ServiceRequirementName: cnt.Labels["service_req_name"], Val: string(logs), + WorkerName: workerName, }) } - - if len(servicesLogs) > 0 { + } + if len(servicesLogs) > 0 { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + if h.Common.ServiceLogger == nil { // Do call api - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) if err := h.Client.QueueServiceLogs(ctx, servicesLogs); err != nil { log.Error(ctx, "Hatchery> Swarm> Cannot send service logs : %v", err) } - cancel() + } else { + h.Common.SendServiceLog(ctx, servicesLogs) } + cancel() } } return nil diff --git a/engine/hatchery/swarm/swarm_util_logs_test.go b/engine/hatchery/swarm/swarm_util_logs_test.go new file mode 100644 index 0000000000..14ff6e232e --- /dev/null +++ b/engine/hatchery/swarm/swarm_util_logs_test.go @@ -0,0 +1,98 @@ +package swarm + +import ( + "crypto/rand" + "crypto/rsa" + "net/http" + "strings" + "testing" + + "github.com/docker/docker/api/types" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/h2non/gock.v1" + + "github.com/ovh/cds/sdk" +) + +var loggerCall = 0 + +func Test_serviceLogs(t *testing.T) { + h := InitTestHatcherySwarm(t) + h.Common.ServiceInstance = &sdk.Service{ + LogServer: sdk.TCPServer{ + Addr: "tcphost", + Port: 8090, + }, + } + reader := rand.Reader + bitSize := 2048 + key, err := rsa.GenerateKey(reader, bitSize) + require.NoError(t, err) + h.Common.PrivateKey = key + require.NoError(t, h.InitServiceLogger()) + + containers := []types.Container{ + { + ID: "swarmy-model1-w1", + Names: []string{"swarmy-model1-w1"}, + Labels: map[string]string{ + "hatchery": "swarmy", + "worker_name": "swarmy-model1-w1", + }, + }, + { + ID: "service-1", + Names: []string{"swarmy-model1-w1"}, + Labels: map[string]string{ + "hatchery": "swarmy", + "worker_name": "swarmy-model1-w1", + "service_job_id": "666", + "service_id": "1", + }, + }, + } + + gock.New("https://lolcat.host").Get("/v6.66/containers/json").Reply(http.StatusOK).JSON(containers) + gock.New("https://lolcat.host").AddMatcher(func(r *http.Request, rr *gock.Request) (bool, error) { + b, err := gock.MatchPath(r, rr) + assert.NoError(t, err) + if r.Method == http.MethodGet && strings.HasPrefix(r.URL.String(), "https://lolcat.host/v6.66/containers/service-1/logs") { + if b { + return true, nil + } + return false, nil + } + return true, nil + }).Reply(http.StatusOK).Body(strings.NewReader("Je suis le log")) + + h.ServiceLogger = GetMockLogger() + + loggerCall = 0 + assert.NoError(t, h.getServicesLogs()) + + for _, p := range gock.Pending() { + t.Logf("%+v", p.Request().URLStruct.String()) + } + require.True(t, gock.IsDone()) + require.Equal(t, 1, loggerCall) +} + +func GetMockLogger() *logrus.Logger { + log := logrus.New() + log.AddHook(&HookMock{}) + return log +} + +type HookMock struct{} + +func (h *HookMock) Levels() []logrus.Level { + return []logrus.Level{ + logrus.InfoLevel, + } +} +func (h *HookMock) Fire(e *logrus.Entry) error { + loggerCall++ + return nil +} diff --git a/engine/service/types.go b/engine/service/types.go index a905aab252..ccbe9995a8 100644 --- a/engine/service/types.go +++ b/engine/service/types.go @@ -6,6 +6,9 @@ import ( "fmt" "time" + "github.com/sirupsen/logrus" + "gopkg.in/square/go-jose.v2" + "github.com/ovh/cds/sdk" "github.com/ovh/cds/sdk/cdsclient" ) @@ -102,6 +105,8 @@ type Common struct { ServiceType string ServiceInstance *sdk.Service PrivateKey *rsa.PrivateKey + Signer jose.Signer + ServiceLogger *logrus.Logger } // Service is the interface for a engine service diff --git a/engine/sql/201_worker.sql b/engine/sql/201_worker.sql new file mode 100644 index 0000000000..9a99b46e60 --- /dev/null +++ b/engine/sql/201_worker.sql @@ -0,0 +1,10 @@ +-- +migrate Up +ALTER TABLE "worker" ADD COLUMN IF NOT EXISTS cypher_private_key BYTEA; +ALTER TABLE "worker" ADD COLUMN IF NOT EXISTS sig BYTEA; +ALTER TABLE "worker" ADD COLUMN IF NOT EXISTS signer TEXT; + + +-- +migrate Down +ALTER TABLE "worker" DROP COLUMN cypher_hmac_key; +ALTER TABLE "worker" DROP COLUMN sig; +ALTER TABLE "worker" DROP COLUMN signer; diff --git a/engine/types.go b/engine/types.go index 732d0f8b5f..6ff22fbc4d 100644 --- a/engine/types.go +++ b/engine/types.go @@ -3,6 +3,7 @@ package main import ( "github.com/ovh/cds/engine/api" "github.com/ovh/cds/engine/api/observability" + "github.com/ovh/cds/engine/cdn" "github.com/ovh/cds/engine/elasticsearch" "github.com/ovh/cds/engine/hatchery/kubernetes" "github.com/ovh/cds/engine/hatchery/local" @@ -36,6 +37,7 @@ type Configuration struct { UI *ui.Configuration `toml:"ui" comment:"#####################\n UI Configuration \n####################" json:"ui"` Hatchery *HatcheryConfiguration `toml:"hatchery" json:"hatchery"` Hooks *hooks.Configuration `toml:"hooks" comment:"######################\n CDS Hooks Settings \n######################" json:"hooks"` + CDN *cdn.Configuration `toml:"cdn" comment:"######################\n CDS cdn Settings \n######################" json:"cdn"` VCS *vcs.Configuration `toml:"vcs" comment:"######################\n CDS VCS Settings \n######################" json:"vcs"` Repositories *repositories.Configuration `toml:"repositories" comment:"######################\n CDS Repositories Settings \n######################" json:"repositories"` ElasticSearch *elasticsearch.Configuration `toml:"elasticsearch" comment:"######################\n CDS ElasticSearch Settings \n This is use for CDS timeline and is optional\n######################" json:"elasticsearch"` diff --git a/engine/worker/internal/take.go b/engine/worker/internal/take.go index 173f4238d3..9e110e5b3e 100644 --- a/engine/worker/internal/take.go +++ b/engine/worker/internal/take.go @@ -2,11 +2,13 @@ package internal import ( "context" + "encoding/base64" "strings" "time" "github.com/ovh/cds/engine/worker/pkg/workerruntime" "github.com/ovh/cds/sdk" + "github.com/ovh/cds/sdk/jws" "github.com/ovh/cds/sdk/log" ) @@ -31,6 +33,27 @@ func (w *CurrentWorker) Take(ctx context.Context, job sdk.WorkflowNodeJobRun) er // Reset build variables w.currentJob.newVariables = nil + if info.SigningKey != "" { + secretKey := make([]byte, 32) + if _, err := base64.StdEncoding.Decode(secretKey, []byte(info.SigningKey)); err != nil { + return sdk.WithStack(err) + } + signer, err := jws.NewHMacSigner(secretKey) + if err != nil { + return sdk.WithStack(err) + } + w.currentJob.signer = signer + } + + if info.GelfServiceAddr != "" { + log.Info(ctx, "Setup step logger") + logger, err := log.New(info.GelfServiceAddr) + if err != nil { + return sdk.WithStack(err) + } + w.logger.stepLogger = logger + } + start := time.Now() //This goroutine try to get the job every 5 seconds, if it fails, it cancel the build. diff --git a/engine/worker/internal/types.go b/engine/worker/internal/types.go index 9e7adfd2a9..ceff2b22cd 100644 --- a/engine/worker/internal/types.go +++ b/engine/worker/internal/types.go @@ -10,11 +10,13 @@ import ( "time" "github.com/ovh/cds/engine/worker/pkg/workerruntime" - + "github.com/sirupsen/logrus" "github.com/spf13/afero" + "gopkg.in/square/go-jose.v2" "github.com/ovh/cds/sdk" "github.com/ovh/cds/sdk/cdsclient" + "github.com/ovh/cds/sdk/jws" "github.com/ovh/cds/sdk/log" ) @@ -32,8 +34,9 @@ type CurrentWorker struct { basedir afero.Fs manualExit bool logger struct { - logChan chan sdk.Log - llist *list.List + logChan chan sdk.Log + llist *list.List + stepLogger *logrus.Logger } httpPort int32 register struct { @@ -47,6 +50,7 @@ type CurrentWorker struct { params []sdk.Parameter secrets []sdk.Variable context context.Context + signer jose.Signer } status struct { Name string `json:"name"` @@ -83,15 +87,45 @@ func (wk *CurrentWorker) Parameters() []sdk.Parameter { func (wk *CurrentWorker) SendLog(ctx context.Context, level workerruntime.Level, s string) { jobID, _ := workerruntime.JobID(ctx) stepOrder, err := workerruntime.StepOrder(ctx) - if !strings.HasSuffix(s, "\n") { - s += "\n" + if wk.logger.stepLogger == nil { + if !strings.HasSuffix(s, "\n") { + s += "\n" + } + if err != nil { + log.Error(ctx, "SendLog> %v", err) + } + if err := wk.sendLog(jobID, fmt.Sprintf("[%s] ", level)+s, stepOrder, false); err != nil { + log.Error(ctx, "SendLog> %v", err) + } + return } - if err != nil { - log.Error(ctx, "SendLog> %v", err) + var logLevel logrus.Level + switch level { + case workerruntime.LevelDebug: + logLevel = logrus.DebugLevel + case workerruntime.LevelInfo: + logLevel = logrus.InfoLevel + case workerruntime.LevelWarn: + logLevel = logrus.WarnLevel + case workerruntime.LevelError: + logLevel = logrus.ErrorLevel + default: } - if err := wk.sendLog(jobID, fmt.Sprintf("[%s] ", level)+s, stepOrder, false); err != nil { - log.Error(ctx, "SendLog> %v", err) + + dataToSign := log.Signature{ + Worker: &log.SignatureWorker{ + WorkerID: wk.id, + StepOrder: int64(stepOrder), + }, + JobID: wk.currentJob.wJob.ID, + Timestamp: time.Now().UnixNano(), + } + signature, err := jws.Sign(wk.currentJob.signer, dataToSign) + if err != nil { + log.Error(ctx, "unable to sign logs: %v", err) } + wk.logger.stepLogger.WithField(log.ExtraFieldSignature, signature).Log(logLevel, s) + } func (wk *CurrentWorker) Name() string { diff --git a/sdk/config.go b/sdk/config.go index 17d0ff8637..e5e698f901 100644 --- a/sdk/config.go +++ b/sdk/config.go @@ -10,3 +10,8 @@ type ConfigUser struct { URLUI string `json:"url.ui"` URLAPI string `json:"url.api"` } + +type TCPServer struct { + Addr string `toml:"addr" default:"" commented:"true" comment:"Listen address without port, example: 127.0.0.1" json:"addr"` + Port int `toml:"port" default:"8089" json:"port"` +} diff --git a/sdk/jws/jws.go b/sdk/jws/jws.go index cb9448ee98..cd6baf7810 100644 --- a/sdk/jws/jws.go +++ b/sdk/jws/jws.go @@ -8,12 +8,25 @@ import ( "encoding/json" "encoding/pem" "errors" + "fmt" "gopkg.in/square/go-jose.v2" "github.com/ovh/cds/sdk" ) +func NewRandomSymmetricKey(size int) ([]byte, error) { + if size <= 0 || size%8 != 0 { + return nil, sdk.WithStack(fmt.Errorf("invalid key size")) + } + + k := make([]byte, size) + if _, err := rand.Read(k); err != nil { + return nil, sdk.WithStack(err) + } + return k, nil +} + // NewRandomRSAKey generates a public/private key pair func NewRandomRSAKey() (*rsa.PrivateKey, error) { // Generate a public/private key pair to use for this example. @@ -63,6 +76,15 @@ func NewSigner(privateKey *rsa.PrivateKey) (jose.Signer, error) { return jose.NewSigner(jose.SigningKey{Algorithm: jose.PS512, Key: privateKey}, nil) } +// NewHMacSigner instantiates a signer using HMAC using SHA-512 with the given private key. +func NewHMacSigner(secret []byte) (jose.Signer, error) { + sign, err := jose.NewSigner(jose.SigningKey{Algorithm: jose.HS512, Key: secret}, nil) + if err != nil { + return nil, sdk.WithStack(err) + } + return sign, nil +} + // Sign a json marshalled content and returns a protected JWS object using the full serialization format. func Sign(signer jose.Signer, content interface{}) (string, error) { btes, err := json.Marshal(content) @@ -82,23 +104,23 @@ func Sign(signer jose.Signer, content interface{}) (string, error) { // Verify parses the serialized, protected JWS object, than verifying the signature on the payload // and unmarshal the payload into i -func Verify(publicKey *rsa.PublicKey, s string, i interface{}) error { +func Verify(key interface{}, s string, i interface{}) error { object, err := jose.ParseSigned(s) if err != nil { return sdk.WithStack(err) } - output, err := object.Verify(publicKey) + output, err := object.Verify(key) if err != nil { return sdk.WithStack(err) } - return json.Unmarshal(output, i) + return sdk.WithStack(json.Unmarshal(output, i)) } func UnsafeParse(s string, i interface{}) error { object, err := jose.ParseSigned(s) if err != nil { - return err + return sdk.WithStack(err) } output := object.UnsafePayloadWithoutVerification() - return json.Unmarshal(output, i) + return sdk.WithStack(json.Unmarshal(output, i)) } diff --git a/sdk/jws/jws_test.go b/sdk/jws/jws_test.go index 576daa4a52..c690d18615 100644 --- a/sdk/jws/jws_test.go +++ b/sdk/jws/jws_test.go @@ -13,3 +13,19 @@ func TestNewRandomRSAKey(t *testing.T) { require.NoError(t, err) t.Log(string(btes)) } + +func TestHMacSignAndVerify(t *testing.T) { + secret, err := NewRandomSymmetricKey(32) + require.NoError(t, err) + signer, err := NewHMacSigner(secret) + require.NoError(t, err) + + message := "coucou" + messageSigned, err := Sign(signer, message) + require.NoError(t, err) + require.NotEqual(t, message, messageSigned) + + var unsigned string + require.NoError(t, Verify(secret, messageSigned, &unsigned)) + require.Equal(t, message, unsigned) +} diff --git a/sdk/log.go b/sdk/log.go index 82c56db508..72d22e7965 100644 --- a/sdk/log.go +++ b/sdk/log.go @@ -40,4 +40,5 @@ type ServiceLog struct { ServiceRequirementID int64 `json:"requirement_id" db:"-"` ServiceRequirementName string `json:"requirement_service_name" db:"requirement_service_name"` Val string `json:"val,omitempty" db:"value"` + WorkerName string `json:"worker_name" db:"-"` } diff --git a/sdk/log/hook/hook.go b/sdk/log/hook/hook.go index 667beec963..d8bd881896 100644 --- a/sdk/log/hook/hook.go +++ b/sdk/log/hook/hook.go @@ -244,7 +244,7 @@ func (hook *Hook) messageFromEntry(entry *logrus.Entry, file string, line int) * // original input. If the input has no newlines, stick the // whole thing in Short. short := p - full := "" + full := p if i := strings.IndexRune(p, '\n'); i > 0 { short = p[:i] full = p @@ -252,7 +252,6 @@ func (hook *Hook) messageFromEntry(entry *logrus.Entry, file string, line int) * // Merge hook extra fields and entry fields extra := hook.merge(hook.Extra, entry.Data) - return &Message{ Version: "1.1", Host: hook.Hostname, diff --git a/sdk/log/hook/tcp.go b/sdk/log/hook/tcp.go index bb41bc993d..a389860528 100644 --- a/sdk/log/hook/tcp.go +++ b/sdk/log/hook/tcp.go @@ -93,7 +93,10 @@ func (w *TCPWriter) Write(p []byte) (int, error) { } if err := w.WriteMessage(&m); err != nil { - return 0, err + fmt.Fprintln(os.Stderr, "[gelf] Try 1 retry: ", err) + if err := w.WriteMessage(&m); err != nil { + return 0, err + } } return len(p), nil diff --git a/sdk/log/log.go b/sdk/log/log.go index 22464dbdf0..617830653e 100644 --- a/sdk/log/log.go +++ b/sdk/log/log.go @@ -8,9 +8,8 @@ import ( "os" "strings" - log "github.com/sirupsen/logrus" - loghook "github.com/ovh/cds/sdk/log/hook" + log "github.com/sirupsen/logrus" ) // Conf contains log configuration @@ -33,6 +32,8 @@ const ( HeaderRequestID = "Request-ID" ContextLoggingRequestIDKey = "ctx-logging-request-id" ContextLoggingFuncKey = "ctx-logging-func" + + ExtraFieldSignature = "Signature" ) var ( @@ -239,3 +240,38 @@ func newEntry(ctx context.Context, fields log.Fields) *log.Entry { return entry } + +type Signature struct { + Worker *SignatureWorker + Service *SignatureService + JobID int64 + Timestamp int64 +} + +type SignatureWorker struct { + WorkerID string + StepOrder int64 +} + +type SignatureService struct { + HatcheryID int64 + HatcheryName string + RequirementID int64 + RequirementName string + WorkerName string +} + +func New(logServerAddr string) (*log.Logger, error) { + newLogger := log.New() + graylogcfg := &loghook.Config{ + Addr: logServerAddr, + Protocol: "tcp", + } + extra := map[string]interface{}{} + hook, err := loghook.NewHook(graylogcfg, extra) + if err != nil { + return nil, fmt.Errorf("unable to add hook: %v", err) + } + newLogger.AddHook(hook) + return newLogger, nil +} diff --git a/sdk/services.go b/sdk/services.go index 272ca393d1..31402cdc61 100644 --- a/sdk/services.go +++ b/sdk/services.go @@ -24,6 +24,7 @@ type Service struct { MonitoringStatus MonitoringStatus `json:"monitoring_status" db:"monitoring_status" cli:"-"` Version string `json:"version" db:"-" cli:"version"` Uptodate bool `json:"up_to_date" db:"-"` + LogServer TCPServer `json:"tcp" db:"-"` } // Update service field from new data. diff --git a/sdk/worker.go b/sdk/worker.go index 29b9fcf83d..bfb04b2499 100644 --- a/sdk/worker.go +++ b/sdk/worker.go @@ -20,6 +20,7 @@ type Worker struct { Version string `json:"version" cli:"version" db:"version"` OS string `json:"os" cli:"os" db:"os"` Arch string `json:"arch" cli:"arch" db:"arch"` + PrivateKey []byte `json:"-" cli:"-" db:"cypher_private_key" gorpmapping:"encrypted,ID,Name,JobRunID"` } // WorkerRegistrationForm represents the arguments needed to register a worker @@ -75,8 +76,10 @@ func TemplateEnvs(args WorkerArgs, envs map[string]string) (map[string]string, e // WorkflowNodeJobRunData is returned to worker in answer to postTakeWorkflowJobHandler type WorkflowNodeJobRunData struct { - NodeJobRun WorkflowNodeJobRun - Secrets []Variable - Number int64 - SubNumber int64 + NodeJobRun WorkflowNodeJobRun + Secrets []Variable + Number int64 + SubNumber int64 + SigningKey string + GelfServiceAddr string } diff --git a/ui/src/app/views/workflow/run/node/pipeline/step/step.log.component.ts b/ui/src/app/views/workflow/run/node/pipeline/step/step.log.component.ts index 7ddddf86f7..e37d42d53e 100644 --- a/ui/src/app/views/workflow/run/node/pipeline/step/step.log.component.ts +++ b/ui/src/app/views/workflow/run/node/pipeline/step/step.log.component.ts @@ -204,7 +204,6 @@ export class WorkflowStepLogComponent implements OnInit, OnDestroy { } htmlView() { - this.ansiViewSelected = this.ansiViewSelected; this.htmlViewSelected = !this.htmlViewSelected; this.basicView = false; this.splittedLogs = null; @@ -214,7 +213,6 @@ export class WorkflowStepLogComponent implements OnInit, OnDestroy { ansiView() { this.ansiViewSelected = !this.ansiViewSelected; - this.htmlViewSelected = this.htmlViewSelected; this.basicView = false; this.splittedLogs = null; this.parseLogs(); @@ -257,7 +255,6 @@ export class WorkflowStepLogComponent implements OnInit, OnDestroy { this.limitTo = this.splittedLogs.length - 40; this.splittedLogsToDisplay.splice(this.limitFrom, this.limitTo - this.limitFrom); } - this._cd.markForCheck(); }