Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(worker): remove all download artifact code #6001

Merged
merged 5 commits into from
Nov 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions engine/worker/internal/action/builtin_artifact_download.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,21 @@ func RunArtifactDownload(ctx context.Context, wk workerruntime.Runtime, a sdk.Ac
destPath := sdk.ParameterValue(a.Parameters, "path")
tag := sdk.ParameterValue(a.Parameters, "tag")

actionWorkflow := sdk.ParameterValue(a.Parameters, "workflow")
actionRunNumber := sdk.ParameterValue(a.Parameters, "number")

destinationWorkflow := workflow
if actionWorkflow != "" {
destinationWorkflow = actionWorkflow
}
destinationWorkflowNum := number
if actionRunNumber != "0" && actionRunNumber != "" {
destinationWorkflowNum = actionRunNumber
}

if destPath == "" {
destPath = "."
}

workdir, err := workerruntime.WorkingDirectory(ctx)
if err != nil {
res.Status = sdk.StatusFail
Expand All @@ -61,7 +72,7 @@ func RunArtifactDownload(ctx context.Context, wk workerruntime.Runtime, a sdk.Ac

wk.SendLog(ctx, workerruntime.LevelInfo, fmt.Sprintf("Downloading artifacts from workflow into '%s'...", destPath))

n, err := strconv.ParseInt(number, 10, 64)
n, err := strconv.ParseInt(destinationWorkflowNum, 10, 64)
if err != nil {
res.Status = sdk.StatusFail
return res, fmt.Errorf("cds.run.number variable is not valid. aborting")
Expand All @@ -81,15 +92,15 @@ func RunArtifactDownload(ctx context.Context, wk workerruntime.Runtime, a sdk.Ac
// 1. Integration artifact manager on workflow
// 2. CDN activated or not
if pluginArtifactManagement != nil {
return GetArtifactFromIntegrationPlugin(ctx, wk, res, pattern, reg, destPath, pluginArtifactManagement, project, workflow, n)
return GetArtifactFromIntegrationPlugin(ctx, wk, res, pattern, reg, destPath, pluginArtifactManagement, project, destinationWorkflow, n)
}
// GET Artifact from CDS API
if !wk.FeatureEnabled(sdk.FeatureCDNArtifact) {
return GetArtifactFromAPI(ctx, wk, project, workflow, n, res, pattern, reg, tag, destPath, wkDirFS)
return GetArtifactFromAPI(ctx, wk, project, destinationWorkflow, n, res, pattern, reg, tag, destPath, wkDirFS)
}

// GET Artifact from CDS CDN
cdnItems, err := wk.Client().WorkflowRunArtifactsLinks(project, workflow, n)
cdnItems, err := wk.Client().WorkflowRunArtifactsLinks(project, destinationWorkflow, n)
if err != nil {
res.Status = sdk.StatusFail
return res, err
Expand All @@ -114,7 +125,7 @@ func RunArtifactDownload(ctx context.Context, wk workerruntime.Runtime, a sdk.Ac
go func(a sdk.CDNItem) {
defer wg.Done()
destFile := path.Join(destPath, a.APIRef.ToFilename())
wk.SendLog(ctx, workerruntime.LevelInfo, fmt.Sprintf("Downloading artifact %s from workflow %s/%s on run %d...", destFile, project, workflow, n))
wk.SendLog(ctx, workerruntime.LevelInfo, fmt.Sprintf("Downloading artifact %s from workflow %s/%s on run %d...", destFile, project, destinationWorkflow, n))

f, err := os.OpenFile(destFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(apiRef.Perm))
if err != nil {
Expand Down
235 changes: 43 additions & 192 deletions engine/worker/internal/handler_download.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,11 @@ import (
"fmt"
"io"
"net/http"
"os"
"path"
"path/filepath"
"regexp"
"strconv"
"sync"
"time"

"github.com/spf13/afero"
"github.com/rockbears/log"

"github.com/ovh/cds/engine/worker/internal/action"
"github.com/ovh/cds/engine/worker/pkg/workerruntime"
"github.com/ovh/cds/sdk"
)
Expand All @@ -25,6 +20,7 @@ func downloadHandler(ctx context.Context, wk *CurrentWorker) http.HandlerFunc {
ctx = workerruntime.SetStepOrder(ctx, wk.currentJob.currentStepIndex)
ctx = workerruntime.SetStepName(ctx, wk.currentJob.currentStepName)

// Get body
data, errRead := io.ReadAll(r.Body)
if errRead != nil {
newError := sdk.NewError(sdk.ErrWrongRequest, errRead)
Expand All @@ -40,199 +36,54 @@ func downloadHandler(ctx context.Context, wk *CurrentWorker) http.HandlerFunc {
return
}

currentProject := sdk.ParameterValue(wk.currentJob.params, "cds.project")
currentWorkflow := sdk.ParameterValue(wk.currentJob.params, "cds.workflow")
if reqArgs.Workflow == "" {
reqArgs.Workflow = currentWorkflow
}

// If the reqArgs.Number is empty and if the reqArgs.Workflow is the current workflow, take the current build number
if reqArgs.Number == 0 {
if reqArgs.Workflow == currentWorkflow {
var errN error
buildNumberString := sdk.ParameterValue(wk.currentJob.params, "cds.run.number")
reqArgs.Number, errN = strconv.ParseInt(buildNumberString, 10, 64)
if errN != nil {
newError := sdk.NewError(sdk.ErrWrongRequest, fmt.Errorf("cannot parse '%s' as run number: %s", buildNumberString, errN))
writeError(w, r, newError)
return
}
} else { // If this is another workflow, check the latest run
runs, err := wk.client.WorkflowRunList(currentProject, reqArgs.Workflow, 0, 0)
if err != nil {
writeError(w, r, sdk.WrapError(err, "cannot search run for project %s and workflow: %s", currentProject, reqArgs.Workflow))
return
}
if len(runs) < 1 {
writeError(w, r, fmt.Errorf("workflow run not found"))
return
}
reqArgs.Number = runs[0].Number
}
}

projectKey := sdk.ParameterValue(wk.currentJob.params, "cds.project")

// GET Artifact from CDS API
if !wk.FeatureEnabled(sdk.FeatureCDNArtifact) {
if err := GetArtifactFromAPI(ctx, wk, projectKey, reqArgs); err != nil {
writeError(w, r, err)
return
}
return
}

// GET Artifact from CDS CDN
reg, err := regexp.Compile(reqArgs.Pattern)
a := sdk.Action{
Parameters: []sdk.Parameter{
{
Name: "path",
Type: sdk.StringParameter,
Value: reqArgs.Destination,
},
{
Name: "pattern",
Type: sdk.StringParameter,
Value: reqArgs.Pattern,
},
{
Name: "tag",
Type: sdk.StringParameter,
Value: reqArgs.Tag,
},
{
Name: "workflow",
Type: sdk.StringParameter,
Value: reqArgs.Workflow,
},
{
Name: "number",
Type: sdk.NumberParameter,
Value: strconv.Itoa(int(reqArgs.Number)),
},
},
}

workingDir, err := workerruntime.WorkingDirectory(wk.currentJob.context)
if err != nil {
newError := sdk.NewError(sdk.ErrInvalidData, fmt.Errorf("unable to compile pattern: %v", err))
writeError(w, r, newError)
log.Error(ctx, "Artifact download failed: No working directory: %v", err)
writeError(w, r, err)
return
}
if reqArgs.Destination == "" {
reqArgs.Destination = "."
}
ctx = workerruntime.SetWorkingDirectory(ctx, workingDir)

workdir, err := workerruntime.WorkingDirectory(wk.currentJob.context)
result, err := action.RunArtifactDownload(ctx, wk, a, wk.currentJob.secrets)
if err != nil {
newError := sdk.NewError(sdk.ErrInvalidData, fmt.Errorf("unable to get working directory: %v", err))
writeError(w, r, newError)
log.Error(ctx, "unable to download artifacts: %v", err)
writeError(w, r, err)
return
}
ctx = workerruntime.SetWorkingDirectory(ctx, workdir)

var abs string
if x, ok := wk.BaseDir().(*afero.BasePathFs); ok {
abs, _ = x.RealPath(workdir.Name())
} else {
abs = workdir.Name()
}

if !sdk.PathIsAbs(reqArgs.Destination) {
reqArgs.Destination = filepath.Join(abs, reqArgs.Destination)
}
wkDirFS := afero.NewOsFs()
if err := wkDirFS.MkdirAll(reqArgs.Destination, os.FileMode(0744)); err != nil {
newError := sdk.NewError(sdk.ErrInvalidData, fmt.Errorf("unable to create destination directory: %v", err))
writeError(w, r, newError)
if result.Status != sdk.StatusSuccess {
log.Error(ctx, "Artifact download failed: %v", result)
writeError(w, r, fmt.Errorf("artifact download failed: %s", result.Reason))
return
}

cdnItems, err := wk.Client().WorkflowRunArtifactsLinks(projectKey, reqArgs.Workflow, reqArgs.Number)
if err != nil {
newError := sdk.NewError(sdk.ErrUnknownError, fmt.Errorf("unable to list artifacts: %v", err))
writeError(w, r, newError)
return
}

wg := new(sync.WaitGroup)
wg.Add(len(cdnItems.Items))
for i := range cdnItems.Items {
item := cdnItems.Items[i]
apiRef, is := item.GetCDNRunResultApiRef()
if !is {
newError := sdk.NewError(sdk.ErrInvalidData, fmt.Errorf("item is not an artifact: %v", err))
writeError(w, r, newError)
return
}
if reqArgs.Pattern != "" && !reg.MatchString(apiRef.ToFilename()) {
wk.SendLog(ctx, workerruntime.LevelInfo, fmt.Sprintf("%s does not match pattern %s - skipped", apiRef.ArtifactName, reqArgs.Pattern))
wg.Done()
continue
}

go func(a sdk.CDNItem) {
defer wg.Done()
destFile := path.Join(reqArgs.Destination, a.APIRef.ToFilename())
wk.SendLog(ctx, workerruntime.LevelInfo, fmt.Sprintf("Downloading artifact %s from workflow %s/%s on run %d...", destFile, projectKey, reqArgs.Workflow, reqArgs.Number))

f, err := wkDirFS.OpenFile(destFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(apiRef.Perm))
if err != nil {
newError := sdk.NewError(sdk.ErrUnknownError, fmt.Errorf("cannot create file (OpenFile) %s: %s", destFile, err))
writeError(w, r, newError)
return
}
defer f.Close() //nolint
if err := wk.Client().CDNItemDownload(ctx, wk.CDNHttpURL(), item.APIRefHash, sdk.CDNTypeItemRunResult, a.MD5, f); err != nil {
newError := sdk.NewError(sdk.ErrUnknownError, fmt.Errorf("cannot download artifact %s: %s", destFile, err))
writeError(w, r, newError)
return
}
}(item)
}
wg.Wait()
return

}
}

func GetArtifactFromAPI(ctx context.Context, wk *CurrentWorker, projectKey string, reqArgs workerruntime.DownloadArtifact) error {
artifacts, err := wk.client.WorkflowRunArtifacts(projectKey, reqArgs.Workflow, reqArgs.Number)
if err != nil {
newError := sdk.NewError(sdk.ErrWrongRequest, fmt.Errorf("cannot download artifacts with worker download: %s", err))
return newError
}

reg, err := regexp.Compile(reqArgs.Pattern)
if err != nil {
newError := sdk.NewError(sdk.ErrWrongRequest, fmt.Errorf("invalid pattern %s : %v", reqArgs.Pattern, err))
return newError
}
wg := new(sync.WaitGroup)
wg.Add(len(artifacts))

wk.SendLog(ctx, workerruntime.LevelInfo, "Downloading artifacts into current directory")

var isInError bool
for i := range artifacts {
a := &artifacts[i]

if reqArgs.Pattern != "" && !reg.MatchString(a.Name) {
wg.Done()
continue
}

if reqArgs.Tag != "" && a.Tag != reqArgs.Tag {
wg.Done()
continue
}

go func(a *sdk.WorkflowNodeRunArtifact) {
defer wg.Done()

filePath := path.Join(reqArgs.Destination, a.Name)
f, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(a.Perm))
if err != nil {
wk.SendLog(ctx, workerruntime.LevelError, fmt.Sprintf("Cannot download artifact (OpenFile) %s: %s", a.Name, err))
isInError = true
return
}
wk.SendLog(ctx, workerruntime.LevelInfo, fmt.Sprintf("downloading artifact %s with tag %s from workflow %s/%s on run %d (%s)...", a.Name, a.Tag, projectKey, reqArgs.Workflow, reqArgs.Number, filePath))
if err := wk.client.WorkflowNodeRunArtifactDownload(projectKey, reqArgs.Workflow, *a, f); err != nil {
wk.SendLog(ctx, workerruntime.LevelInfo, fmt.Sprintf("Cannot download artifact %s: %s", a.Name, err))
isInError = true
return
}
if err := f.Close(); err != nil {
wk.SendLog(ctx, workerruntime.LevelError, fmt.Sprintf("Cannot download artifact %s: %s", a.Name, err))
isInError = true
return
}
}(a)

// there is one error, do not try to load all artifacts
if isInError {
break
}
if len(artifacts) > 1 {
time.Sleep(3 * time.Second)
}
}

wg.Wait()
if isInError {
newError := sdk.NewError(sdk.ErrUnknownError, fmt.Errorf("error while downloading artifacts - see previous logs"))
return newError
}
return nil
}