[improve][pulsar-io-kafka] Add option to copy Kafka headers to Pulsar properties #17829
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Fixes #17760
Motivation
Users of Kafka source today can't access their Kafka headers in Pulsar. This patch adds a config to enable this copy to take place in addition to some standard, useful Kafka headers.
Modifications
KafkaSourceConfig
flag is added:copyHeadersEnabled
of typeboolean
and default value offalse
__kafka_topic
: the name of the Kafka topic consumed by the source.__kafka_partition
: the number of paritioed on the consumed kafka topics__kafka_offset
: the current offset of the kafka message currently consumed by the source<String, byte[]>
. Since Pulsar properties are of type<String, String>
, the Kafka byte[] values are based64 encoded.Verifying this change
This change added tests and can be verified as follows:
Extended integration test for Avro Kafka source to generate headers and verify them on Pulsar sideUpdate : The integ test is blocked by
kafka-avro-console-producer
not respecting --property parse.headers=true confluentinc/schema-registry#2390 and will be disabled for now.Does this pull request potentially affect one of the following parts:
If yes was chosen, please highlight the changes
Documentation
[]
doc-required
(Your PR needs to update docs and you will update later)
doc-not-needed
Will be generated automatically
doc
(Your PR contains doc changes)
doc-complete
(Docs have been already added)
Matching PR in forked repository
PR in forked repository: aymkhalil#1