Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature][broker] PIP-204: Extensions for broker interceptor #17269

Merged
merged 1 commit into from
Sep 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,36 @@ public interface BrokerInterceptor extends AutoCloseable {

/**
* Intercept messages before sending them to the consumers.
* Deprecated, use {@link #beforeSendMessage(Subscription, Entry, long[], MessageMetadata, Consumer)} instead.
*
* @param subscription pulsar subscription
* @param entry entry
* @param ackSet entry ack bitset. it is either <tt>null</tt> or an array of long-based bitsets.
* @param msgMetadata message metadata. The message metadata will be recycled after this call.
*/
@Deprecated
default void beforeSendMessage(Subscription subscription,
Entry entry,
long[] ackSet,
MessageMetadata msgMetadata) {
}

/**
* Intercept messages before sending them to the consumers.
*
* @param subscription pulsar subscription
* @param entry entry
* @param ackSet entry ack bitset. it is either <tt>null</tt> or an array of long-based bitsets.
* @param msgMetadata message metadata. The message metadata will be recycled after this call.
* @param consumer consumer. Consumer which entry are sent to.
*/
default void beforeSendMessage(Subscription subscription,
Jason918 marked this conversation as resolved.
Show resolved Hide resolved
Entry entry,
long[] ackSet,
MessageMetadata msgMetadata,
Consumer consumer) {
}

/**
* Called by the broker when a new connection is created.
*/
Expand All @@ -77,6 +95,18 @@ default void producerCreated(ServerCnx cnx, Producer producer,
Map<String, String> metadata){
}

/**
* Called by the broker when a producer is closed.
*
* @param cnx client Connection
* @param producer Producer object
* @param metadata A map of metadata
aloyszhang marked this conversation as resolved.
Show resolved Hide resolved
*/
default void producerClosed(ServerCnx cnx,
Producer producer,
Map<String, String> metadata) {
}

/**
* Intercept after a consumer is created.
*
Expand All @@ -89,6 +119,30 @@ default void consumerCreated(ServerCnx cnx,
Map<String, String> metadata) {
}

/**
* Called by the broker when a consumer is closed.
*
* @param cnx client Connection
* @param consumer Consumer object
* @param metadata A map of metadata
*/
default void consumerClosed(ServerCnx cnx,
Consumer consumer,
Map<String, String> metadata) {
}

/**
* Intercept message when broker receive a send request.
*
* @param headersAndPayload entry's header and payload
* @param publishContext Publish Context
*/
default void onMessagePublish(Producer producer,
ByteBuf headersAndPayload,
Topic.PublishContext publishContext) {

}

/**
* Intercept after a message is produced.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,26 @@ public void beforeSendMessage(Subscription subscription,
}
}

@Override
public void beforeSendMessage(Subscription subscription,
Entry entry,
long[] ackSet,
MessageMetadata msgMetadata,
Consumer consumer) {
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
this.interceptor.beforeSendMessage(
subscription, entry, ackSet, msgMetadata, consumer);
}
}

@Override
public void onMessagePublish(Producer producer, ByteBuf headersAndPayload,
Topic.PublishContext publishContext) {
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
this.interceptor.onMessagePublish(producer, headersAndPayload, publishContext);
}
}

@Override
public void producerCreated(ServerCnx cnx, Producer producer,
Map<String, String> metadata){
Expand All @@ -71,6 +91,15 @@ public void producerCreated(ServerCnx cnx, Producer producer,
}
}

@Override
public void producerClosed(ServerCnx cnx,
Producer producer,
Map<String, String> metadata) {
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
this.interceptor.producerClosed(cnx, producer, metadata);
}
}

@Override
public void consumerCreated(ServerCnx cnx,
Consumer consumer,
Expand All @@ -81,6 +110,16 @@ public void consumerCreated(ServerCnx cnx,
}
}

@Override
public void consumerClosed(ServerCnx cnx,
Consumer consumer,
Map<String, String> metadata) {
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
this.interceptor.consumerClosed(cnx, consumer, metadata);
}
}


@Override
public void messageProduced(ServerCnx cnx, Producer producer, long startTimeNs, long ledgerId,
long entryId, Topic.PublishContext publishContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,107 +93,143 @@ public static BrokerInterceptor load(ServiceConfiguration conf) throws IOExcepti
}
aloyszhang marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public void onMessagePublish(Producer producer,
ByteBuf headersAndPayload,
Topic.PublishContext publishContext) {
if (interceptorsEnabled()) {
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.onMessagePublish(producer, headersAndPayload, publishContext);
}
}
}

@Override
public void beforeSendMessage(Subscription subscription,
Entry entry,
long[] ackSet,
MessageMetadata msgMetadata) {
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.beforeSendMessage(
subscription,
entry,
ackSet,
msgMetadata);
if (interceptorsEnabled()) {
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.beforeSendMessage(subscription, entry, ackSet, msgMetadata);
}
}
}

@Override
public void beforeSendMessage(Subscription subscription,
Entry entry,
long[] ackSet,
MessageMetadata msgMetadata,
Consumer consumer) {
if (interceptorsEnabled()) {
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.beforeSendMessage(subscription, entry, ackSet, msgMetadata, consumer);
}
}
}

@Override
public void consumerCreated(ServerCnx cnx,
Consumer consumer,
Map<String, String> metadata) {
if (interceptors == null || interceptors.isEmpty()) {
return;
if (interceptorsEnabled()) {
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.consumerCreated(
cnx,
consumer,
metadata);
}
}
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.consumerCreated(
cnx,
consumer,
metadata);
}

@Override
public void consumerClosed(ServerCnx cnx,
Consumer consumer,
Map<String, String> metadata) {
if (interceptorsEnabled()) {
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.consumerClosed(cnx, consumer, metadata);
}
}
}

@Override
public void producerCreated(ServerCnx cnx, Producer producer,
Map<String, String> metadata){
if (interceptors == null || interceptors.isEmpty()) {
return;
if (interceptorsEnabled()) {
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.producerCreated(cnx, producer, metadata);
}
}
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.producerCreated(cnx, producer, metadata);
}

@Override
public void producerClosed(ServerCnx cnx,
Producer producer,
Map<String, String> metadata) {
if (interceptorsEnabled()) {
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.producerClosed(cnx, producer, metadata);
}
}
}

@Override
public void messageProduced(ServerCnx cnx, Producer producer, long startTimeNs, long ledgerId,
long entryId, Topic.PublishContext publishContext) {
if (interceptors == null || interceptors.isEmpty()) {
return;
}
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.messageProduced(cnx, producer, startTimeNs, ledgerId, entryId, publishContext);
if (interceptorsEnabled()) {
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.messageProduced(cnx, producer, startTimeNs, ledgerId, entryId, publishContext);
}
}
}

@Override
public void messageDispatched(ServerCnx cnx, Consumer consumer, long ledgerId,
long entryId, ByteBuf headersAndPayload) {
if (interceptors == null || interceptors.isEmpty()) {
return;
}
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.messageDispatched(cnx, consumer, ledgerId, entryId, headersAndPayload);
if (interceptorsEnabled()) {
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.messageDispatched(cnx, consumer, ledgerId, entryId, headersAndPayload);
}
}
}

@Override
public void messageAcked(ServerCnx cnx, Consumer consumer,
CommandAck ackCmd) {
if (interceptors == null || interceptors.isEmpty()) {
return;
}
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.messageAcked(cnx, consumer, ackCmd);
if (interceptorsEnabled()) {
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.messageAcked(cnx, consumer, ackCmd);
}
}
}

@Override
public void txnOpened(long tcId, String txnID) {
if (interceptors == null || interceptors.isEmpty()) {
return;
}
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.txnOpened(tcId, txnID);
if (interceptorsEnabled()) {
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.txnOpened(tcId, txnID);
}
}
}

@Override
public void txnEnded(String txnID, long txnAction) {
if (interceptors == null || interceptors.isEmpty()) {
return;
}
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.txnEnded(txnID, txnAction);
if (interceptorsEnabled()) {
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.txnEnded(txnID, txnAction);
}
}
}


@Override
public void onConnectionCreated(ServerCnx cnx) {
if (interceptors == null || interceptors.isEmpty()) {
return;
}
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.onConnectionCreated(cnx);
if (interceptorsEnabled()) {
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.onConnectionCreated(cnx);
}
}
}

Expand Down Expand Up @@ -237,4 +273,8 @@ public void initialize(PulsarService pulsarService) throws Exception {
public void close() {
interceptors.values().forEach(BrokerInterceptorWithClassLoader::close);
}

private boolean interceptorsEnabled() {
return interceptors != null && !interceptors.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,9 @@ public int filterEntriesForConsumer(Optional<MessageMetadata[]> optMetadataArray

BrokerInterceptor interceptor = subscription.interceptor();
if (null != interceptor) {
// keep for compatibility if users has implemented the old interface
interceptor.beforeSendMessage(subscription, entry, ackSet, msgMetadata);
interceptor.beforeSendMessage(subscription, entry, ackSet, msgMetadata, consumer);
}
}
if (CollectionUtils.isNotEmpty(entriesToFiltered)) {
Expand Down
Loading