Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming requests #1207

Merged
merged 4 commits into from
Jun 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions src/main/java/com/stripe/net/AbstractStripeResponse.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package com.stripe.net;

import static java.util.Objects.requireNonNull;

import java.time.Instant;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Optional;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;
import lombok.experimental.NonFinal;

/** Common interface representing an HTTP response from Stripe. */
@Accessors(fluent = true)
abstract class AbstractStripeResponse<T> {
/** The HTTP status code of the response. */
int code;

/** The HTTP headers of the response. */
HttpHeaders headers;

/** The body of the response. */
T body;

public final int code() {
return this.code;
}

public final HttpHeaders headers() {
return this.headers;
}

public final T body() {
return this.body;
}

/** Number of times the request was retried. Used for internal tests only. */
@NonFinal
@Getter(AccessLevel.PACKAGE)
@Setter(AccessLevel.PACKAGE)
int numRetries;

/**
* Gets the date of the request, as returned by Stripe.
*
* @return the date of the request, as returned by Stripe
*/
public Instant date() {
Optional<String> dateStr = this.headers.firstValue("Date");
if (!dateStr.isPresent()) {
return null;
}
return ZonedDateTime.parse(dateStr.get(), DateTimeFormatter.RFC_1123_DATE_TIME).toInstant();
}

/**
* Gets the idempotency key of the request, as returned by Stripe.
*
* @return the idempotency key of the request, as returned by Stripe
*/
public String idempotencyKey() {
return this.headers.firstValue("Idempotency-Key").orElse(null);
}

/**
* Gets the ID of the request, as returned by Stripe.
*
* @return the ID of the request, as returned by Stripe
*/
public String requestId() {
return this.headers.firstValue("Request-Id").orElse(null);
}

protected AbstractStripeResponse(int code, HttpHeaders headers, T body) {
requireNonNull(headers);
requireNonNull(body);

this.code = code;
this.headers = headers;
this.body = body;
}
}
17 changes: 17 additions & 0 deletions src/main/java/com/stripe/net/ApiResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.stripe.model.StripeRawJsonObject;
import com.stripe.model.StripeRawJsonObjectDeserializer;
import com.stripe.util.StringUtils;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.Charset;
Expand Down Expand Up @@ -179,6 +180,22 @@ public static <T extends StripeObjectInterface> T request(
return ApiResource.stripeResponseGetter.request(method, url, params, clazz, options);
}

public static InputStream requestStream(
ApiResource.RequestMethod method, String url, ApiRequestParams params, RequestOptions options)
throws StripeException {
checkNullTypedParams(url, params);
return requestStream(method, url, params.toMap(), options);
}

public static InputStream requestStream(
ApiResource.RequestMethod method,
String url,
Map<String, Object> params,
RequestOptions options)
throws StripeException {
return ApiResource.stripeResponseGetter.requestStream(method, url, params, options);
}

