Skip to content

Commit

Permalink
Support multipart uploads
Browse files Browse the repository at this point in the history
  • Loading branch information
DrJosh9000 committed Oct 1, 2024
1 parent 8d4b552 commit b52ee5a
Show file tree
Hide file tree
Showing 16 changed files with 667 additions and 335 deletions.
39 changes: 20 additions & 19 deletions agent/agent_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,24 @@ type AgentConfiguration struct {
VerificationJWKS any // The set of keys to verify jobs with
VerificationFailureBehaviour string // What to do if job verification fails (one of `block` or `warn`)

ANSITimestamps bool
TimestampLines bool
HealthCheckAddr string
DisconnectAfterJob bool
DisconnectAfterIdleTimeout int
CancelGracePeriod int
SignalGracePeriod time.Duration
EnableJobLogTmpfile bool
JobLogPath string
WriteJobLogsToStdout bool
LogFormat string
Shell string
Profile string
RedactedVars []string
AcquireJob string
TracingBackend string
TracingServiceName string
TraceContextEncoding string
DisableWarningsFor []string
ANSITimestamps bool
TimestampLines bool
HealthCheckAddr string
DisconnectAfterJob bool
DisconnectAfterIdleTimeout int
CancelGracePeriod int
SignalGracePeriod time.Duration
EnableJobLogTmpfile bool
JobLogPath string
WriteJobLogsToStdout bool
LogFormat string
Shell string
Profile string
RedactedVars []string
AcquireJob string
TracingBackend string
TracingServiceName string
TraceContextEncoding string
DisableWarningsFor []string
AllowMultipartArtifactUpload bool
}
2 changes: 1 addition & 1 deletion agent/api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions agent/job_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,10 @@ func (r *JobRunner) createEnvironment(ctx context.Context) ([]string, error) {
env["BUILDKITE_KUBERNETES_EXEC"] = "true"
}

if !r.conf.AgentConfiguration.AllowMultipartArtifactUpload {
env["BUILDKITE_NO_MULTIPART_ARTIFACT_UPLOAD"] = "true"
}

