Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multipart artifact upload #2991

Merged
merged 5 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 20 additions & 19 deletions agent/agent_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,24 @@ type AgentConfiguration struct {
VerificationJWKS any // The set of keys to verify jobs with
VerificationFailureBehaviour string // What to do if job verification fails (one of `block` or `warn`)

ANSITimestamps bool
TimestampLines bool
HealthCheckAddr string
DisconnectAfterJob bool
DisconnectAfterIdleTimeout int
CancelGracePeriod int
SignalGracePeriod time.Duration
EnableJobLogTmpfile bool
JobLogPath string
WriteJobLogsToStdout bool
LogFormat string
Shell string
Profile string
RedactedVars []string
AcquireJob string
TracingBackend string
TracingServiceName string
TraceContextEncoding string
DisableWarningsFor []string
ANSITimestamps bool
TimestampLines bool
HealthCheckAddr string
DisconnectAfterJob bool
DisconnectAfterIdleTimeout int
CancelGracePeriod int
SignalGracePeriod time.Duration
EnableJobLogTmpfile bool
JobLogPath string
WriteJobLogsToStdout bool
LogFormat string
Shell string
Profile string
RedactedVars []string
AcquireJob string
TracingBackend string
TracingServiceName string
TraceContextEncoding string
DisableWarningsFor []string
AllowMultipartArtifactUpload bool
}
2 changes: 1 addition & 1 deletion agent/api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions agent/job_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,10 @@ func (r *JobRunner) createEnvironment(ctx context.Context) ([]string, error) {
env["BUILDKITE_KUBERNETES_EXEC"] = "true"
}

if !r.conf.AgentConfiguration.AllowMultipartArtifactUpload {
env["BUILDKITE_NO_MULTIPART_ARTIFACT_UPLOAD"] = "true"
}

