Skip to content

Commit

Permalink
fix(api): upload artifact for vcs release (#5717)
Browse files Browse the repository at this point in the history
  • Loading branch information
sguiheux authored Mar 1, 2021
1 parent ad3d4c2 commit 90cc51b
Show file tree
Hide file tree
Showing 18 changed files with 145 additions and 108 deletions.
4 changes: 2 additions & 2 deletions engine/api/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func selectDeleteAdminServiceCallHandler(api *API, method string) service.Handle
}

query := r.FormValue("query")
btes, _, code, err := services.DoRequest(ctx, api.mustDB(), srvs, method, query, nil)
btes, _, code, err := services.DoRequest(ctx, srvs, method, query, nil)
if err != nil {
return sdk.NewError(sdk.Error{
Status: code,
Expand Down Expand Up @@ -169,7 +169,7 @@ func putPostAdminServiceCallHandler(api *API, method string) service.Handler {
}
defer r.Body.Close()

btes, _, code, err := services.DoRequest(ctx, api.mustDB(), srvs, method, query, body)
btes, _, code, err := services.DoRequest(ctx, srvs, method, query, body)
if err != nil {
return sdk.NewError(sdk.Error{
Status: code,
Expand Down
2 changes: 1 addition & 1 deletion engine/api/cdn/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func ListItems(ctx context.Context, db gorp.SqlExecutor, itemtype sdk.CDNItemTyp
}

}
btes, _, _, err := services.DoRequest(ctx, db, srvs, http.MethodGet, path, nil)
btes, _, _, err := services.DoRequest(ctx, srvs, http.MethodGet, path, nil)
if err != nil {
return result, err
}
Expand Down
20 changes: 7 additions & 13 deletions engine/api/repositoriesmanager/repositories_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/base64"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strconv"
Expand Down Expand Up @@ -247,10 +246,12 @@ func (c *vcsClient) doJSONRequest(ctx context.Context, method, path string, in i
return code, sdk.WithStack(err)
}

func (c *vcsClient) postMultipart(ctx context.Context, path string, fileContent []byte, out interface{}) (int, error) {
return services.PostMultipart(ctx, c.db, c.srvs, "POST", path, fileContent, out, func(req *http.Request) {
func (c *vcsClient) postBinary(ctx context.Context, path string, fileLength int, r io.Reader, out interface{}) (int, error) {
return services.PostBinary(ctx, c.srvs, path, r, out, func(req *http.Request) {
req.Header.Set(sdk.HeaderXAccessToken, base64.StdEncoding.EncodeToString([]byte(c.token)))
req.Header.Set(sdk.HeaderXAccessTokenSecret, base64.StdEncoding.EncodeToString([]byte(c.secret)))
req.Header.Set("Content-Type", "application/octet-stream")
req.Header.Set("Content-Length", strconv.Itoa(fileLength))
})
}

Expand Down Expand Up @@ -559,16 +560,9 @@ func (c *vcsClient) Release(ctx context.Context, fullname, tagName, releaseTitle
return &release, nil
}

func (c *vcsClient) UploadReleaseFile(ctx context.Context, fullname string, releaseName, uploadURL string, artifactName string, r io.ReadCloser) error {
path := fmt.Sprintf("/vcs/%s/repos/%s/releases/%s/artifacts/%s", c.name, fullname, releaseName, artifactName)
defer r.Close()

fileContent, err := ioutil.ReadAll(r)
if err != nil {
return sdk.WithStack(err)
}

if _, err := c.postMultipart(ctx, path, fileContent, nil); err != nil {
func (c *vcsClient) UploadReleaseFile(ctx context.Context, fullname string, releaseName, uploadURL string, artifactName string, r io.Reader, fileLength int) error {
path := fmt.Sprintf("/vcs/%s/repos/%s/releases/%s/artifacts/%s?upload_url=%s", c.name, fullname, releaseName, artifactName, url.QueryEscape(uploadURL))
if _, err := c.postBinary(ctx, path, fileLength, r, nil); err != nil {
return sdk.WithStack(err)
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion engine/api/services/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func ping(ctx context.Context, db gorpmapper.SqlExecutorWithTx, s ExternalServic

log.Debug(ctx, "services.ping> Checking service %s (%v)", s.Name, u.String())

_, _, code, err := doRequestFromURL(context.Background(), db, "GET", u, nil)
_, _, code, err := doRequestFromURL(context.Background(), "GET", u, nil)
if err != nil || code >= 400 {
mon.Lines[0].Status = sdk.MonitoringStatusWarn
mon.Lines[0].Value = "Health: KO"
Expand Down
106 changes: 47 additions & 59 deletions engine/api/services/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (
"fmt"
"io"
"io/ioutil"
"math/rand"
"mime/multipart"
"net/http"
"net/textproto"
"net/url"
"strconv"
"time"

"github.com/go-gorp/gorp"
Expand Down Expand Up @@ -59,11 +61,11 @@ func NewDefaultClient(db gorp.SqlExecutor, srvs []sdk.Service) Client {
}

func (s *defaultServiceClient) DoMultiPartRequest(ctx context.Context, method, path string, multiPartData *MultiPartData, in interface{}, out interface{}, mods ...cdsclient.RequestModifier) (int, error) {
return doMultiPartRequest(ctx, s.db, s.srvs, method, path, multiPartData, in, out, mods...)
return doMultiPartRequest(ctx, s.srvs, method, path, multiPartData, in, out, mods...)
}

// doMultiPartRequest performs an http request on a service with multipart tar file + json field
func doMultiPartRequest(ctx context.Context, db gorp.SqlExecutor, srvs []sdk.Service, method, path string, multiPartData *MultiPartData, in interface{}, out interface{}, mods ...cdsclient.RequestModifier) (int, error) {
func doMultiPartRequest(ctx context.Context, srvs []sdk.Service, method, path string, multiPartData *MultiPartData, in interface{}, out interface{}, mods ...cdsclient.RequestModifier) (int, error) {
ctx, end := telemetry.Span(ctx, "services.doMultiPartRequest",
telemetry.Tag("http.method", method),
telemetry.Tag("http.path", path),
Expand Down Expand Up @@ -108,7 +110,7 @@ func doMultiPartRequest(ctx context.Context, db gorp.SqlExecutor, srvs []sdk.Ser
attempt++
for i := range srvs {
srv := &srvs[i]
res, _, code, err := doRequest(ctx, db, srv, method, path, bodyToSend, mods...)
res, _, code, err := doRequest(ctx, srv, method, path, bodyToSend, mods...)
if err != nil {
lastErr = err
lastCode = code
Expand Down Expand Up @@ -137,17 +139,17 @@ func (s *defaultServiceClient) DoJSONRequest(ctx context.Context, method, path s
)
defer end()

return doJSONRequest(ctx, s.db, s.srvs, method, path, in, out, mods...)
return doJSONRequest(ctx, s.srvs, method, path, in, out, mods...)
}

// doJSONRequest performs an http request on a service
func doJSONRequest(ctx context.Context, db gorp.SqlExecutor, srvs []sdk.Service, method, path string, in interface{}, out interface{}, mods ...cdsclient.RequestModifier) (http.Header, int, error) {
func doJSONRequest(ctx context.Context, srvs []sdk.Service, method, path string, in interface{}, out interface{}, mods ...cdsclient.RequestModifier) (http.Header, int, error) {
var lastErr = sdk.WithStack(errors.New("unable to call service: service not found"))
var lastCode int
for attempt := 0; attempt < 5; attempt++ {
for i := range srvs {
srv := &srvs[i]
headers, code, err := _doJSONRequest(ctx, db, srv, method, path, in, out, mods...)
headers, code, err := _doJSONRequest(ctx, srv, method, path, in, out, mods...)
if err != nil {
lastErr = err
lastCode = code
Expand All @@ -165,7 +167,7 @@ func doJSONRequest(ctx context.Context, db gorp.SqlExecutor, srvs []sdk.Service,
}

// _doJSONRequest is a low level function that performs an http request on service
func _doJSONRequest(ctx context.Context, db gorp.SqlExecutor, srv *sdk.Service, method, path string, in interface{}, out interface{}, mods ...cdsclient.RequestModifier) (http.Header, int, error) {
func _doJSONRequest(ctx context.Context, srv *sdk.Service, method, path string, in interface{}, out interface{}, mods ...cdsclient.RequestModifier) (http.Header, int, error) {
var b = []byte{}
var err error

Expand All @@ -177,7 +179,7 @@ func _doJSONRequest(ctx context.Context, db gorp.SqlExecutor, srv *sdk.Service,
}

mods = append(mods, cdsclient.SetHeader("Content-Type", "application/json"))
res, headers, code, err := doRequest(ctx, db, srv, method, path, b, mods...)
res, headers, code, err := doRequest(ctx, srv, method, path, b, mods...)
if err != nil {
return headers, code, sdk.ErrorWithFallback(err, sdk.ErrUnknownError, "unable to perform request on service %s (%s)", srv.Name, srv.Type)
}
Expand All @@ -191,61 +193,38 @@ func _doJSONRequest(ctx context.Context, db gorp.SqlExecutor, srv *sdk.Service,
return headers, code, nil
}

// PostMultipart post a file content through multipart upload
func PostMultipart(ctx context.Context, db gorp.SqlExecutor, srvs []sdk.Service, path string, filename string, fileContents []byte, out interface{}, mods ...cdsclient.RequestModifier) (int, error) {
body := new(bytes.Buffer)
writer := multipart.NewWriter(body)
part, err := writer.CreateFormFile("file", filename)
// PostBinary
func PostBinary(ctx context.Context, srvs []sdk.Service, path string, r io.Reader, out interface{}, mods ...cdsclient.RequestModifier) (int, error) {
// No retry because if http call failed, reader is closed
seed := rand.NewSource(time.Now().UnixNano())
ra := rand.New(seed)
srv := &srvs[ra.Intn(len(srvs))]
callURL, err := url.ParseRequestURI(srv.HTTPURL + path)
if err != nil {
return 0, sdk.WithStack(err)
}
if _, err := part.Write(fileContents); err != nil {
return 0, sdk.WithStack(err)
}
if err := writer.Close(); err != nil {
return 0, sdk.WithStack(err)
res, _, code, err := doRequestFromURL(ctx, "POST", callURL, r, mods...)
if err != nil {
return code, err
}

mods = append(mods, cdsclient.SetHeader("Content-Type", "multipart/form-data"))

var lastErr error
var lastCode int
var attempt int
for {
attempt++
for i := range srvs {
srv := &srvs[i]
res, _, code, err := doRequest(ctx, db, srv, "POST", path, body.Bytes(), mods...)
lastCode = code
lastErr = err

if err == nil {
return code, nil
}

if out != nil {
if err := json.Unmarshal(res, out); err != nil {
return code, err
}
}
}
if lastErr == nil {
break
if out != nil {
if err := json.Unmarshal(res, out); err != nil {
return code, sdk.WithStack(err)
}
}
return lastCode, lastErr
return code, nil
}

// DoRequest performs an http request on a service
func DoRequest(ctx context.Context, db gorp.SqlExecutor, srvs []sdk.Service, method, path string, args []byte, mods ...cdsclient.RequestModifier) ([]byte, http.Header, int, error) {
func DoRequest(ctx context.Context, srvs []sdk.Service, method, path string, args []byte, mods ...cdsclient.RequestModifier) ([]byte, http.Header, int, error) {
var lastErr error
var lastCode int
var attempt int
for {
attempt++
for i := range srvs {
srv := &srvs[i]
btes, headers, code, err := doRequest(ctx, db, srv, method, path, args, mods...)
btes, headers, code, err := doRequest(ctx, srv, method, path, args, mods...)
if err == nil {
return btes, headers, code, nil
}
Expand All @@ -260,15 +239,19 @@ func DoRequest(ctx context.Context, db gorp.SqlExecutor, srvs []sdk.Service, met
}

// doRequest performs an http request on service
func doRequest(ctx context.Context, db gorp.SqlExecutor, srv *sdk.Service, method, path string, args []byte, mods ...cdsclient.RequestModifier) ([]byte, http.Header, int, error) {
func doRequest(ctx context.Context, srv *sdk.Service, method, path string, args []byte, mods ...cdsclient.RequestModifier) ([]byte, http.Header, int, error) {
callURL, err := url.ParseRequestURI(srv.HTTPURL + path)
if err != nil {
return nil, nil, 0, err
return nil, nil, 0, sdk.WithStack(err)
}
var r io.Reader
if args != nil {
r = bytes.NewReader(args)
}
return doRequestFromURL(ctx, db, method, callURL, args, mods...)
return doRequestFromURL(ctx, method, callURL, r, mods...)
}

func doRequestFromURL(ctx context.Context, db gorp.SqlExecutor, method string, callURL *url.URL, args []byte, mods ...cdsclient.RequestModifier) ([]byte, http.Header, int, error) {
func doRequestFromURL(ctx context.Context, method string, callURL *url.URL, reader io.Reader, mods ...cdsclient.RequestModifier) ([]byte, http.Header, int, error) {
if HTTPClient == nil {
HTTPClient = &http.Client{
Timeout: 60 * time.Second,
Expand All @@ -279,15 +262,9 @@ func doRequestFromURL(ctx context.Context, db gorp.SqlExecutor, method string, c
HTTPSigner = httpsig.NewRSASHA256Signer(authentication.GetIssuerName(), authentication.GetSigningKey(), []string{"(request-target)", "host", "date"})
}

var requestError error
var req *http.Request
if args != nil {
req, requestError = http.NewRequest(method, callURL.String(), bytes.NewReader(args))
} else {
req, requestError = http.NewRequest(method, callURL.String(), nil)
}
req, requestError := http.NewRequest(method, callURL.String(), reader)
if requestError != nil {
return nil, nil, 0, requestError
return nil, nil, 0, sdk.WithStack(requestError)
}

req = req.WithContext(ctx)
Expand All @@ -309,6 +286,17 @@ func doRequestFromURL(ctx context.Context, db gorp.SqlExecutor, method string, c
req.Header.Set(cdslog.HeaderRequestID, requestID)
}

// If body is not *bytes.Buffer, *bytes.Reader or *strings.Reader Content-Length is not set. (
// Here we force Content-Length.
// cf net/http/request.go NewRequestWithContext
if req.Header.Get("Content-Length") != "" {
s, err := strconv.Atoi(req.Header.Get("Content-Length"))
if err != nil {
return nil, nil, 0, sdk.WithStack(err)
}
req.ContentLength = int64(s)
}

// Sign the http request with API private RSA Key
if err := HTTPSigner.Sign(req); err != nil {
return nil, nil, 0, sdk.WrapError(err, "request signature failed")
Expand Down
53 changes: 41 additions & 12 deletions engine/api/workflow_application.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package api
import (
"context"
"fmt"
"io/ioutil"
"net/http"
"regexp"
"sort"
Expand Down Expand Up @@ -112,14 +111,29 @@ func (api *API) releaseApplicationWorkflowHandler() service.Handler {
}

for _, a := range artifactToUpload {
f, err := api.SharedStorage.Fetch(ctx, &a)
if err != nil {
return sdk.WrapError(err, "Cannot fetch artifact")
}
// Do manual retry because if http call failed, reader is closed
attempt := 0
var lastErr error
for {
attempt++
f, err := api.SharedStorage.Fetch(ctx, &a)
if err != nil {
return sdk.WrapError(err, "Cannot fetch artifact")
}

if err := client.UploadReleaseFile(ctx, app.RepositoryFullname, fmt.Sprintf("%d", release.ID), release.UploadURL, a.Name, f); err != nil {
return sdk.WrapError(err, "releaseApplicationWorkflowHandler")
if err := client.UploadReleaseFile(ctx, app.RepositoryFullname, fmt.Sprintf("%d", release.ID), release.UploadURL, a.Name, f, int(a.Size)); err != nil {
lastErr = err
if attempt >= 5 {
break
}
continue
}
break
}
if lastErr != nil {
return err
}

}

results, err := workflow.LoadRunResultsByRunIDAndType(ctx, api.mustDB(), workflowRun.ID, sdk.WorkflowRunResultTypeArtifact)
Expand Down Expand Up @@ -154,13 +168,28 @@ func (api *API) releaseApplicationWorkflowHandler() service.Handler {
return err
}
for _, r := range resultToUpload {
reader, err := api.Client.CDNItemDownload(ctx, cdnHTTP, r.CDNRefHash, sdk.CDNTypeItemArtifact)
if err != nil {
return err
// Do manual retry because if http call failed, reader is closed
attempt := 0
var lastErr error
for {
attempt++
reader, err := api.Client.CDNItemDownload(ctx, cdnHTTP, r.CDNRefHash, sdk.CDNTypeItemArtifact)
if err != nil {
return err
}
if err := client.UploadReleaseFile(ctx, app.RepositoryFullname, fmt.Sprintf("%d", release.ID), release.UploadURL, r.Name, reader, int(r.Size)); err != nil {
lastErr = err
if attempt >= 5 {
break
}
continue
}
break
}
if err := client.UploadReleaseFile(ctx, app.RepositoryFullname, fmt.Sprintf("%d", release.ID), release.UploadURL, r.Name, ioutil.NopCloser(reader)); err != nil {
return sdk.WrapError(err, "releaseApplicationWorkflowHandler")
if lastErr != nil {
return err
}

}

return nil
Expand Down
2 changes: 1 addition & 1 deletion engine/vcs/bitbucketcloud/client_release.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ func (client *bitbucketcloudClient) Release(ctx context.Context, fullname string
}

// UploadReleaseFile Attach a file into the release
func (client *bitbucketcloudClient) UploadReleaseFile(ctx context.Context, repo string, releaseName string, uploadURL string, artifactName string, r io.ReadCloser) error {
func (client *bitbucketcloudClient) UploadReleaseFile(ctx context.Context, repo string, releaseName string, uploadURL string, artifactName string, r io.Reader, fileLength int) error {
return sdk.WithStack(sdk.ErrNotImplemented)
}
2 changes: 1 addition & 1 deletion engine/vcs/bitbucketserver/client_release.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ import (
func (b *bitbucketClient) Release(ctx context.Context, repo, tagName, releaseTitle, releaseDescription string) (*sdk.VCSRelease, error) {
return nil, fmt.Errorf("Not yet implemented")
}
func (b *bitbucketClient) UploadReleaseFile(ctx context.Context, repo string, releaseName string, uploadURL string, artifactName string, r io.ReadCloser) error {
func (b *bitbucketClient) UploadReleaseFile(ctx context.Context, repo string, releaseName string, uploadURL string, artifactName string, r io.Reader, fileLength int) error {
return fmt.Errorf("Not yet implemented")
}
2 changes: 1 addition & 1 deletion engine/vcs/gerrit/client_release.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ func (c *gerritClient) Release(ctx context.Context, repo string, tagName string,
}

// UploadReleaseFile upload a release file project
func (c *gerritClient) UploadReleaseFile(ctx context.Context, repo string, releaseName string, uploadURL string, artifactName string, r io.ReadCloser) error {
func (c *gerritClient) UploadReleaseFile(ctx context.Context, repo string, releaseName string, uploadURL string, artifactName string, r io.Reader, fileLength int) error {
return fmt.Errorf("not implemented")
}
Loading

0 comments on commit 90cc51b

Please sign in to comment.