Skip to content

Commit

Permalink
feat: worker run-result add static-file (#5941)
Browse files Browse the repository at this point in the history
* feat: worker run-result add static-file

Signed-off-by: Yvonnick Esnault <[email protected]>
  • Loading branch information
yesnault authored Sep 30, 2021
1 parent 2997297 commit 48aa23d
Show file tree
Hide file tree
Showing 21 changed files with 457 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ func (e *artifactoryReleasePlugin) Run(_ context.Context, opts *integrationplugi

promotedArtifacts := make([]string, 0)
for _, r := range runResult {
// static-file type does not need to be released
if r.Type == sdk.WorkflowRunResultTypeStaticFile {
continue
}
rData, err := r.GetArtifactManager()
if err != nil {
return fail("unable to read result %s: %v", r.ID, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,15 @@ func (e *artifactoryUploadArtifactPlugin) Manifest(_ context.Context, _ *empty.E
}

func (e *artifactoryUploadArtifactPlugin) Run(_ context.Context, opts *integrationplugin.RunQuery) (*integrationplugin.RunResult, error) {
cdsRepo := opts.GetOptions()[fmt.Sprintf("cds.integration.artifact_manager.%s", sdk.ArtifactManagerConfigCdsRepository)]
artifactoryURL := opts.GetOptions()[fmt.Sprintf("cds.integration.artifact_manager.%s", sdk.ArtifactManagerConfigURL)]
token := opts.GetOptions()[fmt.Sprintf("cds.integration.artifact_manager.%s", sdk.ArtifactManagerConfigToken)]
pathToUpload := opts.GetOptions()["cds.integration.artifact_manager.upload.path"]
prefix := "cds.integration.artifact_manager"
cdsRepo := opts.GetOptions()[fmt.Sprintf("%s.%s", prefix, sdk.ArtifactManagerConfigCdsRepository)]
artifactoryURL := opts.GetOptions()[fmt.Sprintf("%s.%s", prefix, sdk.ArtifactManagerConfigURL)]
token := opts.GetOptions()[fmt.Sprintf("%s.%s", prefix, sdk.ArtifactManagerConfigToken)]
pathToUpload := opts.GetOptions()[fmt.Sprintf("%s.upload.path", prefix)]
projectKey := opts.GetOptions()["cds.project"]
workflowName := opts.GetOptions()["cds.workflow"]
version := opts.GetOptions()["cds.version"]

buildInfo := opts.GetOptions()[fmt.Sprintf("cds.integration.artifact_manager.%s", sdk.ArtifactManagerConfigBuildInfoPrefix)]
buildInfo := opts.GetOptions()[fmt.Sprintf("%s.%s", prefix, sdk.ArtifactManagerConfigBuildInfoPrefix)]

artiClient, err := art.CreateArtifactoryClient(artifactoryURL, token)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion engine/api/workflow/dao_node_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,7 @@ func PreviousNodeRunVCSInfos(ctx context.Context, db gorp.SqlExecutor, projectKe
argPrevious = append(argPrevious, envID)
queryPrevious += "AND w_node_context.environment_id = $5"
}
queryPrevious += fmt.Sprintf(" ORDER BY workflow_node_run.num DESC LIMIT 1")
queryPrevious += " ORDER BY workflow_node_run.num DESC LIMIT 1"

errPrev := db.QueryRow(queryPrevious, argPrevious...).Scan(&prevBranch, &prevTag, &prevHash, &prevRepository, &previousBuildNumber)
if errPrev == sql.ErrNoRows {
Expand Down
1 change: 1 addition & 0 deletions engine/api/workflow/workflow_run_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func TestResyncCommitStatusNotifDisabled(t *testing.T) {
}

allSrv, err := services.LoadAll(context.TODO(), db)
assert.NoError(t, err)
for _, s := range allSrv {
if err := services.Delete(db, &s); err != nil {
t.Fatalf("unable to delete service: %v", err)
Expand Down
34 changes: 33 additions & 1 deletion engine/api/workflow/workflow_run_results.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ func CanUploadRunResult(ctx context.Context, db *gorp.DbMap, store cache.Store,
return false, err
}
fileName = refArt.Name
case sdk.WorkflowRunResultTypeStaticFile:
refArt, err := result.GetStaticFile()
if err != nil {
return false, err
}
fileName = refArt.Name
}

if fileName != runResultCheck.Name {
Expand Down Expand Up @@ -142,8 +148,14 @@ func AddResult(ctx context.Context, db *gorp.DbMap, store cache.Store, wr *sdk.W
if err != nil {
return err
}
case sdk.WorkflowRunResultTypeStaticFile:
var err error
cacheKey, err = verifyAddResultStaticFile(store, runResult)
if err != nil {
return err
}
default:
return sdk.WrapError(sdk.ErrInvalidData, "unkonwn result type %s", runResult.Type)
return sdk.WrapError(sdk.ErrInvalidData, "unknown result type %s", runResult.Type)
}

tx, err := db.Begin()
Expand Down Expand Up @@ -262,6 +274,26 @@ func verifyAddResultArtifact(store cache.Store, runResult *sdk.WorkflowRunResult
return cacheKey, nil
}

func verifyAddResultStaticFile(store cache.Store, runResult *sdk.WorkflowRunResult) (string, error) {
staticFileRunResult, err := runResult.GetStaticFile()
if err != nil {
return "", err
}
if err := staticFileRunResult.IsValid(); err != nil {
return "", err
}

cacheKey := GetRunResultKey(runResult.WorkflowRunID, runResult.Type, staticFileRunResult.Name)
b, err := store.Exist(cacheKey)
if err != nil {
return cacheKey, err
}
if !b {
return cacheKey, sdk.WrapError(sdk.ErrForbidden, "unable to upload an unchecked static-file")
}
return cacheKey, nil
}

func insertResult(tx gorpmapper.SqlExecutorWithTx, runResult *sdk.WorkflowRunResult) error {
runResult.ID = sdk.UUID()
runResult.Created = time.Now()
Expand Down
76 changes: 76 additions & 0 deletions engine/api/workflow/workflow_run_results_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ func TestCanUploadArtifactAlreadyExist(t *testing.T) {
Perm: 0777,
}
bts, err := json.Marshal(artiData)
require.NoError(t, err)
result.DataRaw = bts

cacheKey := workflow.GetRunResultKey(result.WorkflowRunID, sdk.WorkflowRunResultTypeArtifact, artiData.Name)
Expand Down Expand Up @@ -177,6 +178,7 @@ func TestCanUploadArtifactAlreadyExistInMoreRecentSubNum(t *testing.T) {
Perm: 0777,
}
bts, err := json.Marshal(artiData)
require.NoError(t, err)
result.DataRaw = bts

cacheKey := workflow.GetRunResultKey(result.WorkflowRunID, sdk.WorkflowRunResultTypeArtifact, artiData.Name)
Expand Down Expand Up @@ -235,6 +237,7 @@ func TestCanUploadArtifactAlreadyExistInAPreviousSubNum(t *testing.T) {
Perm: 0777,
}
bts, err := json.Marshal(artiData)
require.NoError(t, err)
result.DataRaw = bts

cacheKey := workflow.GetRunResultKey(result.WorkflowRunID, sdk.WorkflowRunResultTypeArtifact, artiData.Name)
Expand All @@ -247,3 +250,76 @@ func TestCanUploadArtifactAlreadyExistInAPreviousSubNum(t *testing.T) {
_, err = workflow.CanUploadRunResult(ctx, db.DbMap, store, workflowRun, artifactRef)
require.NoError(t, err)
}

func TestCanUploadStaticFile(t *testing.T) {
ctx := context.Background()
db, store := test.SetupPG(t)

_, _, workflowRun, nodeRun, jobRun := createRunNodeRunAndJob(t, db, store)

staticFileRef := sdk.WorkflowRunResultCheck{
RunJobID: jobRun.ID,
RunNodeID: nodeRun.ID,
RunID: workflowRun.ID,
Name: "my title static file",
ResultType: sdk.WorkflowRunResultTypeStaticFile,
}

result := sdk.WorkflowRunResult{
ID: sdk.UUID(),
Created: time.Now(),
WorkflowNodeRunID: nodeRun.ID,
WorkflowRunID: workflowRun.ID,
SubNum: 0,
WorkflowRunJobID: jobRun.ID + 1,
Type: sdk.WorkflowRunResultTypeStaticFile,
}
artiData := sdk.WorkflowRunResultStaticFile{
Name: "my title static file",
RemoteURL: "https://foo/bar",
}
bts, err := json.Marshal(artiData)
require.NoError(t, err)
result.DataRaw = bts

cacheKey := workflow.GetRunResultKey(result.WorkflowRunID, sdk.WorkflowRunResultTypeStaticFile, artiData.Name)
require.NoError(t, store.SetWithTTL(cacheKey, true, 60))
require.NoError(t, workflow.AddResult(ctx, db.DbMap, store, &workflowRun, &result))
b, err := store.Exist(cacheKey)
require.NoError(t, err)
require.False(t, b)

_, err = workflow.CanUploadRunResult(ctx, db.DbMap, store, workflowRun, staticFileRef)
require.True(t, sdk.ErrorIs(err, sdk.ErrConflictData))
require.Contains(t, err.Error(), "artifact my title static file has already been uploaded")
}
func TestCanUploadStaticFileInvalid(t *testing.T) {
ctx := context.Background()
db, store := test.SetupPG(t)

_, _, workflowRun, nodeRun, jobRun := createRunNodeRunAndJob(t, db, store)

result := sdk.WorkflowRunResult{
ID: sdk.UUID(),
Created: time.Now(),
WorkflowNodeRunID: nodeRun.ID,
WorkflowRunID: workflowRun.ID,
SubNum: 0,
WorkflowRunJobID: jobRun.ID + 1,
Type: sdk.WorkflowRunResultTypeStaticFile,
}
artiData := sdk.WorkflowRunResultStaticFile{
Name: "my title static file",
RemoteURL: "",
}
bts, err := json.Marshal(artiData)
require.NoError(t, err)
result.DataRaw = bts

cacheKey := workflow.GetRunResultKey(result.WorkflowRunID, sdk.WorkflowRunResultTypeStaticFile, artiData.Name)
require.NoError(t, store.SetWithTTL(cacheKey, true, 60))

err = workflow.AddResult(ctx, db.DbMap, store, &workflowRun, &result)
require.Contains(t, err.Error(), "missing remote url")
require.Error(t, err)
}
24 changes: 14 additions & 10 deletions engine/api/workflow_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -1121,6 +1121,20 @@ func (api *API) initWorkflowRun(ctx context.Context, projKey string, wf *sdk.Wor
}
workflow.ResyncNodeRunsWithCommits(ctx, api.mustDB(), api.Cache, *p, report)

api.initWorkflowRunPurge(ctx, wf)

// Update parent
for i := range report.WorkflowRuns() {
run := &report.WorkflowRuns()[i]
if err := api.updateParentWorkflowRun(ctx, run); err != nil {
log.Error(ctx, "unable to update parent workflow run: %v", err)
}
}

return report
}

func (api *API) initWorkflowRunPurge(ctx context.Context, wf *sdk.Workflow) {
_, enabled := featureflipping.IsEnabled(ctx, gorpmapping.Mapper, api.mustDB(), sdk.FeaturePurgeName, map[string]string{"project_key": wf.ProjectKey})
if !enabled {
// Purge workflow run
Expand All @@ -1142,16 +1156,6 @@ func (api *API) initWorkflowRun(ctx context.Context, projKey string, wf *sdk.Wor
workflow.CountWorkflowRunsMarkToDelete(ctx, api.mustDB(), api.Metrics.WorkflowRunsMarkToDelete)
})
}

// Update parent
for i := range report.WorkflowRuns() {
run := &report.WorkflowRuns()[i]
if err := api.updateParentWorkflowRun(ctx, run); err != nil {
log.Error(ctx, "unable to update parent workflow run: %v", err)
}
}

return report
}

func saveWorkflowRunSecrets(ctx context.Context, db *gorp.DbMap, projID int64, wr sdk.WorkflowRun, secrets *workflow.PushSecrets) error {
Expand Down
4 changes: 0 additions & 4 deletions engine/worker/cmd_cds_version_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ import (
"github.com/ovh/cds/sdk"
)

var (
cmdCDSSetVersionValue string
)

func cmdCDSVersionSet() *cobra.Command {
c := &cobra.Command{
Use: "set-version",
Expand Down
Loading

0 comments on commit 48aa23d

Please sign in to comment.