Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactored to use new Java version constructs #961

Merged
merged 1 commit into from
Jan 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions src/main/java/io/strimzi/kafka/bridge/EmbeddedFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,11 @@ public enum EmbeddedFormat {
* @return corresponding enum
*/
public static EmbeddedFormat from(String value) {
switch (value) {
case "json":
return JSON;
case "binary":
return BINARY;
case "text":
return TEXT;
}
throw new IllegalEmbeddedFormatException("Invalid format type.");
return switch (value) {
case "json" -> JSON;
case "binary" -> BINARY;
case "text" -> TEXT;
default -> throw new IllegalEmbeddedFormatException("Invalid format type.");
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
public class HttpAdminBridgeEndpoint extends HttpBridgeEndpoint {
private static final Logger LOGGER = LogManager.getLogger(HttpAdminBridgeEndpoint.class);

private final HttpBridgeContext httpBridgeContext;
private final HttpBridgeContext<?, ?> httpBridgeContext;
private final KafkaBridgeAdmin kafkaBridgeAdmin;

/**
Expand All @@ -53,7 +53,7 @@ public class HttpAdminBridgeEndpoint extends HttpBridgeEndpoint {
* @param bridgeConfig the bridge configuration
* @param context the HTTP bridge context
*/
public HttpAdminBridgeEndpoint(BridgeConfig bridgeConfig, HttpBridgeContext context) {
public HttpAdminBridgeEndpoint(BridgeConfig bridgeConfig, HttpBridgeContext<?, ?> context) {
super(bridgeConfig);
this.name = "kafka-bridge-admin";
this.httpBridgeContext = context;
Expand Down
11 changes: 4 additions & 7 deletions src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ private void createConsumer(RoutingContext routingContext) {
responseStatus.code(),
ex.getMessage()
);
HttpUtils.sendResponse(routingContext, error.getCode(),
HttpUtils.sendResponse(routingContext, error.code(),
BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(error.toJson()));
}
}
Expand Down Expand Up @@ -586,20 +586,17 @@ private void errorHandler(RoutingContext routingContext) {
// in case of validation exception, building a meaningful error message
if (routingContext.failure() != null) {
StringBuilder sb = new StringBuilder();
if (routingContext.failure().getCause() instanceof ValidationException) {
ValidationException validationException = (ValidationException) routingContext.failure().getCause();
if (routingContext.failure().getCause() instanceof ValidationException validationException) {
if (validationException.inputScope() != null) {
sb.append("Validation error on: ").append(validationException.inputScope()).append(" - ");
}
sb.append(validationException.getMessage());
} else if (routingContext.failure() instanceof ParameterProcessorException) {
ParameterProcessorException parameterException = (ParameterProcessorException) routingContext.failure();
} else if (routingContext.failure() instanceof ParameterProcessorException parameterException) {
if (parameterException.getParameterName() != null) {
sb.append("Parameter error on: ").append(parameterException.getParameterName()).append(" - ");
}
sb.append(parameterException.getMessage());
} else if (routingContext.failure() instanceof BodyProcessorException) {
BodyProcessorException bodyProcessorException = (BodyProcessorException) routingContext.failure();
} else if (routingContext.failure() instanceof BodyProcessorException bodyProcessorException) {
sb.append(bodyProcessorException.getMessage());
}
message = sb.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ private void doSeek(RoutingContext routingContext, JsonNode bodyAsJson) {
HttpUtils.sendResponse(routingContext, HttpResponseStatus.NO_CONTENT.code(), null, null);
} else {
HttpBridgeError error = handleError(ex);
HttpUtils.sendResponse(routingContext, error.getCode(),
HttpUtils.sendResponse(routingContext, error.code(),
BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(error.toJson()));
}
});
Expand All @@ -223,7 +223,7 @@ private void doSeekTo(RoutingContext routingContext, JsonNode bodyAsJson, HttpOp
HttpUtils.sendResponse(routingContext, HttpResponseStatus.NO_CONTENT.code(), null, null);
} else {
HttpBridgeError error = handleError(ex);
HttpUtils.sendResponse(routingContext, error.getCode(),
HttpUtils.sendResponse(routingContext, error.code(),
BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(error.toJson()));
}
});
Expand Down Expand Up @@ -388,7 +388,7 @@ private void doAssign(RoutingContext routingContext, JsonNode bodyAsJson) {
StreamSupport.stream(partitionsList.spliterator(), false)
.map(JsonNode.class::cast)
.map(json -> new SinkTopicSubscription(JsonUtils.getString(json, "topic"), JsonUtils.getInt(json, "partition")))
.collect(Collectors.toList())
.toList()
);

// fulfilling the request in a separate thread to free the Vert.x event loop still in place
Expand Down Expand Up @@ -441,7 +441,7 @@ private void doSubscribe(RoutingContext routingContext, JsonNode bodyAsJson) {
StreamSupport.stream(topicsList.spliterator(), false)
.map(TextNode.class::cast)
.map(topic -> new SinkTopicSubscription(topic.asText()))
.collect(Collectors.toList())
.toList()
);
this.kafkaBridgeConsumer.subscribe(topicSubscriptions);
} else if (bodyAsJson.has("topic_pattern")) {
Expand Down Expand Up @@ -554,7 +554,7 @@ public void handle(RoutingContext routingContext, Handler<HttpBridgeEndpoint> ha
LOGGER.debug("[{}] Request: body = {}", routingContext.get("request-id"), bodyAsJson);
} catch (JsonDecodeException ex) {
HttpBridgeError error = handleError(ex);
HttpUtils.sendResponse(routingContext, error.getCode(),
HttpUtils.sendResponse(routingContext, error.code(),
BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(error.toJson()));
return;
}
Expand Down Expand Up @@ -609,39 +609,30 @@ public void handle(RoutingContext routingContext, Handler<HttpBridgeEndpoint> ha

@SuppressWarnings("unchecked")
private MessageConverter<K, V, byte[], byte[]> buildMessageConverter() {
switch (this.format) {
case JSON:
return (MessageConverter<K, V, byte[], byte[]>) new HttpJsonMessageConverter();
case BINARY:
return (MessageConverter<K, V, byte[], byte[]>) new HttpBinaryMessageConverter();
case TEXT:
return (MessageConverter<K, V, byte[], byte[]>) new HttpTextMessageConverter();
}
return null;
return switch (this.format) {
case JSON -> (MessageConverter<K, V, byte[], byte[]>) new HttpJsonMessageConverter();
case BINARY -> (MessageConverter<K, V, byte[], byte[]>) new HttpBinaryMessageConverter();
case TEXT -> (MessageConverter<K, V, byte[], byte[]>) new HttpTextMessageConverter();
default -> null;
};
}

private String getContentType() {
switch (this.format) {
case JSON:
return BridgeContentType.KAFKA_JSON_JSON;
case BINARY:
return BridgeContentType.KAFKA_JSON_BINARY;
case TEXT:
return BridgeContentType.KAFKA_JSON_TEXT;
}
throw new IllegalArgumentException();
return switch (this.format) {
case JSON -> BridgeContentType.KAFKA_JSON_JSON;
case BINARY -> BridgeContentType.KAFKA_JSON_BINARY;
case TEXT -> BridgeContentType.KAFKA_JSON_TEXT;
default -> throw new IllegalArgumentException();
};
}

private boolean checkAcceptedBody(String accept) {
switch (accept) {
case BridgeContentType.KAFKA_JSON_JSON:
return format == EmbeddedFormat.JSON;
case BridgeContentType.KAFKA_JSON_BINARY:
return format == EmbeddedFormat.BINARY;
case BridgeContentType.KAFKA_JSON_TEXT:
return format == EmbeddedFormat.TEXT;
}
return false;
return switch (accept) {
case BridgeContentType.KAFKA_JSON_JSON -> format == EmbeddedFormat.JSON;
case BridgeContentType.KAFKA_JSON_BINARY -> format == EmbeddedFormat.BINARY;
case BridgeContentType.KAFKA_JSON_TEXT -> format == EmbeddedFormat.TEXT;
default -> false;
};
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,13 +191,11 @@ private ObjectNode buildOffsets(List<HttpBridgeResult<?>> results) {

for (HttpBridgeResult<?> result : results) {
ObjectNode offset = null;
if (result.getResult() instanceof RecordMetadata) {
RecordMetadata metadata = (RecordMetadata) result.getResult();
if (result.result() instanceof RecordMetadata metadata) {
offset = JsonUtils.createObjectNode()
.put("partition", metadata.partition())
.put("offset", metadata.offset());
} else if (result.getResult() instanceof HttpBridgeError) {
HttpBridgeError error = (HttpBridgeError) result.getResult();
} else if (result.result() instanceof HttpBridgeError error) {
offset = error.toJson();
}
offsets.add(offset);
Expand All @@ -218,14 +216,14 @@ private int handleError(Throwable ex) {

@SuppressWarnings("unchecked")
private MessageConverter<K, V, byte[], byte[]> buildMessageConverter(String contentType) {
switch (contentType) {
case BridgeContentType.KAFKA_JSON_JSON:
return (MessageConverter<K, V, byte[], byte[]>) new HttpJsonMessageConverter();
case BridgeContentType.KAFKA_JSON_BINARY:
return (MessageConverter<K, V, byte[], byte[]>) new HttpBinaryMessageConverter();
case BridgeContentType.KAFKA_JSON_TEXT:
return (MessageConverter<K, V, byte[], byte[]>) new HttpTextMessageConverter();
}
return null;
return switch (contentType) {
case BridgeContentType.KAFKA_JSON_JSON ->
(MessageConverter<K, V, byte[], byte[]>) new HttpJsonMessageConverter();
case BridgeContentType.KAFKA_JSON_BINARY ->
(MessageConverter<K, V, byte[], byte[]>) new HttpBinaryMessageConverter();
case BridgeContentType.KAFKA_JSON_TEXT ->
(MessageConverter<K, V, byte[], byte[]>) new HttpTextMessageConverter();
default -> null;
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,36 +11,11 @@

/**
* Represents an error related to HTTP bridging
*
* @param code code classifying the error itself
* @param message message providing more information about the error
*/
public class HttpBridgeError {

private final int code;
private final String message;

/**
* Constructor
*
* @param code code classifying the error itself
* @param message message providing more information about the error
*/
public HttpBridgeError(int code, String message) {
this.code = code;
this.message = message;
}

/**
* @return code classifying the error itself
*/
public int getCode() {
return code;
}

/**
* @return message providing more information about the error
*/
public String getMessage() {
return message;
}
public record HttpBridgeError(int code, String message) {

/**
* @return a JSON representation of the error with code and message
Expand All @@ -54,6 +29,7 @@ public ObjectNode toJson() {

/**
* Create an error instance from a JSON representation
*
* @param json JSON representation of the error
* @return error instance
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,6 @@
* This class represents a result of an HTTP bridging operation
*
* @param <T> the class bringing the actual result as {@link HttpBridgeError} or {@link org.apache.kafka.clients.producer.RecordMetadata}
* @param result actual result
*/
public class HttpBridgeResult<T> {

final T result;

/**
* Constructor
*
* @param result actual result
*/
public HttpBridgeResult(T result) {
this.result = result;
}

/**
* @return the actual result
*/
public T getResult() {
return result;
}
}
public record HttpBridgeResult<T>(T result) { }
Loading
Loading