Skip to content

Commit

Permalink
fix(storage): propagate ctx from invoke to grpc upload reqs (#11475)
Browse files Browse the repository at this point in the history
  • Loading branch information
BrennaEpp authored Jan 27, 2025
1 parent 96dbb6c commit 9ad9d76
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 28 deletions.
33 changes: 14 additions & 19 deletions storage/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1725,12 +1725,12 @@ func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storage

var o *storagepb.Object
uploadBuff := func(ctx context.Context) error {
obj, err := gw.uploadBuffer(recvd, offset, doneReading)
obj, err := gw.uploadBuffer(ctx, recvd, offset, doneReading)
o = obj
return err
}

err = run(gw.ctx, uploadBuff, gw.settings.retry, s.idempotent)
err = run(bucketContext(gw.ctx, gw.bucket), uploadBuff, gw.settings.retry, s.idempotent)
if err != nil {
return err
}
Expand Down Expand Up @@ -2666,11 +2666,10 @@ type gRPCBidiWriteBufferSender interface {
// If flush is true, implementations must not return until the data in buf is
// stable. If finishWrite is true, implementations must return the object on
// success.
sendBuffer(buf []byte, offset int64, flush, finishWrite bool) (*storagepb.Object, error)
sendBuffer(ctx context.Context, buf []byte, offset int64, flush, finishWrite bool) (*storagepb.Object, error)
}

type gRPCOneshotBidiWriteBufferSender struct {
ctx context.Context
firstMessage *storagepb.BidiWriteObjectRequest
raw *gapic.Client
stream storagepb.Storage_BidiWriteObjectClient
Expand All @@ -2691,17 +2690,16 @@ func (w *gRPCWriter) newGRPCOneshotBidiWriteBufferSender() (*gRPCOneshotBidiWrit
}

return &gRPCOneshotBidiWriteBufferSender{
ctx: bucketContext(w.ctx, w.bucket),
firstMessage: firstMessage,
raw: w.c.raw,
settings: w.settings,
}, nil
}

func (s *gRPCOneshotBidiWriteBufferSender) sendBuffer(buf []byte, offset int64, flush, finishWrite bool) (obj *storagepb.Object, err error) {
func (s *gRPCOneshotBidiWriteBufferSender) sendBuffer(ctx context.Context, buf []byte, offset int64, flush, finishWrite bool) (obj *storagepb.Object, err error) {
var firstMessage *storagepb.BidiWriteObjectRequest
if s.stream == nil {
s.stream, err = s.raw.BidiWriteObject(s.ctx, s.settings.gax...)
s.stream, err = s.raw.BidiWriteObject(ctx, s.settings.gax...)
if err != nil {
return
}
Expand Down Expand Up @@ -2737,7 +2735,6 @@ func (s *gRPCOneshotBidiWriteBufferSender) sendBuffer(buf []byte, offset int64,
}

type gRPCResumableBidiWriteBufferSender struct {
ctx context.Context
queryRetry *retryConfig
upid string
progress func(int64)
Expand All @@ -2748,7 +2745,7 @@ type gRPCResumableBidiWriteBufferSender struct {
settings *settings
}

func (w *gRPCWriter) newGRPCResumableBidiWriteBufferSender() (*gRPCResumableBidiWriteBufferSender, error) {
func (w *gRPCWriter) newGRPCResumableBidiWriteBufferSender(ctx context.Context) (*gRPCResumableBidiWriteBufferSender, error) {
req := &storagepb.StartResumableWriteRequest{
WriteObjectSpec: w.spec,
CommonObjectRequestParams: toProtoCommonObjectRequestParams(w.encryptionKey),
Expand All @@ -2758,7 +2755,6 @@ func (w *gRPCWriter) newGRPCResumableBidiWriteBufferSender() (*gRPCResumableBidi
ObjectChecksums: toProtoChecksums(w.sendCRC32C, w.attrs),
}

ctx := bucketContext(w.ctx, w.bucket)
var upid string
err := run(ctx, func(ctx context.Context) error {
upres, err := w.c.raw.StartResumableWrite(ctx, req, w.settings.gax...)
Expand All @@ -2778,7 +2774,6 @@ func (w *gRPCWriter) newGRPCResumableBidiWriteBufferSender() (*gRPCResumableBidi
}

return &gRPCResumableBidiWriteBufferSender{
ctx: ctx,
queryRetry: w.settings.retry,
upid: upid,
progress: w.progress,
Expand All @@ -2791,9 +2786,9 @@ func (w *gRPCWriter) newGRPCResumableBidiWriteBufferSender() (*gRPCResumableBidi

// queryProgress is a helper that queries the status of the resumable upload
// associated with the given upload ID.
func (s *gRPCResumableBidiWriteBufferSender) queryProgress() (int64, error) {
func (s *gRPCResumableBidiWriteBufferSender) queryProgress(ctx context.Context) (int64, error) {
var persistedSize int64
err := run(s.ctx, func(ctx context.Context) error {
err := run(ctx, func(ctx context.Context) error {
q, err := s.raw.QueryWriteStatus(ctx, &storagepb.QueryWriteStatusRequest{
UploadId: s.upid,
}, s.settings.gax...)
Expand All @@ -2805,15 +2800,15 @@ func (s *gRPCResumableBidiWriteBufferSender) queryProgress() (int64, error) {
return persistedSize, err
}

func (s *gRPCResumableBidiWriteBufferSender) sendBuffer(buf []byte, offset int64, flush, finishWrite bool) (obj *storagepb.Object, err error) {
func (s *gRPCResumableBidiWriteBufferSender) sendBuffer(ctx context.Context, buf []byte, offset int64, flush, finishWrite bool) (obj *storagepb.Object, err error) {
reconnected := false
if s.stream == nil {
// Determine offset and reconnect
s.flushOffset, err = s.queryProgress()
s.flushOffset, err = s.queryProgress(ctx)
if err != nil {
return
}
s.stream, err = s.raw.BidiWriteObject(s.ctx, s.settings.gax...)
s.stream, err = s.raw.BidiWriteObject(ctx, s.settings.gax...)
if err != nil {
return
}
Expand Down Expand Up @@ -2885,7 +2880,7 @@ func (s *gRPCResumableBidiWriteBufferSender) sendBuffer(buf []byte, offset int64
// The final Object is returned on success if doneReading is true.
//
// Returns object and any error that is not retriable.
func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (obj *storagepb.Object, err error) {
func (w *gRPCWriter) uploadBuffer(ctx context.Context, recvd int, start int64, doneReading bool) (obj *storagepb.Object, err error) {
if w.streamSender == nil {
if w.append {
// Appendable object semantics
Expand All @@ -2895,7 +2890,7 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (obj
w.streamSender, err = w.newGRPCOneshotBidiWriteBufferSender()
} else {
// Resumable write semantics
w.streamSender, err = w.newGRPCResumableBidiWriteBufferSender()
w.streamSender, err = w.newGRPCResumableBidiWriteBufferSender(ctx)
}
if err != nil {
return
Expand All @@ -2915,7 +2910,7 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (obj
l = len(data)
flush = true
}
obj, err = w.streamSender.sendBuffer(data[:l], offset, flush, flush && doneReading)
obj, err = w.streamSender.sendBuffer(ctx, data[:l], offset, flush, flush && doneReading)
if err != nil {
return nil, err
}
Expand Down
16 changes: 7 additions & 9 deletions storage/grpc_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
)

type gRPCAppendBidiWriteBufferSender struct {
ctx context.Context
bucket string
routingToken *string
raw *gapic.Client
Expand All @@ -51,7 +50,6 @@ type gRPCAppendBidiWriteBufferSender struct {

func (w *gRPCWriter) newGRPCAppendBidiWriteBufferSender() (*gRPCAppendBidiWriteBufferSender, error) {
s := &gRPCAppendBidiWriteBufferSender{
ctx: w.ctx,
bucket: w.spec.GetResource().GetBucket(),
raw: w.c.raw,
settings: w.c.settings,
Expand All @@ -68,7 +66,7 @@ func (w *gRPCWriter) newGRPCAppendBidiWriteBufferSender() (*gRPCAppendBidiWriteB
return s, nil
}

func (s *gRPCAppendBidiWriteBufferSender) connect() (err error) {
func (s *gRPCAppendBidiWriteBufferSender) connect(ctx context.Context) (err error) {
err = func() error {
// If this is a forced first message, we've already determined it's safe to
// send.
Expand Down Expand Up @@ -107,19 +105,19 @@ func (s *gRPCAppendBidiWriteBufferSender) connect() (err error) {
return err
}

return s.startReceiver()
return s.startReceiver(ctx)
}

func (s *gRPCAppendBidiWriteBufferSender) withRequestParams(ctx context.Context) context.Context {
param := fmt.Sprintf("appendable=true&bucket=%s", s.bucket)
if s.routingToken != nil {
param = param + fmt.Sprintf("&routing_token=%s", *s.routingToken)
}
return gax.InsertMetadataIntoOutgoingContext(s.ctx, "x-goog-request-params", param)
return gax.InsertMetadataIntoOutgoingContext(ctx, "x-goog-request-params", param)
}

func (s *gRPCAppendBidiWriteBufferSender) startReceiver() (err error) {
s.stream, err = s.raw.BidiWriteObject(s.withRequestParams(s.ctx), s.settings.gax...)
func (s *gRPCAppendBidiWriteBufferSender) startReceiver(ctx context.Context) (err error) {
s.stream, err = s.raw.BidiWriteObject(s.withRequestParams(ctx), s.settings.gax...)
if err != nil {
return
}
Expand Down Expand Up @@ -282,12 +280,12 @@ func (s *gRPCAppendBidiWriteBufferSender) sendOnConnectedStream(buf []byte, offs
return
}

func (s *gRPCAppendBidiWriteBufferSender) sendBuffer(buf []byte, offset int64, flush, finishWrite bool) (obj *storagepb.Object, err error) {
func (s *gRPCAppendBidiWriteBufferSender) sendBuffer(ctx context.Context, buf []byte, offset int64, flush, finishWrite bool) (obj *storagepb.Object, err error) {
for {
sendFirstMessage := false
if s.stream == nil {
sendFirstMessage = true
if err = s.connect(); err != nil {
if err = s.connect(ctx); err != nil {
return
}
}
Expand Down

0 comments on commit 9ad9d76

Please sign in to comment.