Skip to content

Commit

Permalink
Add a new CLI option to limit the number of requests in a single RPC …
Browse files Browse the repository at this point in the history
…batch request (#4965)

* Add option to limit requests in a single batch

Signed-off-by: Gabriel Trintinalia <[email protected]>

* Change changelog

Signed-off-by: Gabriel Trintinalia <[email protected]>

* Set default max batch size to one

Signed-off-by: Gabriel Trintinalia <[email protected]>

* Update changelog

Signed-off-by: Gabriel Trintinalia <[email protected]>

* Fix max rpc batch size for unit tests

Signed-off-by: Gabriel Trintinalia <[email protected]>

* Change variable name

Signed-off-by: Gabriel Trintinalia <[email protected]>

Signed-off-by: Gabriel Trintinalia <[email protected]>
  • Loading branch information
Gabriel-Trintinalia authored Jan 22, 2023
1 parent 45dcfd4 commit 0626acd
Show file tree
Hide file tree
Showing 14 changed files with 249 additions and 66 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## 23.1.1

### Breaking Changes
- Add a new CLI option to limit the number of requests in a single RPC batch request. Default=1 [#4965](https://github.com/hyperledger/besu/pull/4965)

- Changed JsonRpc http service to return the error -32602 (Invalid params) with a 200 http status code

Expand Down
8 changes: 8 additions & 0 deletions besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,13 @@ static class JsonRPCHttpOptionGroup {
split = ",",
arity = "1..*")
private final List<String> rpcHttpTlsCipherSuites = new ArrayList<>();

@CommandLine.Option(
names = {"--rpc-http-max-batch-size"},
paramLabel = MANDATORY_INTEGER_FORMAT_HELP,
description =
"Specifies the maximum number of requests in a single RPC batch request via RPC. -1 specifies no limit (default: ${DEFAULT-VALUE})")
private final Integer rpcHttpMaxBatchSize = DEFAULT_HTTP_MAX_BATCH_SIZE;
}

// JSON-RPC Websocket Options
Expand Down Expand Up @@ -2378,6 +2385,7 @@ && rpcHttpAuthenticationCredentialsFile() == null
jsonRPCHttpOptionGroup.rpcHttpAuthenticationAlgorithm);
jsonRpcConfiguration.setTlsConfiguration(rpcHttpTlsConfiguration());
jsonRpcConfiguration.setHttpTimeoutSec(unstableRPCOptions.getHttpTimeoutSec());
jsonRpcConfiguration.setMaxBatchSize(jsonRPCHttpOptionGroup.rpcHttpMaxBatchSize);
return jsonRpcConfiguration;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ public interface DefaultCommandValues {
int DEFAULT_P2P_PEER_LOWER_BOUND = 25;
/** The constant DEFAULT_HTTP_MAX_CONNECTIONS. */
int DEFAULT_HTTP_MAX_CONNECTIONS = 80;
/** The constant DEFAULT_HTTP_MAX_BATCH_SIZE. */
int DEFAULT_HTTP_MAX_BATCH_SIZE = 1;
/** The constant DEFAULT_WS_MAX_CONNECTIONS. */
int DEFAULT_WS_MAX_CONNECTIONS = 80;
/** The constant DEFAULT_WS_MAX_FRAME_SIZE. */
Expand Down
15 changes: 15 additions & 0 deletions besu/src/test/java/org/hyperledger/besu/cli/BesuCommandTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1614,6 +1614,21 @@ public void p2pPeerUpperBound_without_p2pPeerLowerBound_shouldSetLowerBoundEqual
verify(mockRunnerBuilder).build();
}

@Test
public void rpcHttpMaxBatchSizeOptionMustBeUsed() {
final int rpcHttpMaxBatchSize = 1;
parseCommand("--rpc-http-max-batch-size", Integer.toString(rpcHttpMaxBatchSize));

verify(mockRunnerBuilder).jsonRpcConfiguration(jsonRpcConfigArgumentCaptor.capture());
verify(mockRunnerBuilder).build();

assertThat(jsonRpcConfigArgumentCaptor.getValue().getMaxBatchSize())
.isEqualTo(rpcHttpMaxBatchSize);

assertThat(commandOutput.toString(UTF_8)).isEmpty();
assertThat(commandErrorOutput.toString(UTF_8)).isEmpty();
}