public static <T extends StripeCollectionInterface<?>> T requestCollection(
String url, ApiRequestParams params, Class<T> clazz, RequestOptions options)
throws StripeException {
Expand Down
74 changes: 63 additions & 11 deletions src/main/java/com/stripe/net/HttpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public abstract class HttpClient {
protected HttpClient() {}

/**
* Sends the given request to Stripe's API.
* Sends the given request to Stripe's API, buffering the response body into memory.
*
* @param request the request
* @return the response
Expand All @@ -38,13 +38,23 @@ protected HttpClient() {}
public abstract StripeResponse request(StripeRequest request) throws StripeException;

/**
* Sends the given request to Stripe's API, handling telemetry if not disabled.
* Sends the given request to Stripe's API, streaming the response body.
*
* @param request the request
* @return the response
* @throws StripeException If the request fails for any reason
*/
public StripeResponse requestWithTelemetry(StripeRequest request) throws StripeException {
public StripeResponseStream requestStream(StripeRequest request) throws StripeException {
throw new UnsupportedOperationException("requestStream is unimplemented for this HttpClient");
}

@FunctionalInterface
private interface RequestSendFunction<R> {
R apply(StripeRequest request) throws StripeException;
}

private <T extends AbstractStripeResponse<?>> T sendWithTelemetry(
StripeRequest request, RequestSendFunction<T> send) throws StripeException {
Optional<String> telemetryHeaderValue = requestTelemetry.getHeaderValue(request.headers());
if (telemetryHeaderValue.isPresent()) {
request =
Expand All @@ -53,7 +63,7 @@ public StripeResponse requestWithTelemetry(StripeRequest request) throws StripeE

Stopwatch stopwatch = Stopwatch.startNew();

StripeResponse response = this.request(request);
T response = send.apply(request);

stopwatch.stop();

Expand All @@ -63,23 +73,40 @@ public StripeResponse requestWithTelemetry(StripeRequest request) throws StripeE
}

/**
* Sends the given request to Stripe's API, retrying the request in cases of intermittent
* problems.
* Sends the given request to Stripe's API, handling telemetry if not disabled.
*
* @param request the request
* @return the response
* @throws StripeException If the request fails for any reason
*/
public StripeResponse requestWithRetries(StripeRequest request) throws StripeException {
public StripeResponse requestWithTelemetry(StripeRequest request) throws StripeException {
return sendWithTelemetry(request, this::request);
}

/**
* Sends the given request to Stripe's API, streaming the response, and handling telemetry if not
* disabled.
*
* @param request the request
* @return the response
* @throws StripeException If the request fails for any reason
*/
public StripeResponseStream requestStreamWithTelemetry(StripeRequest request)
throws StripeException {
return sendWithTelemetry(request, this::requestStream);
}

public <T extends AbstractStripeResponse<?>> T sendWithRetries(
StripeRequest request, RequestSendFunction<T> send) throws StripeException {
ApiConnectionException requestException = null;
StripeResponse response = null;
T response = null;
int retry = 0;

while (true) {
requestException = null;

try {
response = this.requestWithTelemetry(request);
response = send.apply(request);
} catch (ApiConnectionException e) {
requestException = e;
}
Expand All @@ -106,6 +133,31 @@ public StripeResponse requestWithRetries(StripeRequest request) throws StripeExc
return response;
}

/**
* Sends the given request to Stripe's API, retrying the request in cases of intermittent
* problems.
*
* @param request the request
* @return the response
* @throws StripeException If the request fails for any reason
*/
public StripeResponse requestWithRetries(StripeRequest request) throws StripeException {
return sendWithRetries(request, (r) -> this.requestWithTelemetry(r));
}

/**
* Sends the given request to Stripe's API, streaming the response, retrying the request in cases
* of intermittent problems.
*
* @param request the request
* @return the response
* @throws StripeException If the request fails for any reason
*/
public StripeResponseStream requestStreamWithRetries(StripeRequest request)
throws StripeException {
return sendWithRetries(request, (r) -> this.requestStreamWithTelemetry(r));
}

/**
* Builds the value of the {@code User-Agent} header.
*
Expand Down Expand Up @@ -165,8 +217,8 @@ private static String formatAppInfo(Map<String, String> info) {
return str;
}

private boolean shouldRetry(
int numRetries, StripeException exception, StripeRequest request, StripeResponse response) {
private <T extends AbstractStripeResponse<?>> boolean shouldRetry(
int numRetries, StripeException exception, StripeRequest request, T response) {
// Do not retry if we are out of retries.
if (numRetries >= request.options().getMaxNetworkRetries()) {
return false;
Expand Down
31 changes: 25 additions & 6 deletions src/main/java/com/stripe/net/HttpURLConnectionClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.stripe.Stripe;
import com.stripe.exception.ApiConnectionException;
import com.stripe.util.StreamUtils;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand All @@ -29,7 +28,7 @@ public HttpURLConnectionClient() {
* @throws ApiConnectionException if an error occurs when sending or receiving
*/
@Override
public StripeResponse request(StripeRequest request) throws ApiConnectionException {
public StripeResponseStream requestStream(StripeRequest request) throws ApiConnectionException {
try {
final HttpURLConnection conn = createStripeConnection(request);

Expand All @@ -43,12 +42,32 @@ public StripeResponse request(StripeRequest request) throws ApiConnectionExcepti
? conn.getInputStream()
: conn.getErrorStream();

final String responseBody = StreamUtils.readToEnd(responseStream, ApiResource.CHARSET);

responseStream.close();
return new StripeResponseStream(responseCode, headers, responseStream);

return new StripeResponse(responseCode, headers, responseBody);
} catch (IOException e) {
throw new ApiConnectionException(
String.format(
"IOException during API request to Stripe (%s): %s "
+ "Please check your internet connection and try again. If this problem persists,"
+ "you should check Stripe's service status at https://twitter.com/stripestatus,"
+ " or let us know at [email protected].",
Stripe.getApiBase(), e.getMessage()),
e);
}
}

/**
* Sends the given request to Stripe's API, and returns a buffered response.
*
* @param request the request
* @return the response
* @throws ApiConnectionException if an error occurs when sending or receiving
*/
@Override
public StripeResponse request(StripeRequest request) throws ApiConnectionException {
final StripeResponseStream responseStream = requestStream(request);
try {
return responseStream.unstream();
} catch (IOException e) {
throw new ApiConnectionException(
String.format(
Expand Down
36 changes: 36 additions & 0 deletions src/main/java/com/stripe/net/LiveStripeResponseGetter.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.google.gson.JsonObject;
import com.google.gson.JsonSyntaxException;
import com.stripe.Stripe;
import com.stripe.exception.ApiConnectionException;
import com.stripe.exception.ApiException;
import com.stripe.exception.AuthenticationException;
import com.stripe.exception.CardException;
Expand All @@ -20,6 +22,8 @@
import com.stripe.model.StripeObject;
import com.stripe.model.StripeObjectInterface;
import com.stripe.model.oauth.OAuthError;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;

public class LiveStripeResponseGetter implements StripeResponseGetter {
Expand Down Expand Up @@ -73,6 +77,38 @@ public <T extends StripeObjectInterface> T request(
return resource;
}

@Override
public InputStream requestStream(
ApiResource.RequestMethod method,
String url,
Map<String, Object> params,
RequestOptions options)
throws StripeException {
StripeRequest request = new StripeRequest(method, url, params, options);
StripeResponseStream responseStream = httpClient.requestStreamWithRetries(request);

int responseCode = responseStream.code();

if (responseCode < 200 || responseCode >= 300) {
StripeResponse response;
try {
response = responseStream.unstream();
} catch (IOException e) {
throw new ApiConnectionException(
String.format(
"IOException during API request to Stripe (%s): %s "
+ "Please check your internet connection and try again. If this problem persists,"
+ "you should check Stripe's service status at https://twitter.com/stripestatus,"
+ " or let us know at [email protected].",
Stripe.getApiBase(), e.getMessage()),
e);
}
handleApiError(response);
}

return responseStream.body();
}

@Override
public <T extends StripeObjectInterface> T oauthRequest(
ApiResource.RequestMethod method,
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/stripe/net/RequestTelemetry.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public Optional<String> getHeaderValue(HttpHeaders headers) {
* @param response the Stripe response
* @param duration the request duration
*/
public void maybeEnqueueMetrics(StripeResponse response, Duration duration) {
public void maybeEnqueueMetrics(AbstractStripeResponse<?> response, Duration duration) {
if (!Stripe.enableTelemetry) {
return;
}
Expand Down
Loading