Skip to content

Commit

Permalink
Update proxy lookup throw exception type (apache#17600)
Browse files Browse the repository at this point in the history
  • Loading branch information
tuteng authored and tisonkun committed Sep 14, 2022
1 parent 891944f commit b15ec4d
Showing 1 changed file with 7 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ private void performLookup(long clientRequestId, String topic, String brokerServ
if (t != null) {
log.warn("[{}] Failed to lookup topic {}: {}", clientAddress, topic, t.getMessage());
proxyConnection.ctx().writeAndFlush(
Commands.newLookupErrorResponse(ServerError.ServiceNotReady, t.getMessage(), clientRequestId));
Commands.newLookupErrorResponse(getServerError(t), t.getMessage(), clientRequestId));
} else {
String brokerUrl = connectWithTLS ? r.brokerUrlTls : r.brokerUrl;
if (r.redirect) {
Expand Down Expand Up @@ -179,7 +179,7 @@ private void performLookup(long clientRequestId, String topic, String brokerServ
}).exceptionally(ex -> {
// Failed to connect to backend broker
proxyConnection.ctx().writeAndFlush(
Commands.newLookupErrorResponse(ServerError.ServiceNotReady, ex.getMessage(), clientRequestId));
Commands.newLookupErrorResponse(getServerError(ex), ex.getMessage(), clientRequestId));
return null;
});
}
Expand Down Expand Up @@ -249,7 +249,7 @@ private void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata par
});
}).exceptionally(ex -> {
// Failed to connect to backend broker
proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady,
proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(getServerError(ex),
ex.getMessage(), clientRequestId));
return null;
});
Expand Down Expand Up @@ -330,7 +330,7 @@ private void performGetTopicsOfNamespace(long clientRequestId,
log.warn("[{}] Failed to get TopicsOfNamespace {}: {}",
clientAddress, namespaceName, t.getMessage());
proxyConnection.ctx().writeAndFlush(
Commands.newError(clientRequestId, ServerError.ServiceNotReady, t.getMessage()));
Commands.newError(clientRequestId, getServerError(t), t.getMessage()));
} else {
proxyConnection.ctx().writeAndFlush(
Commands.newGetTopicsOfNamespaceResponse(r.getTopics(), r.getTopicsHash(), r.isFiltered(),
Expand All @@ -342,7 +342,7 @@ private void performGetTopicsOfNamespace(long clientRequestId,
}).exceptionally(ex -> {
// Failed to connect to backend broker
proxyConnection.ctx().writeAndFlush(
Commands.newError(clientRequestId, ServerError.ServiceNotReady, ex.getMessage()));
Commands.newError(clientRequestId, getServerError(ex), ex.getMessage()));
return null;
});
}
Expand Down Expand Up @@ -385,7 +385,7 @@ public void handleGetSchema(CommandGetSchema commandGetSchema) {
if (t != null) {
log.warn("[{}] Failed to get schema {}: {}", clientAddress, topic, t);
proxyConnection.ctx().writeAndFlush(
Commands.newError(clientRequestId, ServerError.ServiceNotReady, t.getMessage()));
Commands.newError(clientRequestId, getServerError(t), t.getMessage()));
} else {
proxyConnection.ctx().writeAndFlush(
Commands.newGetSchemaResponse(clientRequestId, r));
Expand All @@ -396,7 +396,7 @@ public void handleGetSchema(CommandGetSchema commandGetSchema) {
}).exceptionally(ex -> {
// Failed to connect to backend broker
proxyConnection.ctx().writeAndFlush(
Commands.newError(clientRequestId, ServerError.ServiceNotReady, ex.getMessage()));
Commands.newError(clientRequestId, getServerError(ex), ex.getMessage()));
return null;
});

Expand Down

0 comments on commit b15ec4d

Please sign in to comment.