diff --git a/aws-android-sdk-appsync/build.gradle b/aws-android-sdk-appsync/build.gradle
index e71941db..3f2a9e7a 100644
--- a/aws-android-sdk-appsync/build.gradle
+++ b/aws-android-sdk-appsync/build.gradle
@@ -49,6 +49,7 @@ dependencies {
testImplementation 'org.mockito:mockito-core:3.2.4'
testImplementation "com.amazonaws:aws-android-sdk-cognitoidentityprovider:$aws_version"
testImplementation project(':aws-android-sdk-appsync-runtime')
+ testImplementation("com.squareup.okhttp3:mockwebserver:4.3.1")
implementation ("com.amazonaws:aws-android-sdk-mobile-client:$aws_version@aar") { transitive = true }
implementation ("com.amazonaws:aws-android-sdk-auth-userpools:$aws_version@aar") { transitive = true }
diff --git a/aws-android-sdk-appsync/src/main/java/com/amazonaws/mobileconnectors/appsync/retry/RetryInterceptor.java b/aws-android-sdk-appsync/src/main/java/com/amazonaws/mobileconnectors/appsync/retry/RetryInterceptor.java
index 4418f370..e66e8884 100644
--- a/aws-android-sdk-appsync/src/main/java/com/amazonaws/mobileconnectors/appsync/retry/RetryInterceptor.java
+++ b/aws-android-sdk-appsync/src/main/java/com/amazonaws/mobileconnectors/appsync/retry/RetryInterceptor.java
@@ -58,6 +58,7 @@ public Response intercept(Chain chain) throws IOException {
if (retryAfterHeaderValue != null) {
try {
waitMillis = Integer.parseInt(retryAfterHeaderValue) * 1000;
+ response.close();
continue;
} catch (NumberFormatException e) {
Log.w(TAG, "Could not parse Retry-After header: " + retryAfterHeaderValue);
@@ -69,6 +70,7 @@ public Response intercept(Chain chain) throws IOException {
if ((response.code() >= 500 && response.code() < 600)
|| response.code() == 429 ) {
waitMillis = calculateBackoff(retryCount);
+ response.close();
continue;
}
diff --git a/aws-android-sdk-appsync/src/test/java/com/amazonaws/mobileconnectors/appsync/retry/RetryInterceptorTest.java b/aws-android-sdk-appsync/src/test/java/com/amazonaws/mobileconnectors/appsync/retry/RetryInterceptorTest.java
new file mode 100644
index 00000000..4f003e04
--- /dev/null
+++ b/aws-android-sdk-appsync/src/test/java/com/amazonaws/mobileconnectors/appsync/retry/RetryInterceptorTest.java
@@ -0,0 +1,142 @@
+/**
+ * Copyright 2021 Amazon.com,,
+ * Inc. or its affiliates. All Rights Reserved.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package com.amazonaws.mobileconnectors.appsync.retry;
+
+import android.support.annotation.NonNull;
+
+import com.amazonaws.mobileconnectors.appsync.util.Await;
+
+import org.jetbrains.annotations.NotNull;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.robolectric.RobolectricTestRunner;
+import org.robolectric.annotation.Config;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+import okhttp3.Call;
+import okhttp3.Callback;
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for retry interceptor.
+ */
+@RunWith(RobolectricTestRunner.class)
+@Config(manifest = "AndroidManifest.xml")
+public class RetryInterceptorTest {
+ public static final int TEST_TIMEOUT = 10;
+ private MockWebServer mockWebServer;
+ private OkHttpClient okHttpClient;
+
+ @Before
+ public void beforeEachTest() throws IOException {
+ mockWebServer = new MockWebServer();
+ mockWebServer.start(8888);
+
+ okHttpClient = new OkHttpClient.Builder()
+ .addInterceptor(new RetryInterceptor())
+ .build();
+ }
+
+ @After
+ public void afterEachTest() throws IOException {
+ mockWebServer.shutdown();
+ }
+
+ /**
+ * Verify that everything works when the first attempt succeeds.
+ * @throws IOException Not expected
+ * @throws InterruptedException Not expected
+ */
+ @Test
+ public void successfulRequestWithoutFailuresTest() throws Throwable {
+ mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody("{\"result\":\"all good\""));
+
+ final Request request = new Request.Builder()
+ .url("http://localhost:8888")
+ .method("POST", RequestBody.create("{}", MediaType.get("application/json")))
+ .build();
+
+ Response response = Await.result(new Await.ResultErrorEmitter() {
+ @Override
+ public void emitTo(@NonNull Consumer onResult, @NonNull Consumer onError) {
+ okHttpClient.newCall(request).enqueue(new OkHttpCallback(onResult, onError));
+ }
+ });
+ assertTrue(response.body().string().contains("all good"));
+ }
+
+ /**
+ * Verify that retries happen successfully without leaving the previous response open.
+ * This test was created as a result of a Github issue
+ * https://github.com/awslabs/aws-mobile-appsync-sdk-android/issues/305.
+ * @throws IOException Not expected
+ * @throws InterruptedException Not expected
+ */
+ @Test
+ public void successfulRequestWithFailuresTest() throws Throwable {
+ mockWebServer.enqueue(new MockResponse().setResponseCode(500).setBody("{\"error\":\"some exception\""));
+ mockWebServer.enqueue(new MockResponse().setResponseCode(501).setBody("{\"error\":\"another exception\"").setHeader("Retry-After", "1"));
+ mockWebServer.enqueue(new MockResponse().setResponseCode(500).setBody("{\"error\":\"some exception\""));
+ mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody("{\"result\":\"all good\""));
+
+ final Request request = new Request.Builder()
+ .url("http://localhost:8888")
+ .method("POST", RequestBody.create("{}", MediaType.get("application/json")))
+ .build();
+
+ mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody("{\"result\":\"all good\""));
+
+ Response response = Await.result(new Await.ResultErrorEmitter() {
+ @Override
+ public void emitTo(@NonNull Consumer onResult, @NonNull Consumer onError) {
+ okHttpClient.newCall(request).enqueue(new OkHttpCallback(onResult, onError));
+ }
+ });
+ assertTrue(response.body().string().contains("all good"));
+ }
+
+ /**
+ * Wrapper class that takes the two Consumer callbacks from the Await.result function
+ * and uses them to emit result or error.
+ */
+ static final class OkHttpCallback implements Callback {
+ private final AtomicBoolean success = new AtomicBoolean(false);
+ private final Consumer onResponse;
+ private final Consumer onError;
+
+ public OkHttpCallback(Consumer onResponse, Consumer onError) {
+ this.onResponse = onResponse;
+ this.onError = onError;
+ }
+
+ @Override
+ public void onFailure(@NotNull Call call, @NotNull IOException e) {
+ onError.accept(e);
+ }
+
+ @Override
+ public void onResponse(@NotNull Call call, @NotNull Response response) throws IOException {
+ onResponse.accept(response);
+ }
+ }
+}
diff --git a/aws-android-sdk-appsync/src/test/java/com/amazonaws/mobileconnectors/appsync/util/Await.java b/aws-android-sdk-appsync/src/test/java/com/amazonaws/mobileconnectors/appsync/util/Await.java
new file mode 100644
index 00000000..e0a8dc12
--- /dev/null
+++ b/aws-android-sdk-appsync/src/test/java/com/amazonaws/mobileconnectors/appsync/util/Await.java
@@ -0,0 +1,202 @@
+/*
+ * Copyright 2021 Amazon.com,
+ * Inc. or its affiliates. All Rights Reserved.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package com.amazonaws.mobileconnectors.appsync.util;
+
+import android.support.annotation.NonNull;
+
+import java.util.Objects;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+/**
+ * A utility to await a value from an async function, in a synchronous way.
+ */
+@SuppressWarnings({"WeakerAccess", "SameParameterValue", "unused", "UnusedReturnValue"})
+public final class Await {
+ private static final long DEFAULT_WAIT_TIME_MS = TimeUnit.SECONDS.toMillis(10);
+
+ private Await() {}
+
+ /**
+ * Await a latch to count down.
+ * @param latch Latch for which count down is awaited
+ * @param waitTimeMs Time in milliseconds to wait for count down before timing out with exception
+ * @throws RuntimeException If the latch doesn't count down in the specified amount of time
+ */
+ public static void latch(CountDownLatch latch, long waitTimeMs) {
+ try {
+ latch.await(waitTimeMs, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException interruptedException) {
+ throw new RuntimeException(interruptedException);
+ }
+ if (latch.getCount() != 0) {
+ throw new RuntimeException("Latch did not count down.");
+ }
+ }
+
+ /**
+ * Await a latch to count down, for a "reasonable" amount of time.
+ * Note: we choose what "reasonable" means. If you want to choose, use
+ * {@link #latch(CountDownLatch, long)}, instead.
+ * @param latch A count down latch for which countdown is awaited
+ * @throws RuntimeException If the latch doesn't count down in a reasonable amount of time
+ */
+ public static void latch(@NonNull CountDownLatch latch) {
+ latch(latch, DEFAULT_WAIT_TIME_MS);
+ }
+
+ /**
+ * Awaits emission of either a result of an error.
+ * Blocks the thread of execution until either the value is
+ * available, of a default timeout has elapsed.
+ * @param resultErrorEmitter A function which emits result or error
+ * @param Type of result
+ * @param type of error
+ * @return The result
+ * @throws E if error is emitted
+ * @throws RuntimeException In all other situations where there is not a non-null result
+ */
+ @NonNull
+ public static R result(
+ @NonNull ResultErrorEmitter resultErrorEmitter) throws E {
+ return result(DEFAULT_WAIT_TIME_MS, resultErrorEmitter);
+ }
+
+ /**
+ * Await emission of either a result or an error.
+ * Blocks the thread of execution until either the value is available,
+ * or the timeout is reached.
+ * @param timeMs Amount of time to wait
+ * @param resultErrorEmitter A function which emits result or error
+ * @param Type of result
+ * @param Type of error
+ * @return The result
+ * @throws E if error is emitted
+ * @throws RuntimeException In all other situations where there is not a non-null result
+ */
+ @NonNull
+ public static R result(
+ long timeMs, @NonNull ResultErrorEmitter resultErrorEmitter) throws E {
+
+ Objects.requireNonNull(resultErrorEmitter);
+
+ AtomicReference resultContainer = new AtomicReference<>();
+ AtomicReference errorContainer = new AtomicReference<>();
+
+ await(timeMs, resultErrorEmitter, resultContainer, errorContainer);
+
+ R result = resultContainer.get();
+ E error = errorContainer.get();
+ if (error != null) {
+ throw error;
+ } else if (result != null) {
+ return result;
+ }
+
+ throw new IllegalStateException("Latch counted down, but where's the value?");
+ }
+
+ /**
+ * Awaits receipt of an error or a callback.
+ * Blocks the thread of execution until it arrives, or until the wait times out.
+ * @param resultErrorEmitter An emitter of result of error
+ * @param Type of result
+ * @param Type of error
+ * @return The error that was emitted by the emitter
+ * @throws RuntimeException If no error was emitted by emitter
+ */
+ @NonNull
+ public static E error(@NonNull ResultErrorEmitter resultErrorEmitter) {
+ return error(DEFAULT_WAIT_TIME_MS, resultErrorEmitter);
+ }
+
+ /**
+ * Awaits receipt of an error on an error callback.
+ * Blocks the calling thread until it shows up, or until timeout elapses.
+ * @param timeMs Amount of time to wait
+ * @param resultErrorEmitter A function which emits result or error
+ * @param Type of result
+ * @param Type of error
+ * @return Error, if attained
+ * @throws RuntimeException If no error is emitted by the emitter
+ */
+ @NonNull
+ public static E error(
+ long timeMs, @NonNull ResultErrorEmitter resultErrorEmitter) {
+
+ Objects.requireNonNull(resultErrorEmitter);
+
+ AtomicReference resultContainer = new AtomicReference<>();
+ AtomicReference errorContainer = new AtomicReference<>();
+
+ await(timeMs, resultErrorEmitter, resultContainer, errorContainer);
+
+ R result = resultContainer.get();
+ E error = errorContainer.get();
+ if (result != null) {
+ throw new RuntimeException("Expected error, but had result = " + result);
+ } else if (error != null) {
+ return error;
+ }
+
+ throw new RuntimeException("Neither error nor result consumers accepted a value.");
+ }
+
+ private static void await(
+ long timeMs,
+ @NonNull final ResultErrorEmitter resultErrorEmitter,
+ @NonNull final AtomicReference resultContainer,
+ @NonNull final AtomicReference errorContainer) {
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ resultErrorEmitter.emitTo(
+ new Consumer() {
+ @Override
+ public void accept(R result) {
+ resultContainer.set(result);
+ latch.countDown();
+ }
+ }, new Consumer() {
+ @Override
+ public void accept(E error) {
+ errorContainer.set(error);
+ latch.countDown();
+ }
+ }
+ );
+
+ try {
+ latch.await(timeMs, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException interruptedException) {
+ // Will check the latch count regardless, and branch appropriately.
+ }
+ if (latch.getCount() != 0) {
+ throw new RuntimeException(
+ "Neither result nor error consumers accepted a value within " + timeMs + "ms."
+ );
+ }
+ }
+
+ /**
+ * A function which, upon completion, either emits a single result,
+ * or emits an error.
+ * @param Type of result
+ * @param Type of error
+ */
+ public interface ResultErrorEmitter {
+ /**
+ * A function that emits a value upon completion, either as a
+ * result or as an error.
+ * @param onResult Callback invoked upon emission of result
+ * @param onError Callback invoked upon emission of error
+ */
+ void emitTo(@NonNull Consumer onResult, @NonNull Consumer onError);
+ }
+}