@Test
public void maxpeersSet_p2pPeerLowerBoundSet() {

Expand Down
1 change: 1 addition & 0 deletions besu/src/test/resources/everything_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ rpc-http-authentication-jwt-algorithm="RS256"
rpc-ws-authentication-jwt-algorithm="RS256"
rpc-http-tls-protocols=["TLSv1.2,TlSv1.1"]
rpc-http-tls-cipher-suites=["TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384","TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"]
rpc-http-max-batch-size=1
rpc-max-logs-range=100

# PRIVACY TLS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package org.hyperledger.besu.ethereum.api.handlers;

import org.hyperledger.besu.ethereum.api.jsonrpc.JsonRpcConfiguration;
import org.hyperledger.besu.ethereum.api.jsonrpc.authentication.AuthenticationService;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.JsonRpcExecutor;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.JsonRpcMethod;
Expand Down Expand Up @@ -47,7 +48,9 @@ public static Handler<RoutingContext> jsonRpcParser() {
}

public static Handler<RoutingContext> jsonRpcExecutor(
final JsonRpcExecutor jsonRpcExecutor, final Tracer tracer) {
return JsonRpcExecutorHandler.handler(jsonRpcExecutor, tracer);
final JsonRpcExecutor jsonRpcExecutor,
final Tracer tracer,
final JsonRpcConfiguration jsonRpcConfiguration) {
return JsonRpcExecutorHandler.handler(jsonRpcExecutor, tracer, jsonRpcConfiguration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcError.INVALID_REQUEST;

import org.hyperledger.besu.ethereum.api.jsonrpc.JsonResponseStreamer;
import org.hyperledger.besu.ethereum.api.jsonrpc.JsonRpcConfiguration;
import org.hyperledger.besu.ethereum.api.jsonrpc.context.ContextKey;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.JsonRpcExecutor;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequest;
Expand All @@ -26,6 +27,7 @@
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcResponseType;

import java.io.IOException;
import java.security.InvalidParameterException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -64,71 +66,27 @@ public class JsonRpcExecutorHandler {
private JsonRpcExecutorHandler() {}

public static Handler<RoutingContext> handler(
final JsonRpcExecutor jsonRpcExecutor, final Tracer tracer) {
final JsonRpcExecutor jsonRpcExecutor,
final Tracer tracer,
final JsonRpcConfiguration jsonRpcConfiguration) {
return ctx -> {
HttpServerResponse response = ctx.response();
try {
Optional<User> user = ContextKey.AUTHENTICATED_USER.extractFrom(ctx, Optional::empty);
Context spanContext = ctx.get(SPAN_CONTEXT);
response = response.putHeader("Content-Type", APPLICATION_JSON);

if (ctx.data().containsKey(ContextKey.REQUEST_BODY_AS_JSON_OBJECT.name())) {
JsonObject jsonRequest = ctx.get(ContextKey.REQUEST_BODY_AS_JSON_OBJECT.name());
lazyTraceLogger(jsonRequest::toString);
JsonRpcResponse jsonRpcResponse =
jsonRpcExecutor.execute(
user,
tracer,
spanContext,
() -> !ctx.response().closed(),
jsonRequest,
req -> req.mapTo(JsonRpcRequest.class));
response.setStatusCode(status(jsonRpcResponse).code());
if (jsonRpcResponse.getType() == JsonRpcResponseType.NONE) {
response.end();
} else {
try (final JsonResponseStreamer streamer =
new JsonResponseStreamer(response, ctx.request().remoteAddress())) {
// underlying output stream lifecycle is managed by the json object writer
lazyTraceLogger(() -> JSON_OBJECT_MAPPER.writeValueAsString(jsonRpcResponse));
JSON_OBJECT_WRITER.writeValue(streamer, jsonRpcResponse);
}
}
} else if (ctx.data().containsKey(ContextKey.REQUEST_BODY_AS_JSON_ARRAY.name())) {
JsonArray batchJsonRequest = ctx.get(ContextKey.REQUEST_BODY_AS_JSON_ARRAY.name());
lazyTraceLogger(batchJsonRequest::toString);
List<JsonRpcResponse> jsonRpcBatchResponses = new ArrayList<>();
if (isJsonObjectRequest(ctx)) {
final JsonRpcResponse jsonRpcResponse =
executeJsonObjectRequest(jsonRpcExecutor, tracer, ctx);
handleJsonObjectResponse(response, jsonRpcResponse, ctx);
} else if (isJsonArrayRequest(ctx)) {
final List<JsonRpcResponse> jsonRpcBatchResponses;
try {
for (int i = 0; i < batchJsonRequest.size(); i++) {
final JsonObject jsonRequest;
try {
jsonRequest = batchJsonRequest.getJsonObject(i);
} catch (ClassCastException e) {
jsonRpcBatchResponses.add(new JsonRpcErrorResponse(null, INVALID_REQUEST));
continue;
}
jsonRpcBatchResponses.add(
jsonRpcExecutor.execute(
user,
tracer,
spanContext,
() -> !ctx.response().closed(),
jsonRequest,
req -> req.mapTo(JsonRpcRequest.class)));
}
} catch (RuntimeException e) {
jsonRpcBatchResponses =
executeJsonArrayRequest(jsonRpcExecutor, tracer, ctx, jsonRpcConfiguration);
handleJsonArrayResponse(response, jsonRpcBatchResponses, ctx);
} catch (final InvalidParameterException e) {
handleJsonRpcError(ctx, null, JsonRpcError.EXCEEDS_RPC_MAX_BATCH_SIZE);
} catch (final RuntimeException e) {
response.setStatusCode(HttpResponseStatus.BAD_REQUEST.code()).end();
return;
}
final JsonRpcResponse[] completed =
jsonRpcBatchResponses.stream()
.filter(jsonRpcResponse -> jsonRpcResponse.getType() != JsonRpcResponseType.NONE)
.toArray(JsonRpcResponse[]::new);
try (final JsonResponseStreamer streamer =
new JsonResponseStreamer(response, ctx.request().remoteAddress())) {
// underlying output stream lifecycle is managed by the json object writer
lazyTraceLogger(() -> JSON_OBJECT_MAPPER.writeValueAsString(completed));
JSON_OBJECT_WRITER.writeValue(streamer, completed);
}
} else {
handleJsonRpcError(ctx, null, JsonRpcError.PARSE_ERROR);
Expand All @@ -142,8 +100,102 @@ public static Handler<RoutingContext> handler(
};
}

private static boolean isJsonObjectRequest(final RoutingContext ctx) {
return ctx.data().containsKey(ContextKey.REQUEST_BODY_AS_JSON_OBJECT.name());
}

private static boolean isJsonArrayRequest(final RoutingContext ctx) {
return ctx.data().containsKey(ContextKey.REQUEST_BODY_AS_JSON_ARRAY.name());
}

private static JsonRpcResponse executeRequest(
final JsonRpcExecutor jsonRpcExecutor,
final Tracer tracer,
final JsonObject jsonRequest,
final RoutingContext ctx) {
final Optional<User> user = ContextKey.AUTHENTICATED_USER.extractFrom(ctx, Optional::empty);
final Context spanContext = ctx.get(SPAN_CONTEXT);
return jsonRpcExecutor.execute(
user,
tracer,
spanContext,
() -> !ctx.response().closed(),
jsonRequest,
req -> req.mapTo(JsonRpcRequest.class));
}

private static JsonRpcResponse executeJsonObjectRequest(
final JsonRpcExecutor jsonRpcExecutor, final Tracer tracer, final RoutingContext ctx) {
final JsonObject jsonRequest = ctx.get(ContextKey.REQUEST_BODY_AS_JSON_OBJECT.name());
lazyTraceLogger(jsonRequest::toString);
return executeRequest(jsonRpcExecutor, tracer, jsonRequest, ctx);
}

private static List<JsonRpcResponse> executeJsonArrayRequest(
final JsonRpcExecutor jsonRpcExecutor,
final Tracer tracer,
final RoutingContext ctx,
final JsonRpcConfiguration jsonRpcConfiguration)
throws InvalidParameterException {
final JsonArray batchJsonRequest = ctx.get(ContextKey.REQUEST_BODY_AS_JSON_ARRAY.name());
lazyTraceLogger(batchJsonRequest::toString);
final List<JsonRpcResponse> jsonRpcBatchResponses = new ArrayList<>();

if (jsonRpcConfiguration.getMaxBatchSize() > 0
&& batchJsonRequest.size() > jsonRpcConfiguration.getMaxBatchSize()) {
throw new InvalidParameterException();
}

for (int i = 0; i < batchJsonRequest.size(); i++) {
final JsonObject jsonRequest;
try {
jsonRequest = batchJsonRequest.getJsonObject(i);
} catch (final ClassCastException e) {
jsonRpcBatchResponses.add(new JsonRpcErrorResponse(null, INVALID_REQUEST));
continue;
}
jsonRpcBatchResponses.add(executeRequest(jsonRpcExecutor, tracer, jsonRequest, ctx));
}
return jsonRpcBatchResponses;
}

private static void handleJsonObjectResponse(
final HttpServerResponse response,
final JsonRpcResponse jsonRpcResponse,
final RoutingContext ctx)
throws IOException {
response.setStatusCode(status(jsonRpcResponse).code());
if (jsonRpcResponse.getType() == JsonRpcResponseType.NONE) {
response.end();
} else {
try (final JsonResponseStreamer streamer =
new JsonResponseStreamer(response, ctx.request().remoteAddress())) {
// underlying output stream lifecycle is managed by the json object writer
lazyTraceLogger(() -> JSON_OBJECT_MAPPER.writeValueAsString(jsonRpcResponse));
JSON_OBJECT_WRITER.writeValue(streamer, jsonRpcResponse);
}
}
}

private static void handleJsonArrayResponse(
final HttpServerResponse response,
final List<JsonRpcResponse> jsonRpcBatchResponses,
final RoutingContext ctx)
throws IOException {
final JsonRpcResponse[] completed =
jsonRpcBatchResponses.stream()
.filter(jsonRpcResponse -> jsonRpcResponse.getType() != JsonRpcResponseType.NONE)
.toArray(JsonRpcResponse[]::new);
try (final JsonResponseStreamer streamer =
new JsonResponseStreamer(response, ctx.request().remoteAddress())) {
// underlying output stream lifecycle is managed by the json object writer
lazyTraceLogger(() -> JSON_OBJECT_MAPPER.writeValueAsString(completed));
JSON_OBJECT_WRITER.writeValue(streamer, completed);
}
}

private static String getRpcMethodName(final RoutingContext ctx) {
if (ctx.data().containsKey(ContextKey.REQUEST_BODY_AS_JSON_OBJECT.name())) {
if (isJsonObjectRequest(ctx)) {
final JsonObject jsonObject = ctx.get(ContextKey.REQUEST_BODY_AS_JSON_OBJECT.name());
return jsonObject.getString("method");
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class JsonRpcConfiguration {
public static final int DEFAULT_JSON_RPC_PORT = 8545;
public static final int DEFAULT_ENGINE_JSON_RPC_PORT = 8551;
public static final int DEFAULT_MAX_ACTIVE_CONNECTIONS = 80;
public static final int DEFAULT_MAX_BATCH_SIZE = 1;

private boolean enabled;
private int port;
Expand All @@ -51,6 +52,7 @@ public class JsonRpcConfiguration {
private Optional<TlsConfiguration> tlsConfiguration = Optional.empty();
private long httpTimeoutSec = TimeoutOptions.defaultOptions().getTimeoutSeconds();
private int maxActiveConnections;
private int maxBatchSize;

public static JsonRpcConfiguration createDefault() {
final JsonRpcConfiguration config = new JsonRpcConfiguration();
Expand All @@ -60,6 +62,7 @@ public static JsonRpcConfiguration createDefault() {
config.setRpcApis(DEFAULT_RPC_APIS);
config.httpTimeoutSec = TimeoutOptions.defaultOptions().getTimeoutSeconds();
config.setMaxActiveConnections(DEFAULT_MAX_ACTIVE_CONNECTIONS);
config.setMaxBatchSize(DEFAULT_MAX_BATCH_SIZE);
return config;
}

Expand Down Expand Up @@ -249,4 +252,12 @@ public int getMaxActiveConnections() {
public void setMaxActiveConnections(final int maxActiveConnections) {
this.maxActiveConnections = maxActiveConnections;
}

public int getMaxBatchSize() {
return maxBatchSize;
}

public void setMaxBatchSize(final int maxBatchSize) {
this.maxBatchSize = maxBatchSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,8 @@ private Router buildRouter() {
authenticationService.get(),
config.getNoAuthRpcApis()),
rpcMethods),
tracer),
tracer,
config),
false);
} else {
mainRoute.blockingHandler(
Expand All @@ -359,7 +360,8 @@ private Router buildRouter() {
new TimedJsonRpcProcessor(
new TracedJsonRpcProcessor(new BaseJsonRpcProcessor()), requestTimer),
rpcMethods),
tracer),
tracer,
config),
false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,8 @@ private Router buildRouter() {
authenticationService.get(),
config.getNoAuthRpcApis()),
rpcMethods),
tracer),
tracer,
config),
false);
} else {
mainRoute.blockingHandler(
Expand All @@ -462,7 +463,8 @@ private Router buildRouter() {
new TimedJsonRpcProcessor(
new TracedJsonRpcProcessor(new BaseJsonRpcProcessor()), requestTimer),
rpcMethods),
tracer),
tracer,
config),
false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public enum JsonRpcError {
TRANSACTION_UPFRONT_COST_EXCEEDS_BALANCE(-32004, "Upfront cost exceeds account balance"),
EXCEEDS_BLOCK_GAS_LIMIT(-32005, "Transaction gas limit exceeds block gas limit"),
EXCEEDS_RPC_MAX_BLOCK_RANGE(-32005, "Requested range exceeds maximum RPC range limit"),
EXCEEDS_RPC_MAX_BATCH_SIZE(-32005, "Number of requests exceeds max batch size"),
NONCE_TOO_HIGH(-32006, "Nonce too high"),
TX_SENDER_NOT_AUTHORIZED(-32007, "Sender account not authorized to send transactions"),
CHAIN_HEAD_WORLD_STATE_NOT_AVAILABLE(-32008, "Initial sync is still in progress"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ private void startService(final BlockchainSetupUtil blockchainSetupUtil) throws
final NatService natService = new NatService(Optional.empty());

config.setPort(0);
config.setMaxBatchSize(10);
service =
new JsonRpcHttpService(
vertx,
Expand Down
Loading

0 comments on commit 0626acd

Please sign in to comment.