Skip to content

Commit

Permalink
fix(cdn): use run-result instead of artifact (#5740)
Browse files Browse the repository at this point in the history
  • Loading branch information
sguiheux authored Mar 5, 2021
1 parent 7494519 commit 6e2b3e7
Show file tree
Hide file tree
Showing 32 changed files with 227 additions and 174 deletions.
2 changes: 1 addition & 1 deletion cli/cdsctl/workflow_artifact.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func workflowArtifactDownloadRun(v cli.Values) error {
return err
}
fmt.Printf("Downloading %s...\n", artifactData.Name)
r, err := client.CDNItemDownload(context.Background(), confCDN.HTTPURL, artifactData.CDNRefHash, sdk.CDNTypeItemArtifact)
r, err := client.CDNItemDownload(context.Background(), confCDN.HTTPURL, artifactData.CDNRefHash, sdk.CDNTypeItemRunResult)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion engine/api/api_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,9 @@ func (api *API) InitRouter() {
r.Handle("/project/{key}/workflows/{permWorkflowName}/runs/{number}/stop", Scope(sdk.AuthConsumerScopeRun), r.POSTEXECUTE(api.stopWorkflowRunHandler, MaintenanceAware()))
r.Handle("/project/{key}/workflows/{permWorkflowName}/runs/{number}/vcs/resync", Scope(sdk.AuthConsumerScopeRun), r.POSTEXECUTE(api.postResyncVCSWorkflowRunHandler))
r.Handle("/project/{key}/workflows/{permWorkflowName}/runs/{number}/artifacts", Scope(sdk.AuthConsumerScopeRun), r.GET(api.getWorkflowRunArtifactsHandler))
r.Handle("/project/{key}/workflows/{permWorkflowName}/runs/{number}/artifacts/check", Scope(sdk.AuthConsumerScopeRunExecution), r.POST(api.workflowRunArtifactCheckUploadHandler))
r.Handle("/project/{key}/workflows/{permWorkflowName}/runs/{number}/artifacts/links", Scope(sdk.AuthConsumerScopeRun), r.GET(api.getWorkflowRunArtifactLinksHandler))
r.Handle("/project/{key}/workflows/{permWorkflowName}/runs/{number}/results", Scope(sdk.AuthConsumerScopeRunExecution), r.GET(api.getWorkflowRunResultsHandler), r.POST(api.postWorkflowRunResultsHandler))
r.Handle("/project/{key}/workflows/{permWorkflowName}/runs/{number}/results/check", Scope(sdk.AuthConsumerScopeRunExecution), r.POST(api.workflowRunResultCheckUploadHandler))
r.Handle("/project/{key}/workflows/{permWorkflowName}/runs/{number}/nodes/{nodeRunID}", Scope(sdk.AuthConsumerScopeRun), r.GET(api.getWorkflowNodeRunHandler))
r.Handle("/project/{key}/workflows/{permWorkflowName}/runs/{number}/nodes/{nodeRunID}/results", Scope(sdk.AuthConsumerScopeRunExecution), r.GET(api.getWorkflowNodeRunResultsHandler))
r.Handle("/project/{key}/workflows/{permWorkflowName}/runs/{number}/nodes/{nodeRunID}/stop", Scope(sdk.AuthConsumerScopeRun), r.POSTEXECUTE(api.stopWorkflowNodeRunHandler, MaintenanceAware()))
Expand Down
56 changes: 34 additions & 22 deletions engine/api/workflow/workflow_run_results.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ var (
KeyResult = cache.Key("run", "result")
)

func GetArtifactResultKey(runID int64, artifactName string) string {
return cache.Key(KeyResult, string(sdk.WorkflowRunResultTypeArtifact), strconv.Itoa(int(runID)), artifactName)
func GetRunResultKey(runID int64, t sdk.WorkflowRunResultType, fileName string) string {
return cache.Key(KeyResult, string(t), strconv.Itoa(int(runID)), fileName)
}

func CanUploadArtifact(ctx context.Context, db *gorp.DbMap, store cache.Store, wr sdk.WorkflowRun, artifactRef sdk.CDNArtifactAPIRef) (bool, error) {
func CanUploadRunResult(ctx context.Context, db *gorp.DbMap, store cache.Store, wr sdk.WorkflowRun, apiRef sdk.CDNRunResultAPIRef) (bool, error) {
// Check run
if wr.ID != artifactRef.RunID {
if wr.ID != apiRef.RunID {
return false, sdk.WrapError(sdk.ErrInvalidData, "unable to upload and artifact for this run")
}
if sdk.StatusIsTerminated(wr.Status) {
Expand All @@ -36,9 +36,9 @@ func CanUploadArtifact(ctx context.Context, db *gorp.DbMap, store cache.Store, w
if len(nodeRuns) < 1 {
continue
}
// get last noderun
// Get last noderun
nodeRun := nodeRuns[0]
if nodeRun.ID != artifactRef.RunNodeID {
if nodeRun.ID != apiRef.RunNodeID {
continue
}
nrs = nodeRuns
Expand All @@ -47,36 +47,48 @@ func CanUploadArtifact(ctx context.Context, db *gorp.DbMap, store cache.Store, w
}
}
if len(nrs) == 0 {
return false, sdk.WrapError(sdk.ErrNotFound, "unable to find node run: %d", artifactRef.RunNodeID)
return false, sdk.WrapError(sdk.ErrNotFound, "unable to find node run: %d", apiRef.RunNodeID)
}

// Check job data
nodeRunJob, err := LoadNodeJobRun(ctx, db, store, artifactRef.RunJobID)
nodeRunJob, err := LoadNodeJobRun(ctx, db, store, apiRef.RunJobID)
if err != nil {
return false, err
}
if nodeRunJob.WorkflowNodeRunID != artifactRef.RunNodeID {
return false, sdk.WrapError(sdk.ErrInvalidData, "invalid node run %d", artifactRef.RunNodeID)
if nodeRunJob.WorkflowNodeRunID != apiRef.RunNodeID {
return false, sdk.WrapError(sdk.ErrInvalidData, "invalid node run %d", apiRef.RunNodeID)
}
if sdk.StatusIsTerminated(nodeRunJob.Status) {
return false, sdk.WrapError(sdk.ErrInvalidData, "unable to upload artifact on a terminated job")
}

// Check artifact name
runResults, err := LoadRunResultsByRunIDAndType(ctx, db, artifactRef.RunID, sdk.WorkflowRunResultTypeArtifact)
// Check File Name
runResults, err := LoadRunResultsByRunIDAndType(ctx, db, apiRef.RunID, apiRef.RunResultType)
if err != nil {
return false, sdk.WrapError(err, "unable to load run results for run %d", artifactRef.RunID)
return false, sdk.WrapError(err, "unable to load run results for run %d", apiRef.RunID)
}
for _, result := range runResults {
refArt, err := result.GetArtifact()
if err != nil {
return false, err
var fileName string
switch apiRef.RunResultType {
case sdk.WorkflowRunResultTypeArtifact:
refArt, err := result.GetArtifact()
if err != nil {
return false, err
}
fileName = refArt.Name
case sdk.WorkflowRunResultTypeCoverage:
refCov, err := result.GetCoverage()
if err != nil {
return false, err
}
fileName = refCov.Name
}
if refArt.Name != artifactRef.ToFilename() {

if fileName != apiRef.ToFilename() {
continue
}

// find artifact in node run history
// If we find a run result with same check, check subnumber
var previousNodeRunUpload *sdk.WorkflowNodeRun
for _, nr := range nrs {
if nr.ID != result.WorkflowNodeRunID {
Expand All @@ -86,15 +98,15 @@ func CanUploadArtifact(ctx context.Context, db *gorp.DbMap, store cache.Store, w
break
}
if previousNodeRunUpload == nil {
return false, sdk.WrapError(sdk.ErrConflictData, "artifact %s has already been uploaded from another pipeline", artifactRef.ArtifactName)
return false, sdk.WrapError(sdk.ErrConflictData, "artifact %s has already been uploaded from another pipeline", apiRef.ArtifactName)
}

// Check Sub num
if result.SubNum == nrs[0].SubNumber {
return false, sdk.WrapError(sdk.ErrConflictData, "artifact %s has already been uploaded", artifactRef.ArtifactName)
return false, sdk.WrapError(sdk.ErrConflictData, "artifact %s has already been uploaded", apiRef.ArtifactName)
}
if result.SubNum > nrs[0].SubNumber {
return false, sdk.WrapError(sdk.ErrConflictData, "artifact %s cannot be uploaded into a previous run", artifactRef.ArtifactName)
return false, sdk.WrapError(sdk.ErrConflictData, "artifact %s cannot be uploaded into a previous run", apiRef.ArtifactName)
}
}
return true, nil
Expand Down Expand Up @@ -137,7 +149,7 @@ func verifyAddResultArtifact(store cache.Store, runResult *sdk.WorkflowRunResult
return "", err
}

cacheKey := GetArtifactResultKey(runResult.WorkflowRunID, artifactRunResult.Name)
cacheKey := GetRunResultKey(runResult.WorkflowRunID, runResult.Type, artifactRunResult.Name)
b, err := store.Exist(cacheKey)
if err != nil {
return cacheKey, err
Expand Down
56 changes: 29 additions & 27 deletions engine/api/workflow/workflow_run_results_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestCanUploadArtifactTerminatedJob(t *testing.T) {

proj, wk, workflowRun, nodeRun, jobRun := createRunNodeRunAndJob(t, db, store)

artifactRef := sdk.CDNArtifactAPIRef{
artifactRef := sdk.CDNRunResultAPIRef{
ProjectKey: proj.Name,
WorkflowName: wk.Name,
WorkflowID: wk.ID,
Expand All @@ -80,7 +80,7 @@ func TestCanUploadArtifactTerminatedJob(t *testing.T) {
jobRun.Status = sdk.StatusSuccess
require.NoError(t, workflow.UpdateNodeJobRun(ctx, db, &jobRun))

_, err := workflow.CanUploadArtifact(ctx, db.DbMap, store, workflowRun, artifactRef)
_, err := workflow.CanUploadRunResult(ctx, db.DbMap, store, workflowRun, artifactRef)
require.True(t, sdk.ErrorIs(err, sdk.ErrInvalidData))
require.Contains(t, err.Error(), "unable to upload artifact on a terminated job")
}
Expand All @@ -91,7 +91,7 @@ func TestCanUploadArtifactWrongNodeRun(t *testing.T) {

proj, wk, workflowRun, nodeRun, jobRun := createRunNodeRunAndJob(t, db, store)

artifactRef := sdk.CDNArtifactAPIRef{
artifactRef := sdk.CDNRunResultAPIRef{
ProjectKey: proj.Key,
WorkflowName: wk.Name,
WorkflowID: wk.ID,
Expand All @@ -101,7 +101,7 @@ func TestCanUploadArtifactWrongNodeRun(t *testing.T) {
ArtifactName: "myartifact",
}

_, err := workflow.CanUploadArtifact(ctx, db.DbMap, store, workflowRun, artifactRef)
_, err := workflow.CanUploadRunResult(ctx, db.DbMap, store, workflowRun, artifactRef)
require.True(t, sdk.ErrorIs(err, sdk.ErrNotFound))
require.Contains(t, err.Error(), "unable to find node run")
}
Expand All @@ -112,14 +112,15 @@ func TestCanUploadArtifactAlreadyExist(t *testing.T) {

proj, wk, workflowRun, nodeRun, jobRun := createRunNodeRunAndJob(t, db, store)

artifactRef := sdk.CDNArtifactAPIRef{
ProjectKey: proj.Key,
WorkflowName: wk.Name,
WorkflowID: wk.ID,
RunJobID: jobRun.ID,
RunNodeID: nodeRun.ID,
RunID: workflowRun.ID,
ArtifactName: "myartifact",
artifactRef := sdk.CDNRunResultAPIRef{
ProjectKey: proj.Key,
WorkflowName: wk.Name,
WorkflowID: wk.ID,
RunJobID: jobRun.ID,
RunNodeID: nodeRun.ID,
RunID: workflowRun.ID,
ArtifactName: "myartifact",
RunResultType: sdk.WorkflowRunResultTypeArtifact,
}

result := sdk.WorkflowRunResult{
Expand All @@ -141,14 +142,14 @@ func TestCanUploadArtifactAlreadyExist(t *testing.T) {
bts, err := json.Marshal(artiData)
result.DataRaw = bts

cacheKey := workflow.GetArtifactResultKey(result.WorkflowRunID, artiData.Name)
cacheKey := workflow.GetRunResultKey(result.WorkflowRunID, sdk.WorkflowRunResultTypeArtifact, artiData.Name)
require.NoError(t, store.SetWithTTL(cacheKey, true, 60))
require.NoError(t, workflow.AddResult(db.DbMap, store, &result))
b, err := store.Exist(cacheKey)
require.NoError(t, err)
require.False(t, b)

_, err = workflow.CanUploadArtifact(ctx, db.DbMap, store, workflowRun, artifactRef)
_, err = workflow.CanUploadRunResult(ctx, db.DbMap, store, workflowRun, artifactRef)
require.True(t, sdk.ErrorIs(err, sdk.ErrConflictData))
require.Contains(t, err.Error(), "artifact myartifact has already been uploaded")
}
Expand All @@ -159,14 +160,15 @@ func TestCanUploadArtifactAlreadyExistInMoreRecentSubNum(t *testing.T) {

proj, wk, workflowRun, nodeRun, jobRun := createRunNodeRunAndJob(t, db, store)

artifactRef := sdk.CDNArtifactAPIRef{
ProjectKey: proj.Key,
WorkflowName: wk.Name,
WorkflowID: wk.ID,
RunJobID: jobRun.ID,
RunNodeID: nodeRun.ID,
RunID: workflowRun.ID,
ArtifactName: "myartifact",
artifactRef := sdk.CDNRunResultAPIRef{
ProjectKey: proj.Key,
WorkflowName: wk.Name,
WorkflowID: wk.ID,
RunJobID: jobRun.ID,
RunNodeID: nodeRun.ID,
RunID: workflowRun.ID,
ArtifactName: "myartifact",
RunResultType: sdk.WorkflowRunResultTypeArtifact,
}

result := sdk.WorkflowRunResult{
Expand All @@ -188,14 +190,14 @@ func TestCanUploadArtifactAlreadyExistInMoreRecentSubNum(t *testing.T) {
bts, err := json.Marshal(artiData)
result.DataRaw = bts

cacheKey := workflow.GetArtifactResultKey(result.WorkflowRunID, artiData.Name)
cacheKey := workflow.GetRunResultKey(result.WorkflowRunID, sdk.WorkflowRunResultTypeArtifact, artiData.Name)
require.NoError(t, store.SetWithTTL(cacheKey, true, 60))
require.NoError(t, workflow.AddResult(db.DbMap, store, &result))
b, err := store.Exist(cacheKey)
require.NoError(t, err)
require.False(t, b)

_, err = workflow.CanUploadArtifact(ctx, db.DbMap, store, workflowRun, artifactRef)
_, err = workflow.CanUploadRunResult(ctx, db.DbMap, store, workflowRun, artifactRef)
require.True(t, sdk.ErrorIs(err, sdk.ErrConflictData))
require.Contains(t, err.Error(), "artifact myartifact cannot be uploaded into a previous run")
}
Expand All @@ -220,7 +222,7 @@ func TestCanUploadArtifactAlreadyExistInAPreviousSubNum(t *testing.T) {
require.NoError(t, err)
workflowRun = *run2

artifactRef := sdk.CDNArtifactAPIRef{
artifactRef := sdk.CDNRunResultAPIRef{
ProjectKey: proj.Key,
WorkflowName: wk.Name,
WorkflowID: wk.ID,
Expand Down Expand Up @@ -249,13 +251,13 @@ func TestCanUploadArtifactAlreadyExistInAPreviousSubNum(t *testing.T) {
bts, err := json.Marshal(artiData)
result.DataRaw = bts

cacheKey := workflow.GetArtifactResultKey(result.WorkflowRunID, artiData.Name)
cacheKey := workflow.GetRunResultKey(result.WorkflowRunID, sdk.WorkflowRunResultTypeArtifact, artiData.Name)
require.NoError(t, store.SetWithTTL(cacheKey, true, 60))
require.NoError(t, workflow.AddResult(db.DbMap, store, &result))
b, err := store.Exist(cacheKey)
require.NoError(t, err)
require.False(t, b)

_, err = workflow.CanUploadArtifact(ctx, db.DbMap, store, workflowRun, artifactRef)
_, err = workflow.CanUploadRunResult(ctx, db.DbMap, store, workflowRun, artifactRef)
require.NoError(t, err)
}
2 changes: 1 addition & 1 deletion engine/api/workflow_application.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (api *API) releaseApplicationWorkflowHandler() service.Handler {
var lastErr error
for {
attempt++
reader, err := api.Client.CDNItemDownload(ctx, cdnHTTP, r.CDNRefHash, sdk.CDNTypeItemArtifact)
reader, err := api.Client.CDNItemDownload(ctx, cdnHTTP, r.CDNRefHash, sdk.CDNTypeItemRunResult)
if err != nil {
return err
}
Expand Down
10 changes: 5 additions & 5 deletions engine/api/workflow_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -1478,7 +1478,7 @@ func (api *API) postWorkflowRunResultsHandler() service.Handler {
}
}

func (api *API) workflowRunArtifactCheckUploadHandler() service.Handler {
func (api *API) workflowRunResultCheckUploadHandler() service.Handler {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
vars := mux.Vars(r)
key := vars["key"]
Expand All @@ -1500,21 +1500,21 @@ func (api *API) workflowRunArtifactCheckUploadHandler() service.Handler {
return sdk.WrapError(sdk.ErrForbidden, "only CDN can call this route")
}

var apiRef sdk.CDNArtifactAPIRef
var apiRef sdk.CDNRunResultAPIRef
if err := service.UnmarshalBody(r, &apiRef); err != nil {
return sdk.WithStack(err)
}

if apiRef.ToFilename() == "" {
return sdk.WrapError(sdk.ErrInvalidData, "unable to read artifact api ref")
return sdk.WrapError(sdk.ErrInvalidData, "unable to read run result api ref")
}

wr, err := workflow.LoadRun(ctx, api.mustDB(), key, name, number, workflow.LoadRunOptions{DisableDetailledNodeRun: true})
if err != nil {
return err
}

b, err := workflow.CanUploadArtifact(ctx, api.mustDBWithCtx(ctx), api.Cache, *wr, apiRef)
b, err := workflow.CanUploadRunResult(ctx, api.mustDBWithCtx(ctx), api.Cache, *wr, apiRef)
if err != nil {
return err
}
Expand All @@ -1523,7 +1523,7 @@ func (api *API) workflowRunArtifactCheckUploadHandler() service.Handler {
}

// Save check
if err := api.Cache.SetWithTTL(workflow.GetArtifactResultKey(apiRef.RunID, apiRef.ArtifactName), true, 600); err != nil {
if err := api.Cache.SetWithTTL(workflow.GetRunResultKey(apiRef.RunID, apiRef.RunResultType, apiRef.ArtifactName), true, 600); err != nil {
return sdk.WrapError(err, "unable to cache result artifact check %s ", apiRef.ToFilename())
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion engine/api/workflow_run_artifact.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (api *API) getWorkflowRunArtifactLinksHandler() service.Handler {
return err
}

result, err := cdn.ListItems(ctx, api.mustDB(), sdk.CDNTypeItemArtifact, map[string]string{cdn.ParamRunID: strconv.Itoa(int(wr.ID))})
result, err := cdn.ListItems(ctx, api.mustDB(), sdk.CDNTypeItemRunResult, map[string]string{cdn.ParamRunID: strconv.Itoa(int(wr.ID))})
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion engine/api/workflow_run_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func (api *API) getWorkflowAccessHandler() service.Handler {
switch sdk.CDNItemType(itemType) {
case sdk.CDNTypeItemStepLog, sdk.CDNTypeItemServiceLog:
enabled = true
case sdk.CDNTypeItemArtifact:
case sdk.CDNTypeItemRunResult:
_, enabled = featureflipping.IsEnabled(ctx, gorpmapping.Mapper, api.mustDB(), sdk.FeatureCDNArtifact, map[string]string{
"project_key": projectKey,
})
Expand Down
Loading

0 comments on commit 6e2b3e7

Please sign in to comment.