Skip to content

Commit

Permalink
fix: update batch handling to ensure each operation has its own uniqu…
Browse files Browse the repository at this point in the history
…e idempotency-token (#2905)
  • Loading branch information
BenWhitehead authored Jan 30, 2025
1 parent 2a5242e commit 8d79b8d
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,23 +63,27 @@ final class JsonResumableSession {
new JsonResumableSessionPutTask(
context, resumableWrite.getUploadId(), content, contentRange);
HttpRpcContext httpRpcContext = HttpRpcContext.getInstance();
httpRpcContext.newInvocationId();
AtomicBoolean dirty = new AtomicBoolean(false);
return Retrying.run(
deps,
alg,
() -> {
if (dirty.getAndSet(true)) {
ResumableOperationResult<@Nullable StorageObject> query = query();
long persistedSize = query.getPersistedSize();
if (contentRange.endOffsetEquals(persistedSize) || query.getObject() != null) {
return query;
} else {
task.rewindTo(persistedSize);
try {
httpRpcContext.newInvocationId();
AtomicBoolean dirty = new AtomicBoolean(false);
return Retrying.run(
deps,
alg,
() -> {
if (dirty.getAndSet(true)) {
ResumableOperationResult<@Nullable StorageObject> query = query();
long persistedSize = query.getPersistedSize();
if (contentRange.endOffsetEquals(persistedSize) || query.getObject() != null) {
return query;
} else {
task.rewindTo(persistedSize);
}
}
}
return task.call();
},
Decoder.identity());
return task.call();
},
Decoder.identity());
} finally {
httpRpcContext.clearInvocationId();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,12 @@
import com.google.api.client.util.Data;
import com.google.api.services.storage.Storage;
import com.google.api.services.storage.Storage.Objects.Compose;
import com.google.api.services.storage.Storage.Objects.Delete;
import com.google.api.services.storage.Storage.Objects.Get;
import com.google.api.services.storage.Storage.Objects.Insert;
import com.google.api.services.storage.Storage.Objects.Move;
import com.google.api.services.storage.Storage.Objects.Patch;
import com.google.api.services.storage.StorageRequest;
import com.google.api.services.storage.model.Bucket;
import com.google.api.services.storage.model.Bucket.RetentionPolicy;
import com.google.api.services.storage.model.BucketAccessControl;
Expand Down Expand Up @@ -109,6 +112,7 @@ public class HttpStorageRpc implements StorageRpc {
// declare this HttpStatus code here as it's not included in java.net.HttpURLConnection
private static final int SC_REQUESTED_RANGE_NOT_SATISFIABLE = 416;
private static final boolean IS_RECORD_EVENTS = true;
private static final String X_GOOG_GCS_IDEMPOTENCY_TOKEN = "x-goog-gcs-idempotency-token";

private final StorageOptions options;
private final Storage storage;
Expand Down Expand Up @@ -208,7 +212,7 @@ public void intercept(HttpRequest request) throws IOException {
.filter(java.util.Objects::nonNull)
.collect(JOINER);
headers.set("x-goog-api-client", newValue);
headers.set("x-goog-gcs-idempotency-token", invocationId);
headers.set(X_GOOG_GCS_IDEMPOTENCY_TOKEN, invocationId);

String userAgent = headers.getUserAgent();
if ((userAgent == null
Expand Down Expand Up @@ -247,7 +251,9 @@ public void addDelete(
batches.add(storage.batch());
currentBatchSize = 0;
}
deleteCall(storageObject, options).queue(batches.getLast(), toJsonCallback(callback));
Delete call = deleteCall(storageObject, options);
addIdempotencyTokenToCall(call);
call.queue(batches.getLast(), toJsonCallback(callback));
currentBatchSize++;
} catch (IOException ex) {
throw translate(ex);
Expand All @@ -264,7 +270,9 @@ public void addPatch(
batches.add(storage.batch());
currentBatchSize = 0;
}
patchCall(storageObject, options).queue(batches.getLast(), toJsonCallback(callback));
Patch call = patchCall(storageObject, options);
addIdempotencyTokenToCall(call);
call.queue(batches.getLast(), toJsonCallback(callback));
currentBatchSize++;
} catch (IOException ex) {
throw translate(ex);
Expand All @@ -281,7 +289,9 @@ public void addGet(
batches.add(storage.batch());
currentBatchSize = 0;
}
getCall(storageObject, options).queue(batches.getLast(), toJsonCallback(callback));
Get call = getCall(storageObject, options);
addIdempotencyTokenToCall(call);
call.queue(batches.getLast(), toJsonCallback(callback));
currentBatchSize++;
} catch (IOException ex) {
throw translate(ex);
Expand Down Expand Up @@ -310,6 +320,12 @@ public void submit() {
span.end(HttpStorageRpcSpans.END_SPAN_OPTIONS);
}
}

private void addIdempotencyTokenToCall(StorageRequest<?> call) {
HttpRpcContext instance = HttpRpcContext.getInstance();
call.getRequestHeaders().set(X_GOOG_GCS_IDEMPOTENCY_TOKEN, instance.newInvocationId());
instance.clearInvocationId();
}
}

private static <T> JsonBatchCallback<T> toJsonCallback(final RpcBatch.Callback<T> callback) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import com.google.cloud.WriteChannel;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.DataGenerator;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.Storage.BlobTargetOption;
import com.google.cloud.storage.Storage.BlobWriteOption;
import com.google.cloud.storage.StorageBatch;
import com.google.cloud.storage.StorageBatchResult;
import com.google.cloud.storage.StorageException;
Expand All @@ -40,6 +43,14 @@
import com.google.cloud.storage.it.runner.annotations.StorageFixture;
import com.google.cloud.storage.it.runner.registry.Generator;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Clock;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -208,4 +219,46 @@ public void testBatchRequestFail() {
assertThat(e.getMessage()).contains("Invalid argument");
}
}

@Test
public void batchSuccessiveUpdatesWork() {
byte[] bytes = DataGenerator.base64Characters().genBytes(137);

List<BlobId> blobs =
IntStream.range(0, 2)
.mapToObj(
i -> {
BlobInfo info = BlobInfo.newBuilder(bucket, generator.randomObjectName()).build();
try (WriteChannel writer = storage.writer(info, BlobWriteOption.doesNotExist())) {
writer.write(ByteBuffer.wrap(bytes));
} catch (IOException e) {
throw new RuntimeException(e);
}
return info.getBlobId();
})
.collect(Collectors.toList());

OffsetDateTime now1 = Clock.systemUTC().instant().atOffset(ZoneOffset.UTC);

List<Blob> update1 =
storage.update(
blobs.stream()
.map(id -> BlobInfo.newBuilder(id).setCustomTimeOffsetDateTime(now1).build())
.collect(Collectors.toList()));

OffsetDateTime now2 = Clock.systemUTC().instant().atOffset(ZoneOffset.UTC);
List<Blob> update2 =
storage.update(
blobs.stream()
.map(id -> BlobInfo.newBuilder(id).setCustomTimeOffsetDateTime(now2).build())
.collect(Collectors.toList()));

assertThat(
update2.stream()
.filter(b -> !now2.equals(b.getCustomTimeOffsetDateTime()))
.map(BlobInfo::getBlobId)
.map(BlobId::toGsUtilUriWithGeneration)
.collect(Collectors.toList()))
.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import com.google.cloud.storage.Storage.BlobTargetOption;
import com.google.cloud.storage.Storage.BlobWriteOption;
import com.google.cloud.storage.Storage.BucketListOption;
import com.google.cloud.storage.StorageBatch;
import com.google.cloud.storage.StorageBatchResult;
import com.google.cloud.storage.StorageOptions;
import com.google.cloud.storage.it.runner.StorageITRunner;
import com.google.cloud.storage.it.runner.annotations.Backend;
Expand All @@ -42,6 +44,9 @@
import com.google.common.collect.ImmutableList;
import com.google.common.truth.IterableSubject;
import java.nio.ByteBuffer;
import java.time.Clock;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -179,4 +184,34 @@ public void resumableUpload() throws Exception {
// 4. Finalize session and put final 45B
assertAll(() -> subject.hasSize(4), () -> assertThat(actualXxd).isEqualTo(expectedXxd));
}

@Test
public void batch() throws Exception {
BlobInfo info1 = BlobInfo.newBuilder(bucket, generator.randomObjectName()).build();
BlobInfo info2 = BlobInfo.newBuilder(bucket, generator.randomObjectName()).build();
BlobInfo info3 = BlobInfo.newBuilder(bucket, generator.randomObjectName()).build();
storage.create(info1, BlobTargetOption.doesNotExist());
storage.create(info2, BlobTargetOption.doesNotExist());
storage.create(info3, BlobTargetOption.doesNotExist());

requestAuditing.clear();
OffsetDateTime now = Clock.systemUTC().instant().atOffset(ZoneOffset.UTC);

StorageBatch batch = storage.batch();
StorageBatchResult<Blob> r1 = batch.get(info1.getBlobId());
StorageBatchResult<Blob> r2 =
batch.update(info2.toBuilder().setCustomTimeOffsetDateTime(now).build());
StorageBatchResult<Boolean> r3 = batch.delete(info3.getBlobId());

batch.submit();
assertAll(
() -> assertThat(r1).isNotNull(),
() -> assertThat(r2.get().getCustomTimeOffsetDateTime()).isEqualTo(now),
() -> assertThat(r3.get()).isTrue(),
() -> {
IterableSubject subject =
requestAuditing.assertRequestHeader(X_GOOG_GCS_IDEMPOTENCY_TOKEN);
subject.hasSize(3);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,15 @@ public void testNewInvocationId() {
UUID uuid = UUID.fromString("28220dff-1e8b-4770-9e10-022c2a99d8f3");
HttpRpcContext testContext = new HttpRpcContext(() -> uuid);

assertThat(testContext.newInvocationId()).isEqualTo(uuid);
assertThat(testContext.getInvocationId()).isEqualTo(uuid);
// call again to ensure the id is consistent with our supplier
assertThat(testContext.newInvocationId()).isEqualTo(uuid);
assertThat(testContext.getInvocationId()).isEqualTo(uuid);
try {
assertThat(testContext.newInvocationId()).isEqualTo(uuid);
assertThat(testContext.getInvocationId()).isEqualTo(uuid);
// call again to ensure the id is consistent with our supplier
assertThat(testContext.newInvocationId()).isEqualTo(uuid);
assertThat(testContext.getInvocationId()).isEqualTo(uuid);
} finally {
testContext.clearInvocationId();
}
}

@Test
Expand Down

0 comments on commit 8d79b8d

Please sign in to comment.