Skip to content

Commit

Permalink
extends broker interceptor
Browse files Browse the repository at this point in the history
  • Loading branch information
aloyszhang committed Sep 9, 2022
1 parent 200f433 commit b264a92
Show file tree
Hide file tree
Showing 10 changed files with 263 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,36 @@ public interface BrokerInterceptor extends AutoCloseable {

/**
* Intercept messages before sending them to the consumers.
* Deprecated, use {@link #beforeSendMessage(Subscription, Entry, long[], MessageMetadata, Consumer)} instead.
*
* @param subscription pulsar subscription
* @param entry entry
* @param ackSet entry ack bitset. it is either <tt>null</tt> or an array of long-based bitsets.
* @param msgMetadata message metadata. The message metadata will be recycled after this call.
*/
@Deprecated
default void beforeSendMessage(Subscription subscription,
Entry entry,
long[] ackSet,
MessageMetadata msgMetadata) {
}

/**
* Intercept messages before sending them to the consumers.
*
* @param subscription pulsar subscription
* @param entry entry
* @param ackSet entry ack bitset. it is either <tt>null</tt> or an array of long-based bitsets.
* @param msgMetadata message metadata. The message metadata will be recycled after this call.
* @param consumer consumer. Consumer which entry are sent to.
*/
default void beforeSendMessage(Subscription subscription,
Entry entry,
long[] ackSet,
MessageMetadata msgMetadata,
Consumer consumer) {
}

/**
* Called by the broker when a new connection is created.
*/
Expand All @@ -77,6 +95,18 @@ default void producerCreated(ServerCnx cnx, Producer producer,
Map<String, String> metadata){
}

/**
* Called by the broker when a producer is closed.
*
* @param cnx client Connection
* @param producer Producer object
* @param metadata A map of metadata
*/
default void producerClosed(ServerCnx cnx,
Producer producer,
Map<String, String> metadata) {
}

/**
* Intercept after a consumer is created.
*
Expand All @@ -89,6 +119,30 @@ default void consumerCreated(ServerCnx cnx,
Map<String, String> metadata) {
}

/**
* Called by the broker when a consumer is closed.
*
* @param cnx client Connection
* @param consumer Consumer object
* @param metadata A map of metadata
*/
default void consumerClosed(ServerCnx cnx,
Consumer consumer,
Map<String, String> metadata) {
}

/**
* Intercept message when broker receive a send request.
*
* @param headersAndPayload entry's header and payload
* @param publishContext Publish Context
*/
default void onMessagePublish(Producer producer,
ByteBuf headersAndPayload,
Topic.PublishContext publishContext) {

}

/**
* Intercept after a message is produced.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,26 @@ public void beforeSendMessage(Subscription subscription,
}
}

@Override
public void beforeSendMessage(Subscription subscription,
Entry entry,
long[] ackSet,
MessageMetadata msgMetadata,
Consumer consumer) {
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
this.interceptor.beforeSendMessage(
subscription, entry, ackSet, msgMetadata, consumer);
}
}

@Override
public void onMessagePublish(Producer producer, ByteBuf headersAndPayload,
Topic.PublishContext publishContext) {
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
this.interceptor.onMessagePublish(producer, headersAndPayload, publishContext);
}
}

@Override
public void producerCreated(ServerCnx cnx, Producer producer,
Map<String, String> metadata){
Expand All @@ -71,6 +91,15 @@ public void producerCreated(ServerCnx cnx, Producer producer,
}
}

@Override
public void producerClosed(ServerCnx cnx,
Producer producer,
Map<String, String> metadata) {
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
this.interceptor.producerClosed(cnx, producer, metadata);
}
}

@Override
public void consumerCreated(ServerCnx cnx,
Consumer consumer,
Expand All @@ -81,6 +110,16 @@ public void consumerCreated(ServerCnx cnx,
}
}

@Override
public void consumerClosed(ServerCnx cnx,
Consumer consumer,
Map<String, String> metadata) {
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
this.interceptor.consumerClosed(cnx, consumer, metadata);
}
}


@Override
public void messageProduced(ServerCnx cnx, Producer producer, long startTimeNs, long ledgerId,
long entryId, Topic.PublishContext publishContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,107 +93,143 @@ public static BrokerInterceptor load(ServiceConfiguration conf) throws IOExcepti
}
}

@Override
public void onMessagePublish(Producer producer,
ByteBuf headersAndPayload,
Topic.PublishContext publishContext) {
if (interceptorsEnabled()) {
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.onMessagePublish(producer, headersAndPayload, publishContext);
}
}
}

@Override
public void beforeSendMessage(Subscription subscription,
Entry entry,
long[] ackSet,
MessageMetadata msgMetadata) {
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.beforeSendMessage(
subscription,
entry,
ackSet,
msgMetadata);
if (interceptorsEnabled()) {
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.beforeSendMessage(subscription, entry, ackSet, msgMetadata);
}
}
}

@Override
public void beforeSendMessage(Subscription subscription,
Entry entry,
long[] ackSet,
MessageMetadata msgMetadata,
Consumer consumer) {
if (interceptorsEnabled()) {
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.beforeSendMessage(subscription, entry, ackSet, msgMetadata, consumer);
}
}
}

@Override
public void consumerCreated(ServerCnx cnx,
Consumer consumer,
Map<String, String> metadata) {
if (interceptors == null || interceptors.isEmpty()) {
return;
if (interceptorsEnabled()) {
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.consumerCreated(
cnx,
consumer,
metadata);
}
}
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.consumerCreated(
cnx,
consumer,
metadata);
}

@Override
public void consumerClosed(ServerCnx cnx,
Consumer consumer,
Map<String, String> metadata) {
if (interceptorsEnabled()) {
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.consumerClosed(cnx, consumer, metadata);
}
}
}

@Override
public void producerCreated(ServerCnx cnx, Producer producer,
Map<String, String> metadata){
if (interceptors == null || interceptors.isEmpty()) {
return;
if (interceptorsEnabled()) {
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.producerCreated(cnx, producer, metadata);
}
}
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.producerCreated(cnx, producer, metadata);
}

@Override
public void producerClosed(ServerCnx cnx,
Producer producer,
Map<String, String> metadata) {
if (interceptorsEnabled()) {
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.producerClosed(cnx, producer, metadata);
}
}
}

@Override
public void messageProduced(ServerCnx cnx, Producer producer, long startTimeNs, long ledgerId,
long entryId, Topic.PublishContext publishContext) {
if (interceptors == null || interceptors.isEmpty()) {
return;
}
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.messageProduced(cnx, producer, startTimeNs, ledgerId, entryId, publishContext);
if (interceptorsEnabled()) {
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.messageProduced(cnx, producer, startTimeNs, ledgerId, entryId, publishContext);
}
}
}

@Override
public void messageDispatched(ServerCnx cnx, Consumer consumer, long ledgerId,
long entryId, ByteBuf headersAndPayload) {
if (interceptors == null || interceptors.isEmpty()) {
return;
}
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.messageDispatched(cnx, consumer, ledgerId, entryId, headersAndPayload);
if (interceptorsEnabled()) {
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.messageDispatched(cnx, consumer, ledgerId, entryId, headersAndPayload);
}
}
}

@Override
public void messageAcked(ServerCnx cnx, Consumer consumer,
CommandAck ackCmd) {
if (interceptors == null || interceptors.isEmpty()) {
return;
}
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.messageAcked(cnx, consumer, ackCmd);
if (interceptorsEnabled()) {
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.messageAcked(cnx, consumer, ackCmd);
}
}
}

@Override
public void txnOpened(long tcId, String txnID) {
if (interceptors == null || interceptors.isEmpty()) {
return;
}
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.txnOpened(tcId, txnID);
if (interceptorsEnabled()) {
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.txnOpened(tcId, txnID);
}
}
}

@Override
public void txnEnded(String txnID, long txnAction) {
if (interceptors == null || interceptors.isEmpty()) {
return;
}
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.txnEnded(txnID, txnAction);
if (interceptorsEnabled()) {
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.txnEnded(txnID, txnAction);
}
}
}


@Override
public void onConnectionCreated(ServerCnx cnx) {
if (interceptors == null || interceptors.isEmpty()) {
return;
}
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.onConnectionCreated(cnx);
if (interceptorsEnabled()) {
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.onConnectionCreated(cnx);
}
}
}

Expand Down Expand Up @@ -237,4 +273,8 @@ public void initialize(PulsarService pulsarService) throws Exception {
public void close() {
interceptors.values().forEach(BrokerInterceptorWithClassLoader::close);
}

private boolean interceptorsEnabled() {
return interceptors != null && !interceptors.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,9 @@ public int filterEntriesForConsumer(Optional<MessageMetadata[]> optMetadataArray

BrokerInterceptor interceptor = subscription.interceptor();
if (null != interceptor) {
// keep for compatibility if users has implemented the old interface
interceptor.beforeSendMessage(subscription, entry, ackSet, msgMetadata);
interceptor.beforeSendMessage(subscription, entry, ackSet, msgMetadata, consumer);
}
}
if (CollectionUtils.isNotEmpty(entriesToFiltered)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -811,10 +811,8 @@ public CompletableFuture<Void> closeAsync() {
}
});

if (interceptor != null) {
interceptor.close();
interceptor = null;
}
interceptor.close();
interceptor = null;

try {
authenticationService.close();
Expand Down
Loading

0 comments on commit b264a92

Please sign in to comment.