Skip to content

Commit

Permalink
fix: retry interceptor now closes previous response (#332)
Browse files Browse the repository at this point in the history
* fix: retry interceptor now closes previous response

* Add Await class and test cleanup
rjuliano authored Mar 15, 2021
1 parent bd84ec1 commit 1aeaf29
Showing 4 changed files with 347 additions and 0 deletions.
1 change: 1 addition & 0 deletions aws-android-sdk-appsync/build.gradle
Original file line number Diff line number Diff line change
@@ -49,6 +49,7 @@ dependencies {
testImplementation 'org.mockito:mockito-core:3.2.4'
testImplementation "com.amazonaws:aws-android-sdk-cognitoidentityprovider:$aws_version"
testImplementation project(':aws-android-sdk-appsync-runtime')
testImplementation("com.squareup.okhttp3:mockwebserver:4.3.1")
implementation ("com.amazonaws:aws-android-sdk-mobile-client:$aws_version@aar") { transitive = true }
implementation ("com.amazonaws:aws-android-sdk-auth-userpools:$aws_version@aar") { transitive = true }

Original file line number Diff line number Diff line change
@@ -58,6 +58,7 @@ public Response intercept(Chain chain) throws IOException {
if (retryAfterHeaderValue != null) {
try {
waitMillis = Integer.parseInt(retryAfterHeaderValue) * 1000;
response.close();
continue;
} catch (NumberFormatException e) {
Log.w(TAG, "Could not parse Retry-After header: " + retryAfterHeaderValue);
@@ -69,6 +70,7 @@ public Response intercept(Chain chain) throws IOException {
if ((response.code() >= 500 && response.code() < 600)
|| response.code() == 429 ) {
waitMillis = calculateBackoff(retryCount);
response.close();
continue;
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/**
* Copyright 2021 Amazon.com,,
* Inc. or its affiliates. All Rights Reserved.
* <p>
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazonaws.mobileconnectors.appsync.retry;

import android.support.annotation.NonNull;

import com.amazonaws.mobileconnectors.appsync.util.Await;

import org.jetbrains.annotations.NotNull;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.robolectric.RobolectricTestRunner;
import org.robolectric.annotation.Config;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;

import static org.junit.Assert.assertTrue;

/**
* Tests for retry interceptor.
*/
@RunWith(RobolectricTestRunner.class)
@Config(manifest = "AndroidManifest.xml")
public class RetryInterceptorTest {
public static final int TEST_TIMEOUT = 10;
private MockWebServer mockWebServer;
private OkHttpClient okHttpClient;

@Before
public void beforeEachTest() throws IOException {
mockWebServer = new MockWebServer();
mockWebServer.start(8888);

okHttpClient = new OkHttpClient.Builder()
.addInterceptor(new RetryInterceptor())
.build();
}

@After
public void afterEachTest() throws IOException {
mockWebServer.shutdown();
}

/**
* Verify that everything works when the first attempt succeeds.
* @throws IOException Not expected
* @throws InterruptedException Not expected
*/
@Test
public void successfulRequestWithoutFailuresTest() throws Throwable {
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody("{\"result\":\"all good\""));

final Request request = new Request.Builder()
.url("http://localhost:8888")
.method("POST", RequestBody.create("{}", MediaType.get("application/json")))
.build();

Response response = Await.result(new Await.ResultErrorEmitter<Response, Throwable>() {
@Override
public void emitTo(@NonNull Consumer<Response> onResult, @NonNull Consumer<Throwable> onError) {
okHttpClient.newCall(request).enqueue(new OkHttpCallback(onResult, onError));
}
});
assertTrue(response.body().string().contains("all good"));
}

/**
* Verify that retries happen successfully without leaving the previous response open.
* This test was created as a result of a Github issue
* https://github.com/awslabs/aws-mobile-appsync-sdk-android/issues/305.
* @throws IOException Not expected
* @throws InterruptedException Not expected
*/
@Test
public void successfulRequestWithFailuresTest() throws Throwable {
mockWebServer.enqueue(new MockResponse().setResponseCode(500).setBody("{\"error\":\"some exception\""));
mockWebServer.enqueue(new MockResponse().setResponseCode(501).setBody("{\"error\":\"another exception\"").setHeader("Retry-After", "1"));
mockWebServer.enqueue(new MockResponse().setResponseCode(500).setBody("{\"error\":\"some exception\""));
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody("{\"result\":\"all good\""));

final Request request = new Request.Builder()
.url("http://localhost:8888")
.method("POST", RequestBody.create("{}", MediaType.get("application/json")))
.build();

mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody("{\"result\":\"all good\""));

Response response = Await.result(new Await.ResultErrorEmitter<Response, Throwable>() {
@Override
public void emitTo(@NonNull Consumer<Response> onResult, @NonNull Consumer<Throwable> onError) {
okHttpClient.newCall(request).enqueue(new OkHttpCallback(onResult, onError));
}
});
assertTrue(response.body().string().contains("all good"));
}

/**
* Wrapper class that takes the two Consumer callbacks from the Await.result function
* and uses them to emit result or error.
*/
static final class OkHttpCallback implements Callback {
private final AtomicBoolean success = new AtomicBoolean(false);
private final Consumer<Response> onResponse;
private final Consumer<Throwable> onError;

public OkHttpCallback(Consumer<Response> onResponse, Consumer<Throwable> onError) {
this.onResponse = onResponse;
this.onError = onError;
}

@Override
public void onFailure(@NotNull Call call, @NotNull IOException e) {
onError.accept(e);
}

@Override
public void onResponse(@NotNull Call call, @NotNull Response response) throws IOException {
onResponse.accept(response);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
/*
* Copyright 2021 Amazon.com,
* Inc. or its affiliates. All Rights Reserved.
* <p>
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazonaws.mobileconnectors.appsync.util;

import android.support.annotation.NonNull;

import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

/**
* A utility to await a value from an async function, in a synchronous way.
*/
@SuppressWarnings({"WeakerAccess", "SameParameterValue", "unused", "UnusedReturnValue"})
public final class Await {
private static final long DEFAULT_WAIT_TIME_MS = TimeUnit.SECONDS.toMillis(10);

private Await() {}

/**
* Await a latch to count down.
* @param latch Latch for which count down is awaited
* @param waitTimeMs Time in milliseconds to wait for count down before timing out with exception
* @throws RuntimeException If the latch doesn't count down in the specified amount of time
*/
public static void latch(CountDownLatch latch, long waitTimeMs) {
try {
latch.await(waitTimeMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException interruptedException) {
throw new RuntimeException(interruptedException);
}
if (latch.getCount() != 0) {
throw new RuntimeException("Latch did not count down.");
}
}

/**
* Await a latch to count down, for a "reasonable" amount of time.
* Note: we choose what "reasonable" means. If you want to choose, use
* {@link #latch(CountDownLatch, long)}, instead.
* @param latch A count down latch for which countdown is awaited
* @throws RuntimeException If the latch doesn't count down in a reasonable amount of time
*/
public static void latch(@NonNull CountDownLatch latch) {
latch(latch, DEFAULT_WAIT_TIME_MS);
}

/**
* Awaits emission of either a result of an error.
* Blocks the thread of execution until either the value is
* available, of a default timeout has elapsed.
* @param resultErrorEmitter A function which emits result or error
* @param <R> Type of result
* @param <E> type of error
* @return The result
* @throws E if error is emitted
* @throws RuntimeException In all other situations where there is not a non-null result
*/
@NonNull
public static <R, E extends Throwable> R result(
@NonNull ResultErrorEmitter<R, E> resultErrorEmitter) throws E {
return result(DEFAULT_WAIT_TIME_MS, resultErrorEmitter);
}

/**
* Await emission of either a result or an error.
* Blocks the thread of execution until either the value is available,
* or the timeout is reached.
* @param timeMs Amount of time to wait
* @param resultErrorEmitter A function which emits result or error
* @param <R> Type of result
* @param <E> Type of error
* @return The result
* @throws E if error is emitted
* @throws RuntimeException In all other situations where there is not a non-null result
*/
@NonNull
public static <R, E extends Throwable> R result(
long timeMs, @NonNull ResultErrorEmitter<R, E> resultErrorEmitter) throws E {

Objects.requireNonNull(resultErrorEmitter);

AtomicReference<R> resultContainer = new AtomicReference<>();
AtomicReference<E> errorContainer = new AtomicReference<>();

await(timeMs, resultErrorEmitter, resultContainer, errorContainer);

R result = resultContainer.get();
E error = errorContainer.get();
if (error != null) {
throw error;
} else if (result != null) {
return result;
}

throw new IllegalStateException("Latch counted down, but where's the value?");
}

/**
* Awaits receipt of an error or a callback.
* Blocks the thread of execution until it arrives, or until the wait times out.
* @param resultErrorEmitter An emitter of result of error
* @param <R> Type of result
* @param <E> Type of error
* @return The error that was emitted by the emitter
* @throws RuntimeException If no error was emitted by emitter
*/
@NonNull
public static <R, E extends Throwable> E error(@NonNull ResultErrorEmitter<R, E> resultErrorEmitter) {
return error(DEFAULT_WAIT_TIME_MS, resultErrorEmitter);
}

/**
* Awaits receipt of an error on an error callback.
* Blocks the calling thread until it shows up, or until timeout elapses.
* @param timeMs Amount of time to wait
* @param resultErrorEmitter A function which emits result or error
* @param <R> Type of result
* @param <E> Type of error
* @return Error, if attained
* @throws RuntimeException If no error is emitted by the emitter
*/
@NonNull
public static <R, E extends Throwable> E error(
long timeMs, @NonNull ResultErrorEmitter<R, E> resultErrorEmitter) {

Objects.requireNonNull(resultErrorEmitter);

AtomicReference<R> resultContainer = new AtomicReference<>();
AtomicReference<E> errorContainer = new AtomicReference<>();

await(timeMs, resultErrorEmitter, resultContainer, errorContainer);

R result = resultContainer.get();
E error = errorContainer.get();
if (result != null) {
throw new RuntimeException("Expected error, but had result = " + result);
} else if (error != null) {
return error;
}

throw new RuntimeException("Neither error nor result consumers accepted a value.");
}

private static <R, E extends Throwable> void await(
long timeMs,
@NonNull final ResultErrorEmitter<R, E> resultErrorEmitter,
@NonNull final AtomicReference<R> resultContainer,
@NonNull final AtomicReference<E> errorContainer) {

final CountDownLatch latch = new CountDownLatch(1);
resultErrorEmitter.emitTo(
new Consumer<R>() {
@Override
public void accept(R result) {
resultContainer.set(result);
latch.countDown();
}
}, new Consumer<E>() {
@Override
public void accept(E error) {
errorContainer.set(error);
latch.countDown();
}
}
);

try {
latch.await(timeMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException interruptedException) {
// Will check the latch count regardless, and branch appropriately.
}
if (latch.getCount() != 0) {
throw new RuntimeException(
"Neither result nor error consumers accepted a value within " + timeMs + "ms."
);
}
}

/**
* A function which, upon completion, either emits a single result,
* or emits an error.
* @param <R> Type of result
* @param <E> Type of error
*/
public interface ResultErrorEmitter<R, E extends Throwable> {
/**
* A function that emits a value upon completion, either as a
* result or as an error.
* @param onResult Callback invoked upon emission of result
* @param onError Callback invoked upon emission of error
*/
void emitTo(@NonNull Consumer<R> onResult, @NonNull Consumer<E> onError);
}
}

0 comments on commit 1aeaf29

Please sign in to comment.