Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New KeyShared consumers will not get any messages until a consumer that did get messages disconnects or acks/nacks some messages #15705

Closed
zbentley opened this issue May 21, 2022 · 10 comments · Fixed by #15709 or #23352
Labels
deprecated/question Questions should happened in GitHub Discussions doc-required Your PR changes impact docs and you will update later. Stale

Comments

@zbentley
Copy link
Contributor

zbentley commented May 21, 2022

Describe the bug

If a KeyShared consumer has any unacknowledged messages, new KeyShared consumers on the same subscription after that point will not get any new messages (even messages with brand new keys) until the original consumer either disconnects or acks/nacks some indeterminate number of messages.

This is a really bad bug!

Bug scenario 1

  1. Create a non partitioned topic.
  2. Create a KeyShared subscription on that topic.
  3. Produce some number of messages on the topic with a given key, say key1, using a KeyBased batching strategy.
  4. Start a consumer on the topic with the below code and ensure it prints Press a key to acknowledge messages.
  5. Start a second consumer on the same topic; it may or may not print Press a key to acknowledge messages depending on whether key1 is assigned to it (but it will never print that if this bug is in effect).
  6. Produce some number (10 should be sufficient) of messages on the topic with unique keys not equal to key1; say key2, key3, and so on. The goal here is to get a key that hashes to the second consumer's range.
  7. Observe that the second consumer never gets a message.
  8. In the first consumer's terminal, press enter.
  9. As the first consumer acks messages, observe that only then does the second consumer get any messages.

Bug scenario 2

  1. Create a non partitioned topic.
  2. Create a KeyShared subscription on that topic.
  3. Produce 100 messages on that topic, each with a distinct partition key (e.g. key1, key2 through key100).
  4. Start a consumer on the topic with the below code and ensure it prints Press a key to acknowledge messages.
  5. Start a second consumer on the topic with the below code.
  6. Observe that the second consumer does not receive any messages (i.e. it does not print Press a key to acknowledge messages), even though hash range redistribution should have allocated at least some of the 100 keys to the new consumer.
  7. In the first consumer's terminal, press enter.
  8. As the first consumer acks messages, observe that only then does the second consumer get any messages.

Consumer code

import time
from pulsar import Client, ConsumerType, Timeout
import os

TOPIC = 'THETOPIC'
SUBSCRIPTION = 'MYSUBSCRIPTION'

def recv(sub):
    while True:
        try:
            msg = sub.receive(100)
            print("Got message", msg.data())
            return msg
        except Timeout:
            pass

def main():
    client = Client(service_url='pulsar://localhost:6650')
    sub = client.subscribe(
        topic=TOPIC,
        subscription_name=SUBSCRIPTION,
        consumer_type=ConsumerType.KeyShared,
        receiver_queue_size=2,
        consumer_name=f'testconsumer-{os.getpid()}'
    )
    msg = recv(sub)
    input("Press a key to acknowledge messages")
    while True:
        sub.acknowledge(msg)
        msg = recv(sub)


if __name__ == '__main__':
    main()

Expected behavior
In scenario 1, the second consumer should receive at least some messages in step 7.
In scenario 2, the second consumer should receive messages as soon as it starts in step 5.

In short, I think hash range redistribution is not working right, or is not triggering message re-routing: when new KeyShared consumers arrive, two things should happen:

  1. New consumers should be allocated part of the hash range of their subscription.
  2. Any backlog messages for keys in that range should be sent to the new consumer.

In scenario 1, neither part is working.
In scenario 2, part 1 is working, but I think part 2 is not.

Environment:

Same environment as apache/pulsar-client-python#190

@codelipenghui
Copy link
Contributor

@zbentley

If a KeyShared consumer has any unacknowledged messages, new KeyShared consumers on the same subscription after that point will not get any new messages (even messages with brand new keys) until the original consumer either disconnects or acks/nacks some indeterminate number of messages.

It's expected behavior because the old consumer has unacked messages, the new messages after the unacked messages might break the message dispatch order by the key.

Here is more context about the key-shared subscription ordering guarantee

#6554
#7106

@zbentley
Copy link
Contributor Author

@codelipenghui if I'm reading that correctly, that's really concerning behavior.

If a topic has no consumers, and a backlog of message index:key pairs 0:a, 1:a, 2:a, 3:b, 4:b, 5:b, and a KeyShared consumer c1 joins with a receiver queue size of 1 and gets message 0, why would we prevent a new consumer c2 from joining and getting messages 3-5? That doesn't compromise key ordering in any way.

Am I interpreting it correctly that: a new key shared consumer that connects to the topic when the newest message has position X will not receive any messages until the oldest unacked message in the subscription is newer than or equal to X?

