You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
The central messaging platform at Intuit uses Apache Pulsar. The platform team operates multiple clusters that are used by hundreds of teams across Intuit. For complete visibility and to better serve customers, the platform needs the ability to intercept all key broker and ledger events. Also needed is the ability to transparently control the format of messages that get persisted in the disk store
Goal
The scope of this proposal is limited to broker events and operations. This PIP addresses only the traceability/interceptability at the broker level. To achieve end-to-end tracing, it is also desirable to intercept events at the pulsar proxy level (for topic lookup, ownership assignment flow etc), but that can be handled in a separate PIP.
API Changes
Would like to propose the following solution
Extend the existing interface org.apache.pulsar.broker.intercept.BrokerInterceptor to support the following granular events with all relevant context information void onConnectionCreated(ServerCnx cnx); void producerCreated(ServerCnx cnx, Producer producer, Map<String, String> metadata); void void consumerCreated(ServerCnx cnx,Consumer consumer, Map<String, String> metadata); void messageProduced(ServerCnx cnx, Producer producer, long startTimeNs, long ledgerId,long entryId, Rate rateIn, Topic.PublishContext publishContext); void messageDispatched(ServerCnx cnx, Consumer consumer, long ledgerId,long entryId, ByteBuf headersAndPayload); void void messageAcked(ServerCnx cnx, Consumer consumer, CommandAck ackCmd);
Support a new interface org.apache.pulsar.common.intercept.ManagedLedgerPayloadProcessor to allow interception of write and read operations of a managed ledger and modify the payload. The interface details are given in the next section
Support dynamic load of managed ledger interceptor implementations through a broker configuration parameter 'brokerEntryPayloadProcessors' in class org.apache.pulsar.broker.ServiceConfiguration Set<String> brokerEntryPayloadProcessors;
Implementation
interface BrokerInterceptor should be extended to include the additional callback methods specified in the section above.
The new callback methods need to be invoked at appropriate places in pulsar-broker module (in classes ServerCnx, Producer, Consumer)
A new interface ManagedLedgerPayloadProcessor to be added with the following content
A new configuration parameter 'brokerEntryPayloadProcessors' should be supported in broker.conf. This can be a list of processors
The existing class org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl should be extended to support 2 additional operations. processPayloadBeforeLedgerWrite(OpAddEntry op, ByteBuf ledgerData) processPayloadBeforeEntryCache(Bytebuf ledgerData)
ManagedLedgerInterceptorImpl should internally use ManagedLedgerPayloadProcessor instance(s) to handle the above payload processing operations.
OpAddEntry should use the method ManagedLedgerInterceptor::processPayloadBeforeLedgerWrite to support processing of the payload before it gets written to the ledger
EntryCacheManager (and EntryCacheImpl) should use method ManagedLedgerInterceptor::processPayloadBeforeEntryCache to process the payload immediately after it is read from the ledger
Reject Alternatives
The text was updated successfully, but these errors were encountered:
madhavan-narayanan
changed the title
Broker extensions to provide operators of enterprise-wide clusters better control and flexibility
[PIP] Broker extensions to provide operators of enterprise-wide clusters better control and flexibility
Nov 17, 2021
sijie
changed the title
[PIP] Broker extensions to provide operators of enterprise-wide clusters better control and flexibility
[PIP 106] Broker extensions to provide operators of enterprise-wide clusters better control and flexibility
Dec 9, 2021
Motivation
The central messaging platform at Intuit uses Apache Pulsar. The platform team operates multiple clusters that are used by hundreds of teams across Intuit. For complete visibility and to better serve customers, the platform needs the ability to intercept all key broker and ledger events. Also needed is the ability to transparently control the format of messages that get persisted in the disk store
Goal
The scope of this proposal is limited to broker events and operations. This PIP addresses only the traceability/interceptability at the broker level. To achieve end-to-end tracing, it is also desirable to intercept events at the pulsar proxy level (for topic lookup, ownership assignment flow etc), but that can be handled in a separate PIP.
API Changes
Would like to propose the following solution
Extend the existing interface org.apache.pulsar.broker.intercept.BrokerInterceptor to support the following granular events with all relevant context information
void onConnectionCreated(ServerCnx cnx);
void producerCreated(ServerCnx cnx, Producer producer, Map<String, String> metadata);
void void consumerCreated(ServerCnx cnx,Consumer consumer, Map<String, String> metadata);
void messageProduced(ServerCnx cnx, Producer producer, long startTimeNs, long ledgerId,long entryId, Rate rateIn, Topic.PublishContext publishContext);
void messageDispatched(ServerCnx cnx, Consumer consumer, long ledgerId,long entryId, ByteBuf headersAndPayload);
void void messageAcked(ServerCnx cnx, Consumer consumer, CommandAck ackCmd);
Support a new interface org.apache.pulsar.common.intercept.ManagedLedgerPayloadProcessor to allow interception of write and read operations of a managed ledger and modify the payload. The interface details are given in the next section
Support dynamic load of managed ledger interceptor implementations through a broker configuration parameter 'brokerEntryPayloadProcessors' in class org.apache.pulsar.broker.ServiceConfiguration
Set<String> brokerEntryPayloadProcessors;
Implementation
interface BrokerInterceptor should be extended to include the additional callback methods specified in the section above.
The new callback methods need to be invoked at appropriate places in pulsar-broker module (in classes ServerCnx, Producer, Consumer)
A new interface ManagedLedgerPayloadProcessor to be added with the following content
A new configuration parameter 'brokerEntryPayloadProcessors' should be supported in broker.conf. This can be a list of processors
The existing class org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl should be extended to support 2 additional operations.
processPayloadBeforeLedgerWrite(OpAddEntry op, ByteBuf ledgerData)
processPayloadBeforeEntryCache(Bytebuf ledgerData)
ManagedLedgerInterceptorImpl should internally use ManagedLedgerPayloadProcessor instance(s) to handle the above payload processing operations.
OpAddEntry should use the method ManagedLedgerInterceptor::processPayloadBeforeLedgerWrite to support processing of the payload before it gets written to the ledger
EntryCacheManager (and EntryCacheImpl) should use method ManagedLedgerInterceptor::processPayloadBeforeEntryCache to process the payload immediately after it is read from the ledger
Reject Alternatives
The text was updated successfully, but these errors were encountered: