Skip to content

Commit

Permalink
feat(cdn): worker upload artifact on CDN (#5646)
Browse files Browse the repository at this point in the history
  • Loading branch information
sguiheux authored Jan 22, 2021
1 parent d1f6871 commit 28b6826
Show file tree
Hide file tree
Showing 16 changed files with 247 additions and 81 deletions.
62 changes: 17 additions & 45 deletions engine/api/test/assets/assets.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,23 @@ func NewAuthentifiedRequest(t *testing.T, _ *sdk.AuthentifiedUser, pass, method,
return req
}

func NewUploadFileRequest(t *testing.T, method string, uri string, body io.Reader, headers map[string]string) *http.Request {
req, err := http.NewRequest(method, uri, body)
if err != nil {
t.Error(err)
t.FailNow()
}
if headers != nil {
for k, v := range headers {
req.Header.Set(k, v)
}
}
date := sdk.FormatDateRFC5322(time.Now())
req.Header.Set("Date", date)
req.Header.Set("X-CDS-RemoteTime", date)
return req
}

func NewRequest(t *testing.T, method, uri string, i interface{}) *http.Request {
var btes []byte
var err error
Expand Down Expand Up @@ -335,51 +352,6 @@ func GetBuiltinOrPluginActionByName(t *testing.T, db gorp.SqlExecutor, name stri
return a
}

func NewMultipartRequest(t *testing.T, method, uri string, path string, partName, fileName string, params map[string]string, additionalHeaders map[string]string) *http.Request {
file, err := os.Open(path)
if err != nil {
t.Fail()
}
defer file.Close()
body := &bytes.Buffer{}
writer := multipart.NewWriter(body)
part, err := writer.CreateFormFile(partName, filepath.Base(path))
if err != nil {
t.Fail()
}
if _, err := io.Copy(part, file); err != nil {
t.Fail()
}

for key, val := range params {
_ = writer.WriteField(key, val)
}

contextType := writer.FormDataContentType()

if err := writer.Close(); err != nil {
t.Fail()
}

req, err := http.NewRequest(method, uri, body)
if err != nil {
t.Fail()
}
req.Header.Set("Content-Type", contextType)
req.Header.Set("Content-Disposition", fmt.Sprintf("attachment; filename=\"%s\"", fileName))
req.Header.Set("ARTIFACT-FILENAME", fileName)

for k, v := range additionalHeaders {
req.Header.Set(k, v)
}

date := sdk.FormatDateRFC5322(time.Now())
req.Header.Set("Date", date)
req.Header.Set("X-CDS-RemoteTime", date)

return req
}

func NewJWTAuthentifiedMultipartRequest(t *testing.T, jwt string, method, uri string, path string, fileName string, params map[string]string) *http.Request {
file, err := os.Open(path)
if err != nil {
Expand Down
9 changes: 9 additions & 0 deletions engine/api/workflow_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/sguiheux/go-coverage"

"github.com/ovh/cds/engine/api/authentication"
"github.com/ovh/cds/engine/api/database/gorpmapping"
"github.com/ovh/cds/engine/api/event"
"github.com/ovh/cds/engine/api/group"
"github.com/ovh/cds/engine/api/metrics"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/ovh/cds/engine/api/workermodel"
"github.com/ovh/cds/engine/api/workflow"
"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/engine/featureflipping"
"github.com/ovh/cds/engine/service"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/jws"
Expand Down Expand Up @@ -111,6 +113,13 @@ func (api *API) postTakeWorkflowJobHandler() service.Handler {
return err
}

pbji.CDNHttpAddr, err = services.GetCDNPublicHTTPAdress(ctx, api.mustDB())

enabled := featureflipping.IsEnabled(ctx, gorpmapping.Mapper, api.mustDB(), sdk.FeatureCDNArtifact, map[string]string{"project_key": pbji.ProjectKey})

pbji.Features = make(map[string]bool, 1)
pbji.Features[sdk.FeatureCDNArtifact] = enabled

workflow.ResyncNodeRunsWithCommits(ctx, api.mustDB(), api.Cache, *p, report)
go api.WorkflowSendEvent(context.Background(), *p, report)

Expand Down
22 changes: 2 additions & 20 deletions engine/cdn/item_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cdn

import (
"context"
"io"
"net/http"

"github.com/ovh/cds/engine/service"
Expand Down Expand Up @@ -33,25 +32,8 @@ func (s *Service) postUploadHandler() service.Handler {
return sdk.WrapError(err, "worker key: %d", len(workerData.PrivateKey))
}

reader, err := r.MultipartReader()
if err != nil {
return sdk.WithStack(err)
}
for {
part, err := reader.NextPart()
if err == io.EOF {
break
}
if err != nil {
return sdk.WithStack(err)
}
if part.FormName() != "file" {
continue
}
if err := s.storeFile(ctx, signature, part); err != nil {
return err
}
break
if err := s.storeFile(ctx, signature, r.Body); err != nil {
return err
}
return nil
}
Expand Down
6 changes: 4 additions & 2 deletions engine/cdn/item_upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,13 +192,15 @@ func TestPostUploadHandler(t *testing.T) {
moreHeaders := map[string]string{
"X-CDS-WORKER-SIGNATURE": signature,
}
req := assets.NewMultipartRequest(t, "POST", uri, path.Join(os.TempDir(), "myartifact"), "file", "myartifact", nil, moreHeaders)
f, err = os.Open(path.Join(os.TempDir(), "myartifact"))
require.NoError(t, err)
req := assets.NewUploadFileRequest(t, "POST", uri, f, moreHeaders)
rec := httptest.NewRecorder()
s.Router.Mux.ServeHTTP(rec, req)
for _, r := range gock.Pending() {
t.Logf("PENDING: %s \n", r.Request().URLStruct.String())
}

f.Close()
require.Equal(t, 204, rec.Code)
require.True(t, gock.IsDone())

Expand Down
4 changes: 2 additions & 2 deletions engine/service/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ const (
ContextSessionID
)

func NoAuthMiddleware(ctx context.Context, w http.ResponseWriter, req *http.Request, rc *HandlerConfig) (context.Context, error) {
func NoAuthMiddleware(ctx context.Context, _ http.ResponseWriter, _ *http.Request, _ *HandlerConfig) (context.Context, error) {
return ctx, nil
}

func JWTMiddleware(ctx context.Context, w http.ResponseWriter, req *http.Request, rc *HandlerConfig, keyFunc jwt.Keyfunc) (context.Context, error) {
func JWTMiddleware(ctx context.Context, _ http.ResponseWriter, req *http.Request, _ *HandlerConfig, keyFunc jwt.Keyfunc) (context.Context, error) {
var jwtRaw string
var jwtFromCookie bool
// Try to get the jwt from the cookie firstly then from the authorization bearer header, a XSRF token with cookie
Expand Down
40 changes: 31 additions & 9 deletions engine/worker/internal/action/builtin_artifact_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,26 +72,48 @@ func RunArtifactUpload(ctx context.Context, wk workerruntime.Runtime, a sdk.Acti
}
}()

cdnArtifactEnabled := wk.FeatureEnabled(sdk.FeatureCDNArtifact)

integrationName := sdk.DefaultIfEmptyStorage(strings.TrimSpace(sdk.ParameterValue(a.Parameters, "destination")))
projectKey := sdk.ParameterValue(wk.Parameters(), "cds.project")

wg.Add(len(filesPath))
for _, p := range filesPath {
go func(path string) {
log.Debug(ctx, "worker.RunArtifactUpload> Uploading %s projectKey:%v integrationName:%v job:%d", path, projectKey, integrationName, jobID)
log.Debug(ctx, "uploading %s projectKey:%v integrationName:%v job:%d", path, projectKey, integrationName, jobID)
defer wg.Done()
throughTempURL, duration, err := wk.Client().QueueArtifactUpload(ctx, projectKey, integrationName, jobID, tag.Value, path)

if !cdnArtifactEnabled {
throughTempURL, duration, err := wk.Client().QueueArtifactUpload(ctx, projectKey, integrationName, jobID, tag.Value, path)
if err != nil {
log.Warn(ctx, "queueArtifactUpload(%s, %s, %d, %s, %s) failed: %v", projectKey, integrationName, jobID, tag.Value, path, err)
chanError <- sdk.WrapError(err, "Error while uploading artifact %s", path)
wgErrors.Add(1)
return
}
if throughTempURL {
wk.SendLog(ctx, workerruntime.LevelInfo, fmt.Sprintf("File '%s' uploaded in %.2fs to object store", path, duration.Seconds()))
} else {
wk.SendLog(ctx, workerruntime.LevelInfo, fmt.Sprintf("File '%s' uploaded in %.2fs to CDS API", path, duration.Seconds()))
}
return
}

_, name := filepath.Split(path)
signature, err := wk.ArtifactSignature(name)
if err != nil {
log.Warn(ctx, "worker.RunArtifactUpload> QueueArtifactUpload(%s, %s, %d, %s, %s) failed: %v", projectKey, integrationName, jobID, tag.Value, path, err)
chanError <- sdk.WrapError(err, "Error while uploading artifact %s", path)
wgErrors.Add(1)
log.Error(ctx, "unable to sign artifact: %v", err)
return
}
if throughTempURL {
wk.SendLog(ctx, workerruntime.LevelInfo, fmt.Sprintf("File '%s' uploaded in %.2fs to object store", path, duration.Seconds()))
} else {
wk.SendLog(ctx, workerruntime.LevelInfo, fmt.Sprintf("File '%s' uploaded in %.2fs to CDS API", path, duration.Seconds()))

duration, err := wk.Client().CDNArtifactUpdload(ctx, wk.CDNHttpURL(), signature, path)
if err != nil {
log.Error(ctx, "upable to upload artifact %q: %v", path, err)
chanError <- sdk.WrapError(err, "Error while uploading artifact %s", path)
wgErrors.Add(1)
}
wk.SendLog(ctx, workerruntime.LevelInfo, fmt.Sprintf("File '%s' uploaded in %.2fs to CDS CDN", path, duration.Seconds()))

}(p)
if len(filesPath) > 1 {
//Wait 3 second to get the object storage to set up all the things
Expand Down
12 changes: 12 additions & 0 deletions engine/worker/internal/action/test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,18 @@ type TestWorker struct {
logBuffer bytes.Buffer
}

func (w *TestWorker) ArtifactSignature(artifactName string) (string, error) {
return "mysignature", nil
}

func (w *TestWorker) FeatureEnabled(featureName string) bool {
return false
}

func (w *TestWorker) CDNHttpURL() string {
return "http://cdn.me"
}

func (w *TestWorker) WorkingDirectory() *afero.BasePathFile {
return w.workingDirectory
}
Expand Down
3 changes: 3 additions & 0 deletions engine/worker/internal/take.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,13 @@ func (w *CurrentWorker) Take(ctx context.Context, job sdk.WorkflowNodeJobRun) er
w.currentJob.workflowID = info.WorkflowID
w.currentJob.runID = info.RunID
w.currentJob.nodeRunName = info.NodeRunName
w.currentJob.features = info.Features

// Reset build variables
w.currentJob.newVariables = nil

w.cdnHttpAddr = info.CDNHttpAddr

secretKey := make([]byte, 32)
if _, err := base64.StdEncoding.Decode(secretKey, []byte(info.SigningKey)); err != nil {
return sdk.WithStack(err)
Expand Down
35 changes: 35 additions & 0 deletions engine/worker/internal/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type CurrentWorker struct {
basedir afero.Fs
manualExit bool
gelfLogger *logger
cdnHttpAddr string
stepLogLine int64
httpPort int32
register struct {
Expand All @@ -60,6 +61,7 @@ type CurrentWorker struct {
workflowID int64
runID int64
nodeRunName string
features map[string]bool
}
status struct {
Name string `json:"name"`
Expand Down Expand Up @@ -99,6 +101,14 @@ func (wk *CurrentWorker) Parameters() []sdk.Parameter {
return wk.currentJob.params
}

func (wk *CurrentWorker) FeatureEnabled(name string) bool {
b, has := wk.currentJob.features[name]
if !has {
return false
}
return b
}

func (wk *CurrentWorker) SendTerminatedStepLog(ctx context.Context, level workerruntime.Level, logLine string) {
msg, sign, err := wk.prepareLog(ctx, level, logLine)
if err != nil {
Expand All @@ -113,6 +123,27 @@ func (wk *CurrentWorker) SendTerminatedStepLog(ctx context.Context, level worker
wk.stepLogLine++
}

func (wk *CurrentWorker) ArtifactSignature(artifactName string) (string, error) {
sig := cdn.Signature{
ProjectKey: wk.currentJob.projectKey,
JobID: wk.currentJob.wJob.ID,
NodeRunID: wk.currentJob.wJob.WorkflowNodeRunID,
Timestamp: time.Now().UnixNano(),
WorkflowID: wk.currentJob.workflowID,
WorkflowName: wk.currentJob.workflowName,
NodeRunName: wk.currentJob.nodeRunName,
RunID: wk.currentJob.runID,
JobName: wk.currentJob.wJob.Job.Action.Name,
Worker: &cdn.SignatureWorker{
WorkerID: wk.id,
WorkerName: wk.Name(),
ArtifactName: artifactName,
},
}
signature, err := jws.Sign(wk.currentJob.signer, sig)
return signature, sdk.WrapError(err, "cannot sign log message")
}

func (wk *CurrentWorker) SendLog(ctx context.Context, level workerruntime.Level, logLine string) {
msg, sign, err := wk.prepareLog(ctx, level, logLine)
if err != nil {
Expand All @@ -127,6 +158,10 @@ func (wk *CurrentWorker) SendLog(ctx context.Context, level workerruntime.Level,
wk.stepLogLine++
}

func (wk *CurrentWorker) CDNHttpURL() string {
return wk.cdnHttpAddr
}

func (wk *CurrentWorker) prepareLog(ctx context.Context, level workerruntime.Level, s string) (cdslog.Message, string, error) {
var res cdslog.Message

Expand Down
3 changes: 3 additions & 0 deletions engine/worker/pkg/workerruntime/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ type Runtime interface {
Take(ctx context.Context, job sdk.WorkflowNodeJobRun) error
ProcessJob(job sdk.WorkflowNodeJobRunData) sdk.Result
SendLog(ctx context.Context, level Level, format string)
ArtifactSignature(artifactName string) (string, error)
FeatureEnabled(featureName string) bool
CDNHttpURL() string
InstallKey(key sdk.Variable) (*KeyResponse, error)
InstallKeyTo(key sdk.Variable, destinationPath string) (*KeyResponse, error)
Unregister(ctx context.Context) error
Expand Down
33 changes: 33 additions & 0 deletions sdk/cdsclient/client_cdn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package cdsclient

import (
"context"
"fmt"
"net/http"
"os"
"time"

"github.com/ovh/cds/sdk"
)

func (c *client) CDNArtifactUpdload(ctx context.Context, cdnAddr string, signature string, path string) (time.Duration, error) {
t0 := time.Now()

var savedError error
// as *File implement io.ReadSeeker, retry in c.Stream will be skipped
for i := 0; i < c.config.Retry; i++ {
f, err := os.Open(path)
if err != nil {
return time.Since(t0), sdk.WithStack(err)
}
_, _, _, err = c.Request(ctx, http.MethodPost, fmt.Sprintf("%s/item/upload", cdnAddr), f, SetHeader("X-CDS-WORKER-SIGNATURE", signature))
if err != nil {
savedError = sdk.WrapError(err, "unable to upload file, try %d", i+1)
time.Sleep(1 * time.Second)
continue
}
savedError = nil
break
}
return time.Since(t0), sdk.WithStack(savedError)
}
3 changes: 1 addition & 2 deletions sdk/cdsclient/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,7 @@ func (c *client) Stream(ctx context.Context, method string, path string, body io
var requestError error
if rs, ok := body.(io.ReadSeeker); ok {
if _, err := rs.Seek(0, 0); err != nil {
savederror = sdk.WithStack(err)
continue
return nil, nil, 0, sdk.WrapError(err, "request failed after %d retries", i)
}
req, requestError = http.NewRequest(method, url, body)
} else {
Expand Down
Loading

0 comments on commit 28b6826

Please sign in to comment.