If that's the current behavior, it should be really prominently documented (potentially in a warning/highlighted way).

@codelipenghui
Copy link
Contributor

codelipenghui commented May 23, 2022

If a topic has no consumers, and a backlog of message index:key pairs 0:a, 1:a, 2:a, 3:b, 4:b, 5:b, and a KeyShared consumer c1 joins with a receiver queue size of 1 and gets message 0, why would we prevent a new consumer c2 from joining and getting messages 3-5? That doesn't compromise key ordering in any way.

Yes, it will not break the key shared semantics, but it's an implementation tradeoff, the current implementation doesn't need to maintain the state for each key since a topic might have a huge number of keys.

The behavior you described is expected for the current implementation(maybe not the best solution for now).

Am I interpreting it correctly that: a new key shared consumer that connects to the topic when the newest message has position X will not receive any messages until the oldest unacked message in the subscription is newer than or equal to X?

Yes, correct.

If that's the current behavior, it should be really prominently documented (potentially in a warning/highlighted way).

Yes, make sense, we will add the document.

@codelipenghui codelipenghui added doc-required Your PR changes impact docs and you will update later. deprecated/question Questions should happened in GitHub Discussions and removed type/bug The PR fixed a bug or issue reported a bug labels May 23, 2022
@Anonymitaet
Copy link
Member

Hi @momo-jun can you help add that note? Thanks

@zbentley
Copy link
Contributor Author

@codelipenghui thank you for the explanation; that makes sense.

Two clarifications:

  1. How does this apply to nacked messages? if a new KeyShared consumer c2 is blocked due to markDeletePosition not being caught up to the point where c2 joined, if an existing consumer c1 negatively acknowledges a message that hashes to c2, will the nacked message go to c2 or c1?
  2. Address this limitation, do you really need to track state for every key in the topic? I may be naïve here, but it seems to me that you would only need to track state for each key which has messages that have been dispatched to a consumer. That's still an O(N) state where there's currently not one, but it's a much smaller N. This might be getting into feature request territory now though.

@merlimat
Copy link
Contributor

@codelipenghui thank you for the explanation; that makes sense.

Two clarifications:

  1. How does this apply to nacked messages? if a new KeyShared consumer c2 is blocked due to markDeletePosition not being caught up to the point where c2 joined, if an existing consumer c1 negatively acknowledges a message that hashes to c2, will the nacked message go to c2 or c1?

C2 will joined and be marked that it can only receive messages dispatched from the moment it joins.

In this example, a message nacked by c1, will still get redelivered to c1 (unless c1 disconnects), because the keys are not switched until everything that c1 has already received is acked.

Otherwise, we could get a nack on one message and then on another and they could end up being out of order, eg: if c2 also goes away.

  1. Address this limitation, do you really need to track state for every key in the topic? I may be naïve here, but it seems to me that you would only need to track state for each key which has messages that have been dispatched to a consumer. That's still an O(N) state where there's currently not one, but it's a much smaller N. This might be getting into feature request territory now though.

"each key which has messages that have been dispatched to a consumer." ... for which the worst case scenario is to track every key :)

@xuesongxs
Copy link
Contributor

erwise, we could get a nack on one message and then on another and they could end up being out of order, eg: if c2 al

Your broker.conf file changed to:
subscriptionKeySharedUseConsistentHashing=true

@zbentley
Copy link
Contributor Author

@merlimat that's not accurate; if I start a KeyShared consumer c1 per the test setup here, wait until it receives its first message, then start another consumer c2, then trigger c1 to nack all of its messages, then c2 starts getting (some, not all, hash-based) messages.

Is that not supposed to happen? The docs seem to indicate that this is expected:

Be aware that negative acknowledgments on ordered subscription types, such as Exclusive, Failover and Key_Shared, might cause failed messages being sent to consumers out of the original order.

Additionally, how does allowOutOfOrderDelivery work with the Python/C++ client? Is that on by default? Off by default?

Lastly, how does the setting that @xuesongxs mentioned (subscriptionKeySharedUseConsistentHashing) affect this behavior? I thought that setting only affected what keys new consumers assume ownership from when they arrive; does it also affect how those consumers get messages (nacked or backlogged) that were already in the topic when they joined?

@github-actions
Copy link

The issue had no activity for 30 days, mark with Stale label.

@kuskmen
Copy link

kuskmen commented Aug 14, 2023

Can outOfOrderDelivery mitigate issues with consumers getting stuck, because obviously we don't care about ordering at this case?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
deprecated/question Questions should happened in GitHub Discussions doc-required Your PR changes impact docs and you will update later. Stale
Projects
None yet
6 participants