Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(engine): check md5 on artifact download + fix error printing #5846

Merged
merged 6 commits into from
Jun 21, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 6 additions & 21 deletions cli/cdsctl/workflow_artifact.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"context"
"fmt"
"io"
"os"
"regexp"
"strconv"
Expand Down Expand Up @@ -170,33 +169,19 @@ func workflowArtifactDownloadRun(v cli.Values) error {
}

if toDownload {
var err error
f, err = os.OpenFile(artifactData.Name, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(artifactData.Perm))
if err != nil {
return err
}
fmt.Printf("Downloading %s...\n", artifactData.Name)
r, err := client.CDNItemDownload(context.Background(), confCDN.HTTPURL, artifactData.CDNRefHash, sdk.CDNTypeItemRunResult)
f, err := os.OpenFile(artifactData.Name, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(artifactData.Perm))
if err != nil {
return err
return sdk.NewError(sdk.ErrUnknownError, fmt.Errorf("cannot create file (OpenFile) %s: %s", artifactData.Name, err))
}
if _, err := io.Copy(f, r); err != nil {
return cli.WrapError(err, "unable to write file")
if err := client.CDNItemDownload(context.Background(), confCDN.HTTPURL, artifactData.CDNRefHash, sdk.CDNTypeItemRunResult, artifactData.MD5, f); err != nil {
_ = f.Close()
return err
}
if err := f.Close(); err != nil {
return cli.WrapError(err, "unable to close file")
return sdk.NewErrorFrom(sdk.ErrUnknownError, "unable to close file %s: %v", artifactData.Name, err)
}
}

md5Sum, err := sdk.FileMd5sum(artifactData.Name)
if err != nil {
return err
}

if md5Sum != artifactData.MD5 {
return cli.NewError("Invalid md5Sum \ndownloaded file:%s\n%s:%s", md5Sum, f.Name(), artifactData.MD5)
}

if toDownload {
fmt.Printf("File %s created, checksum OK\n", f.Name())
} else {
Expand Down
22 changes: 4 additions & 18 deletions cli/cdsctl/workflow_run_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"context"
"fmt"
"io"
"os"
"regexp"

Expand Down Expand Up @@ -129,30 +128,17 @@ func workflowRunResultGet(v cli.Values) error {
var err error
f, err = os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(perm))
if err != nil {
return err
return sdk.NewError(sdk.ErrUnknownError, fmt.Errorf("cannot create file (OpenFile) %s: %s", fileName, err))
}
fmt.Printf("Downloading %s...\n", fileName)
r, err := client.CDNItemDownload(context.Background(), confCDN.HTTPURL, cdnHash, sdk.CDNTypeItemRunResult)
if err != nil {
if err := client.CDNItemDownload(context.Background(), confCDN.HTTPURL, cdnHash, sdk.CDNTypeItemRunResult, md5, f); err != nil {
_ = f.Close()
return err
}
if _, err := io.Copy(f, r); err != nil {
return cli.WrapError(err, "unable to write file")
}
if err := f.Close(); err != nil {
return cli.WrapError(err, "unable to close file")
return sdk.NewErrorFrom(sdk.ErrUnknownError, "unable to close file %s: %v", fileName, err)
}
}

md5Sum, err := sdk.FileMd5sum(fileName)
if err != nil {
return err
}

if md5Sum != md5 {
return cli.NewError("Invalid md5Sum \ndownloaded file:%s\n%s:%s", md5Sum, f.Name(), md5)
}

if toDownload {
fmt.Printf("File %s created, checksum OK\n", f.Name())
} else {
Expand Down
4 changes: 1 addition & 3 deletions engine/api/workflow_application.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (api *API) releaseApplicationWorkflowHandler() service.Handler {
var lastErr error
for {
attempt++
reader, err := api.Client.CDNItemDownload(ctx, cdnHTTP, r.CDNRefHash, sdk.CDNTypeItemRunResult)
reader, err := api.Client.CDNItemStream(ctx, cdnHTTP, r.CDNRefHash, sdk.CDNTypeItemRunResult)
if err != nil {
return err
}
Expand All @@ -189,9 +189,7 @@ func (api *API) releaseApplicationWorkflowHandler() service.Handler {
if lastErr != nil {
return err
}

}

return nil
}
}
9 changes: 7 additions & 2 deletions engine/cdn/cdn_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/hex"
"encoding/json"
"io"
"net/http"
"os"

"github.com/ovh/cds/engine/cdn/item"
Expand Down Expand Up @@ -42,7 +43,7 @@ func (s *Service) storeFile(ctx context.Context, sig cdn.Signature, reader io.Re
// Check Item unicity
_, err = item.LoadByAPIRefHashAndType(ctx, s.Mapper, s.mustDBWithCtx(ctx), hashRef, itemType)
if err == nil {
return sdk.WrapError(sdk.ErrConflictData, "cannot upload the same file twice")
return sdk.NewErrorFrom(sdk.ErrInvalidData, "cannot upload the same file twice")
}
if !sdk.ErrorIs(err, sdk.ErrNotFound) {
return err
Expand All @@ -67,7 +68,11 @@ func (s *Service) storeFile(ctx context.Context, sig cdn.Signature, reader io.Re
RunNodeID: runResultApiRef.RunNodeID,
RunJobID: runResultApiRef.RunJobID,
}
if err := s.Client.QueueWorkflowRunResultCheck(ctx, sig.JobID, runResultCheck); err != nil {
code, err := s.Client.QueueWorkflowRunResultCheck(ctx, sig.JobID, runResultCheck)
if err != nil {
if code == http.StatusConflict {
return sdk.NewErrorFrom(sdk.ErrInvalidData, "unable to upload the same file twice")
}
return err
}
}
Expand Down
5 changes: 3 additions & 2 deletions engine/cdn/cdn_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,9 @@ func TestSyncLog(t *testing.T) {

var cdsStorage *cds.CDS
for _, sto := range s.Units.Storages {
cdsStorage = sto.(*cds.CDS)
if cdsStorage != nil {
var is bool
cdsStorage, is = sto.(*cds.CDS)
if is {
break
}
}
Expand Down
25 changes: 9 additions & 16 deletions engine/worker/internal/action/builtin_artifact_download.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package action
import (
"context"
"fmt"
"io"
"os"
"path"
"path/filepath"
Expand Down Expand Up @@ -115,34 +114,28 @@ func RunArtifactDownload(ctx context.Context, wk workerruntime.Runtime, a sdk.Ac
go func(a sdk.CDNItem) {
defer wg.Done()
destFile := path.Join(destPath, a.APIRef.ToFilename())
f, err := wkDirFS.OpenFile(destFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(apiRef.Perm))
if err != nil {
res.Status = sdk.StatusFail
res.Reason = err.Error()
log.Warn(ctx, "Cannot download artifact (OpenFile) %s: %s", destFile, err)
wk.SendLog(ctx, workerruntime.LevelError, res.Reason)
return
}
wk.SendLog(ctx, workerruntime.LevelInfo, fmt.Sprintf("Downloading artifact %s from workflow %s/%s on run %d...", destFile, project, workflow, n))

r, err := wk.Client().CDNItemDownload(ctx, wk.CDNHttpURL(), item.APIRefHash, sdk.CDNTypeItemRunResult)
f, err := os.OpenFile(destFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(apiRef.Perm))
if err != nil {
res.Status = sdk.StatusFail
res.Reason = err.Error()
log.Warn(ctx, "Cannot download artifact %s: %s", destFile, err)
res.Reason = sdk.NewError(sdk.ErrUnknownError, fmt.Errorf("cannot create file (OpenFile) %s: %s", destFile, err)).Error()
log.Warn(ctx, "%s", res.Reason)
wk.SendLog(ctx, workerruntime.LevelError, res.Reason)
return
}
if _, err := io.Copy(f, r); err != nil {
if err := wk.Client().CDNItemDownload(ctx, wk.CDNHttpURL(), item.APIRefHash, sdk.CDNTypeItemRunResult, a.MD5, f); err != nil {
_ = f.Close()
res.Status = sdk.StatusFail
res.Reason = err.Error()
log.Warn(ctx, "cannot download artifact %s: %s", destFile, sdk.WithStack(err))
log.Warn(ctx, "Cannot download artifact %s: %s", destFile, err)
wk.SendLog(ctx, workerruntime.LevelError, res.Reason)
return
}
if err := f.Close(); err != nil {
res.Status = sdk.StatusFail
res.Reason = err.Error()
log.Warn(ctx, "Cannot close file %s: %s", destFile, err)
res.Reason = sdk.NewErrorFrom(sdk.ErrUnknownError, "unable to close file %s: %v", destFile, err).Error()
log.Warn(ctx, "%s", res.Reason)
wk.SendLog(ctx, workerruntime.LevelError, res.Reason)
return
}
Expand Down
15 changes: 8 additions & 7 deletions engine/worker/internal/action/builtin_artifact_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ func RunArtifactUpload(ctx context.Context, wk workerruntime.Runtime, a sdk.Acti
// 3. CDN activated or not
if integrationName != sdk.DefaultStorageIntegrationName {
if err := uploadArtifactByApiCall(path, wk, ctx, projectKey, integrationName, jobID, tag); err != nil {
log.Warn(ctx, "queueArtifactUpload(%s, %s, %d, %s, %s) failed: %v", projectKey, integrationName, jobID, tag.Value, path, err)
chanError <- sdk.WrapError(err, "Error while uploading artifact by api call %s", path)
wgErrors.Add(1)
}
Expand All @@ -107,14 +106,12 @@ func RunArtifactUpload(ctx context.Context, wk workerruntime.Runtime, a sdk.Acti
}
} else if !cdnArtifactEnabled {
if err := uploadArtifactByApiCall(path, wk, ctx, projectKey, integrationName, jobID, tag); err != nil {
log.Warn(ctx, "queueArtifactUpload(%s, %s, %d, %s, %s) failed: %v", projectKey, integrationName, jobID, tag.Value, path, err)
chanError <- sdk.WrapError(err, "Error while uploading artifact by api call %s", path)
wgErrors.Add(1)
}
return
} else {
if err := uploadArtifactIntoCDN(path, ctx, wk); err != nil {
log.Error(ctx, "unable to upload artifact into cdn %q: %v", path, err)
chanError <- err
wgErrors.Add(1)
}
Expand All @@ -133,7 +130,7 @@ func RunArtifactUpload(ctx context.Context, wk workerruntime.Runtime, a sdk.Acti
wgErrors.Wait()

if !globalError.IsEmpty() {
return res, sdk.NewError(sdk.ErrUnknownError, fmt.Errorf("error: fail to upload artifact"))
return res, fmt.Errorf("%s", globalError.Error())
}

return res, nil
Expand All @@ -143,7 +140,11 @@ func uploadArtifactByIntegrationPlugin(path string, ctx context.Context, wk work
_, fileName := filepath.Split(path)

// Check run result
if err := checkArtifactUpload(ctx, wk, fileName, sdk.WorkflowRunResultTypeArtifactManager); err != nil {
code, err := checkArtifactUpload(ctx, wk, fileName, sdk.WorkflowRunResultTypeArtifactManager)
if err != nil {
if code == 409 {
return fmt.Errorf("unable to upload the same file twice: %s", fileName)
}
return fmt.Errorf("unable to check artifact upload authorization: %v", err)
}

Expand Down Expand Up @@ -233,7 +234,7 @@ func uploadArtifactIntoCDN(path string, ctx context.Context, wk workerruntime.Ru

duration, err := wk.Client().CDNItemUpload(ctx, wk.CDNHttpURL(), signature, afero.NewOsFs(), path)
if err != nil {
return sdk.WrapError(err, "Error while uploading artifact %s", path)
return err
}
wk.SendLog(ctx, workerruntime.LevelInfo, fmt.Sprintf("File '%s' uploaded in %.2fs to CDS CDN", path, duration.Seconds()))
return nil
Expand All @@ -252,7 +253,7 @@ func uploadArtifactByApiCall(path string, wk workerruntime.Runtime, ctx context.
return nil
}

func checkArtifactUpload(ctx context.Context, wk workerruntime.Runtime, fileName string, runResultType sdk.WorkflowRunResultType) error {
func checkArtifactUpload(ctx context.Context, wk workerruntime.Runtime, fileName string, runResultType sdk.WorkflowRunResultType) (int, error) {
runID, runNodeID, runJobID := wk.GetJobIdentifiers()
runResultCheck := sdk.WorkflowRunResultCheck{
RunJobID: runJobID,
Expand Down
Loading