Skip to content

Commit

Permalink
fix(api): include repo type for run result unicity (#6184)
Browse files Browse the repository at this point in the history
  • Loading branch information
richardlt authored Jun 1, 2022
1 parent a070b16 commit dc7444b
Show file tree
Hide file tree
Showing 13 changed files with 221 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ func (e *artifactoryReleasePlugin) Run(ctx context.Context, opts *integrationplu
return fail("unable to list run results: %v", err)
}

fmt.Printf("Found %d run results\n", len(runResult))

log.SetLogger(log.NewLogger(log.INFO, os.Stdout))
if distributionURL == "" {
fmt.Printf("Using %s to release\n", artifactoryURL)
Expand Down Expand Up @@ -137,8 +139,15 @@ func (e *artifactoryReleasePlugin) Run(ctx context.Context, opts *integrationplu
break
}
}
name, err := r.ComputeName()
if err != nil {
return fail("unable to read result %s: %v", r.ID, err)
}
if skip {
fmt.Printf("Result %q skipped\n", name)
continue
} else {
fmt.Printf("Result %q to promote\n", name)
}
switch rData.RepoType {
case "docker":
Expand Down
38 changes: 17 additions & 21 deletions engine/api/workflow/workflow_run_results.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,42 +354,38 @@ func insertResult(tx gorpmapper.SqlExecutorWithTx, runResult *sdk.WorkflowRunRes
return nil
}

func getAll(ctx context.Context, db gorp.SqlExecutor, query gorpmapping.Query) ([]sdk.WorkflowRunResult, error) {
func getAll(ctx context.Context, db gorp.SqlExecutor, query gorpmapping.Query) (sdk.WorkflowRunResults, error) {
var dbResults []dbRunResult
if err := gorpmapping.GetAll(ctx, db, query, &dbResults); err != nil {
return nil, err
}
results := make([]sdk.WorkflowRunResult, 0, len(dbResults))
results := make(sdk.WorkflowRunResults, 0, len(dbResults))
for _, r := range dbResults {
results = append(results, sdk.WorkflowRunResult(r))
}
return results, nil
}

func LoadRunResultsByRunID(ctx context.Context, db gorp.SqlExecutor, runID int64) ([]sdk.WorkflowRunResult, error) {
dbQuery := `
WITH allResults AS (
SELECT data->>'name' AS name, sub_num, id
FROM workflow_run_result
WHERE workflow_run_id = $1
),
deduplication AS (
SELECT distinct on (name) *
FROM allResults
ORDER BY name, sub_num DESC
)
SELECT * FROM workflow_run_result WHERE id IN (SELECT id FROM deduplication);`
query := gorpmapping.NewQuery(dbQuery).Args(runID)
func LoadRunResultsByRunID(ctx context.Context, db gorp.SqlExecutor, runID int64) (sdk.WorkflowRunResults, error) {
query := gorpmapping.NewQuery("SELECT * FROM workflow_run_result WHERE workflow_run_id = $1 ORDER BY sub_num DESC").Args(runID)
return getAll(ctx, db, query)
}

func LoadRunResultsByNodeRunID(ctx context.Context, db gorp.SqlExecutor, nodeRunID int64) ([]sdk.WorkflowRunResult, error) {
query := gorpmapping.NewQuery("SELECT * FROM workflow_run_result where workflow_node_run_id = $1").Args(nodeRunID)
return getAll(ctx, db, query)
func LoadRunResultsByRunIDUnique(ctx context.Context, db gorp.SqlExecutor, runID int64) (sdk.WorkflowRunResults, error) {
query := gorpmapping.NewQuery("SELECT * FROM workflow_run_result WHERE workflow_run_id = $1 ORDER BY sub_num DESC").Args(runID)
rs, err := getAll(ctx, db, query)
if err != nil {
return nil, err
}
return rs.Unique()
}

func LoadRunResultsByRunIDAndType(ctx context.Context, db gorp.SqlExecutor, runID int64, t sdk.WorkflowRunResultType) ([]sdk.WorkflowRunResult, error) {
query := gorpmapping.NewQuery("SELECT * FROM workflow_run_result where workflow_run_id = $1 AND type = $2").Args(runID, t)
func LoadRunResultsByNodeRunID(ctx context.Context, db gorp.SqlExecutor, nodeRunID int64) (sdk.WorkflowRunResults, error) {
query := gorpmapping.NewQuery("SELECT * FROM workflow_run_result WHERE workflow_node_run_id = $1").Args(nodeRunID)
return getAll(ctx, db, query)
}

func LoadRunResultsByRunIDAndType(ctx context.Context, db gorp.SqlExecutor, runID int64, t sdk.WorkflowRunResultType) (sdk.WorkflowRunResults, error) {
query := gorpmapping.NewQuery("SELECT * FROM workflow_run_result WHERE workflow_run_id = $1 AND type = $2").Args(runID, t)
return getAll(ctx, db, query)
}
20 changes: 15 additions & 5 deletions engine/api/workflow/workflow_run_results_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ func TestCanUploadArtifactAlreadyExist(t *testing.T) {
Type: sdk.WorkflowRunResultTypeArtifact,
}
artiData := sdk.WorkflowRunResultArtifact{
Name: "myartifact",
WorkflowRunResultArtifactCommon: sdk.WorkflowRunResultArtifactCommon{
Name: "myartifact",
},
CDNRefHash: "123",
MD5: "123",
Size: 1,
Expand Down Expand Up @@ -171,7 +173,9 @@ func TestCanUploadArtifactAlreadyExistInMoreRecentSubNum(t *testing.T) {
Type: sdk.WorkflowRunResultTypeArtifact,
}
artiData := sdk.WorkflowRunResultArtifact{
Name: "myartifact",
WorkflowRunResultArtifactCommon: sdk.WorkflowRunResultArtifactCommon{
Name: "myartifact",
},
CDNRefHash: "123",
MD5: "123",
Size: 1,
Expand Down Expand Up @@ -230,7 +234,9 @@ func TestCanUploadArtifactAlreadyExistInAPreviousSubNum(t *testing.T) {
Type: sdk.WorkflowRunResultTypeArtifact,
}
artiData := sdk.WorkflowRunResultArtifact{
Name: "myartifact",
WorkflowRunResultArtifactCommon: sdk.WorkflowRunResultArtifactCommon{
Name: "myartifact",
},
CDNRefHash: "123",
MD5: "123",
Size: 1,
Expand Down Expand Up @@ -275,7 +281,9 @@ func TestCanUploadStaticFile(t *testing.T) {
Type: sdk.WorkflowRunResultTypeStaticFile,
}
artiData := sdk.WorkflowRunResultStaticFile{
Name: "my title static file",
WorkflowRunResultArtifactCommon: sdk.WorkflowRunResultArtifactCommon{
Name: "my title static file",
},
RemoteURL: "https://foo/bar",
}
bts, err := json.Marshal(artiData)
Expand Down Expand Up @@ -309,7 +317,9 @@ func TestCanUploadStaticFileInvalid(t *testing.T) {
Type: sdk.WorkflowRunResultTypeStaticFile,
}
artiData := sdk.WorkflowRunResultStaticFile{
Name: "my title static file",
WorkflowRunResultArtifactCommon: sdk.WorkflowRunResultArtifactCommon{
Name: "my title static file",
},
RemoteURL: "",
}
bts, err := json.Marshal(artiData)
Expand Down
4 changes: 3 additions & 1 deletion engine/api/workflow_purge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,10 @@ func Test_Purge_DeleteArtifactsFromRepositoryManager(t *testing.T) {
require.NoError(t, err)

data := sdk.WorkflowRunResultArtifactManager{
WorkflowRunResultArtifactCommon: sdk.WorkflowRunResultArtifactCommon{
Name: "foo",
},
Path: "path/to/foo",
Name: "foo",
RepoName: "repository",
}
rawData, _ := json.Marshal(data)
Expand Down
4 changes: 3 additions & 1 deletion engine/api/workflow_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1556,10 +1556,12 @@ func Test_workflowRunResultsAdd(t *testing.T) {
}

artiData := sdk.WorkflowRunResultArtifact{
WorkflowRunResultArtifactCommon: sdk.WorkflowRunResultArtifactCommon{
Name: "myartifact",
},
Size: 1,
MD5: "AA",
CDNRefHash: "AA",
Name: "myartifact",
Perm: 0777,
}
bts, err := json.Marshal(artiData)
Expand Down
2 changes: 1 addition & 1 deletion engine/api/workflow_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -1348,7 +1348,7 @@ func (api *API) getWorkflowRunResultsHandler() service.Handler {
return sdk.WrapError(err, "unable to load workflow run for workflow %s and number %d", name, number)
}

results, err := workflow.LoadRunResultsByRunID(ctx, api.mustDB(), wr.ID)
results, err := workflow.LoadRunResultsByRunIDUnique(ctx, api.mustDB(), wr.ID)
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion engine/api/workflow_run_result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ func Test_getWorkflowRunAndNodeRunResults(t *testing.T) {
require.NoError(t, err)

artiData := sdk.WorkflowRunResultArtifact{
Name: "myarti",
WorkflowRunResultArtifactCommon: sdk.WorkflowRunResultArtifactCommon{
Name: "myarti",
},
CDNRefHash: "123",
MD5: "123",
Size: 1,
Expand Down
8 changes: 6 additions & 2 deletions engine/cdn/cdn_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,15 +152,19 @@ func (s *Service) storeFile(ctx context.Context, sig cdn.Signature, reader io.Re
switch runResultApiRef.RunResultType {
case sdk.WorkflowRunResultTypeArtifact:
result = sdk.WorkflowRunResultArtifact{
Name: apiRef.ToFilename(),
WorkflowRunResultArtifactCommon: sdk.WorkflowRunResultArtifactCommon{
Name: apiRef.ToFilename(),
},
Size: it.Size,
MD5: it.MD5,
CDNRefHash: it.APIRefHash,
Perm: runResultApiRef.Perm,
}
case sdk.WorkflowRunResultTypeCoverage:
result = sdk.WorkflowRunResultCoverage{
Name: apiRef.ToFilename(),
WorkflowRunResultArtifactCommon: sdk.WorkflowRunResultArtifactCommon{
Name: apiRef.ToFilename(),
},
Size: it.Size,
MD5: it.MD5,
CDNRefHash: it.APIRefHash,
Expand Down
8 changes: 6 additions & 2 deletions engine/worker/cmd_run_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ func addStaticFileRunResultCmd() func(cmd *cobra.Command, args []string) {
}

payload := sdk.WorkflowRunResultStaticFile{
Name: name,
WorkflowRunResultArtifactCommon: sdk.WorkflowRunResultArtifactCommon{
Name: name,
},
RemoteURL: remoteURL.String(),
}
data, _ := json.Marshal(payload)
Expand Down Expand Up @@ -109,7 +111,9 @@ func addArtifactManagerRunResultCmd() func(cmd *cobra.Command, args []string) {
}

payload := sdk.WorkflowRunResultArtifactManager{
Name: fileName,
WorkflowRunResultArtifactCommon: sdk.WorkflowRunResultArtifactCommon{
Name: fileName,
},
Perm: 0,
Path: filePath,
RepoName: repositoryName,
Expand Down
4 changes: 3 additions & 1 deletion engine/worker/internal/action/builtin_artifact_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,9 @@ func addWorkflowRunResult(ctx context.Context, wk workerruntime.Runtime, filePat
}

data := sdk.WorkflowRunResultArtifactManager{
Name: uploadResult.Outputs[sdk.ArtifactUploadPluginOutputPathFileName],
WorkflowRunResultArtifactCommon: sdk.WorkflowRunResultArtifactCommon{
Name: uploadResult.Outputs[sdk.ArtifactUploadPluginOutputPathFileName],
},
Perm: uint32(perm),
RepoName: uploadResult.Outputs[sdk.ArtifactUploadPluginOutputPathRepoName],
Path: uploadResult.Outputs[sdk.ArtifactUploadPluginOutputPathFilePath],
Expand Down
8 changes: 6 additions & 2 deletions engine/worker/internal/handler_run_result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ func Test_addRunResultStaticFileHandler(t *testing.T) {
).Times(1)

v := sdk.WorkflowRunResultStaticFile{
Name: "foo",
WorkflowRunResultArtifactCommon: sdk.WorkflowRunResultArtifactCommon{
Name: "foo",
},
RemoteURL: "http://locat.local/static/foo.html",
}
buf, err := json.Marshal(v)
Expand Down Expand Up @@ -107,7 +109,9 @@ func Test_addRunResultArtifactManagerHandler(t *testing.T) {
).Times(1)

v := sdk.WorkflowRunResultArtifactManager{
Name: "foo",
WorkflowRunResultArtifactCommon: sdk.WorkflowRunResultArtifactCommon{
Name: "foo",
},
RepoName: "my-repo",
}
buf, err := json.Marshal(v)
Expand Down
73 changes: 68 additions & 5 deletions sdk/workflow_run_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package sdk

import (
"encoding/json"
"fmt"
"sort"
"time"
)

Expand All @@ -15,6 +17,28 @@ const (
type WorkflowRunResultType string
type WorkflowRunResultDataKey string

type WorkflowRunResults []WorkflowRunResult

// Unique returns the last version of each results
func (w WorkflowRunResults) Unique() (WorkflowRunResults, error) {
m := make(map[string]WorkflowRunResult, len(w))
for i := range w {
key, err := w[i].ComputeUniqueKey()
if err != nil {
return nil, err
}
if v, ok := m[key]; !ok || v.SubNum < w[i].SubNum {
m[key] = w[i]
}
}
filtered := make(WorkflowRunResults, 0, len(m))
for _, v := range m {
filtered = append(filtered, v)
}
sort.Slice(filtered, func(i, j int) bool { return filtered[i].Created.Before(filtered[j].Created) })
return filtered, nil
}

type WorkflowRunResult struct {
ID string `json:"id" db:"id"`
Created time.Time `json:"created" db:"created"`
Expand All @@ -26,12 +50,47 @@ type WorkflowRunResult struct {
DataRaw json.RawMessage `json:"data" db:"data"`
}

func (r WorkflowRunResult) ComputeUniqueKey() (string, error) {
key := fmt.Sprintf("%d-%s", r.WorkflowRunID, r.Type)
switch r.Type {
case WorkflowRunResultTypeArtifactManager:
var data WorkflowRunResultArtifactManager
if err := json.Unmarshal(r.DataRaw, &data); err != nil {
return "", WithStack(err)
}
key = key + "-" + data.Name + "-" + data.RepoType
default:
var data WorkflowRunResultArtifactCommon
if err := json.Unmarshal(r.DataRaw, &data); err != nil {
return "", WithStack(err)
}
key = key + "-" + data.Name
}
return key, nil
}

func (r WorkflowRunResult) ComputeName() (string, error) {
switch r.Type {
case WorkflowRunResultTypeArtifactManager:
var data WorkflowRunResultArtifactManager
if err := json.Unmarshal(r.DataRaw, &data); err != nil {
return "", WithStack(err)
}
return fmt.Sprintf("%s (%s: %s)", data.Name, r.Type, data.RepoType), nil
default:
var data WorkflowRunResultArtifactCommon
if err := json.Unmarshal(r.DataRaw, &data); err != nil {
return "", WithStack(err)
}
return fmt.Sprintf("%s (%s)", data.Name, r.Type), nil
}
}

func (r *WorkflowRunResult) GetArtifact() (WorkflowRunResultArtifact, error) {
var data WorkflowRunResultArtifact
if err := JSONUnmarshal(r.DataRaw, &data); err != nil {
return data, WithStack(err)
}

return data, nil
}

Expand Down Expand Up @@ -70,8 +129,12 @@ type WorkflowRunResultCheck struct {
ResultType WorkflowRunResultType `json:"result_type"`
}

type WorkflowRunResultArtifactCommon struct {
Name string `json:"name"`
}

type WorkflowRunResultArtifactManager struct {
Name string `json:"name"`
WorkflowRunResultArtifactCommon
Size int64 `json:"size"`
MD5 string `json:"md5"`
Path string `json:"path"`
Expand All @@ -98,7 +161,7 @@ func (a *WorkflowRunResultArtifactManager) IsValid() error {
}

type WorkflowRunResultStaticFile struct {
Name string `json:"name"`
WorkflowRunResultArtifactCommon
RemoteURL string `json:"remote_url"`
}

Expand All @@ -113,7 +176,7 @@ func (a *WorkflowRunResultStaticFile) IsValid() error {
}

type WorkflowRunResultArtifact struct {
Name string `json:"name"`
WorkflowRunResultArtifactCommon
Size int64 `json:"size"`
MD5 string `json:"md5"`
CDNRefHash string `json:"cdn_hash"`
Expand All @@ -138,7 +201,7 @@ func (a *WorkflowRunResultArtifact) IsValid() error {
}

type WorkflowRunResultCoverage struct {
Name string `json:"name"`
WorkflowRunResultArtifactCommon
Size int64 `json:"size"`
MD5 string `json:"md5"`
CDNRefHash string `json:"cdn_hash"`
Expand Down
Loading

0 comments on commit dc7444b

Please sign in to comment.