-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve][pulsar-io-kafka] Add option to copy Kafka headers to Pulsar properties #1
Conversation
c0f0eaa
to
cef00bc
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lgtm
@@ -189,19 +195,35 @@ public void start() { | |||
|
|||
public abstract KafkaRecord buildRecord(ConsumerRecord<Object, Object> consumerRecord); | |||
|
|||
protected Map<String, String> copyKafkaHeaders(ConsumerRecord<Object, Object> consumerRecord) { | |||
Map<String, String> properties = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can save a allocations by returning Collections.emptyMap here if the feature is not enabled
ImmutableMap.of("schema.registry.url", getRegistryAddressInDockerNetwork()) | ||
); | ||
ImmutableMap.of("schema.registry.url", getRegistryAddressInDockerNetwork())); | ||
sourceConfig.put("copyHeadersEnabled", true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The integ test is failing because the kafka-avro-console-producer
doesn't respect the headers.pars option. Will disable the test for now and add it as needed.
cef00bc
to
c1dada7
Compare
c1dada7
to
df108f2
Compare
Fixes apache#17760
Master Issue: #
Motivation
Users of Kafka source today can't access their Kafka headers in Pulsar. This patch adds a config to enable this copy to take place in addition to some standard, useful Kafka headers.
Modifications
KafkaSourceConfig
flag is added:copyHeadersEnabled
of typeboolean
and default value offalse
__kafka_topic
: the name of the Kafka topic consumed by the source.__kafka_partition
: the number of paritioed on the consumed kafka topics__kafka_offset
: the current offset of the kafka message currently consumed by the source<String, byte[]>
. Since Pulsar properties are of type<String, String>
, the Kafka byte[] values are based64 encoded.Verifying this change
This change added tests and can be verified as follows:
Extended integration test for Avro Kafka source to generate headers and verify them on Pulsar sideUpdate : The integ test is blocked by
kafka-avro-console-producer
not respecting --property parse.headers=true confluentinc/schema-registry#2390 and will be disabled for now.Does this pull request potentially affect one of the following parts:
If yes was chosen, please highlight the changes
Documentation
doc-required
(Your PR needs to update docs and you will update later)
doc-not-needed
(Please explain why)
doc
(Your PR contains doc changes)
doc-complete
(Docs have been already added)
Matching PR in forked repository
PR in forked repository: #1