From e825a5c6a34f2493268aff3edf71235cef4c57c1 Mon Sep 17 00:00:00 2001 From: Guiheux Steven Date: Mon, 21 Jun 2021 09:43:58 +0200 Subject: [PATCH] fix(engine): check md5 on artifact download + fix error printing (#5846) --- cli/cdsctl/workflow_artifact.go | 27 +- cli/cdsctl/workflow_run_result.go | 22 +- engine/api/workflow_application.go | 4 +- engine/cdn/cdn_file.go | 9 +- engine/cdn/cdn_sync_test.go | 5 +- .../action/builtin_artifact_download.go | 25 +- .../action/builtin_artifact_upload.go | 15 +- engine/worker/internal/handler_cache.go | 289 ++++++++++-------- engine/worker/internal/handler_download.go | 35 +-- engine/worker/internal/handler_run_result.go | 7 +- engine/worker/internal/handler_upload.go | 3 - sdk/cdsclient/client_cdn.go | 62 +++- sdk/cdsclient/client_queue.go | 8 +- sdk/cdsclient/http.go | 123 +++++++- sdk/cdsclient/interface.go | 5 +- .../mock_cdsclient/interface_mock.go | 119 ++++++-- sdk/common.go | 2 +- 17 files changed, 488 insertions(+), 272 deletions(-) diff --git a/cli/cdsctl/workflow_artifact.go b/cli/cdsctl/workflow_artifact.go index a3da655b28..f123d54743 100644 --- a/cli/cdsctl/workflow_artifact.go +++ b/cli/cdsctl/workflow_artifact.go @@ -3,7 +3,6 @@ package main import ( "context" "fmt" - "io" "os" "regexp" "strconv" @@ -180,33 +179,19 @@ func workflowArtifactDownloadRun(v cli.Values) error { } if toDownload { - var err error - f, err = os.OpenFile(artifactData.Name, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(artifactData.Perm)) - if err != nil { - return err - } fmt.Printf("Downloading %s...\n", artifactData.Name) - r, err := client.CDNItemDownload(context.Background(), cdnURL, artifactData.CDNRefHash, sdk.CDNTypeItemRunResult) + f, err := os.OpenFile(artifactData.Name, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(artifactData.Perm)) if err != nil { - return err + return sdk.NewError(sdk.ErrUnknownError, fmt.Errorf("cannot create file (OpenFile) %s: %s", artifactData.Name, err)) } - if _, err := io.Copy(f, r); err != nil { - return cli.WrapError(err, "unable to write file") + if err := client.CDNItemDownload(context.Background(), cdnURL, artifactData.CDNRefHash, sdk.CDNTypeItemRunResult, artifactData.MD5, f); err != nil { + _ = f.Close() + return err } if err := f.Close(); err != nil { - return cli.WrapError(err, "unable to close file") + return sdk.NewErrorFrom(sdk.ErrUnknownError, "unable to close file %s: %v", artifactData.Name, err) } } - - md5Sum, err := sdk.FileMd5sum(artifactData.Name) - if err != nil { - return err - } - - if md5Sum != artifactData.MD5 { - return cli.NewError("Invalid md5Sum \ndownloaded file:%s\n%s:%s", md5Sum, f.Name(), artifactData.MD5) - } - if toDownload { fmt.Printf("File %s created, checksum OK\n", f.Name()) } else { diff --git a/cli/cdsctl/workflow_run_result.go b/cli/cdsctl/workflow_run_result.go index a1edc390d9..3025086c92 100644 --- a/cli/cdsctl/workflow_run_result.go +++ b/cli/cdsctl/workflow_run_result.go @@ -3,7 +3,6 @@ package main import ( "context" "fmt" - "io" "os" "regexp" @@ -129,30 +128,17 @@ func workflowRunResultGet(v cli.Values) error { var err error f, err = os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(perm)) if err != nil { - return err + return sdk.NewError(sdk.ErrUnknownError, fmt.Errorf("cannot create file (OpenFile) %s: %s", fileName, err)) } fmt.Printf("Downloading %s...\n", fileName) - r, err := client.CDNItemDownload(context.Background(), confCDN.HTTPURL, cdnHash, sdk.CDNTypeItemRunResult) - if err != nil { + if err := client.CDNItemDownload(context.Background(), confCDN.HTTPURL, cdnHash, sdk.CDNTypeItemRunResult, md5, f); err != nil { + _ = f.Close() return err } - if _, err := io.Copy(f, r); err != nil { - return cli.WrapError(err, "unable to write file") - } if err := f.Close(); err != nil { - return cli.WrapError(err, "unable to close file") + return sdk.NewErrorFrom(sdk.ErrUnknownError, "unable to close file %s: %v", fileName, err) } } - - md5Sum, err := sdk.FileMd5sum(fileName) - if err != nil { - return err - } - - if md5Sum != md5 { - return cli.NewError("Invalid md5Sum \ndownloaded file:%s\n%s:%s", md5Sum, f.Name(), md5) - } - if toDownload { fmt.Printf("File %s created, checksum OK\n", f.Name()) } else { diff --git a/engine/api/workflow_application.go b/engine/api/workflow_application.go index f258856f60..b277f31716 100644 --- a/engine/api/workflow_application.go +++ b/engine/api/workflow_application.go @@ -173,7 +173,7 @@ func (api *API) releaseApplicationWorkflowHandler() service.Handler { var lastErr error for { attempt++ - reader, err := api.Client.CDNItemDownload(ctx, cdnHTTP, r.CDNRefHash, sdk.CDNTypeItemRunResult) + reader, err := api.Client.CDNItemStream(ctx, cdnHTTP, r.CDNRefHash, sdk.CDNTypeItemRunResult) if err != nil { return err } @@ -189,9 +189,7 @@ func (api *API) releaseApplicationWorkflowHandler() service.Handler { if lastErr != nil { return err } - } - return nil } } diff --git a/engine/cdn/cdn_file.go b/engine/cdn/cdn_file.go index 204a1d5f2f..281ccdbb19 100644 --- a/engine/cdn/cdn_file.go +++ b/engine/cdn/cdn_file.go @@ -8,6 +8,7 @@ import ( "encoding/hex" "encoding/json" "io" + "net/http" "os" "github.com/ovh/cds/engine/cdn/item" @@ -42,7 +43,7 @@ func (s *Service) storeFile(ctx context.Context, sig cdn.Signature, reader io.Re // Check Item unicity _, err = item.LoadByAPIRefHashAndType(ctx, s.Mapper, s.mustDBWithCtx(ctx), hashRef, itemType) if err == nil { - return sdk.WrapError(sdk.ErrConflictData, "cannot upload the same file twice") + return sdk.NewErrorFrom(sdk.ErrInvalidData, "cannot upload the same file twice") } if !sdk.ErrorIs(err, sdk.ErrNotFound) { return err @@ -67,7 +68,11 @@ func (s *Service) storeFile(ctx context.Context, sig cdn.Signature, reader io.Re RunNodeID: runResultApiRef.RunNodeID, RunJobID: runResultApiRef.RunJobID, } - if err := s.Client.QueueWorkflowRunResultCheck(ctx, sig.JobID, runResultCheck); err != nil { + code, err := s.Client.QueueWorkflowRunResultCheck(ctx, sig.JobID, runResultCheck) + if err != nil { + if code == http.StatusConflict { + return sdk.NewErrorFrom(sdk.ErrInvalidData, "unable to upload the same file twice") + } return err } } diff --git a/engine/cdn/cdn_sync_test.go b/engine/cdn/cdn_sync_test.go index ce2bd9dcbe..1486810377 100644 --- a/engine/cdn/cdn_sync_test.go +++ b/engine/cdn/cdn_sync_test.go @@ -141,8 +141,9 @@ func TestSyncLog(t *testing.T) { var cdsStorage *cds.CDS for _, sto := range s.Units.Storages { - cdsStorage = sto.(*cds.CDS) - if cdsStorage != nil { + var is bool + cdsStorage, is = sto.(*cds.CDS) + if is { break } } diff --git a/engine/worker/internal/action/builtin_artifact_download.go b/engine/worker/internal/action/builtin_artifact_download.go index b670881677..27f13961e2 100644 --- a/engine/worker/internal/action/builtin_artifact_download.go +++ b/engine/worker/internal/action/builtin_artifact_download.go @@ -3,7 +3,6 @@ package action import ( "context" "fmt" - "io" "os" "path" "path/filepath" @@ -115,34 +114,28 @@ func RunArtifactDownload(ctx context.Context, wk workerruntime.Runtime, a sdk.Ac go func(a sdk.CDNItem) { defer wg.Done() destFile := path.Join(destPath, a.APIRef.ToFilename()) - f, err := wkDirFS.OpenFile(destFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(apiRef.Perm)) - if err != nil { - res.Status = sdk.StatusFail - res.Reason = err.Error() - log.Warn(ctx, "Cannot download artifact (OpenFile) %s: %s", destFile, err) - wk.SendLog(ctx, workerruntime.LevelError, res.Reason) - return - } wk.SendLog(ctx, workerruntime.LevelInfo, fmt.Sprintf("Downloading artifact %s from workflow %s/%s on run %d...", destFile, project, workflow, n)) - r, err := wk.Client().CDNItemDownload(ctx, wk.CDNHttpURL(), item.APIRefHash, sdk.CDNTypeItemRunResult) + f, err := os.OpenFile(destFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(apiRef.Perm)) if err != nil { res.Status = sdk.StatusFail - res.Reason = err.Error() - log.Warn(ctx, "Cannot download artifact %s: %s", destFile, err) + res.Reason = sdk.NewError(sdk.ErrUnknownError, fmt.Errorf("cannot create file (OpenFile) %s: %s", destFile, err)).Error() + log.Warn(ctx, "%s", res.Reason) wk.SendLog(ctx, workerruntime.LevelError, res.Reason) return } - if _, err := io.Copy(f, r); err != nil { + if err := wk.Client().CDNItemDownload(ctx, wk.CDNHttpURL(), item.APIRefHash, sdk.CDNTypeItemRunResult, a.MD5, f); err != nil { + _ = f.Close() res.Status = sdk.StatusFail res.Reason = err.Error() - log.Warn(ctx, "cannot download artifact %s: %s", destFile, sdk.WithStack(err)) + log.Warn(ctx, "Cannot download artifact %s: %s", destFile, err) wk.SendLog(ctx, workerruntime.LevelError, res.Reason) + return } if err := f.Close(); err != nil { res.Status = sdk.StatusFail - res.Reason = err.Error() - log.Warn(ctx, "Cannot close file %s: %s", destFile, err) + res.Reason = sdk.NewErrorFrom(sdk.ErrUnknownError, "unable to close file %s: %v", destFile, err).Error() + log.Warn(ctx, "%s", res.Reason) wk.SendLog(ctx, workerruntime.LevelError, res.Reason) return } diff --git a/engine/worker/internal/action/builtin_artifact_upload.go b/engine/worker/internal/action/builtin_artifact_upload.go index 4db401621b..f6e2e8f4ed 100644 --- a/engine/worker/internal/action/builtin_artifact_upload.go +++ b/engine/worker/internal/action/builtin_artifact_upload.go @@ -95,7 +95,6 @@ func RunArtifactUpload(ctx context.Context, wk workerruntime.Runtime, a sdk.Acti // 3. CDN activated or not if integrationName != sdk.DefaultStorageIntegrationName { if err := uploadArtifactByApiCall(path, wk, ctx, projectKey, integrationName, jobID, tag); err != nil { - log.Warn(ctx, "queueArtifactUpload(%s, %s, %d, %s, %s) failed: %v", projectKey, integrationName, jobID, tag.Value, path, err) chanError <- sdk.WrapError(err, "Error while uploading artifact by api call %s", path) wgErrors.Add(1) } @@ -107,14 +106,12 @@ func RunArtifactUpload(ctx context.Context, wk workerruntime.Runtime, a sdk.Acti } } else if !cdnArtifactEnabled { if err := uploadArtifactByApiCall(path, wk, ctx, projectKey, integrationName, jobID, tag); err != nil { - log.Warn(ctx, "queueArtifactUpload(%s, %s, %d, %s, %s) failed: %v", projectKey, integrationName, jobID, tag.Value, path, err) chanError <- sdk.WrapError(err, "Error while uploading artifact by api call %s", path) wgErrors.Add(1) } return } else { if err := uploadArtifactIntoCDN(path, ctx, wk); err != nil { - log.Error(ctx, "unable to upload artifact into cdn %q: %v", path, err) chanError <- err wgErrors.Add(1) } @@ -133,7 +130,7 @@ func RunArtifactUpload(ctx context.Context, wk workerruntime.Runtime, a sdk.Acti wgErrors.Wait() if !globalError.IsEmpty() { - return res, sdk.NewError(sdk.ErrUnknownError, fmt.Errorf("error: fail to upload artifact")) + return res, fmt.Errorf("%s", globalError.Error()) } return res, nil @@ -143,7 +140,11 @@ func uploadArtifactByIntegrationPlugin(path string, ctx context.Context, wk work _, fileName := filepath.Split(path) // Check run result - if err := checkArtifactUpload(ctx, wk, fileName, sdk.WorkflowRunResultTypeArtifactManager); err != nil { + code, err := checkArtifactUpload(ctx, wk, fileName, sdk.WorkflowRunResultTypeArtifactManager) + if err != nil { + if code == 409 { + return fmt.Errorf("unable to upload the same file twice: %s", fileName) + } return fmt.Errorf("unable to check artifact upload authorization: %v", err) } @@ -233,7 +234,7 @@ func uploadArtifactIntoCDN(path string, ctx context.Context, wk workerruntime.Ru duration, err := wk.Client().CDNItemUpload(ctx, wk.CDNHttpURL(), signature, afero.NewOsFs(), path) if err != nil { - return sdk.WrapError(err, "Error while uploading artifact %s", path) + return err } wk.SendLog(ctx, workerruntime.LevelInfo, fmt.Sprintf("File '%s' uploaded in %.2fs to CDS CDN", path, duration.Seconds())) return nil @@ -252,7 +253,7 @@ func uploadArtifactByApiCall(path string, wk workerruntime.Runtime, ctx context. return nil } -func checkArtifactUpload(ctx context.Context, wk workerruntime.Runtime, fileName string, runResultType sdk.WorkflowRunResultType) error { +func checkArtifactUpload(ctx context.Context, wk workerruntime.Runtime, fileName string, runResultType sdk.WorkflowRunResultType) (int, error) { runID, runNodeID, runJobID := wk.GetJobIdentifiers() runResultCheck := sdk.WorkflowRunResultCheck{ RunJobID: runJobID, diff --git a/engine/worker/internal/handler_cache.go b/engine/worker/internal/handler_cache.go index d1e819841d..e189f14273 100644 --- a/engine/worker/internal/handler_cache.go +++ b/engine/worker/internal/handler_cache.go @@ -160,10 +160,6 @@ func cachePushHandler(ctx context.Context, wk *CurrentWorker) http.HandlerFunc { } duration, err := wk.client.CDNItemUpload(ctx, wk.cdnHttpAddr, sig, wk.BaseDir(), tarF.Name()) if err != nil { - err := sdk.Error{ - Message: "worker cache push > Cannot upload cache: %v" + err.Error(), - Status: http.StatusInternalServerError, - } log.Error(ctx, "%v", err) writeError(w, r, err) return @@ -185,150 +181,195 @@ func cachePullHandler(ctx context.Context, wk *CurrentWorker) http.HandlerFunc { params := wk.currentJob.wJob.Parameters projectKey := sdk.ParameterValue(params, "cds.project") - var r io.Reader - if cdnArtifact { - // Get cache link - items, err := wk.client.QueueWorkerCacheLink(ctx, wk.currentJob.wJob.ID, vars["ref"]) - if err != nil { - err = sdk.Error{ - Message: "worker cache pull > Cannot get cache links: " + err.Error(), - Status: http.StatusNotFound, - } - log.Error(ctx, "%v", err) - writeError(w, req, err) - return + if !cdnArtifact { + getWorkerCacheFromAPI(w, req, wk, projectKey, vars, ctx, path) + return + } + + // Get cache link + items, err := wk.client.QueueWorkerCacheLink(ctx, wk.currentJob.wJob.ID, vars["ref"]) + if err != nil { + err = sdk.Error{ + Message: "worker cache pull > Cannot get cache links: " + err.Error(), + Status: http.StatusNotFound, } - if len(items.Items) != 1 { - err := sdk.Error{ - Message: "worker cache pull > No unique link found", - Status: http.StatusNotFound, - } - log.Error(ctx, "%v", err) - writeError(w, req, err) - return + log.Error(ctx, "%v", err) + writeError(w, req, err) + return + } + if len(items.Items) != 1 { + err := sdk.Error{ + Message: "worker cache pull > No unique link found", + Status: http.StatusNotFound, } - // Download cache - r, err = wk.client.CDNItemDownload(ctx, wk.cdnHttpAddr, items.Items[0].APIRefHash, sdk.CDNTypeItemWorkerCache) - if err != nil { - err = sdk.Error{ - Message: "Cannot pull cache: " + err.Error(), - Status: http.StatusNotFound, - } - log.Error(ctx, "%v", err) - writeError(w, req, err) - return + log.Error(ctx, "%v", err) + writeError(w, req, err) + return + } + // Download cache + wkDirFS := afero.NewOsFs() + if err := wkDirFS.MkdirAll(path, os.FileMode(0744)); err != nil { + newError := sdk.NewError(sdk.ErrInvalidData, fmt.Errorf("unable to create destination directory: %v", err)) + writeError(w, req, newError) + return + } + + dest := filepath.Join(path, "workercache.tar") + f, err := os.OpenFile(dest, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(0755)) + if err != nil { + err = sdk.NewError(sdk.ErrUnknownError, fmt.Errorf("cannot create file (OpenFile) %s: %s", dest, err)) + log.Error(ctx, "%v", err) + writeError(w, req, err) + return + } + if err := wk.client.CDNItemDownload(ctx, wk.cdnHttpAddr, items.Items[0].APIRefHash, sdk.CDNTypeItemWorkerCache, items.Items[0].MD5, f); err != nil { + _ = f.Close() + err = sdk.Error{ + Message: "worker cache pull > Cannot pull cache: " + err.Error(), + Status: http.StatusNotFound, } - } else { - integrationName := sdk.DefaultIfEmptyStorage(req.FormValue("integration")) - var err error - r, err = wk.client.WorkflowCachePull(projectKey, integrationName, vars["ref"]) - if err != nil { - err = sdk.Error{ - Message: "worker cache pull > Cannot pull cache: " + err.Error(), - Status: http.StatusNotFound, - } - log.Error(ctx, "%v", err) - writeError(w, req, err) - return + log.Error(ctx, "%v", err) + writeError(w, req, err) + return + } + if err := f.Close(); err != nil { + err = sdk.Error{ + Message: fmt.Sprintf("worker cache pull > unable to close file %s: %v", dest, err), + Status: http.StatusInternalServerError, } + log.Error(ctx, "%v", err) + writeError(w, req, err) + return } - log.Debug(ctx, "cachePullHandler> Start read cache tar") - - tr := tar.NewReader(r) - for { - header, errH := tr.Next() - if errH == io.EOF { - break + // Open tar file + log.Info(ctx, "extracting worker cache %s / %s", dest, vars["ref"]) + archive, err := wkDirFS.Open(dest) + if err != nil { + e := sdk.Error{ + Message: "worker cache pull > unable to open archive: " + err.Error(), + Status: http.StatusInternalServerError, } - - if errH != nil { - errH = sdk.Error{ - Message: "worker cache pull > Unable to read tar file: " + errH.Error(), - Status: http.StatusBadRequest, - } - log.Error(ctx, "%v", errH) - writeJSON(w, errH, http.StatusBadRequest) - return + log.Error(ctx, "%v", e) + writeError(w, req, e) + return + } + if err := extractArchive(ctx, archive, path); err != nil { + log.Error(ctx, "%v", err) + writeError(w, req, err) + return + } + if err := wkDirFS.Remove(dest); err != nil { + e := sdk.Error{ + Message: "unable to remove worker cache archive: " + err.Error(), + Status: http.StatusInternalServerError, } + log.Error(ctx, "%v", e) + writeError(w, req, e) + return + } + return + } +} + +func getWorkerCacheFromAPI(w http.ResponseWriter, req *http.Request, wk *CurrentWorker, projectKey string, vars map[string]string, ctx context.Context, path string) { + integrationName := sdk.DefaultIfEmptyStorage(req.FormValue("integration")) + var err error + reader, err := wk.client.WorkflowCachePull(projectKey, integrationName, vars["ref"]) + if err != nil { + err = sdk.Error{ + Message: "worker cache pull > Cannot pull cache: " + err.Error(), + Status: http.StatusNotFound, + } + log.Error(ctx, "%v", err) + writeError(w, req, err) + return + } + log.Debug(ctx, "cachePullHandler> Start read cache tar") - if header == nil { - continue + if err := extractArchive(ctx, reader, path); err != nil { + log.Error(ctx, "%s", err.Message) + writeJSON(w, err, err.Status) + return + } + return +} + +func extractArchive(ctx context.Context, r io.Reader, path string) *sdk.Error { + tr := tar.NewReader(r) + for { + header, err := tr.Next() + if err == io.EOF { + break + } + if err != nil { + return &sdk.Error{ + Message: "worker cache pull > Unable to read tar file: " + err.Error(), + Status: http.StatusBadRequest, } + } - log.Debug(ctx, "cachePullHandler> Tar contains file %s", header.Name) - - // the target location where the dir/file should be created - target := filepath.Join(path, header.Name) - - // check the file type - switch header.Typeflag { - // if its a dir and it doesn't exist create it - case tar.TypeDir: - if _, err := os.Stat(target); err != nil { - if err := os.MkdirAll(target, 0755); err != nil { - err = sdk.Error{ - Message: "worker cache pull > Unable to mkdir all files : " + err.Error(), - Status: http.StatusInternalServerError, - } - log.Error(ctx, "%v", err) - writeJSON(w, err, http.StatusInternalServerError) - return - } - } - case tar.TypeSymlink: - if err := os.Symlink(header.Linkname, target); err != nil { - err = sdk.Error{ - Message: "worker cache pull > Unable to create symlink: " + err.Error(), - Status: http.StatusInternalServerError, - } - log.Error(ctx, "%v", err) - writeJSON(w, err, http.StatusInternalServerError) - return - } + if header == nil { + continue + } - // if it's a file create it - case tar.TypeReg, tar.TypeLink: - // if directory of file does not exist, create it before - if _, err := os.Stat(filepath.Dir(target)); err != nil { - if err := os.MkdirAll(filepath.Dir(target), 0755); err != nil { - err = sdk.Error{ - Message: "worker cache pull > Unable to mkdir all files : " + err.Error(), - Status: http.StatusInternalServerError, - } - log.Error(ctx, "%v", err) - writeJSON(w, err, http.StatusInternalServerError) - return - } - } + log.Debug(ctx, "cachePullHandler> Tar contains file %s", header.Name) - log.Debug(ctx, "cachePullHandler> Create file at %s", target) + // the target location where the dir/file should be created + target := filepath.Join(path, header.Name) - f, err := os.OpenFile(target, os.O_CREATE|os.O_WRONLY, os.FileMode(header.Mode)) - if err != nil { - sdkErr := sdk.Error{ - Message: "worker cache pull > Unable to open file: " + err.Error(), + // check the file type + switch header.Typeflag { + // if its a dir and it doesn't exist create it + case tar.TypeDir: + if _, err := os.Stat(target); err != nil { + if err := os.MkdirAll(target, 0755); err != nil { + return &sdk.Error{ + Message: "worker cache pull > Unable to mkdir all files : " + err.Error(), Status: http.StatusInternalServerError, } - log.Error(ctx, "%v", err) - writeJSON(w, sdkErr, sdkErr.Status) - return } + } + case tar.TypeSymlink: + if err := os.Symlink(header.Linkname, target); err != nil { + return &sdk.Error{ + Message: "worker cache pull > Unable to create symlink: " + err.Error(), + Status: http.StatusInternalServerError, + } + } - // copy over contents - if _, err := io.Copy(f, tr); err != nil { - _ = f.Close() - sdkErr := sdk.Error{ - Message: "worker cache pull > Cannot copy content file: " + err.Error(), + // if it's a file create it + case tar.TypeReg, tar.TypeLink: + // if directory of file does not exist, create it before + if _, err := os.Stat(filepath.Dir(target)); err != nil { + if err := os.MkdirAll(filepath.Dir(target), 0755); err != nil { + return &sdk.Error{ + Message: "worker cache pull > Unable to mkdir all files : " + err.Error(), Status: http.StatusInternalServerError, } - log.Error(ctx, "%v", err) - writeJSON(w, sdkErr, sdkErr.Status) - return } + } + log.Debug(ctx, "cachePullHandler> Create file at %s", target) + + f, err := os.OpenFile(target, os.O_CREATE|os.O_WRONLY, os.FileMode(header.Mode)) + if err != nil { + return &sdk.Error{ + Message: "worker cache pull > Unable to open file: " + err.Error(), + Status: http.StatusInternalServerError, + } + } + + // copy over contents + if _, err := io.Copy(f, tr); err != nil { _ = f.Close() + return &sdk.Error{ + Message: "worker cache pull > Cannot copy content file: " + err.Error(), + Status: http.StatusInternalServerError, + } } + _ = f.Close() } } + return nil } diff --git a/engine/worker/internal/handler_download.go b/engine/worker/internal/handler_download.go index 95200daae4..b2e7124d17 100644 --- a/engine/worker/internal/handler_download.go +++ b/engine/worker/internal/handler_download.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "io" "io/ioutil" "net/http" "os" @@ -55,7 +54,7 @@ func downloadHandler(ctx context.Context, wk *CurrentWorker) http.HandlerFunc { buildNumberString := sdk.ParameterValue(wk.currentJob.params, "cds.run.number") reqArgs.Number, errN = strconv.ParseInt(buildNumberString, 10, 64) if errN != nil { - newError := sdk.NewError(sdk.ErrWrongRequest, fmt.Errorf("Cannot parse '%s' as run number: %s", buildNumberString, errN)) + newError := sdk.NewError(sdk.ErrWrongRequest, fmt.Errorf("cannot parse '%s' as run number: %s", buildNumberString, errN)) writeError(w, r, newError) return } @@ -146,27 +145,17 @@ func downloadHandler(ctx context.Context, wk *CurrentWorker) http.HandlerFunc { go func(a sdk.CDNItem) { defer wg.Done() destFile := path.Join(reqArgs.Destination, a.APIRef.ToFilename()) - f, err := wkDirFS.OpenFile(destFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(apiRef.Perm)) - if err != nil { - newError := sdk.NewError(sdk.ErrUnknownError, fmt.Errorf("cannot download artifact (OpenFile) %s: %s", destFile, err)) - writeError(w, r, newError) - return - } wk.SendLog(ctx, workerruntime.LevelInfo, fmt.Sprintf("Downloading artifact %s from workflow %s/%s on run %d...", destFile, projectKey, reqArgs.Workflow, reqArgs.Number)) - reader, err := wk.Client().CDNItemDownload(ctx, wk.CDNHttpURL(), item.APIRefHash, sdk.CDNTypeItemRunResult) + f, err := wkDirFS.OpenFile(destFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(apiRef.Perm)) if err != nil { - newError := sdk.NewError(sdk.ErrUnknownError, fmt.Errorf("cannot download artifact %s: %s", destFile, err)) + newError := sdk.NewError(sdk.ErrUnknownError, fmt.Errorf("cannot create file (OpenFile) %s: %s", destFile, err)) writeError(w, r, newError) return } - if _, err := io.Copy(f, reader); err != nil { - newError := sdk.NewError(sdk.ErrUnknownError, fmt.Errorf("cannot download artifact (Copy) %s: %s", destFile, err)) - writeError(w, r, newError) - return - } - if err := f.Close(); err != nil { - newError := sdk.NewError(sdk.ErrUnknownError, fmt.Errorf("cannot close file%s: %s", destFile, err)) + defer f.Close() //nolint + if err := wk.Client().CDNItemDownload(ctx, wk.CDNHttpURL(), item.APIRefHash, sdk.CDNTypeItemRunResult, a.MD5, f); err != nil { + newError := sdk.NewError(sdk.ErrUnknownError, fmt.Errorf("cannot download artifact %s: %s", destFile, err)) writeError(w, r, newError) return } @@ -185,7 +174,7 @@ func GetArtifactFromAPI(ctx context.Context, wk *CurrentWorker, projectKey strin return newError } - regexp, err := regexp.Compile(reqArgs.Pattern) + reg, err := regexp.Compile(reqArgs.Pattern) if err != nil { newError := sdk.NewError(sdk.ErrWrongRequest, fmt.Errorf("invalid pattern %s : %v", reqArgs.Pattern, err)) return newError @@ -199,7 +188,7 @@ func GetArtifactFromAPI(ctx context.Context, wk *CurrentWorker, projectKey strin for i := range artifacts { a := &artifacts[i] - if reqArgs.Pattern != "" && !regexp.MatchString(a.Name) { + if reqArgs.Pattern != "" && !reg.MatchString(a.Name) { wg.Done() continue } @@ -212,14 +201,14 @@ func GetArtifactFromAPI(ctx context.Context, wk *CurrentWorker, projectKey strin go func(a *sdk.WorkflowNodeRunArtifact) { defer wg.Done() - path := path.Join(reqArgs.Destination, a.Name) - f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(a.Perm)) + filePath := path.Join(reqArgs.Destination, a.Name) + f, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(a.Perm)) if err != nil { wk.SendLog(ctx, workerruntime.LevelError, fmt.Sprintf("Cannot download artifact (OpenFile) %s: %s", a.Name, err)) isInError = true return } - wk.SendLog(ctx, workerruntime.LevelInfo, fmt.Sprintf("downloading artifact %s with tag %s from workflow %s/%s on run %d (%s)...", a.Name, a.Tag, projectKey, reqArgs.Workflow, reqArgs.Number, path)) + wk.SendLog(ctx, workerruntime.LevelInfo, fmt.Sprintf("downloading artifact %s with tag %s from workflow %s/%s on run %d (%s)...", a.Name, a.Tag, projectKey, reqArgs.Workflow, reqArgs.Number, filePath)) if err := wk.client.WorkflowNodeRunArtifactDownload(projectKey, reqArgs.Workflow, *a, f); err != nil { wk.SendLog(ctx, workerruntime.LevelInfo, fmt.Sprintf("Cannot download artifact %s: %s", a.Name, err)) isInError = true @@ -243,7 +232,7 @@ func GetArtifactFromAPI(ctx context.Context, wk *CurrentWorker, projectKey strin wg.Wait() if isInError { - newError := sdk.NewError(sdk.ErrUnknownError, fmt.Errorf("Error while downloading artifacts - see previous logs")) + newError := sdk.NewError(sdk.ErrUnknownError, fmt.Errorf("error while downloading artifacts - see previous logs")) return newError } return nil diff --git a/engine/worker/internal/handler_run_result.go b/engine/worker/internal/handler_run_result.go index 979d153eb7..7a5ec9bec3 100644 --- a/engine/worker/internal/handler_run_result.go +++ b/engine/worker/internal/handler_run_result.go @@ -65,7 +65,12 @@ func addRunResulthandler(ctx context.Context, wk *CurrentWorker) http.HandlerFun Name: reqArgs.Name, ResultType: sdk.WorkflowRunResultTypeArtifactManager, } - if err := wk.Client().QueueWorkflowRunResultCheck(ctx, runJobID, runResultCheck); err != nil { + code, err := wk.Client().QueueWorkflowRunResultCheck(ctx, runJobID, runResultCheck) + if err != nil { + if code == 409 { + writeError(w, r, sdk.NewErrorFrom(sdk.ErrInvalidData, "unable to upload the same file twice")) + return + } writeError(w, r, sdk.WrapError(err, "unable to check run result %s", reqArgs.Name)) return diff --git a/engine/worker/internal/handler_upload.go b/engine/worker/internal/handler_upload.go index 520cb410f1..e95df05a51 100644 --- a/engine/worker/internal/handler_upload.go +++ b/engine/worker/internal/handler_upload.go @@ -61,7 +61,6 @@ func uploadHandler(ctx context.Context, wk *CurrentWorker) http.HandlerFunc { workingDir, err := workerruntime.WorkingDirectory(wk.currentJob.context) if err != nil { - wk.SendLog(ctx, workerruntime.LevelError, fmt.Sprintf("Artifact upload failed: %v", err)) log.Error(ctx, "Artifact upload failed: No working directory: %v", err) writeError(w, r, err) return @@ -70,13 +69,11 @@ func uploadHandler(ctx context.Context, wk *CurrentWorker) http.HandlerFunc { result, err := action.RunArtifactUpload(ctx, wk, a, wk.currentJob.secrets) if err != nil { - wk.SendLog(ctx, workerruntime.LevelError, fmt.Sprintf("Artifact upload failed: %v", err)) log.Error(ctx, "unable to upload artifacts: %v", err) writeError(w, r, err) return } if result.Status != sdk.StatusSuccess { - wk.SendLog(ctx, workerruntime.LevelError, fmt.Sprintf("Artifact upload failed: %s", result.Reason)) log.Error(ctx, "Artifact upload failed: %v", result) writeError(w, r, fmt.Errorf("artifact upload failed: %s", result.Reason)) return diff --git a/sdk/cdsclient/client_cdn.go b/sdk/cdsclient/client_cdn.go index 909324432b..ac85ea78e6 100644 --- a/sdk/cdsclient/client_cdn.go +++ b/sdk/cdsclient/client_cdn.go @@ -2,6 +2,8 @@ package cdsclient import ( "context" + "crypto/md5" + "encoding/hex" "encoding/json" "fmt" "io" @@ -9,12 +11,55 @@ import ( "net/http" "time" + "github.com/rockbears/log" "github.com/spf13/afero" "github.com/ovh/cds/sdk" ) -func (c *client) CDNItemDownload(ctx context.Context, cdnAddr string, hash string, itemType sdk.CDNItemType) (io.Reader, error) { +func (c *client) CDNItemDownload(ctx context.Context, cdnAddr string, hash string, itemType sdk.CDNItemType, md5Sum string, writer io.WriteSeeker) error { + currentRetry := 0 + var lastError error + for i := 0; i <= c.config.Retry; i++ { + currentRetry++ + if _, err := writer.Seek(0, io.SeekStart); err != nil { + return sdk.NewErrorFrom(sdk.ErrUnknownError, "unable to reset writer: %v", err) + } + + reader, _, code, err := c.StreamNoRetry(ctx, c.HTTPNoTimeoutClient(), http.MethodGet, fmt.Sprintf("%s/item/%s/%s/download", cdnAddr, itemType, hash), nil, func(req *http.Request) { + auth := "Bearer " + c.config.SessionToken + req.Header.Add("Authorization", auth) + }) + if code >= 500 { + lastError = err + continue + } + + if err != nil { + return err + } + + md5Hash := md5.New() + multiWriter := io.MultiWriter(md5Hash, writer) + + if _, err := io.Copy(multiWriter, reader); err != nil { + lastError = fmt.Errorf("unable to read cdn response: %v", err) + log.Error(ctx, "%v", lastError) + continue + } + + md5S := hex.EncodeToString(md5Hash.Sum(nil)) + if md5S != md5Sum { + lastError = fmt.Errorf("ms5 doesn't match: Want %s Got %s", md5Sum, md5S) + log.Error(ctx, "%v", lastError) + continue + } + return nil + } + return fmt.Errorf("unable to get data after %d retries: %v", currentRetry, lastError) +} + +func (c *client) CDNItemStream(ctx context.Context, cdnAddr string, hash string, itemType sdk.CDNItemType) (io.Reader, error) { reader, _, code, err := c.Stream(ctx, c.HTTPNoTimeoutClient(), http.MethodGet, fmt.Sprintf("%s/item/%s/%s/download", cdnAddr, itemType, hash), nil, func(req *http.Request) { auth := "Bearer " + c.config.SessionToken req.Header.Add("Authorization", auth) @@ -49,11 +94,22 @@ func (c *client) CDNItemUpload(ctx context.Context, cdnAddr string, signature st if err != nil { return time.Since(t0), err } - if _, _, _, err := c.Stream(ctx, c.HTTPNoTimeoutClient(), http.MethodPost, fmt.Sprintf("%s/item/upload", cdnAddr), f, SetHeader("X-CDS-WORKER-SIGNATURE", signature)); err != nil { - savedError = newAPIError(fmt.Errorf("unable to upload file, try %d: %v", i+1, err)) + body, _, code, err := c.Stream(ctx, c.HTTPNoTimeoutClient(), http.MethodPost, fmt.Sprintf("%s/item/upload", cdnAddr), f, SetHeader("X-CDS-WORKER-SIGNATURE", signature)) + if err != nil { + savedError = err time.Sleep(1 * time.Second) continue } + if code >= 400 { + bts, err := ioutil.ReadAll(body) + if err != nil { + return time.Since(t0), err + } + if err := sdk.DecodeError(bts); err != nil { + return time.Since(t0), err + } + return time.Since(t0), newAPIError(fmt.Errorf("HTTP %d", code)) + } savedError = nil break } diff --git a/sdk/cdsclient/client_queue.go b/sdk/cdsclient/client_queue.go index 24ca2d6c0c..c2bace8154 100644 --- a/sdk/cdsclient/client_queue.go +++ b/sdk/cdsclient/client_queue.go @@ -233,12 +233,10 @@ func (c *client) QueueWorkflowRunResultsAdd(ctx context.Context, jobID int64, ad return nil } -func (c *client) QueueWorkflowRunResultCheck(ctx context.Context, jobID int64, runResultCheck sdk.WorkflowRunResultCheck) error { +func (c *client) QueueWorkflowRunResultCheck(ctx context.Context, jobID int64, runResultCheck sdk.WorkflowRunResultCheck) (int, error) { uri := fmt.Sprintf("/queue/workflows/%d/run/results/check", jobID) - if _, err := c.PostJSON(ctx, uri, runResultCheck, nil); err != nil { - return err - } - return nil + code, err := c.PostJSON(ctx, uri, runResultCheck, nil) + return code, err } // QueueJobRelease release a job for a worker diff --git a/sdk/cdsclient/http.go b/sdk/cdsclient/http.go index d93d35382e..1d29ebc6fa 100644 --- a/sdk/cdsclient/http.go +++ b/sdk/cdsclient/http.go @@ -112,13 +112,6 @@ func (c *client) RequestJSON(ctx context.Context, method, path string, in interf return nil, nil, code, err } - if code >= 400 { - if err := sdk.DecodeError(res); err != nil { - return res, nil, code, err - } - return res, nil, code, newAPIError(fmt.Errorf("HTTP %d", code)) - } - if code == 204 { return res, header, code, nil } @@ -136,7 +129,7 @@ func (c *client) RequestJSON(ctx context.Context, method, path string, in interf func (c *client) Request(ctx context.Context, method string, path string, body io.Reader, mods ...RequestModifier) ([]byte, http.Header, int, error) { respBody, respHeader, code, err := c.Stream(ctx, c.httpClient, method, path, body, mods...) if err != nil { - return nil, nil, 0, err + return nil, nil, code, err } defer func() { // Drain and close the body to let the Transport reuse the connection @@ -158,7 +151,7 @@ func (c *client) Request(ctx context.Context, method string, path string, body i if code >= 400 { if err := sdk.DecodeError(bodyBtes); err != nil { - return bodyBtes, nil, code, newAPIError(err) + return bodyBtes, nil, code, err } return bodyBtes, nil, code, newAPIError(fmt.Errorf("HTTP %d", code)) } @@ -174,11 +167,113 @@ func extractBodyErrorFromResponse(r *http.Response) error { body, _ := ioutil.ReadAll(r.Body) r.Body.Close() // nolint if err := sdk.DecodeError(body); err != nil { - return newAPIError(err) + return err } return newAPIError(fmt.Errorf("HTTP %d", r.StatusCode)) } +func (c *client) StreamNoRetry(ctx context.Context, httpClient HTTPClient, method string, path string, body io.Reader, mods ...RequestModifier) (io.ReadCloser, http.Header, int, error) { + // Checks that current session_token is still valid + // If not, challenge a new one against the authenticationToken + var checkToken = !strings.Contains(path, "/auth/consumer/builtin/signin") && + !strings.Contains(path, "/auth/consumer/local/signin") && + !strings.Contains(path, "/auth/consumer/local/signup") && + !strings.Contains(path, "/auth/consumer/local/verify") && + !strings.Contains(path, "/auth/consumer/worker/signin") + + if checkToken && !c.config.HasValidSessionToken() && c.config.BuitinConsumerAuthenticationToken != "" { + if c.config.Verbose { + log.Printf("session token invalid: (%s). Relogin...\n", c.config.SessionToken) + } + resp, err := c.AuthConsumerSignin(sdk.ConsumerBuiltin, sdk.AuthConsumerSigninRequest{"token": c.config.BuitinConsumerAuthenticationToken}) + if err != nil { + return nil, nil, -1, err + } + if c.config.Verbose { + log.Println("jwt: ", sdk.StringFirstN(resp.Token, 12)) + } + c.config.SessionToken = resp.Token + } + labels := pprof.Labels("path", path, "method", method) + ctx = pprof.WithLabels(ctx, labels) + pprof.SetGoroutineLabels(ctx) + + var url string + if strings.HasPrefix(path, "http") { + url = path + } else { + url = c.config.Host + path + } + + req, err := http.NewRequest(method, url, body) + if err != nil { + return nil, nil, 0, sdk.NewErrorFrom(sdk.ErrUnknownError, "unable to create request: %v", err) + } + req = req.WithContext(ctx) + date := sdk.FormatDateRFC5322(time.Now()) + req.Header.Set("Date", date) + req.Header.Set("X-CDS-RemoteTime", date) + + if c.config.Verbose { + log.Printf("Stream > context> %s\n", telemetry.DumpContext(ctx)) + } + spanCtx, ok := telemetry.ContextToSpanContext(ctx) + if ok { + telemetry.DefaultFormat.SpanContextToRequest(spanCtx, req) + } + + for i := range mods { + if mods[i] != nil { + mods[i](req) + } + } + + if req.Header.Get("Content-Type") == "" { + req.Header.Set("Content-Type", "application/json") + } + + req.Header.Set("Connection", "close") + + //No auth on signing routes or on url that is not cds configured in config.Host + if strings.HasPrefix(url, c.config.Host) && !signinRouteRegexp.MatchString(path) { + if _, _, err := new(jwt.Parser).ParseUnverified(c.config.SessionToken, &sdk.AuthSessionJWTClaims{}); err == nil { + if c.config.Verbose { + log.Println("JWT recognized") + } + auth := "Bearer " + c.config.SessionToken + req.Header.Add("Authorization", auth) + } + } + + if c.config.Verbose { + log.Println(cli.Green("********REQUEST**********")) + dmp, _ := httputil.DumpRequestOut(req, true) + log.Printf("%s", string(dmp)) + log.Println(cli.Green("**************************")) + } + + resp, err := httpClient.Do(req) + if err != nil { + return nil, nil, 500, sdk.NewErrorFrom(sdk.ErrUnknownError, "unable to execute request: %v", err) + } + if c.config.Verbose { + log.Println(cli.Yellow("********RESPONSE**********")) + dmp, _ := httputil.DumpResponse(resp, true) + log.Printf("%s", string(dmp)) + log.Println(cli.Yellow("**************************")) + } + + if resp.StatusCode == 401 { + c.config.SessionToken = "" + } + + if resp.StatusCode >= 400 { + err := extractBodyErrorFromResponse(resp) + return nil, nil, resp.StatusCode, err + } + return resp.Body, resp.Header, resp.StatusCode, nil +} + // Stream makes an authenticated http request and return io.ReadCloser func (c *client) Stream(ctx context.Context, httpClient HTTPClient, method string, path string, body io.Reader, mods ...RequestModifier) (io.ReadCloser, http.Header, int, error) { // Checks that current session_token is still valid @@ -225,6 +320,7 @@ func (c *client) Stream(ctx context.Context, httpClient HTTPClient, method strin } var savederror error + var savedCodeError int for i := 0; i <= c.config.Retry; i++ { var req *http.Request var requestError error @@ -290,6 +386,8 @@ func (c *client) Stream(ctx context.Context, httpClient HTTPClient, method strin continue } + savedCodeError = resp.StatusCode + if c.config.Verbose { log.Println(cli.Yellow("********RESPONSE**********")) dmp, _ := httputil.DumpResponse(resp, true) @@ -309,7 +407,10 @@ func (c *client) Stream(ctx context.Context, httpClient HTTPClient, method strin return resp.Body, resp.Header, resp.StatusCode, nil } - return nil, nil, 0, newError(fmt.Errorf("request failed after %d retries: %v", c.config.Retry, savederror)) + if savedCodeError == 409 { + return nil, nil, savedCodeError, savederror + } + return nil, nil, savedCodeError, newError(fmt.Errorf("request failed after %d retries: %v", c.config.Retry, savederror)) } // UploadMultiPart upload multipart diff --git a/sdk/cdsclient/interface.go b/sdk/cdsclient/interface.go index 96eb3756dc..90a0c6d807 100644 --- a/sdk/cdsclient/interface.go +++ b/sdk/cdsclient/interface.go @@ -270,7 +270,7 @@ type QueueClient interface { QueueJobSetVersion(ctx context.Context, jobID int64, version sdk.WorkflowRunVersion) error QueueWorkerCacheLink(ctx context.Context, jobID int64, tag string) (sdk.CDNItemLinks, error) QueueWorkflowRunResultsAdd(ctx context.Context, jobID int64, addRequest sdk.WorkflowRunResult) error - QueueWorkflowRunResultCheck(ctx context.Context, jobID int64, runResultCheck sdk.WorkflowRunResultCheck) error + QueueWorkflowRunResultCheck(ctx context.Context, jobID int64, runResultCheck sdk.WorkflowRunResultCheck) (int, error) } // UserClient exposes users functions @@ -305,7 +305,8 @@ type WorkerClient interface { type CDNClient interface { CDNItemUpload(ctx context.Context, cdnAddr string, signature string, fs afero.Fs, path string) (time.Duration, error) - CDNItemDownload(ctx context.Context, cdnAddr string, hash string, itemType sdk.CDNItemType) (io.Reader, error) + CDNItemDownload(ctx context.Context, cdnAddr string, hash string, itemType sdk.CDNItemType, md5 string, writer io.WriteSeeker) error + CDNItemStream(ctx context.Context, cdnAddr string, hash string, itemType sdk.CDNItemType) (io.Reader, error) } // HookClient exposes functions used for hooks services diff --git a/sdk/cdsclient/mock_cdsclient/interface_mock.go b/sdk/cdsclient/mock_cdsclient/interface_mock.go index 9fb2942ece..4f4cdd29b7 100644 --- a/sdk/cdsclient/mock_cdsclient/interface_mock.go +++ b/sdk/cdsclient/mock_cdsclient/interface_mock.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: sdk/cdsclient/interface.go +// Source: interface.go // Package mock_cdsclient is a generated GoMock package. package mock_cdsclient @@ -3306,11 +3306,12 @@ func (mr *MockQueueClientMockRecorder) QueueWorkflowNodeJobRun(status ...interfa } // QueueWorkflowRunResultCheck mocks base method. -func (m *MockQueueClient) QueueWorkflowRunResultCheck(ctx context.Context, jobID int64, runResultCheck sdk.WorkflowRunResultCheck) error { +func (m *MockQueueClient) QueueWorkflowRunResultCheck(ctx context.Context, jobID int64, runResultCheck sdk.WorkflowRunResultCheck) (int, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "QueueWorkflowRunResultCheck", ctx, jobID, runResultCheck) - ret0, _ := ret[0].(error) - return ret0 + ret0, _ := ret[0].(int) + ret1, _ := ret[1].(error) + return ret0, ret1 } // QueueWorkflowRunResultCheck indicates an expected call of QueueWorkflowRunResultCheck. @@ -3470,18 +3471,32 @@ func (m *MockWorkerClient) EXPECT() *MockWorkerClientMockRecorder { } // CDNItemDownload mocks base method. -func (m *MockWorkerClient) CDNItemDownload(ctx context.Context, cdnAddr, hash string, itemType sdk.CDNItemType) (io.Reader, error) { +func (m *MockWorkerClient) CDNItemDownload(ctx context.Context, cdnAddr, hash string, itemType sdk.CDNItemType, md5 string, writer io.WriteSeeker) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CDNItemDownload", ctx, cdnAddr, hash, itemType, md5, writer) + ret0, _ := ret[0].(error) + return ret0 +} + +// CDNItemDownload indicates an expected call of CDNItemDownload. +func (mr *MockWorkerClientMockRecorder) CDNItemDownload(ctx, cdnAddr, hash, itemType, md5, writer interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CDNItemDownload", reflect.TypeOf((*MockWorkerClient)(nil).CDNItemDownload), ctx, cdnAddr, hash, itemType, md5, writer) +} + +// CDNItemStream mocks base method. +func (m *MockWorkerClient) CDNItemStream(ctx context.Context, cdnAddr, hash string, itemType sdk.CDNItemType) (io.Reader, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CDNItemDownload", ctx, cdnAddr, hash, itemType) + ret := m.ctrl.Call(m, "CDNItemStream", ctx, cdnAddr, hash, itemType) ret0, _ := ret[0].(io.Reader) ret1, _ := ret[1].(error) return ret0, ret1 } -// CDNItemDownload indicates an expected call of CDNItemDownload. -func (mr *MockWorkerClientMockRecorder) CDNItemDownload(ctx, cdnAddr, hash, itemType interface{}) *gomock.Call { +// CDNItemStream indicates an expected call of CDNItemStream. +func (mr *MockWorkerClientMockRecorder) CDNItemStream(ctx, cdnAddr, hash, itemType interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CDNItemDownload", reflect.TypeOf((*MockWorkerClient)(nil).CDNItemDownload), ctx, cdnAddr, hash, itemType) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CDNItemStream", reflect.TypeOf((*MockWorkerClient)(nil).CDNItemStream), ctx, cdnAddr, hash, itemType) } // CDNItemUpload mocks base method. @@ -3747,18 +3762,32 @@ func (m *MockCDNClient) EXPECT() *MockCDNClientMockRecorder { } // CDNItemDownload mocks base method. -func (m *MockCDNClient) CDNItemDownload(ctx context.Context, cdnAddr, hash string, itemType sdk.CDNItemType) (io.Reader, error) { +func (m *MockCDNClient) CDNItemDownload(ctx context.Context, cdnAddr, hash string, itemType sdk.CDNItemType, md5 string, writer io.WriteSeeker) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CDNItemDownload", ctx, cdnAddr, hash, itemType, md5, writer) + ret0, _ := ret[0].(error) + return ret0 +} + +// CDNItemDownload indicates an expected call of CDNItemDownload. +func (mr *MockCDNClientMockRecorder) CDNItemDownload(ctx, cdnAddr, hash, itemType, md5, writer interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CDNItemDownload", reflect.TypeOf((*MockCDNClient)(nil).CDNItemDownload), ctx, cdnAddr, hash, itemType, md5, writer) +} + +// CDNItemStream mocks base method. +func (m *MockCDNClient) CDNItemStream(ctx context.Context, cdnAddr, hash string, itemType sdk.CDNItemType) (io.Reader, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CDNItemDownload", ctx, cdnAddr, hash, itemType) + ret := m.ctrl.Call(m, "CDNItemStream", ctx, cdnAddr, hash, itemType) ret0, _ := ret[0].(io.Reader) ret1, _ := ret[1].(error) return ret0, ret1 } -// CDNItemDownload indicates an expected call of CDNItemDownload. -func (mr *MockCDNClientMockRecorder) CDNItemDownload(ctx, cdnAddr, hash, itemType interface{}) *gomock.Call { +// CDNItemStream indicates an expected call of CDNItemStream. +func (mr *MockCDNClientMockRecorder) CDNItemStream(ctx, cdnAddr, hash, itemType interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CDNItemDownload", reflect.TypeOf((*MockCDNClient)(nil).CDNItemDownload), ctx, cdnAddr, hash, itemType) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CDNItemStream", reflect.TypeOf((*MockCDNClient)(nil).CDNItemStream), ctx, cdnAddr, hash, itemType) } // CDNItemUpload mocks base method. @@ -5522,18 +5551,32 @@ func (mr *MockInterfaceMockRecorder) Broadcasts() *gomock.Call { } // CDNItemDownload mocks base method. -func (m *MockInterface) CDNItemDownload(ctx context.Context, cdnAddr, hash string, itemType sdk.CDNItemType) (io.Reader, error) { +func (m *MockInterface) CDNItemDownload(ctx context.Context, cdnAddr, hash string, itemType sdk.CDNItemType, md5 string, writer io.WriteSeeker) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CDNItemDownload", ctx, cdnAddr, hash, itemType) + ret := m.ctrl.Call(m, "CDNItemDownload", ctx, cdnAddr, hash, itemType, md5, writer) + ret0, _ := ret[0].(error) + return ret0 +} + +// CDNItemDownload indicates an expected call of CDNItemDownload. +func (mr *MockInterfaceMockRecorder) CDNItemDownload(ctx, cdnAddr, hash, itemType, md5, writer interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CDNItemDownload", reflect.TypeOf((*MockInterface)(nil).CDNItemDownload), ctx, cdnAddr, hash, itemType, md5, writer) +} + +// CDNItemStream mocks base method. +func (m *MockInterface) CDNItemStream(ctx context.Context, cdnAddr, hash string, itemType sdk.CDNItemType) (io.Reader, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CDNItemStream", ctx, cdnAddr, hash, itemType) ret0, _ := ret[0].(io.Reader) ret1, _ := ret[1].(error) return ret0, ret1 } -// CDNItemDownload indicates an expected call of CDNItemDownload. -func (mr *MockInterfaceMockRecorder) CDNItemDownload(ctx, cdnAddr, hash, itemType interface{}) *gomock.Call { +// CDNItemStream indicates an expected call of CDNItemStream. +func (mr *MockInterfaceMockRecorder) CDNItemStream(ctx, cdnAddr, hash, itemType interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CDNItemDownload", reflect.TypeOf((*MockInterface)(nil).CDNItemDownload), ctx, cdnAddr, hash, itemType) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CDNItemStream", reflect.TypeOf((*MockInterface)(nil).CDNItemStream), ctx, cdnAddr, hash, itemType) } // CDNItemUpload mocks base method. @@ -7230,11 +7273,12 @@ func (mr *MockInterfaceMockRecorder) QueueWorkflowNodeJobRun(status ...interface } // QueueWorkflowRunResultCheck mocks base method. -func (m *MockInterface) QueueWorkflowRunResultCheck(ctx context.Context, jobID int64, runResultCheck sdk.WorkflowRunResultCheck) error { +func (m *MockInterface) QueueWorkflowRunResultCheck(ctx context.Context, jobID int64, runResultCheck sdk.WorkflowRunResultCheck) (int, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "QueueWorkflowRunResultCheck", ctx, jobID, runResultCheck) - ret0, _ := ret[0].(error) - return ret0 + ret0, _ := ret[0].(int) + ret1, _ := ret[1].(error) + return ret0, ret1 } // QueueWorkflowRunResultCheck indicates an expected call of QueueWorkflowRunResultCheck. @@ -8809,18 +8853,32 @@ func (m *MockWorkerInterface) EXPECT() *MockWorkerInterfaceMockRecorder { } // CDNItemDownload mocks base method. -func (m *MockWorkerInterface) CDNItemDownload(ctx context.Context, cdnAddr, hash string, itemType sdk.CDNItemType) (io.Reader, error) { +func (m *MockWorkerInterface) CDNItemDownload(ctx context.Context, cdnAddr, hash string, itemType sdk.CDNItemType, md5 string, writer io.WriteSeeker) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CDNItemDownload", ctx, cdnAddr, hash, itemType) + ret := m.ctrl.Call(m, "CDNItemDownload", ctx, cdnAddr, hash, itemType, md5, writer) + ret0, _ := ret[0].(error) + return ret0 +} + +// CDNItemDownload indicates an expected call of CDNItemDownload. +func (mr *MockWorkerInterfaceMockRecorder) CDNItemDownload(ctx, cdnAddr, hash, itemType, md5, writer interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CDNItemDownload", reflect.TypeOf((*MockWorkerInterface)(nil).CDNItemDownload), ctx, cdnAddr, hash, itemType, md5, writer) +} + +// CDNItemStream mocks base method. +func (m *MockWorkerInterface) CDNItemStream(ctx context.Context, cdnAddr, hash string, itemType sdk.CDNItemType) (io.Reader, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CDNItemStream", ctx, cdnAddr, hash, itemType) ret0, _ := ret[0].(io.Reader) ret1, _ := ret[1].(error) return ret0, ret1 } -// CDNItemDownload indicates an expected call of CDNItemDownload. -func (mr *MockWorkerInterfaceMockRecorder) CDNItemDownload(ctx, cdnAddr, hash, itemType interface{}) *gomock.Call { +// CDNItemStream indicates an expected call of CDNItemStream. +func (mr *MockWorkerInterfaceMockRecorder) CDNItemStream(ctx, cdnAddr, hash, itemType interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CDNItemDownload", reflect.TypeOf((*MockWorkerInterface)(nil).CDNItemDownload), ctx, cdnAddr, hash, itemType) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CDNItemStream", reflect.TypeOf((*MockWorkerInterface)(nil).CDNItemStream), ctx, cdnAddr, hash, itemType) } // CDNItemUpload mocks base method. @@ -9278,11 +9336,12 @@ func (mr *MockWorkerInterfaceMockRecorder) QueueWorkflowNodeJobRun(status ...int } // QueueWorkflowRunResultCheck mocks base method. -func (m *MockWorkerInterface) QueueWorkflowRunResultCheck(ctx context.Context, jobID int64, runResultCheck sdk.WorkflowRunResultCheck) error { +func (m *MockWorkerInterface) QueueWorkflowRunResultCheck(ctx context.Context, jobID int64, runResultCheck sdk.WorkflowRunResultCheck) (int, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "QueueWorkflowRunResultCheck", ctx, jobID, runResultCheck) - ret0, _ := ret[0].(error) - return ret0 + ret0, _ := ret[0].(int) + ret1, _ := ret[1].(error) + return ret0, ret1 } // QueueWorkflowRunResultCheck indicates an expected call of QueueWorkflowRunResultCheck. diff --git a/sdk/common.go b/sdk/common.go index 59b914ad3c..c9759aab1d 100644 --- a/sdk/common.go +++ b/sdk/common.go @@ -127,7 +127,7 @@ func JSONWithoutHTMLEncode(t interface{}) ([]byte, error) { return buffer.Bytes(), err } -// FileMd5sum returns the md5sum ofr a file +// FileMd5sum returns the md5sum of a file func FileMd5sum(filePath string) (string, error) { file, errop := os.Open(filePath) if errop != nil {