Skip to content

Commit

Permalink
feat(cli,cdn): add command to migrate artifact into CDN (#5884)
Browse files Browse the repository at this point in the history
  • Loading branch information
sguiheux authored Jul 15, 2021
1 parent b1a2379 commit 4b6bb08
Show file tree
Hide file tree
Showing 8 changed files with 260 additions and 53 deletions.
81 changes: 81 additions & 0 deletions cli/cdsctl/workflow_artifact.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"encoding/json"
"fmt"
"os"
"regexp"
Expand All @@ -11,6 +12,7 @@ import (

"github.com/ovh/cds/cli"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/cdn"
)

var workflowArtifactCmd = cli.Command{
Expand All @@ -23,9 +25,88 @@ func workflowArtifact() *cobra.Command {
return cli.NewCommand(workflowArtifactCmd, nil, []*cobra.Command{
cli.NewListCommand(workflowArtifactListCmd, workflowArtifactListRun, nil, withAllCommandModifiers()...),
cli.NewCommand(workflowArtifactDownloadCmd, workflowArtifactDownloadRun, nil, withAllCommandModifiers()...),
cli.NewCommand(workflowArtifactCDNMigrateCmd, workflowArtifactCDNMigrate, nil, withAllCommandModifiers()...),
})
}

var workflowArtifactCDNMigrateCmd = cli.Command{
Name: "cdn-migrate",
Short: "Migrate artifact into CDN",
Long: `Migrate artifact from CDS to CDN
cdsctl workflow artifact cdn-migrate <project_key> <workflow_name> <run_number>
CDN does not manage artifact with same name inside a run. They will be renamed <num>_<artifact_name>
Migrated artifacts will not be visible with cdsctl workflow result command
`,
Ctx: []cli.Arg{
{Name: _ProjectKey},
{Name: _WorkflowName},
},
Args: []cli.Arg{
{Name: "number"},
},
}

func workflowArtifactCDNMigrate(v cli.Values) error {
number, err := strconv.ParseInt(v.GetString("number"), 10, 64)
if err != nil {
return cli.NewError("number parameter have to be an integer")
}
projKey := v.GetString(_ProjectKey)
wName := v.GetString(_WorkflowName)

run, err := client.WorkflowRunGet(projKey, wName, number)
if err != nil {
return err
}

workflowArtifacts, err := client.WorkflowRunArtifacts(projKey, wName, number)
if err != nil {
return err
}

artName := make(map[string]int64)
for _, art := range workflowArtifacts {
prefix := ""
nb, has := artName[art.Name]
if has {
prefix = fmt.Sprintf("%d_", nb)
fmt.Printf("%s/%s will be renamed to %s%s\n", art.Name, art.Tag, prefix, art.Name)

}
artName[art.Name] = nb + 1
artName := prefix + art.Name
// Call cdn to migrate
sign := cdn.Signature{
ProjectKey: projKey,
WorkflowName: wName,
WorkflowID: run.WorkflowID,
RunID: art.WorkflowID,
RunNumber: run.Number,
NodeRunID: art.WorkflowNodeRunID,
JobName: "",
JobID: 0,
Worker: &cdn.SignatureWorker{
FileName: artName,
FilePerm: art.Perm,
RunResultType: string(sdk.CDNTypeItemRunResult),
},
}

// call cdn
url := fmt.Sprintf("/migrate/artifact/%s/%s/%d", projKey, wName, art.ID)
bts, _ := json.Marshal(sign)
if bts, err := client.ServiceCallPOST(sdk.TypeCDN, url, bts); err != nil {
fmt.Printf("unable to migrate %s: %s: %v\n", art.Name, string(bts), err)
continue
}
fmt.Printf("artifact %s migrated\n", artName)

}
fmt.Printf("Migration done.")
return nil
}

var workflowArtifactListCmd = cli.Command{
Name: "list",
Short: "List artifacts of one Workflow Run",
Expand Down
110 changes: 59 additions & 51 deletions engine/cdn/cdn_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ import (
"github.com/ovh/cds/sdk/cdn"
)

func (s *Service) storeFile(ctx context.Context, sig cdn.Signature, reader io.ReadCloser) error {
type StoreFileOptions struct {
DisableApiRunResult bool
}

func (s *Service) storeFile(ctx context.Context, sig cdn.Signature, reader io.ReadCloser, storeFileOptions StoreFileOptions) error {
var itemType sdk.CDNItemType
switch {
case sig.Worker.FileName != "":
Expand Down Expand Up @@ -57,24 +61,26 @@ func (s *Service) storeFile(ctx context.Context, sig cdn.Signature, reader io.Re
Status: sdk.CDNStatusItemIncoming,
}

switch itemType {
case sdk.CDNTypeItemRunResult:
// Call CDS API to check if we can upload the run result
runResultApiRef, _ := it.GetCDNRunResultApiRef()
if !storeFileOptions.DisableApiRunResult {
switch itemType {
case sdk.CDNTypeItemRunResult:
// Call CDS API to check if we can upload the run result
runResultApiRef, _ := it.GetCDNRunResultApiRef()

runResultCheck := sdk.WorkflowRunResultCheck{
Name: runResultApiRef.ArtifactName,
ResultType: runResultApiRef.RunResultType,
RunID: runResultApiRef.RunID,
RunNodeID: runResultApiRef.RunNodeID,
RunJobID: runResultApiRef.RunJobID,
}
code, err := s.Client.QueueWorkflowRunResultCheck(ctx, sig.JobID, runResultCheck)
if err != nil {
if code == http.StatusConflict {
return sdk.NewErrorFrom(sdk.ErrInvalidData, "unable to upload the same file twice")
runResultCheck := sdk.WorkflowRunResultCheck{
Name: runResultApiRef.ArtifactName,
ResultType: runResultApiRef.RunResultType,
RunID: runResultApiRef.RunID,
RunNodeID: runResultApiRef.RunNodeID,
RunJobID: runResultApiRef.RunJobID,
}
code, err := s.Client.QueueWorkflowRunResultCheck(ctx, sig.JobID, runResultCheck)
if err != nil {
if code == http.StatusConflict {
return sdk.NewErrorFrom(sdk.ErrInvalidData, "unable to upload the same file twice")
}
return err
}
return err
}
}

Expand Down Expand Up @@ -138,42 +144,44 @@ func (s *Service) storeFile(ctx context.Context, sig cdn.Signature, reader io.Re
return err
}

runResultApiRef, _ := it.GetCDNRunResultApiRef()
switch itemType {
case sdk.CDNTypeItemRunResult:
var result interface{}
switch runResultApiRef.RunResultType {
case sdk.WorkflowRunResultTypeArtifact:
result = sdk.WorkflowRunResultArtifact{
Name: apiRef.ToFilename(),
Size: it.Size,
MD5: it.MD5,
CDNRefHash: it.APIRefHash,
Perm: runResultApiRef.Perm,
}
case sdk.WorkflowRunResultTypeCoverage:
result = sdk.WorkflowRunResultCoverage{
Name: apiRef.ToFilename(),
Size: it.Size,
MD5: it.MD5,
CDNRefHash: it.APIRefHash,
Perm: runResultApiRef.Perm,
if !storeFileOptions.DisableApiRunResult {
runResultApiRef, _ := it.GetCDNRunResultApiRef()
switch itemType {
case sdk.CDNTypeItemRunResult:
var result interface{}
switch runResultApiRef.RunResultType {
case sdk.WorkflowRunResultTypeArtifact:
result = sdk.WorkflowRunResultArtifact{
Name: apiRef.ToFilename(),
Size: it.Size,
MD5: it.MD5,
CDNRefHash: it.APIRefHash,
Perm: runResultApiRef.Perm,
}
case sdk.WorkflowRunResultTypeCoverage:
result = sdk.WorkflowRunResultCoverage{
Name: apiRef.ToFilename(),
Size: it.Size,
MD5: it.MD5,
CDNRefHash: it.APIRefHash,
Perm: runResultApiRef.Perm,
}
}
}

bts, err := json.Marshal(result)
if err != nil {
return sdk.WithStack(err)
}
wrResult := sdk.WorkflowRunResult{
WorkflowRunID: sig.RunID,
WorkflowNodeRunID: sig.NodeRunID,
WorkflowRunJobID: sig.JobID,
Type: runResultApiRef.RunResultType,
DataRaw: json.RawMessage(bts),
}
if err := s.Client.QueueWorkflowRunResultsAdd(ctx, sig.JobID, wrResult); err != nil {
return err
bts, err := json.Marshal(result)
if err != nil {
return sdk.WithStack(err)
}
wrResult := sdk.WorkflowRunResult{
WorkflowRunID: sig.RunID,
WorkflowNodeRunID: sig.NodeRunID,
WorkflowRunJobID: sig.JobID,
Type: runResultApiRef.RunResultType,
DataRaw: json.RawMessage(bts),
}
if err := s.Client.QueueWorkflowRunResultsAdd(ctx, sig.JobID, wrResult); err != nil {
return err
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions engine/cdn/cdn_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ func (s *Service) initRouter(ctx context.Context) {
r.Handle("/item/{type}/{apiRef}/download/{unit}", nil, r.GET(s.getItemDownloadInUnitHandler, service.OverrideAuth(s.itemAccessMiddleware)))
r.Handle("/item/{type}/{apiRef}/lines", nil, r.GET(s.getItemLogsLinesHandler, service.OverrideAuth(s.itemAccessMiddleware)))

r.Handle("/migrate/artifact/{projectKey}/{workflowName}/{artifactID}", nil, r.POST(s.migrateArtifactInCDNHandler))

r.Handle("/unit", nil, r.GET(s.getUnitsHandler))
r.Handle("/unit/{id}", nil, r.DELETE(s.deleteUnitHandler))
r.Handle("/unit/{id}/item", nil, r.DELETE(s.markItemUnitAsDeleteHandler))
Expand Down
2 changes: 1 addition & 1 deletion engine/cdn/item_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func TestGetItemArtifactDownloadHandler(t *testing.T) {
},
}

require.NoError(t, s.storeFile(ctx, sig, f))
require.NoError(t, s.storeFile(ctx, sig, f, StoreFileOptions{}))

signer, err := authentication.NewSigner("cdn-test", test.SigningKey)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion engine/cdn/item_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (s *Service) postUploadHandler() service.Handler {
return sdk.WrapError(err, "worker key: %d", len(workerData.PrivateKey))
}

if err := s.storeFile(ctx, signature, r.Body); err != nil {
if err := s.storeFile(ctx, signature, r.Body, StoreFileOptions{}); err != nil {
return err
}
return nil
Expand Down
71 changes: 71 additions & 0 deletions engine/cdn/migrate_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package cdn

import (
"context"
"fmt"
"io/ioutil"
"net/http"
"strconv"

"github.com/gorilla/mux"

"github.com/ovh/cds/engine/service"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/cdn"
)

func (s *Service) migrateArtifactInCDNHandler() service.Handler {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
vars := mux.Vars(r)
projectKey := vars["projectKey"]
workflowName := vars["workflowName"]

artifactIDS := vars["artifactID"]
artifactID, err := strconv.ParseInt(artifactIDS, 10, 64)
if err != nil {
return sdk.NewErrorFrom(sdk.ErrInvalidData, "wrong artifact id")
}

var sign cdn.Signature
if err := service.UnmarshalBody(r, &sign); err != nil {
return err
}

nodeRun, err := s.Client.WorkflowNodeRun(projectKey, workflowName, sign.RunNumber, sign.NodeRunID)
if err != nil {
return err
}

if sign.WorkflowID != nodeRun.WorkflowID || sign.NodeRunID != nodeRun.ID || sign.RunID != nodeRun.WorkflowRunID || nodeRun.Number != sign.RunNumber {
return sdk.NewErrorFrom(sdk.ErrInvalidData, "signature doesn't match request")
}

// Check if artifact exist
found := false
for _, a := range nodeRun.Artifacts {
if a.ID == artifactID {
found = true
break
}
}
if !found {
return sdk.NewErrorFrom(sdk.ErrNotFound, "unable to find artifact in the given run")
}

// Retrieve Artifact from CDS API
url := fmt.Sprintf("/project/%s/workflows/%s/artifact/%d", projectKey, workflowName, artifactID)
readcloser, _, code, err := s.Client.Stream(ctx, s.Client.HTTPNoTimeoutClient(), "GET", url, nil)
if err != nil {
return sdk.WithStack(err)
}
if code >= 400 {
var bodyBtes []byte
bodyBtes, errR := ioutil.ReadAll(readcloser)
if errR != nil {
return errR
}
return sdk.NewErrorFrom(sdk.ErrUnknownError, "unable to get artifact: %s", string(bodyBtes))
}
return s.storeFile(ctx, sign, readcloser, StoreFileOptions{DisableApiRunResult: true})
}
}
1 change: 1 addition & 0 deletions sdk/cdsclient/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ type Raw interface {
DeleteJSON(ctx context.Context, path string, out interface{}, mods ...RequestModifier) (int, error)
RequestJSON(ctx context.Context, method, path string, in interface{}, out interface{}, mods ...RequestModifier) ([]byte, http.Header, int, error)
Request(ctx context.Context, method string, path string, body io.Reader, mods ...RequestModifier) ([]byte, http.Header, int, error)
Stream(ctx context.Context, httpClient HTTPClient, method string, path string, body io.Reader, mods ...RequestModifier) (io.ReadCloser, http.Header, int, error)
HTTPClient() *http.Client
HTTPNoTimeoutClient() *http.Client
HTTPWebsocketClient() *websocket.Dialer
Expand Down
Loading

0 comments on commit 4b6bb08

Please sign in to comment.