Skip to content

Commit

Permalink
feat(worker,api): allow to override cds.version value (#5365)
Browse files Browse the repository at this point in the history
  • Loading branch information
richardlt authored Aug 12, 2020
1 parent 7c77c38 commit d933943
Show file tree
Hide file tree
Showing 30 changed files with 1,056 additions and 24 deletions.
6 changes: 5 additions & 1 deletion cli/cobra.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,11 @@ func listItem(i interface{}, filters map[string]string, quiet bool, fields []str

// if not quiet mode add the key:value to result else if quiet add only the key
if !quiet {
res[tag] = fmt.Sprintf("%v", f.Interface())
if !f.IsValid() {
res[tag] = ""
} else {
res[tag] = fmt.Sprintf("%v", f.Interface())
}
} else if isKey {
res["key"] = fmt.Sprintf("%v", f.Interface())
}
Expand Down
1 change: 1 addition & 0 deletions engine/api/api_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ func (api *API) InitRouter() {
r.Handle("/queue/workflows/{permJobID}/test", Scope(sdk.AuthConsumerScopeRunExecution), r.POSTEXECUTE(api.postWorkflowJobTestsResultsHandler, MaintenanceAware()))
r.Handle("/queue/workflows/{permJobID}/tag", Scope(sdk.AuthConsumerScopeRunExecution), r.POSTEXECUTE(api.postWorkflowJobTagsHandler, MaintenanceAware()))
r.Handle("/queue/workflows/{permJobID}/step", Scope(sdk.AuthConsumerScopeRunExecution), r.POSTEXECUTE(api.postWorkflowJobStepStatusHandler, MaintenanceAware()))
r.Handle("/queue/workflows/{permJobID}/version", Scope(sdk.AuthConsumerScopeRunExecution), r.POSTEXECUTE(api.postWorkflowJobSetVersionHandler, MaintenanceAware()))

r.Handle("/variable/type", ScopeNone(), r.GET(api.getVariableTypeHandler))
r.Handle("/parameter/type", ScopeNone(), r.GET(api.getParameterTypeHandler))
Expand Down
25 changes: 24 additions & 1 deletion engine/api/workflow/dao_node_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (
"github.com/go-gorp/gorp"
"github.com/ovh/venom"

"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/engine/api/database/gorpmapping"
"github.com/ovh/cds/engine/api/repositoriesmanager"
"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/engine/gorpmapper"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/log"
Expand Down Expand Up @@ -186,6 +186,29 @@ func LoadAndLockNodeRunByID(ctx context.Context, db gorp.SqlExecutor, id int64)
return fromDBNodeRun(rr, LoadRunOptions{})
}

//LoadAndLockNodeRunByJobID load and lock a specific node run on a workflow
func LoadAndLockNodeRunByJobID(ctx context.Context, db gorp.SqlExecutor, jobID int64) (*sdk.WorkflowNodeRun, error) {
var end func()
_, end = telemetry.Span(ctx, "workflow.LoadAndLockNodeRunByJobID")
defer end()

var rr = NodeRun{}

query := fmt.Sprintf(`
SELECT %s %s
FROM workflow_node_run
JOIN workflow_node_run_job on workflow_node_run.id = workflow_node_run_job.workflow_node_run_id
WHERE workflow_node_run_job.id = $1 FOR UPDATE SKIP LOCKED
`, nodeRunFields, nodeRunTestsField)
if err := db.SelectOne(&rr, query, jobID); err != nil {
if err == sql.ErrNoRows {
return nil, sdk.WithStack(sdk.ErrLocked)
}
return nil, sdk.WrapError(err, "unable to load workflow_node_run with jobID=%d", jobID)
}
return fromDBNodeRun(rr, LoadRunOptions{})
}

//LoadNodeRunByID load a specific node run on a workflow
func LoadNodeRunByID(db gorp.SqlExecutor, id int64, loadOpts LoadRunOptions) (*sdk.WorkflowNodeRun, error) {
var rr = NodeRun{}
Expand Down
3 changes: 2 additions & 1 deletion engine/api/workflow/dao_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ workflow_run.status,
workflow_run.last_sub_num,
workflow_run.last_execution,
workflow_run.to_delete,
workflow_run.read_only
workflow_run.read_only,
workflow_run.version
`

// LoadRunOptions are options for loading a run (node or workflow)
Expand Down
6 changes: 5 additions & 1 deletion engine/api/workflow/process_parameters.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,11 @@ func getNodeRunBuildParameters(ctx context.Context, proj sdk.Project, wr *sdk.Wo
errm := &sdk.MultiError{}
//override default parameters value
tmp := sdk.ParametersToMap(params)
tmp["cds.version"] = fmt.Sprintf("%d", run.Number)
if wr.Version != nil {
tmp["cds.version"] = *wr.Version
} else {
tmp["cds.version"] = fmt.Sprintf("%d", run.Number)
}
tmp["cds.run"] = fmt.Sprintf("%d.%d", run.Number, run.SubNumber)
tmp["cds.run.number"] = fmt.Sprintf("%d", run.Number)
tmp["cds.run.subnumber"] = fmt.Sprintf("%d", run.SubNumber)
Expand Down
66 changes: 65 additions & 1 deletion engine/api/workflow_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/sguiheux/go-coverage"

"github.com/ovh/cds/engine/api/authentication"
"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/engine/api/event"
"github.com/ovh/cds/engine/api/group"
"github.com/ovh/cds/engine/api/metrics"
Expand All @@ -24,6 +23,7 @@ import (
"github.com/ovh/cds/engine/api/worker"
"github.com/ovh/cds/engine/api/workermodel"
"github.com/ovh/cds/engine/api/workflow"
"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/engine/service"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/jws"
Expand Down Expand Up @@ -1038,3 +1038,67 @@ func (api *API) postWorkflowJobTagsHandler() service.Handler {
return nil
}
}

func (api *API) postWorkflowJobSetVersionHandler() service.Handler {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
if isWorker := isWorker(ctx); !isWorker {
return sdk.WithStack(sdk.ErrForbidden)
}

id, err := requestVarInt(r, "permJobID")
if err != nil {
return err
}

var data sdk.WorkflowRunVersion
if err := service.UnmarshalBody(r, &data); err != nil {
return sdk.WithStack(err)
}
if err := data.IsValid(); err != nil {
return err
}

tx, err := api.mustDB().Begin()
if err != nil {
return sdk.WrapError(err, "unable to start transaction")
}
defer tx.Rollback() // nolint

workflowRun, err := workflow.LoadAndLockRunByJobID(tx, id, workflow.LoadRunOptions{})
if err != nil {
if sdk.ErrorIs(err, sdk.ErrNotFound) {
return sdk.NewErrorFrom(sdk.ErrLocked, "workflow run is already locked")
}
return sdk.WrapError(err, "unable to load node run id %d", id)
}

if workflowRun.Version != nil && *workflowRun.Version != data.Value {
return sdk.NewErrorFrom(sdk.ErrForbidden, "cannot change existing workflow run version value")
}

workflowRun.Version = &data.Value
if err := workflow.UpdateWorkflowRun(ctx, tx, workflowRun); err != nil {
return err
}

nodeRun, err := workflow.LoadAndLockNodeRunByJobID(ctx, tx, id)
if err != nil {
return err
}
for i := range nodeRun.BuildParameters {
if nodeRun.BuildParameters[i].Name == "cds.version" {
nodeRun.BuildParameters[i].Value = data.Value
break
}
}
if err := workflow.UpdateNodeRunBuildParameters(tx, nodeRun.ID, nodeRun.BuildParameters); err != nil {
return err
}

if err := tx.Commit(); err != nil {
return sdk.WrapError(err, "unable to commit transaction")
}

return nil
}
}
65 changes: 65 additions & 0 deletions engine/api/workflow_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1526,3 +1526,68 @@ func TestInsertNewCodeCoverageReport(t *testing.T) {

require.Equal(t, coverateReportDefaultBranch.Report.CoveredBranches, covDB.Trend.DefaultBranch.CoveredBranches)
}

func Test_postWorkflowJobSetVersionHandler(t *testing.T) {
api, db, router := newTestAPI(t)

s, _ := assets.InitCDNService(t, db)
defer func() {
_ = services.Delete(db, s)
}()

ctx := testRunWorkflow(t, api, db, router)
testGetWorkflowJobAsWorker(t, api, db, router, &ctx)
require.NotNil(t, ctx.job)

// Register the worker
testRegisterWorker(t, api, db, router, &ctx)

// Take the job
uri := router.GetRoute("POST", api.postTakeWorkflowJobHandler, map[string]string{
"id": fmt.Sprintf("%d", ctx.job.ID),
})
require.NotEmpty(t, uri)
req := assets.NewJWTAuthentifiedRequest(t, ctx.workerToken, "POST", uri, nil)
rec := httptest.NewRecorder()
router.Mux.ServeHTTP(rec, req)
require.Equal(t, 200, rec.Code)

// Check that version is not set
run, err := workflow.LoadRun(context.TODO(), db, ctx.project.Key, ctx.workflow.Name, ctx.run.Number, workflow.LoadRunOptions{})
require.NoError(t, err)
require.Empty(t, "", run.Version)
nodeRun, err := workflow.LoadNodeRunByID(db, ctx.job.WorkflowNodeRunID, workflow.LoadRunOptions{})
require.NoError(t, err)
require.Equal(t, "1", sdk.ParameterValue(nodeRun.BuildParameters, "cds.version"))

// Set version from worker
uri = router.GetRoute("POST", api.postWorkflowJobSetVersionHandler, map[string]string{
"permJobID": fmt.Sprintf("%d", ctx.job.ID),
})
require.NotEmpty(t, uri)
req = assets.NewJWTAuthentifiedRequest(t, ctx.workerToken, "POST", uri, sdk.WorkflowRunVersion{
Value: "1.2.3",
})
rec = httptest.NewRecorder()
router.Mux.ServeHTTP(rec, req)
require.Equal(t, 204, rec.Code)

run, err = workflow.LoadRun(context.TODO(), db, ctx.project.Key, ctx.workflow.Name, ctx.run.Number, workflow.LoadRunOptions{})
require.NoError(t, err)
require.NotNil(t, run.Version)
require.Equal(t, "1.2.3", *run.Version)
nodeRun, err = workflow.LoadNodeRunByID(db, ctx.job.WorkflowNodeRunID, workflow.LoadRunOptions{})
require.NoError(t, err)
require.Equal(t, "1.2.3", sdk.ParameterValue(nodeRun.BuildParameters, "cds.version"))

uri = router.GetRoute("POST", api.postWorkflowJobSetVersionHandler, map[string]string{
"permJobID": fmt.Sprintf("%d", ctx.job.ID),
})
require.NotEmpty(t, uri)
req = assets.NewJWTAuthentifiedRequest(t, ctx.workerToken, "POST", uri, sdk.WorkflowRunVersion{
Value: "3.2.1",
})
rec = httptest.NewRecorder()
router.Mux.ServeHTTP(rec, req)
require.Equal(t, 403, rec.Code)
}
7 changes: 7 additions & 0 deletions engine/sql/api/213_workflow_run_version.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- +migrate Up
ALTER TABLE "workflow_run" ADD COLUMN IF NOT EXISTS "version" VARCHAR(256);
SELECT create_unique_index('workflow_run','IDX_WORKFLOW_RUN_WORKFLOW_ID_VERSION','workflow_id,version');

-- +migrate Down
DROP INDEX idx_workflow_run_workflow_id_version;
ALTER TABLE "workflow_run" DROP COLUMN IF EXISTS "version";
95 changes: 95 additions & 0 deletions engine/worker/cmd_cds_version_set.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package main

import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"
"strconv"
"time"

"github.com/spf13/cobra"

"github.com/ovh/cds/engine/worker/internal"
"github.com/ovh/cds/engine/worker/pkg/workerruntime"
"github.com/ovh/cds/sdk"
)

var (
cmdCDSSetVersionValue string
)

func cmdCDSVersionSet() *cobra.Command {
c := &cobra.Command{
Use: "set-version",
Short: "Override {{.cds.version}} value with given string. This value should be unique for the workflow and can't be changed when set.",
Run: cdsVersionSetCmd(),
}
return c
}

func cdsVersionSetCmd() func(cmd *cobra.Command, args []string) {
return func(cmd *cobra.Command, args []string) {
f := func() error {
if len(args) < 1 || args[0] == "" {
return fmt.Errorf("invalid given value for CDS version")
}

portS := os.Getenv(internal.WorkerServerPort)
if portS == "" {
return fmt.Errorf("%s not found, are you running inside a CDS worker job?", internal.WorkerServerPort)
}

port, err := strconv.Atoi(portS)
if err != nil {
return fmt.Errorf("cannot parse '%s' as a port number", portS)
}

a := workerruntime.CDSVersionSet{
Value: args[0],
}

data, err := json.Marshal(a)
if err != nil {
return sdk.WithStack(err)
}

req, err := http.NewRequest("POST", fmt.Sprintf("http://127.0.0.1:%d/version", port), bytes.NewReader(data))
if err != nil {
return fmt.Errorf("cannot post set version (Request): %s", err)
}

client := http.DefaultClient
client.Timeout = 5 * time.Minute

resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("command failed: %v", err)
}

if resp.StatusCode >= 300 {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("set version failed: unable to read body %v", err)
}
defer resp.Body.Close()
return sdk.DecodeError(body)
}

fmt.Printf("CDS version was set to %s\n", a.Value)

return nil
}

if err := f(); err != nil {
if sdk.IsErrorWithStack(err) {
httpErr := sdk.ExtractHTTPError(err, "")
sdk.Exit("%v", httpErr.Error())
} else {
sdk.Exit("%v", err)
}
}
}
}
48 changes: 48 additions & 0 deletions engine/worker/internal/handler_cds_version_set.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package internal

import (
"context"
"encoding/json"
"io/ioutil"
"net/http"

"github.com/ovh/cds/engine/worker/pkg/workerruntime"
"github.com/ovh/cds/sdk"
)

func setVersionHandler(ctx context.Context, wk *CurrentWorker) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
data, err := ioutil.ReadAll(r.Body)
if err != nil {
writeError(w, r, sdk.NewError(sdk.ErrWrongRequest, err))
return
}
defer r.Body.Close()

var req workerruntime.CDSVersionSet
if err := json.Unmarshal(data, &req); err != nil {
writeError(w, r, sdk.NewError(sdk.ErrWrongRequest, err))
return
}

if req.Value == "" {
writeError(w, r, sdk.NewErrorFrom(sdk.ErrWrongRequest, "invalid given CDS version value"))
return
}

if err := wk.client.QueueJobSetVersion(ctx, wk.currentJob.wJob.ID, sdk.WorkflowRunVersion{
Value: req.Value,
}); err != nil {
writeError(w, r, err)
return
}

// Override cds.version value in params to allow usage of this value in others steps
for i := range wk.currentJob.params {
if wk.currentJob.params[i].Name == "cds.version" {
wk.currentJob.params[i].Value = req.Value
break
}
}
}
}
Loading

0 comments on commit d933943

Please sign in to comment.