Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(worker,api): allow to override cds.version value #5365

Merged
merged 3 commits into from
Aug 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cli/cobra.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,11 @@ func listItem(i interface{}, filters map[string]string, quiet bool, fields []str

// if not quiet mode add the key:value to result else if quiet add only the key
if !quiet {
res[tag] = fmt.Sprintf("%v", f.Interface())
if !f.IsValid() {
res[tag] = ""
} else {
res[tag] = fmt.Sprintf("%v", f.Interface())
}
} else if isKey {
res["key"] = fmt.Sprintf("%v", f.Interface())
}
Expand Down
1 change: 1 addition & 0 deletions engine/api/api_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ func (api *API) InitRouter() {
r.Handle("/queue/workflows/{permJobID}/test", Scope(sdk.AuthConsumerScopeRunExecution), r.POSTEXECUTE(api.postWorkflowJobTestsResultsHandler, MaintenanceAware()))
r.Handle("/queue/workflows/{permJobID}/tag", Scope(sdk.AuthConsumerScopeRunExecution), r.POSTEXECUTE(api.postWorkflowJobTagsHandler, MaintenanceAware()))
r.Handle("/queue/workflows/{permJobID}/step", Scope(sdk.AuthConsumerScopeRunExecution), r.POSTEXECUTE(api.postWorkflowJobStepStatusHandler, MaintenanceAware()))
r.Handle("/queue/workflows/{permJobID}/version", Scope(sdk.AuthConsumerScopeRunExecution), r.POSTEXECUTE(api.postWorkflowJobSetVersionHandler, MaintenanceAware()))

r.Handle("/variable/type", ScopeNone(), r.GET(api.getVariableTypeHandler))
r.Handle("/parameter/type", ScopeNone(), r.GET(api.getParameterTypeHandler))
Expand Down
25 changes: 24 additions & 1 deletion engine/api/workflow/dao_node_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (
"github.com/go-gorp/gorp"
"github.com/ovh/venom"

"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/engine/api/database/gorpmapping"
"github.com/ovh/cds/engine/api/repositoriesmanager"
"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/engine/gorpmapper"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/log"
Expand Down Expand Up @@ -186,6 +186,29 @@ func LoadAndLockNodeRunByID(ctx context.Context, db gorp.SqlExecutor, id int64)
return fromDBNodeRun(rr, LoadRunOptions{})
}

//LoadAndLockNodeRunByJobID load and lock a specific node run on a workflow
func LoadAndLockNodeRunByJobID(ctx context.Context, db gorp.SqlExecutor, jobID int64) (*sdk.WorkflowNodeRun, error) {
var end func()
_, end = telemetry.Span(ctx, "workflow.LoadAndLockNodeRunByJobID")
defer end()

var rr = NodeRun{}

query := fmt.Sprintf(`
SELECT %s %s
FROM workflow_node_run
JOIN workflow_node_run_job on workflow_node_run.id = workflow_node_run_job.workflow_node_run_id
WHERE workflow_node_run_job.id = $1 FOR UPDATE SKIP LOCKED
`, nodeRunFields, nodeRunTestsField)
if err := db.SelectOne(&rr, query, jobID); err != nil {
if err == sql.ErrNoRows {
return nil, sdk.WithStack(sdk.ErrLocked)
}
return nil, sdk.WrapError(err, "unable to load workflow_node_run with jobID=%d", jobID)
}
return fromDBNodeRun(rr, LoadRunOptions{})
}

//LoadNodeRunByID load a specific node run on a workflow
func LoadNodeRunByID(db gorp.SqlExecutor, id int64, loadOpts LoadRunOptions) (*sdk.WorkflowNodeRun, error) {
var rr = NodeRun{}
Expand Down
3 changes: 2 additions & 1 deletion engine/api/workflow/dao_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ workflow_run.status,
workflow_run.last_sub_num,
workflow_run.last_execution,
workflow_run.to_delete,
workflow_run.read_only
workflow_run.read_only,
workflow_run.version
`

// LoadRunOptions are options for loading a run (node or workflow)
Expand Down
6 changes: 5 additions & 1 deletion engine/api/workflow/process_parameters.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,11 @@ func getNodeRunBuildParameters(ctx context.Context, proj sdk.Project, wr *sdk.Wo
errm := &sdk.MultiError{}
//override default parameters value
tmp := sdk.ParametersToMap(params)
tmp["cds.version"] = fmt.Sprintf("%d", run.Number)
if wr.Version != nil {
tmp["cds.version"] = *wr.Version
} else {
tmp["cds.version"] = fmt.Sprintf("%d", run.Number)
}
tmp["cds.run"] = fmt.Sprintf("%d.%d", run.Number, run.SubNumber)
tmp["cds.run.number"] = fmt.Sprintf("%d", run.Number)
tmp["cds.run.subnumber"] = fmt.Sprintf("%d", run.SubNumber)
Expand Down
66 changes: 65 additions & 1 deletion engine/api/workflow_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/sguiheux/go-coverage"

"github.com/ovh/cds/engine/api/authentication"
"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/engine/api/event"
"github.com/ovh/cds/engine/api/group"
"github.com/ovh/cds/engine/api/metrics"
Expand All @@ -24,6 +23,7 @@ import (
"github.com/ovh/cds/engine/api/worker"
"github.com/ovh/cds/engine/api/workermodel"
"github.com/ovh/cds/engine/api/workflow"
"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/engine/service"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/jws"
Expand Down Expand Up @@ -1038,3 +1038,67 @@ func (api *API) postWorkflowJobTagsHandler() service.Handler {
return nil
}
}

func (api *API) postWorkflowJobSetVersionHandler() service.Handler {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
if isWorker := isWorker(ctx); !isWorker {
return sdk.WithStack(sdk.ErrForbidden)
}

id, err := requestVarInt(r, "permJobID")
if err != nil {
return err
}

var data sdk.WorkflowRunVersion
if err := service.UnmarshalBody(r, &data); err != nil {
return sdk.WithStack(err)
}
if err := data.IsValid(); err != nil {
return err
}

tx, err := api.mustDB().Begin()
if err != nil {
return sdk.WrapError(err, "unable to start transaction")
}
defer tx.Rollback() // nolint

workflowRun, err := workflow.LoadAndLockRunByJobID(tx, id, workflow.LoadRunOptions{})
if err != nil {
if sdk.ErrorIs(err, sdk.ErrNotFound) {
return sdk.NewErrorFrom(sdk.ErrLocked, "workflow run is already locked")
}
return sdk.WrapError(err, "unable to load node run id %d", id)
}

if workflowRun.Version != nil && *workflowRun.Version != data.Value {
return sdk.NewErrorFrom(sdk.ErrForbidden, "cannot change existing workflow run version value")
}

workflowRun.Version = &data.Value
if err := workflow.UpdateWorkflowRun(ctx, tx, workflowRun); err != nil {
return err
}

nodeRun, err := workflow.LoadAndLockNodeRunByJobID(ctx, tx, id)
if err != nil {
return err
}
for i := range nodeRun.BuildParameters {
if nodeRun.BuildParameters[i].Name == "cds.version" {
nodeRun.BuildParameters[i].Value = data.Value
break
}
}
if err := workflow.UpdateNodeRunBuildParameters(tx, nodeRun.ID, nodeRun.BuildParameters); err != nil {
return err
}

if err := tx.Commit(); err != nil {
return sdk.WrapError(err, "unable to commit transaction")
}

return nil
}
}
65 changes: 65 additions & 0 deletions engine/api/workflow_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1526,3 +1526,68 @@ func TestInsertNewCodeCoverageReport(t *testing.T) {

require.Equal(t, coverateReportDefaultBranch.Report.CoveredBranches, covDB.Trend.DefaultBranch.CoveredBranches)
}

func Test_postWorkflowJobSetVersionHandler(t *testing.T) {
api, db, router := newTestAPI(t)

s, _ := assets.InitCDNService(t, db)
defer func() {
_ = services.Delete(db, s)
}()

ctx := testRunWorkflow(t, api, db, router)
testGetWorkflowJobAsWorker(t, api, db, router, &ctx)
require.NotNil(t, ctx.job)

// Register the worker
testRegisterWorker(t, api, db, router, &ctx)

// Take the job
uri := router.GetRoute("POST", api.postTakeWorkflowJobHandler, map[string]string{
"id": fmt.Sprintf("%d", ctx.job.ID),
})
require.NotEmpty(t, uri)
req := assets.NewJWTAuthentifiedRequest(t, ctx.workerToken, "POST", uri, nil)
rec := httptest.NewRecorder()
router.Mux.ServeHTTP(rec, req)
require.Equal(t, 200, rec.Code)

// Check that version is not set
run, err := workflow.LoadRun(context.TODO(), db, ctx.project.Key, ctx.workflow.Name, ctx.run.Number, workflow.LoadRunOptions{})
require.NoError(t, err)
require.Empty(t, "", run.Version)
nodeRun, err := workflow.LoadNodeRunByID(db, ctx.job.WorkflowNodeRunID, workflow.LoadRunOptions{})
require.NoError(t, err)
require.Equal(t, "1", sdk.ParameterValue(nodeRun.BuildParameters, "cds.version"))

// Set version from worker
uri = router.GetRoute("POST", api.postWorkflowJobSetVersionHandler, map[string]string{
"permJobID": fmt.Sprintf("%d", ctx.job.ID),
})
require.NotEmpty(t, uri)
req = assets.NewJWTAuthentifiedRequest(t, ctx.workerToken, "POST", uri, sdk.WorkflowRunVersion{
Value: "1.2.3",
})
rec = httptest.NewRecorder()
router.Mux.ServeHTTP(rec, req)
require.Equal(t, 204, rec.Code)

run, err = workflow.LoadRun(context.TODO(), db, ctx.project.Key, ctx.workflow.Name, ctx.run.Number, workflow.LoadRunOptions{})
require.NoError(t, err)
require.NotNil(t, run.Version)
require.Equal(t, "1.2.3", *run.Version)
nodeRun, err = workflow.LoadNodeRunByID(db, ctx.job.WorkflowNodeRunID, workflow.LoadRunOptions{})
require.NoError(t, err)
require.Equal(t, "1.2.3", sdk.ParameterValue(nodeRun.BuildParameters, "cds.version"))

uri = router.GetRoute("POST", api.postWorkflowJobSetVersionHandler, map[string]string{
"permJobID": fmt.Sprintf("%d", ctx.job.ID),
})
require.NotEmpty(t, uri)
req = assets.NewJWTAuthentifiedRequest(t, ctx.workerToken, "POST", uri, sdk.WorkflowRunVersion{
Value: "3.2.1",
})
rec = httptest.NewRecorder()
router.Mux.ServeHTTP(rec, req)
require.Equal(t, 403, rec.Code)
}
7 changes: 7 additions & 0 deletions engine/sql/api/213_workflow_run_version.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- +migrate Up
ALTER TABLE "workflow_run" ADD COLUMN IF NOT EXISTS "version" VARCHAR(256);
SELECT create_unique_index('workflow_run','IDX_WORKFLOW_RUN_WORKFLOW_ID_VERSION','workflow_id,version');

-- +migrate Down
DROP INDEX idx_workflow_run_workflow_id_version;
ALTER TABLE "workflow_run" DROP COLUMN IF EXISTS "version";
95 changes: 95 additions & 0 deletions engine/worker/cmd_cds_version_set.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package main

import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"
"strconv"
"time"

"github.com/spf13/cobra"

"github.com/ovh/cds/engine/worker/internal"
"github.com/ovh/cds/engine/worker/pkg/workerruntime"
"github.com/ovh/cds/sdk"
)

var (
cmdCDSSetVersionValue string
)

func cmdCDSVersionSet() *cobra.Command {
c := &cobra.Command{
Use: "set-version",
Short: "Override {{.cds.version}} value with given string. This value should be unique for the workflow and can't be changed when set.",
Run: cdsVersionSetCmd(),
}
return c
}

func cdsVersionSetCmd() func(cmd *cobra.Command, args []string) {
return func(cmd *cobra.Command, args []string) {
f := func() error {
if len(args) < 1 || args[0] == "" {
return fmt.Errorf("invalid given value for CDS version")
}

portS := os.Getenv(internal.WorkerServerPort)
if portS == "" {
return fmt.Errorf("%s not found, are you running inside a CDS worker job?", internal.WorkerServerPort)
}

port, err := strconv.Atoi(portS)
if err != nil {
return fmt.Errorf("cannot parse '%s' as a port number", portS)
}

a := workerruntime.CDSVersionSet{
Value: args[0],
}

data, err := json.Marshal(a)
if err != nil {
return sdk.WithStack(err)
}

req, err := http.NewRequest("POST", fmt.Sprintf("http://127.0.0.1:%d/version", port), bytes.NewReader(data))
if err != nil {
return fmt.Errorf("cannot post set version (Request): %s", err)
}

client := http.DefaultClient
client.Timeout = 5 * time.Minute

resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("command failed: %v", err)
}

if resp.StatusCode >= 300 {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("set version failed: unable to read body %v", err)
}
defer resp.Body.Close()
return sdk.DecodeError(body)
}

fmt.Printf("CDS version was set to %s\n", a.Value)

return nil
}

if err := f(); err != nil {
if sdk.IsErrorWithStack(err) {
httpErr := sdk.ExtractHTTPError(err, "")
sdk.Exit("%v", httpErr.Error())
} else {
sdk.Exit("%v", err)
}
}
}
}
48 changes: 48 additions & 0 deletions engine/worker/internal/handler_cds_version_set.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package internal

import (
"context"
"encoding/json"
"io/ioutil"
"net/http"

"github.com/ovh/cds/engine/worker/pkg/workerruntime"
"github.com/ovh/cds/sdk"
)

func setVersionHandler(ctx context.Context, wk *CurrentWorker) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
data, err := ioutil.ReadAll(r.Body)
if err != nil {
writeError(w, r, sdk.NewError(sdk.ErrWrongRequest, err))
return
}
defer r.Body.Close()

var req workerruntime.CDSVersionSet
if err := json.Unmarshal(data, &req); err != nil {
writeError(w, r, sdk.NewError(sdk.ErrWrongRequest, err))
return
}

if req.Value == "" {
writeError(w, r, sdk.NewErrorFrom(sdk.ErrWrongRequest, "invalid given CDS version value"))
return
}

if err := wk.client.QueueJobSetVersion(ctx, wk.currentJob.wJob.ID, sdk.WorkflowRunVersion{
Value: req.Value,
}); err != nil {
writeError(w, r, err)
return
}

// Override cds.version value in params to allow usage of this value in others steps
for i := range wk.currentJob.params {
if wk.currentJob.params[i].Name == "cds.version" {
wk.currentJob.params[i].Value = req.Value
break
}
}
}
}
Loading