Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix][pulsar-io] KCA to use index (if available) instead of sequenceId and to handle batched messages non-unique sequenceIds #16098

Merged
merged 1 commit into from
Jun 17, 2022

Conversation

dlg99
Copy link
Contributor

@dlg99 dlg99 commented Jun 16, 2022

Motivation

Record's getRecordSequence() returns non-unique sequenceId for the messages from the same batch.
The root cause is that FunctionCommon.getSequenceId() does not account for the index of the message in the batch and only uses ledgerid and entryId.

For the KCA Sink it mean that messages start arriving with the same offset, and some Kafka Sinks will ignore such messages as duplicates.

Changing this behavior in the FunctionCommon.getSequenceId() is potentially breaking (requires separate discussion) and I assume it does not affect Pulsar right now.
Another problem is that we are already packing two longs (ledgerId and entryId) to get one long (sequenceId), batch adds an int to this. With the KCA one can make assumptions around batch size/number of entries in ledger before rotation and configure this to avoid/minimize lossiness of this packing, in general Pulsar such assumption aren'tr eliable.

Modifications

KCA's produced kafka offset can use

  • message index as an offset, if enabled and added by interceptor
  • use message index in the batch to build the offset

PulsarKafkaConnectSinkConfig added couple of parameters (documented in FieldDoc)

  • maxBatchBitsForOffset
  • useIndexAsOffset

Verifying this change

Added unit tests

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

No

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API: (yes / no)
  • The schema: (yes / no / don't know)
  • The default values of configurations: (yes / no)
  • The wire protocol: (yes / no)
  • The rest endpoints: (yes / no)
  • The admin cli options: (yes / no)
  • Anything that affects deployment: (yes / no / don't know)

There are no Sinks in the Apache Pulsar that use KCA.

Documentation

Check the box below or label this PR directly.

Need to update docs?

  • doc-required
    (Your PR needs to update docs and you will update later)

  • doc-not-needed
    (Please explain why)

  • doc
    (Your PR contains doc changes)

  • doc-complete
    (Docs have been already added)

@github-actions github-actions bot added the doc Your PR contains doc changes, no matter whether the changes are in markdown or code files. label Jun 16, 2022
help = "Number of bits (0 to 20) to use for index of message in the batch for translation into an offset.\n"
+ "0 to disable this behavior (Messages from the same batch will have the same "
+ "offset which can affect some connectors.)")
private int maxBatchBitsForOffset = 12;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is zero than the behaviour is the same as before.
So I would keep 0 as default, in order to not break compatibility with data (offsets) already stored in existing envs that are upgrading

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe if we use Integer we can detect that the connector has been created with a old version if the value is null and so enable the legacy behaviour

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apache Pulsar does not have Sink implementations that use kafka connectors (Debezium etc are sources).
I don't think we need to worry about legacy behavior here.

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@eolivelli eolivelli merged commit a18c01d into apache:master Jun 17, 2022
defaultValue = "true",
help = "Allows use of message index instead of message sequenceId as offset, if available.\n"
+ "Requires AppendIndexMetadataInterceptor and "
+ "enableExposingBrokerEntryMetadataToClient=true on brokers.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the config is exposingBrokerEntryMetadataToClientEnabled

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will update in #16100 since this PR is merged already

nicoloboschi pushed a commit to datastax/pulsar that referenced this pull request Jun 17, 2022
…ched messages non-unique sequenceIds (apache#16098)

(cherry picked from commit a18c01d)
nicoloboschi pushed a commit to datastax/pulsar that referenced this pull request Jun 17, 2022
…ched messages non-unique sequenceIds (apache#16098)

(cherry picked from commit a18c01d)
nicoloboschi pushed a commit to datastax/pulsar that referenced this pull request Jul 21, 2022
…ched messages non-unique sequenceIds (apache#16098)

(cherry picked from commit a18c01d)
@nicoloboschi nicoloboschi added this to the 2.11.0 milestone Sep 26, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connector doc Your PR contains doc changes, no matter whether the changes are in markdown or code files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants