Skip to content

Commit

Permalink
refactor(worker): delete old artifacts engine (#6115)
Browse files Browse the repository at this point in the history
* refactor(worker): delete old artifacts engine

Signed-off-by: Yvonnick Esnault <[email protected]>
  • Loading branch information
yesnault authored Mar 25, 2022
1 parent 5a33878 commit 881ed4b
Show file tree
Hide file tree
Showing 23 changed files with 201 additions and 358 deletions.
66 changes: 1 addition & 65 deletions engine/worker/internal/action/builtin_artifact_download.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ func RunArtifactDownload(ctx context.Context, wk workerruntime.Runtime, a sdk.Ac
pattern := sdk.ParameterValue(a.Parameters, "pattern")

destPath := sdk.ParameterValue(a.Parameters, "path")
tag := sdk.ParameterValue(a.Parameters, "tag")

actionWorkflow := sdk.ParameterValue(a.Parameters, "workflow")
actionRunNumber := sdk.ParameterValue(a.Parameters, "number")
Expand Down Expand Up @@ -90,14 +89,10 @@ func RunArtifactDownload(ctx context.Context, wk workerruntime.Runtime, a sdk.Ac

// Priority:
// 1. Integration artifact manager on workflow
// 2. CDN activated or not
// 2. CDN
if pluginArtifactManagement != nil {
return GetArtifactFromIntegrationPlugin(ctx, wk, res, pattern, reg, destPath, pluginArtifactManagement, project, destinationWorkflow, n)
}
// GET Artifact from CDS API
if !wk.FeatureEnabled(sdk.FeatureCDNArtifact) {
return GetArtifactFromAPI(ctx, wk, project, destinationWorkflow, n, res, pattern, reg, tag, destPath, wkDirFS)
}

// GET Artifact from CDS CDN
cdnItems, err := wk.Client().WorkflowRunArtifactsLinks(project, destinationWorkflow, n)
Expand Down Expand Up @@ -275,62 +270,3 @@ func runGRPCIntegrationPlugin(ctx context.Context, wk workerruntime.Runtime, bin
}
return nil
}

func GetArtifactFromAPI(ctx context.Context, wk workerruntime.Runtime, project string, workflow string, n int64, res sdk.Result, pattern string, regexp *regexp.Regexp, tag string, destPath string, wkDirFS afero.Fs) (sdk.Result, error) {
wg := new(sync.WaitGroup)
artifacts, err := wk.Client().WorkflowRunArtifacts(project, workflow, n)
if err != nil {
return res, err
}
wg.Add(len(artifacts))
for i := range artifacts {
a := &artifacts[i]

if pattern != "" && !regexp.MatchString(a.Name) {
wk.SendLog(ctx, workerruntime.LevelInfo, fmt.Sprintf("%s does not match pattern %s - skipped", a.Name, pattern))
wg.Done()
continue
}

if tag != "" && a.Tag != tag {
wk.SendLog(ctx, workerruntime.LevelInfo, fmt.Sprintf("%s does not match tag %s - skipped", a.Name, tag))
wg.Done()
continue
}

go func(a *sdk.WorkflowNodeRunArtifact) {
defer wg.Done()

destFile := path.Join(destPath, a.Name)
f, err := wkDirFS.OpenFile(destFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(a.Perm))
if err != nil {
res.Status = sdk.StatusFail
res.Reason = err.Error()
log.Warn(ctx, "Cannot download artifact (OpenFile) %s: %s", destFile, err)
wk.SendLog(ctx, workerruntime.LevelError, res.Reason)
return
}
wk.SendLog(ctx, workerruntime.LevelInfo, fmt.Sprintf("Downloading artifact %s from workflow %s/%s on run %d...", destFile, project, workflow, n))
if err := wk.Client().WorkflowNodeRunArtifactDownload(project, workflow, *a, f); err != nil {
res.Status = sdk.StatusFail
res.Reason = err.Error()
log.Warn(ctx, "Cannot download artifact %s: %s", destFile, err)
wk.SendLog(ctx, workerruntime.LevelError, res.Reason)
return
}
if err := f.Close(); err != nil {
res.Status = sdk.StatusFail
res.Reason = err.Error()
log.Warn(ctx, "Cannot download artifact %s: %s", destFile, err)
wk.SendLog(ctx, workerruntime.LevelError, res.Reason)
return
}
}(a)
// TODO: write here a reason why we are waiting 3 seconds
if len(artifacts) > 1 {
time.Sleep(3 * time.Second)
}
}
wg.Wait()
return res, nil
}
74 changes: 47 additions & 27 deletions engine/worker/internal/action/builtin_artifact_download_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,34 @@ func TestRunArtifactDownload(t *testing.T) {

wk, ctx := SetupTest(t)

as := []sdk.WorkflowNodeRunArtifact{
{
ID: 1,
Name: "myFile.txt",
Tag: "999",
},
{
ID: 2,
Name: "myFile.csv",
Tag: "999",
},
}

f1 := bytes.NewBufferString("contentfile")

gock.New("http://lolcat.host").Get("/project/projKey/workflows/workflowName/runs/999/artifacts").
Reply(200).JSON(as)
gock.New("http://lolcat.host").Get("/project/projKey/workflows/workflowName/artifact/1").
gock.New("http://cds-cdn.local").Get("/item/run-result/the-ref/download").
Reply(200).Body(f1)

ref := sdk.CDNRunResultAPIRef{
ProjectKey: "projKey",
WorkflowName: "WorkflowName",
ArtifactName: "myFile.txt",
RunResultType: sdk.WorkflowRunResultTypeArtifact,
}

it := sdk.CDNItemLinks{
CDNHttpURL: "http://lolcat.cdn.host",
Items: []sdk.CDNItem{
{
ID: "1",
Type: sdk.CDNTypeItemRunResult,
APIRef: &ref,
APIRefHash: "the-ref",
MD5: "d62dd48969b2bcf4023f51be7cc02c05",
},
},
}

gock.New("http://cds-api.local").Get("/project/projKey/workflows/workflowName/runs/999/artifacts/links").
Reply(200).JSON(it)

gock.InterceptClient(wk.Client().(cdsclient.Raw).HTTPClient())
gock.InterceptClient(wk.Client().(cdsclient.Raw).HTTPNoTimeoutClient())

Expand Down Expand Up @@ -88,21 +96,33 @@ func TestRunArtifactDownloadOutsideWorkspace(t *testing.T) {
wk, ctx := SetupTest(t)

fileName := sdk.RandomString(10)
f1 := bytes.NewBufferString("contentfile")

as := []sdk.WorkflowNodeRunArtifact{
{
ID: 1,
Name: fileName,
Tag: "999",
},
gock.New("http://cds-cdn.local").Get("/item/run-result/the-ref/download").
Reply(200).Body(f1)

ref := sdk.CDNRunResultAPIRef{
ProjectKey: "projKey",
WorkflowName: "WorkflowName",
ArtifactName: fileName,
RunResultType: sdk.WorkflowRunResultTypeArtifact,
}

f1 := bytes.NewBufferString("contentfile")
it := sdk.CDNItemLinks{
CDNHttpURL: "http://lolcat.cdn.host",
Items: []sdk.CDNItem{
{
ID: "1",
Type: sdk.CDNTypeItemRunResult,
APIRef: &ref,
APIRefHash: "the-ref",
MD5: "d62dd48969b2bcf4023f51be7cc02c05",
},
},
}

gock.New("http://lolcat.host").Get("/project/projKey/workflows/workflowName/runs/999/artifacts").
Reply(200).JSON(as)
gock.New("http://lolcat.host").Get("/project/projKey/workflows/workflowName/artifact/1").
Reply(200).Body(f1)
gock.New("http://cds-api.local").Get("/project/projKey/workflows/workflowName/runs/999/artifacts/links").
Reply(200).JSON(it)

gock.InterceptClient(wk.Client().(cdsclient.Raw).HTTPClient())
gock.InterceptClient(wk.Client().(cdsclient.Raw).HTTPNoTimeoutClient())
Expand Down
46 changes: 5 additions & 41 deletions engine/worker/internal/action/builtin_artifact_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,6 @@ func RunArtifactUpload(ctx context.Context, wk workerruntime.Runtime, a sdk.Acti
artifactPath = filepath.Join(abs, artifactPath)
}

tagParam := sdk.ParameterFind(a.Parameters, "tag")
var tag string
if tagParam != nil {
tag = tagParam.Value
}

fileTypeParam := sdk.ParameterFind(a.Parameters, "type")
var fileType string
if fileTypeParam != nil {
Expand Down Expand Up @@ -84,47 +78,30 @@ func RunArtifactUpload(ctx context.Context, wk workerruntime.Runtime, a sdk.Acti
}
}()

cdnArtifactEnabled := wk.FeatureEnabled(sdk.FeatureCDNArtifact)

integrationName := sdk.DefaultIfEmptyStorage(strings.TrimSpace(sdk.ParameterValue(a.Parameters, "destination")))
projectKey := sdk.ParameterValue(wk.Parameters(), "cds.project")
pluginArtifactManagement := wk.GetPlugin(sdk.GRPCPluginUploadArtifact)

wg.Add(len(filesPath))
for _, p := range filesPath {
go func(path string) {
log.Debug(ctx, "uploading %s projectKey:%v integrationName:%v job:%d", path, projectKey, integrationName, jobID)
log.Debug(ctx, "uploading %s projectKey:%v job:%d", path, projectKey, jobID)
defer wg.Done()

// Priority:
// 1. Integration specified on artifact upload action ( advanced parameter )
// 2. Integration artifact manager on workflow
// 3. CDN activated or not
if integrationName != sdk.DefaultStorageIntegrationName {
if err := uploadArtifactByApiCall(path, wk, ctx, projectKey, integrationName, jobID, tag, fileType); err != nil {
chanError <- sdk.WrapError(err, "Error while uploading artifact by api call %s", path)
wgErrors.Add(1)
}
return
} else if pluginArtifactManagement != nil {
// 1. Integration artifact manager on workflow
// 2. CDN
if pluginArtifactManagement != nil {
if err := uploadArtifactByIntegrationPlugin(path, ctx, wk, pluginArtifactManagement, fileType); err != nil {
chanError <- sdk.WrapError(err, "Error while uploading artifact by plugin %s", path)
wgErrors.Add(1)
}
} else if !cdnArtifactEnabled {
if err := uploadArtifactByApiCall(path, wk, ctx, projectKey, integrationName, jobID, tag, fileType); err != nil {
chanError <- sdk.WrapError(err, "Error while uploading artifact by api call %s", path)
wgErrors.Add(1)
}
return
} else {
if err := uploadArtifactIntoCDN(path, ctx, wk); err != nil {
chanError <- err
wgErrors.Add(1)
}
return
}

}(p)
if len(filesPath) > 1 {
//Wait 3 second to get the object storage to set up all the things
Expand Down Expand Up @@ -215,7 +192,7 @@ func uploadArtifactByIntegrationPlugin(path string, ctx context.Context, wk work
return fmt.Errorf("error uploading artifact: %v", err)
}

if strings.ToUpper(res.Status) != strings.ToUpper(sdk.StatusSuccess) {
if !strings.EqualFold(res.Status, sdk.StatusSuccess) {
return fmt.Errorf("plugin execution failed %s: %s", res.Status, res.Details)
}
res.Outputs[sdk.ArtifactUploadPluginOutputFileType] = fileType
Expand Down Expand Up @@ -248,19 +225,6 @@ func uploadArtifactIntoCDN(path string, ctx context.Context, wk workerruntime.Ru
return nil
}

func uploadArtifactByApiCall(path string, wk workerruntime.Runtime, ctx context.Context, projectKey string, integrationName string, jobID int64, tag, fileType string) error {
throughTempURL, duration, err := wk.Client().QueueArtifactUpload(ctx, projectKey, integrationName, jobID, tag, path, fileType)
if err != nil {
return err
}
if throughTempURL {
wk.SendLog(ctx, workerruntime.LevelInfo, fmt.Sprintf("File '%s' uploaded in %.2fs to object store", path, duration.Seconds()))
} else {
wk.SendLog(ctx, workerruntime.LevelInfo, fmt.Sprintf("File '%s' uploaded in %.2fs to CDS API", path, duration.Seconds()))
}
return nil
}

func checkArtifactUpload(ctx context.Context, wk workerruntime.Runtime, fileName string, runResultType sdk.WorkflowRunResultType) (int, error) {
runID, runNodeID, runJobID := wk.GetJobIdentifiers()
runResultCheck := sdk.WorkflowRunResultCheck{
Expand Down
45 changes: 4 additions & 41 deletions engine/worker/internal/action/builtin_artifact_upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package action
import (
"bytes"
"io"
"mime"
"net/http"
"os"
"path/filepath"
Expand Down Expand Up @@ -34,11 +33,7 @@ func TestRunArtifactUpload_Absolute(t *testing.T) {

defer os.Remove("foo")

gock.New("http://lolcat.host").Get("/project/project/storage/shared.infra").
Reply(200)

gock.New("http://lolcat.host").Post("/project/project/storage/shared.infra/artifact/dGFn").
Reply(200)
gock.New("http://cds-cdn.local").Post("/item/upload").Reply(200)

var checkRequest gock.ObserverFunc = func(request *http.Request, mock gock.Mock) {
bodyContent, err := io.ReadAll(request.Body)
Expand All @@ -47,21 +42,7 @@ func TestRunArtifactUpload_Absolute(t *testing.T) {
if mock != nil {
t.Logf("%s %s - Body: %s", mock.Request().Method, mock.Request().URLStruct.String(), string(bodyContent))
}
switch mock.Request().URLStruct.String() {
case "http://lolcat.host/queue/workflows/666/coverage":
require.NoError(t, request.ParseMultipartForm(10000))

_, params, err := mime.ParseMediaType(request.Header.Get("Content-Disposition"))
assert.NoError(t, err)

fileName := params["filename"]
assert.Equal(t, "foo", fileName)

md5 := params["md4sum"]
assert.Equal(t, "32c0c0a755c70c6faef2eeb98b66c3aeee4c389d62bb9b639796c37abe3d", md5)

case "/project/project/storage/shared.infra/artifact":
}
assert.Equal(t, "http://cds-cdn.local/item/upload", mock.Request().URLStruct.String())
}

gock.Observe(checkRequest)
Expand Down Expand Up @@ -98,11 +79,7 @@ func TestRunArtifactUpload_Relative(t *testing.T) {
fname := filepath.Join(wk.workingDirectory.Name(), "foo")
assert.NoError(t, afero.WriteFile(wk.workspace, fname, []byte("something"), os.ModePerm))

gock.New("http://lolcat.host").Get("/project/project/storage/shared.infra").
Reply(200)

gock.New("http://lolcat.host").Post("/project/project/storage/shared.infra/artifact/dGFn").
Reply(200)
gock.New("http://cds-cdn.local").Post("/item/upload").Reply(200)

var checkRequest gock.ObserverFunc = func(request *http.Request, mock gock.Mock) {
bodyContent, err := io.ReadAll(request.Body)
Expand All @@ -111,21 +88,7 @@ func TestRunArtifactUpload_Relative(t *testing.T) {
if mock != nil {
t.Logf("%s %s - Body: %s", mock.Request().Method, mock.Request().URLStruct.String(), string(bodyContent))
}
switch mock.Request().URLStruct.String() {
case "http://lolcat.host/queue/workflows/666/coverage":
require.NoError(t, request.ParseMultipartForm(10000))

_, params, err := mime.ParseMediaType(request.Header.Get("Content-Disposition"))
assert.NoError(t, err)

fileName := params["filename"]
assert.Equal(t, "foo", fileName)

md5 := params["md4sum"]
assert.Equal(t, "32c0c0a755c70c6faef2eeb98b66c3aeee4c389d62bb9b639796c37abe3d", md5)

case "/project/project/storage/shared.infra/artifact":
}
assert.Equal(t, "http://cds-cdn.local/item/upload", mock.Request().URLStruct.String())
}

gock.Observe(checkRequest)
Expand Down
Loading

0 comments on commit 881ed4b

Please sign in to comment.