// propagate CancelSignal to bootstrap, unless it's the default SIGTERM
if r.conf.CancelSignal != process.SIGTERM {
env["BUILDKITE_CANCEL_SIGNAL"] = r.conf.CancelSignal.String()
Expand Down
94 changes: 60 additions & 34 deletions api/artifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,25 +51,47 @@ type Artifact struct {
}

type ArtifactBatch struct {
ID string `json:"id"`
Artifacts []*Artifact `json:"artifacts"`
UploadDestination string `json:"upload_destination"`
ID string `json:"id"`
Artifacts []*Artifact `json:"artifacts"`
UploadDestination string `json:"upload_destination"`
MultipartSupported bool `json:"multipart_supported,omitempty"`
}

// ArtifactUploadInstructions describes how to upload an artifact to Buildkite
// artifact storage.
type ArtifactUploadInstructions struct {
Data map[string]string `json:"data"`
Action struct {
URL string `json:"url,omitempty"`
Method string `json:"method"`
Path string `json:"path"`
FileInput string `json:"file_input"`
}
// Used for a single-part upload.
Action ArtifactUploadAction `json:"action"`

// Used for a multi-part upload.
Actions []ArtifactUploadAction `json:"actions"`

// Contains other data necessary for interpreting instructions.
Data map[string]string `json:"data"`
}

// ArtifactUploadAction describes one action needed to upload an artifact or
// part of an artifact to Buildkite artifact storage.
type ArtifactUploadAction struct {
URL string `json:"url,omitempty"`
Method string `json:"method"`
Path string `json:"path"`
FileInput string `json:"file_input"`
PartNumber int `json:"part_number,omitempty"`
}

type ArtifactBatchCreateResponse struct {
ID string `json:"id"`
ArtifactIDs []string `json:"artifact_ids"`
UploadInstructions *ArtifactUploadInstructions `json:"upload_instructions"`
ID string `json:"id"`
ArtifactIDs []string `json:"artifact_ids"`

// These instructions apply to all artifacts. The template contains
// variable interpolations such as ${artifact:path}.
InstructionsTemplate *ArtifactUploadInstructions `json:"upload_instructions"`

// These instructions apply to specific artifacts, necessary for multipart
// uploads. It overrides InstructionTemplate and should not contain
// interpolations. Map: artifact ID -> instructions for that artifact.
PerArtifactInstructions map[string]*ArtifactUploadInstructions `json:"per_artifact_instructions"`
}

// ArtifactSearchOptions specifies the optional parameters to the
Expand All @@ -82,18 +104,29 @@ type ArtifactSearchOptions struct {
IncludeDuplicates bool `url:"include_duplicates,omitempty"`
}

type ArtifactBatchUpdateArtifact struct {
ID string `json:"id"`
State string `json:"state"`
// ArtifactState represents the state of a single artifact, when calling UpdateArtifacts.
type ArtifactState struct {
ID string `json:"id"`
State string `json:"state"`
Multipart bool `json:"multipart,omitempty"`
// If this artifact was a multipart upload and is complete, we need the
// the ETag from each uploaded part so that they can be joined together.
MultipartETags []ArtifactPartETag `json:"multipart_etags,omitempty"`
}

// ArtifactPartETag associates an ETag to a part number for a multipart upload.
type ArtifactPartETag struct {
PartNumber int `json:"part_number"`
ETag string `json:"etag"`
}

type ArtifactBatchUpdateRequest struct {
Artifacts []*ArtifactBatchUpdateArtifact `json:"artifacts"`
Artifacts []ArtifactState `json:"artifacts"`
}

// CreateArtifacts takes a slice of artifacts, and creates them on Buildkite as a batch.
func (c *Client) CreateArtifacts(ctx context.Context, jobId string, batch *ArtifactBatch) (*ArtifactBatchCreateResponse, *Response, error) {
u := fmt.Sprintf("jobs/%s/artifacts", railsPathEscape(jobId))
func (c *Client) CreateArtifacts(ctx context.Context, jobID string, batch *ArtifactBatch) (*ArtifactBatchCreateResponse, *Response, error) {
u := fmt.Sprintf("jobs/%s/artifacts", railsPathEscape(jobID))

req, err := c.newRequest(ctx, "POST", u, batch)
if err != nil {
Expand All @@ -109,31 +142,24 @@ func (c *Client) CreateArtifacts(ctx context.Context, jobId string, batch *Artif
return createResponse, resp, err
}

// Updates a particular artifact
func (c *Client) UpdateArtifacts(ctx context.Context, jobId string, artifactStates map[string]string) (*Response, error) {
u := fmt.Sprintf("jobs/%s/artifacts", railsPathEscape(jobId))
payload := ArtifactBatchUpdateRequest{}

for id, state := range artifactStates {
payload.Artifacts = append(payload.Artifacts, &ArtifactBatchUpdateArtifact{id, state})
// UpdateArtifacts updates Buildkite with one or more artifact states.
func (c *Client) UpdateArtifacts(ctx context.Context, jobID string, artifactStates []ArtifactState) (*Response, error) {
u := fmt.Sprintf("jobs/%s/artifacts", railsPathEscape(jobID))
payload := ArtifactBatchUpdateRequest{
Artifacts: artifactStates,
}

req, err := c.newRequest(ctx, "PUT", u, payload)
if err != nil {
return nil, err
}

resp, err := c.doRequest(req, nil)
if err != nil {
return resp, err
}

return resp, err
return c.doRequest(req, nil)
}

// SearchArtifacts searches Buildkite for a set of artifacts
func (c *Client) SearchArtifacts(ctx context.Context, buildId string, opt *ArtifactSearchOptions) ([]*Artifact, *Response, error) {
u := fmt.Sprintf("builds/%s/artifacts/search", railsPathEscape(buildId))
func (c *Client) SearchArtifacts(ctx context.Context, buildID string, opt *ArtifactSearchOptions) ([]*Artifact, *Response, error) {
u := fmt.Sprintf("builds/%s/artifacts/search", railsPathEscape(buildID))
u, err := addOptions(u, opt)
if err != nil {
return nil, nil, err
Expand Down
23 changes: 13 additions & 10 deletions clicommand/agent_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,15 @@ type AgentStartConfig struct {
TracingServiceName string `cli:"tracing-service-name"`

// Global flags
Debug bool `cli:"debug"`
LogLevel string `cli:"log-level"`
NoColor bool `cli:"no-color"`
Experiments []string `cli:"experiment" normalize:"list"`
Profile string `cli:"profile"`
StrictSingleHooks bool `cli:"strict-single-hooks"`
KubernetesExec bool `cli:"kubernetes-exec"`
TraceContextEncoding string `cli:"trace-context-encoding"`
Debug bool `cli:"debug"`
LogLevel string `cli:"log-level"`
NoColor bool `cli:"no-color"`
Experiments []string `cli:"experiment" normalize:"list"`
Profile string `cli:"profile"`
StrictSingleHooks bool `cli:"strict-single-hooks"`
KubernetesExec bool `cli:"kubernetes-exec"`
TraceContextEncoding string `cli:"trace-context-encoding"`
NoMultipartArtifactUpload bool `cli:"no-multipart-artifact-upload"`

// API config
DebugHTTP bool `cli:"debug-http"`
Expand Down Expand Up @@ -704,6 +705,7 @@ var AgentStartCommand = cli.Command{
StrictSingleHooksFlag,
KubernetesExecFlag,
TraceContextEncodingFlag,
NoMultipartArtifactUploadFlag,

// Deprecated flags which will be removed in v4
cli.StringSliceFlag{
Expand Down Expand Up @@ -994,15 +996,16 @@ var AgentStartCommand = cli.Command{
TracingBackend: cfg.TracingBackend,
TracingServiceName: cfg.TracingServiceName,
TraceContextEncoding: cfg.TraceContextEncoding,
VerificationFailureBehaviour: cfg.VerificationFailureBehavior,
AllowMultipartArtifactUpload: !cfg.NoMultipartArtifactUpload,
KubernetesExec: cfg.KubernetesExec,

SigningJWKSFile: cfg.SigningJWKSFile,
SigningJWKSKeyID: cfg.SigningJWKSKeyID,
SigningAWSKMSKey: cfg.SigningAWSKMSKey,
DebugSigning: cfg.DebugSigning,

VerificationJWKS: verificationJWKS,
VerificationJWKS: verificationJWKS,
VerificationFailureBehaviour: cfg.VerificationFailureBehavior,

DisableWarningsFor: cfg.DisableWarningsFor,
}
Expand Down
4 changes: 4 additions & 0 deletions clicommand/artifact_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type ArtifactUploadConfig struct {
// Uploader flags
GlobResolveFollowSymlinks bool `cli:"glob-resolve-follow-symlinks"`
UploadSkipSymlinks bool `cli:"upload-skip-symlinks"`
NoMultipartUpload bool `cli:"no-multipart-artifact-upload"`

// deprecated
FollowSymlinks bool `cli:"follow-symlinks" deprecated-and-renamed-to:"GlobResolveFollowSymlinks"`
Expand Down Expand Up @@ -138,6 +139,7 @@ var ArtifactUploadCommand = cli.Command{
LogLevelFlag,
ExperimentsFlag,
ProfileFlag,
NoMultipartArtifactUploadFlag,
},
Action: func(c *cli.Context) error {
ctx := context.Background()
Expand All @@ -155,6 +157,8 @@ var ArtifactUploadCommand = cli.Command{
ContentType: cfg.ContentType,
DebugHTTP: cfg.DebugHTTP,

AllowMultipart: !cfg.NoMultipartUpload,

// If the deprecated flag was set to true, pretend its replacement was set to true too
// this works as long as the user only sets one of the two flags
GlobResolveFollowSymlinks: (cfg.GlobResolveFollowSymlinks || cfg.FollowSymlinks),
Expand Down
6 changes: 6 additions & 0 deletions clicommand/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ var (
EnvVar: "BUILDKITE_KUBERNETES_EXEC",
}

NoMultipartArtifactUploadFlag = cli.BoolFlag{
Name: "no-multipart-artifact-upload",
Usage: "For Buildkite-hosted artifacts, disables the use of multipart uploads. Has no effect on uploads to other destinations such as custom cloud buckets",
EnvVar: "BUILDKITE_NO_MULTIPART_ARTIFACT_UPLOAD",
}

ExperimentsFlag = cli.StringSliceFlag{
Name: "experiment",
Value: &cli.StringSlice{},
Expand Down
2 changes: 1 addition & 1 deletion internal/artifact/api_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ import (
type APIClient interface {
CreateArtifacts(context.Context, string, *api.ArtifactBatch) (*api.ArtifactBatchCreateResponse, *api.Response, error)
SearchArtifacts(context.Context, string, *api.ArtifactSearchOptions) ([]*api.Artifact, *api.Response, error)
UpdateArtifacts(context.Context, string, map[string]string) (*api.Response, error)
UpdateArtifacts(context.Context, string, []api.ArtifactState) (*api.Response, error)
}
52 changes: 33 additions & 19 deletions internal/artifact/artifactory_uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,50 +99,64 @@ func (u *ArtifactoryUploader) URL(artifact *api.Artifact) string {
return url.String()
}

func (u *ArtifactoryUploader) Upload(_ context.Context, artifact *api.Artifact) error {
func (u *ArtifactoryUploader) CreateWork(artifact *api.Artifact) ([]workUnit, error) {
return []workUnit{&artifactoryUploaderWork{
ArtifactoryUploader: u,
artifact: artifact,
}}, nil
}

type artifactoryUploaderWork struct {
*ArtifactoryUploader
artifact *api.Artifact
}

func (u *artifactoryUploaderWork) Artifact() *api.Artifact { return u.artifact }

func (u *artifactoryUploaderWork) Description() string {
return singleUnitDescription(u.artifact)
}

func (u *artifactoryUploaderWork) DoWork(context.Context) (*api.ArtifactPartETag, error) {
// Open file from filesystem
u.logger.Debug("Reading file \"%s\"", artifact.AbsolutePath)
f, err := os.Open(artifact.AbsolutePath)
u.logger.Debug("Reading file %q", u.artifact.AbsolutePath)
f, err := os.Open(u.artifact.AbsolutePath)
if err != nil {
return fmt.Errorf("failed to open file %q (%w)", artifact.AbsolutePath, err)
return nil, fmt.Errorf("failed to open file %q (%w)", u.artifact.AbsolutePath, err)
}

// Upload the file to Artifactory.
u.logger.Debug("Uploading \"%s\" to `%s`", artifact.Path, u.URL(artifact))
u.logger.Debug("Uploading %q to %q", u.artifact.Path, u.URL(u.artifact))

req, err := http.NewRequest("PUT", u.URL(artifact), f)
req, err := http.NewRequest("PUT", u.URL(u.artifact), f)
req.SetBasicAuth(u.user, u.password)
if err != nil {
return err
return nil, err
}

md5Checksum, err := checksumFile(md5.New(), artifact.AbsolutePath)
md5Checksum, err := checksumFile(md5.New(), u.artifact.AbsolutePath)
if err != nil {
return err
return nil, err
}
req.Header.Add("X-Checksum-MD5", md5Checksum)

sha1Checksum, err := checksumFile(sha1.New(), artifact.AbsolutePath)
sha1Checksum, err := checksumFile(sha1.New(), u.artifact.AbsolutePath)
if err != nil {
return err
return nil, err
}
req.Header.Add("X-Checksum-SHA1", sha1Checksum)

sha256Checksum, err := checksumFile(sha256.New(), artifact.AbsolutePath)
sha256Checksum, err := checksumFile(sha256.New(), u.artifact.AbsolutePath)
if err != nil {
return err
return nil, err
}
req.Header.Add("X-Checksum-SHA256", sha256Checksum)

res, err := u.client.Do(req)
if err != nil {
return err
}
if err := checkResponse(res); err != nil {
return err
return nil, err
}

return nil
return nil, checkResponse(res)
}

func checksumFile(hasher hash.Hash, path string) (string, error) {
Expand Down
Loading