Skip to content

Commit

Permalink
[improve][broker]prevent partitioned metadata lookup request when bro…
Browse files Browse the repository at this point in the history
…ker is closing (apache#17315)

(cherry picked from commit 256bd68)
  • Loading branch information
HQebupt authored and nicoloboschi committed Oct 25, 2022
1 parent c2f7230 commit b1a6998
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,18 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
return;
}

if (!this.service.getPulsar().isRunning()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Failed PartitionMetadataLookup from {} for {} "
+ "due to pulsar service is not ready: {} state",
partitionMetadata.getTopic(), remoteAddress, requestId,
this.service.getPulsar().getState().toString());
}
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady,
"Failed due to pulsar service is not ready", requestId));
return;
}

final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
if (lookupSemaphore.tryAcquire()) {
if (invalidOriginalPrincipal(originalPrincipal)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
import org.apache.pulsar.common.api.proto.CommandConnected;
import org.apache.pulsar.common.api.proto.CommandError;
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse;
import org.apache.pulsar.common.api.proto.CommandProducerSuccess;
import org.apache.pulsar.common.api.proto.CommandSendError;
import org.apache.pulsar.common.api.proto.CommandSendReceipt;
Expand Down Expand Up @@ -2173,4 +2174,19 @@ public void handleConnectWithServiceNotReady() throws Exception {
assertEquals(error.getError(), ServerError.ServiceNotReady);
channel.finish();
}

@Test(timeOut = 30000)
public void handlePartitionMetadataRequestWithServiceNotReady() throws Exception {
resetChannel();
setChannelConnected();
doReturn(false).when(pulsar).isRunning();
assertTrue(channel.isActive());

ByteBuf clientCommand = Commands.newPartitionMetadataRequest(successTopicName, 1);
channel.writeInbound(clientCommand);
Object response = getResponse();
assertTrue(response instanceof CommandPartitionedTopicMetadataResponse);
assertEquals(((CommandPartitionedTopicMetadataResponse) response).getError(), ServerError.ServiceNotReady);
channel.finish();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@

import java.util.Queue;

<<<<<<< HEAD
=======
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse;
import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse;
import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
>>>>>>> 256bd68770d ([improve][broker]prevent partitioned metadata lookup request when broker is closing (#17315))
import org.apache.pulsar.common.protocol.PulsarDecoder;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandCloseConsumer;
Expand Down Expand Up @@ -150,6 +156,11 @@ protected void handleProducerSuccess(CommandProducerSuccess success) {
protected void handleLookupResponse(CommandLookupTopicResponse connection) {
queue.offer(new CommandLookupTopicResponse().copyFrom(connection));
}

@Override
protected void handlePartitionResponse(CommandPartitionedTopicMetadataResponse response) {
queue.offer(new CommandPartitionedTopicMetadataResponse().copyFrom(response));
}
};

}

0 comments on commit b1a6998

Please sign in to comment.