// propagate CancelSignal to bootstrap, unless it's the default SIGTERM
if r.conf.CancelSignal != process.SIGTERM {
env["BUILDKITE_CANCEL_SIGNAL"] = r.conf.CancelSignal.String()
Expand Down
88 changes: 56 additions & 32 deletions api/artifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,27 +51,47 @@ type Artifact struct {
}

type ArtifactBatch struct {
ID string `json:"id"`
Artifacts []*Artifact `json:"artifacts"`
UploadDestination string `json:"upload_destination"`
ID string `json:"id"`
Artifacts []*Artifact `json:"artifacts"`
UploadDestination string `json:"upload_destination"`
MultipartSupported bool `json:"multipart_supported,omitempty"`
}

// ArtifactUploadInstructions describes how to upload an artifact to Buildkite
// artifact storage.
type ArtifactUploadInstructions struct {
Data map[string]string `json:"data"`
// Used for a single-part upload.
Action ArtifactUploadAction `json:"action"`

// Used for a multi-part upload.
Actions []ArtifactUploadAction `json:"actions"`

// Contains other data necessary for interpreting instructions.
Data map[string]string `json:"data"`
}

// ArtifactUploadAction describes one action needed to upload an artifact or
// part of an artifact to Buildkite artifact storage.
type ArtifactUploadAction struct {
URL string `json:"url,omitempty"`
Method string `json:"method"`
Path string `json:"path"`
FileInput string `json:"file_input"`
URL string `json:"url,omitempty"`
Method string `json:"method"`
Path string `json:"path"`
FileInput string `json:"file_input"`
PartNumber int `json:"part_number,omitempty"`
}

type ArtifactBatchCreateResponse struct {
ID string `json:"id"`
ArtifactIDs []string `json:"artifact_ids"`
UploadInstructions *ArtifactUploadInstructions `json:"upload_instructions"`
ID string `json:"id"`
ArtifactIDs []string `json:"artifact_ids"`

// These instructions apply to all artifacts. The template contains
// variable interpolations such as ${artifact:path}.
InstructionsTemplate *ArtifactUploadInstructions `json:"upload_instructions"`

// These instructions apply to specific artifacts, necessary for multipart
// uploads. It overrides InstructionTemplate and should not contain
// interpolations. Map: artifact ID -> instructions for that artifact.
PerArtifactInstructions map[string]*ArtifactUploadInstructions `json:"per_artifact_instructions"`
}

// ArtifactSearchOptions specifies the optional parameters to the
Expand All @@ -84,18 +104,29 @@ type ArtifactSearchOptions struct {
IncludeDuplicates bool `url:"include_duplicates,omitempty"`
}

type ArtifactBatchUpdateArtifact struct {
ID string `json:"id"`
State string `json:"state"`
// ArtifactState represents the state of a single artifact, when calling UpdateArtifacts.
type ArtifactState struct {
ID string `json:"id"`
State string `json:"state"`
Multipart bool `json:"multipart,omitempty"`
// If this artifact was a multipart upload and is complete, we need the
// the ETag from each uploaded part so that they can be joined together.
MultipartETags []ArtifactPartETag `json:"multipart_etags,omitempty"`
}

// ArtifactPartETag associates an ETag to a part number for a multipart upload.
type ArtifactPartETag struct {
PartNumber int `json:"part_number"`
ETag string `json:"etag"`
}

type ArtifactBatchUpdateRequest struct {
Artifacts []*ArtifactBatchUpdateArtifact `json:"artifacts"`
Artifacts []ArtifactState `json:"artifacts"`
}

// CreateArtifacts takes a slice of artifacts, and creates them on Buildkite as a batch.
func (c *Client) CreateArtifacts(ctx context.Context, jobId string, batch *ArtifactBatch) (*ArtifactBatchCreateResponse, *Response, error) {
u := fmt.Sprintf("jobs/%s/artifacts", railsPathEscape(jobId))
func (c *Client) CreateArtifacts(ctx context.Context, jobID string, batch *ArtifactBatch) (*ArtifactBatchCreateResponse, *Response, error) {
u := fmt.Sprintf("jobs/%s/artifacts", railsPathEscape(jobID))

req, err := c.newRequest(ctx, "POST", u, batch)
if err != nil {
Expand All @@ -111,31 +142,24 @@ func (c *Client) CreateArtifacts(ctx context.Context, jobId string, batch *Artif
return createResponse, resp, err
}

// Updates a particular artifact
func (c *Client) UpdateArtifacts(ctx context.Context, jobId string, artifactStates map[string]string) (*Response, error) {
u := fmt.Sprintf("jobs/%s/artifacts", railsPathEscape(jobId))
payload := ArtifactBatchUpdateRequest{}

for id, state := range artifactStates {
payload.Artifacts = append(payload.Artifacts, &ArtifactBatchUpdateArtifact{id, state})
// UpdateArtifacts updates Buildkite with one or more artifact states.
func (c *Client) UpdateArtifacts(ctx context.Context, jobID string, artifactStates []ArtifactState) (*Response, error) {
u := fmt.Sprintf("jobs/%s/artifacts", railsPathEscape(jobID))
payload := ArtifactBatchUpdateRequest{
Artifacts: artifactStates,
}

req, err := c.newRequest(ctx, "PUT", u, payload)
if err != nil {
return nil, err
}

resp, err := c.doRequest(req, nil)
if err != nil {
return resp, err
}

return resp, err
return c.doRequest(req, nil)
}

// SearchArtifacts searches Buildkite for a set of artifacts
func (c *Client) SearchArtifacts(ctx context.Context, buildId string, opt *ArtifactSearchOptions) ([]*Artifact, *Response, error) {
u := fmt.Sprintf("builds/%s/artifacts/search", railsPathEscape(buildId))
func (c *Client) SearchArtifacts(ctx context.Context, buildID string, opt *ArtifactSearchOptions) ([]*Artifact, *Response, error) {
u := fmt.Sprintf("builds/%s/artifacts/search", railsPathEscape(buildID))
u, err := addOptions(u, opt)
if err != nil {
return nil, nil, err
Expand Down
23 changes: 13 additions & 10 deletions clicommand/agent_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,15 @@ type AgentStartConfig struct {
TracingServiceName string `cli:"tracing-service-name"`

// Global flags
Debug bool `cli:"debug"`
LogLevel string `cli:"log-level"`
NoColor bool `cli:"no-color"`
Experiments []string `cli:"experiment" normalize:"list"`
Profile string `cli:"profile"`
StrictSingleHooks bool `cli:"strict-single-hooks"`
KubernetesExec bool `cli:"kubernetes-exec"`
TraceContextEncoding string `cli:"trace-context-encoding"`
Debug bool `cli:"debug"`
LogLevel string `cli:"log-level"`
NoColor bool `cli:"no-color"`
Experiments []string `cli:"experiment" normalize:"list"`
Profile string `cli:"profile"`
StrictSingleHooks bool `cli:"strict-single-hooks"`
KubernetesExec bool `cli:"kubernetes-exec"`
TraceContextEncoding string `cli:"trace-context-encoding"`
NoMultipartArtifactUpload bool `cli:"no-multipart-artifact-upload"`

// API config
DebugHTTP bool `cli:"debug-http"`
Expand Down Expand Up @@ -704,6 +705,7 @@ var AgentStartCommand = cli.Command{
StrictSingleHooksFlag,
KubernetesExecFlag,
TraceContextEncodingFlag,
NoMultipartArtifactUploadFlag,

// Deprecated flags which will be removed in v4
cli.StringSliceFlag{
Expand Down Expand Up @@ -994,15 +996,16 @@ var AgentStartCommand = cli.Command{
TracingBackend: cfg.TracingBackend,
TracingServiceName: cfg.TracingServiceName,
TraceContextEncoding: cfg.TraceContextEncoding,
VerificationFailureBehaviour: cfg.VerificationFailureBehavior,
AllowMultipartArtifactUpload: !cfg.NoMultipartArtifactUpload,
KubernetesExec: cfg.KubernetesExec,

SigningJWKSFile: cfg.SigningJWKSFile,
SigningJWKSKeyID: cfg.SigningJWKSKeyID,
SigningAWSKMSKey: cfg.SigningAWSKMSKey,
DebugSigning: cfg.DebugSigning,

VerificationJWKS: verificationJWKS,
VerificationJWKS: verificationJWKS,
VerificationFailureBehaviour: cfg.VerificationFailureBehavior,

DisableWarningsFor: cfg.DisableWarningsFor,
}
Expand Down
4 changes: 4 additions & 0 deletions clicommand/artifact_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type ArtifactUploadConfig struct {
// Uploader flags
GlobResolveFollowSymlinks bool `cli:"glob-resolve-follow-symlinks"`
UploadSkipSymlinks bool `cli:"upload-skip-symlinks"`
NoMultipartUpload bool `cli:"no-multipart-artifact-upload"`

// deprecated
FollowSymlinks bool `cli:"follow-symlinks" deprecated-and-renamed-to:"GlobResolveFollowSymlinks"`
Expand Down Expand Up @@ -138,6 +139,7 @@ var ArtifactUploadCommand = cli.Command{
LogLevelFlag,
ExperimentsFlag,
ProfileFlag,
NoMultipartArtifactUploadFlag,
},
Action: func(c *cli.Context) error {
ctx := context.Background()
Expand All @@ -155,6 +157,8 @@ var ArtifactUploadCommand = cli.Command{
ContentType: cfg.ContentType,
DebugHTTP: cfg.DebugHTTP,

AllowMultipart: !cfg.NoMultipartUpload,

// If the deprecated flag was set to true, pretend its replacement was set to true too
// this works as long as the user only sets one of the two flags
GlobResolveFollowSymlinks: (cfg.GlobResolveFollowSymlinks || cfg.FollowSymlinks),
Expand Down
6 changes: 6 additions & 0 deletions clicommand/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ var (
EnvVar: "BUILDKITE_KUBERNETES_EXEC",
}

NoMultipartArtifactUploadFlag = cli.BoolFlag{
Name: "no-multipart-artifact-upload",
Usage: "For Buildkite-hosted artifacts, disables the use of multipart uploads. Has no effect on uploads to other destinations such as custom cloud buckets",
EnvVar: "BUILDKITE_NO_MULTIPART_ARTIFACT_UPLOAD",
}

ExperimentsFlag = cli.StringSliceFlag{
Name: "experiment",
Value: &cli.StringSlice{},
Expand Down
2 changes: 1 addition & 1 deletion internal/artifact/api_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ import (
type APIClient interface {
CreateArtifacts(context.Context, string, *api.ArtifactBatch) (*api.ArtifactBatchCreateResponse, *api.Response, error)
SearchArtifacts(context.Context, string, *api.ArtifactSearchOptions) ([]*api.Artifact, *api.Response, error)
UpdateArtifacts(context.Context, string, map[string]string) (*api.Response, error)
UpdateArtifacts(context.Context, string, []api.ArtifactState) (*api.Response, error)
}
16 changes: 8 additions & 8 deletions internal/artifact/artifactory_uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,12 @@ func (u *artifactoryUploaderWork) Description() string {
return singleUnitDescription(u.artifact)
}

func (u *artifactoryUploaderWork) DoWork(context.Context) error {
func (u *artifactoryUploaderWork) DoWork(context.Context) (*api.ArtifactPartETag, error) {
// Open file from filesystem
u.logger.Debug("Reading file %q", u.artifact.AbsolutePath)
f, err := os.Open(u.artifact.AbsolutePath)
if err != nil {
return fmt.Errorf("failed to open file %q (%w)", u.artifact.AbsolutePath, err)
return nil, fmt.Errorf("failed to open file %q (%w)", u.artifact.AbsolutePath, err)
}

// Upload the file to Artifactory.
Expand All @@ -131,32 +131,32 @@ func (u *artifactoryUploaderWork) DoWork(context.Context) error {
req, err := http.NewRequest("PUT", u.URL(u.artifact), f)
req.SetBasicAuth(u.user, u.password)
if err != nil {
return err
return nil, err
}

md5Checksum, err := checksumFile(md5.New(), u.artifact.AbsolutePath)
if err != nil {
return err
return nil, err
}
req.Header.Add("X-Checksum-MD5", md5Checksum)

sha1Checksum, err := checksumFile(sha1.New(), u.artifact.AbsolutePath)
if err != nil {
return err
return nil, err
}
req.Header.Add("X-Checksum-SHA1", sha1Checksum)

sha256Checksum, err := checksumFile(sha256.New(), u.artifact.AbsolutePath)
if err != nil {
return err
return nil, err
}
req.Header.Add("X-Checksum-SHA256", sha256Checksum)

res, err := u.client.Do(req)
if err != nil {
return err
return nil, err
}
return checkResponse(res)
return nil, checkResponse(res)
}

func checksumFile(hasher hash.Hash, path string) (string, error) {
Expand Down
6 changes: 3 additions & 3 deletions internal/artifact/azure_blob_uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,11 @@ func (u *azureBlobUploaderWork) Description() string {
}

// DoWork uploads an artifact file.
func (u *azureBlobUploaderWork) DoWork(ctx context.Context) error {
func (u *azureBlobUploaderWork) DoWork(ctx context.Context) (*api.ArtifactPartETag, error) {
u.logger.Debug("Reading file %q", u.artifact.AbsolutePath)
f, err := os.Open(u.artifact.AbsolutePath)
if err != nil {
return fmt.Errorf("failed to open file %q (%w)", u.artifact.AbsolutePath, err)
return nil, fmt.Errorf("failed to open file %q (%w)", u.artifact.AbsolutePath, err)
}
defer f.Close()

Expand All @@ -121,5 +121,5 @@ func (u *azureBlobUploaderWork) DoWork(ctx context.Context) error {

bbc := u.client.NewContainerClient(u.loc.ContainerName).NewBlockBlobClient(blobName)
_, err = bbc.UploadFile(ctx, f, nil)
return err
return nil, err
}
5 changes: 4 additions & 1 deletion internal/artifact/batch_creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ type BatchCreatorConfig struct {
// CreateArtifactsTimeout, sets a context.WithTimeout around the CreateArtifacts API.
// If it's zero, there's no context timeout and the default HTTP timeout will prevail.
CreateArtifactsTimeout time.Duration

// Whether to allow multipart uploads to the BK-hosted bucket.
AllowMultipart bool
}

type BatchCreator struct {
Expand Down Expand Up @@ -63,7 +66,7 @@ func (a *BatchCreator) Create(ctx context.Context) ([]*api.Artifact, error) {
ID: api.NewUUID(),
Artifacts: theseArtifacts,
UploadDestination: a.conf.UploadDestination,
MultipartSupported: true,
MultipartSupported: a.conf.AllowMultipart,
}

a.logger.Info("Creating (%d-%d)/%d artifacts", i, j, length)
Expand Down
Loading

0 comments on commit b52ee5a

Please sign in to comment.