Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rare non-resumption of message delivery after connection interruption #4916

Open
6 of 7 tasks
tmcqueen-materials opened this issue Nov 27, 2024 · 3 comments
Open
6 of 7 tasks

Comments

@tmcqueen-materials
Copy link

tmcqueen-materials commented Nov 27, 2024

Description

In rare (~1/50) instances, a connection interruption causes librdkafka to no longer receive messages from topics in consume calls. It appears to be caused by a change in partition counts to 0, that then never gets reset, even after future resubscribe calls. Once in this state, a consumer fails to resume message consumption for at least hours (at which point we restart the consumer to fix the issue).

How to reproduce

This is a difficult issue to reproduce, as we have only ever seen it after interruption of network connectivity (e.g. by physically powering down a network switch, waiting some time, and then powering it back on), or on simultaneous broker restarts (or, e.g., by only using a single broker and restarting that apache kafka instance), and even then it only happens infrequently. So we have not yet been able to collect full debug logs during an event.

However, what we do know is that most of the time, the consumer automatically recovers. In cases where they recover, we see log lines like:

2024-11-24T22:39:36.023416770-05:00 stderr F WARNING:root:Processed 42200 messages
2024-11-24T22:41:06.122966280-05:00 stderr F INFO:kafkacrypto.confluent_kafka_wrapper:Executing Consumer subscribe with topics=[<bunch of topics>], pattern=None, listener=None
2024-11-24T22:43:36.517038922-05:00 stderr F %6|1732506216.516|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://kafka-broker-dns-name:9094/bootstrap]: sasl_ssl://kafka-broker-dns-name:9094/1: Disconnected (after 125971537ms in state UP)
2024-11-24T22:43:36.519298106-05:00 stderr F %6|1732506216.519|FAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: kafka-broker-dns-name:9094: Disconnected (after 133873ms in state UP, 1 identical error(s) suppressed)
2024-11-24T22:43:36.519473368-05:00 stderr F %3|1732506216.519|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://kafka-broker-dns-name:9094/bootstrap]: sasl_ssl://kafka-broker-dns-name:9094/1: SSL handshake failed: Disconnected: connecting to a PLAINTEXT broker listener? (after 1ms in state SSL_HANDSHAKE)
2024-11-24T22:43:36.519756401-05:00 stderr F %3|1732506216.519|FAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: kafka-broker-dns-name:9094: Connect to ipv4#kafka-broker-ip-address:9094 failed: Connection refused (after 0ms in state CONNECT)
2024-11-24T22:43:36.521932645-05:00 stderr F %6|1732506216.521|FAIL|rdkafka#consumer-3| [thrd:sasl_ssl://kafka-broker-dns-name:9094/bootstrap]: sasl_ssl://kafka-broker-dns-name:9094/1: Disconnected (after 125971955ms in state UP)
2024-11-24T22:43:36.522075526-05:00 stderr F %6|1732506216.522|FAIL|rdkafka#consumer-3| [thrd:GroupCoordinator]: GroupCoordinator: kafka-broker-dns-name:9094: Disconnected (after 125971863ms in state UP)
2024-11-24T22:43:36.522356949-05:00 stderr F %3|1732506216.522|FAIL|rdkafka#consumer-3| [thrd:sasl_ssl://kafka-broker-dns-name:9094/bootstrap]: sasl_ssl://kafka-broker-dns-name:9094/1: Connect to ipv4#kafka-broker-ip-address:9094 failed: Connection refused (after 0ms in state CONNECT)
2024-11-24T22:43:36.522419880-05:00 stderr F %3|1732506216.522|FAIL|rdkafka#consumer-3| [thrd:GroupCoordinator]: GroupCoordinator: kafka-broker-dns-name:9094: Connect to ipv4#kafka-broker-ip-address:9094 failed: Connection refused (after 0ms in state CONNECT)
2024-11-24T22:43:36.672831046-05:00 stderr F %3|1732506216.672|FAIL|rdkafka#consumer-3| [thrd:GroupCoordinator]: GroupCoordinator: kafka-broker-dns-name:9094: Connect to ipv4#kafka-broker-ip-address:9094 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)
2024-11-24T22:43:36.748973509-05:00 stderr F %3|1732506216.748|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://kafka-broker-dns-name:9094/bootstrap]: sasl_ssl://kafka-broker-dns-name:9094/1: Connect to ipv4#kafka-broker-ip-address:9094 failed: Connection refused (after 0ms in state CONNECT)
2024-11-24T22:43:36.782744613-05:00 stderr F %3|1732506216.782|FAIL|rdkafka#consumer-3| [thrd:sasl_ssl://kafka-broker-dns-name:9094/bootstrap]: sasl_ssl://kafka-broker-dns-name:9094/1: Connect to ipv4#kafka-broker-ip-address:9094 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)
2024-11-24T22:43:37.016636731-05:00 stderr F %3|1732506217.016|FAIL|rdkafka#producer-2| [thrd:sasl_ssl://kafka-broker-dns-name:9094/bootstrap]: sasl_ssl://kafka-broker-dns-name:9094/1: Connect to ipv4#kafka-broker-ip-address:9094 failed: Connection refused (after 0ms in state CONNECT)
2024-11-24T22:43:37.900568331-05:00 stderr F %3|1732506217.900|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://kafka-broker-dns-name:9094/bootstrap]: sasl_ssl://kafka-broker-dns-name:9094/1: Connect to ipv4#kafka-broker-ip-address:9094 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)
2024-11-24T22:43:38.016547205-05:00 stderr F %3|1732506218.016|FAIL|rdkafka#producer-2| [thrd:sasl_ssl://kafka-broker-dns-name:9094/bootstrap]: sasl_ssl://kafka-broker-dns-name:9094/1: Connect to ipv4#kafka-broker-ip-address:9094 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)
2024-11-24T22:44:41.372540811-05:00 stderr F WARNING:root:Processed 42300 messages
2024-11-24T22:46:06.458232020-05:00 stderr F INFO:kafkacrypto.confluent_kafka_wrapper:Executing Consumer subscribe with topics=[<bunch of topics>], pattern=None, listener=None

However, infrequently (we estimate 1 in 50 times) after a physical network interruption, the consumer permanently stops processing messages. In which case we see logs like:

2024-11-24T22:37:22.502358364-05:00 stderr F INFO:kafkacrypto.confluent_kafka_wrapper:Executing Consumer subscribe with topics=[<bunch of topics>]
2024-11-24T22:43:36.523590519-05:00 stderr F %6|1732506216.523|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://kafka-broker-dns-name:9094/bootstrap]: sasl_ssl://kafka-broker-dns-name:9094/1: Disconnected (after 126737394ms in state UP)
2024-11-24T22:43:36.528541128-05:00 stderr F %6|1732506216.528|FAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: kafka-broker-dns-name:9094: Disconnected (after 126737304ms in state UP)
2024-11-24T22:43:36.532508745-05:00 stderr F %3|1732506216.532|FAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: kafka-broker-dns-name:9094: Connect to ipv4#kafka-broker-ip-address:9094 failed: Connection refused (after 3ms in state CONNECT)
2024-11-24T22:43:36.532508745-05:00 stderr P %3|1732506216.532|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://kafka-broker-dns-name:9094/bootstrap]: sasl_ssl://kafka-broker-dns-name:9094/1: Connect to ipv4#kafka-broker-ip-address:9094 failed: Connection refused (after 8ms in state CONNECT)
2024-11-24T22:43:36.532531145-05:00 stderr F 
2024-11-24T22:43:36.675091934-05:00 stderr F %3|1732506216.675|FAIL|rdkafka#producer-2| [thrd:sasl_ssl://kafka-broker-dns-name:9094/bootstrap]: sasl_ssl://kafka-broker-dns-name:9094/1: Connect to ipv4#kafka-broker-ip-address:9094 failed: Connection refused (after 0ms in state CONNECT)
2024-11-24T22:43:36.718871142-05:00 stderr F %3|1732506216.718|FAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: kafka-broker-dns-name:9094: Connect to ipv4#kafka-broker-ip-address:9094 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)
2024-11-24T22:43:36.822871371-05:00 stderr F %3|1732506216.822|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://kafka-broker-dns-name:9094/bootstrap]: sasl_ssl://kafka-broker-dns-name:9094/1: Connect to ipv4#kafka-broker-ip-address:9094 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)
2024-11-24T22:43:37.675230645-05:00 stderr F %3|1732506217.675|FAIL|rdkafka#producer-2| [thrd:sasl_ssl://kafka-broker-dns-name:9094/bootstrap]: sasl_ssl://kafka-broker-dns-name:9094/1: Connect to ipv4#kafka-broker-ip-address:9094 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)
2024-11-24T22:43:52.020068014-05:00 stderr F %5|1732506232.019|PARTCNT|rdkafka#consumer-1| [thrd:main]: Topic topic1.subs partition count changed from 2 to 0
2024-11-24T22:43:52.020068014-05:00 stderr F %5|1732506232.019|PARTCNT|rdkafka#consumer-1| [thrd:main]: Topic topic2.subs partition count changed from 1 to 0
2024-11-24T22:43:52.020068014-05:00 stderr F %5|1732506232.019|PARTCNT|rdkafka#consumer-1| [thrd:main]: Topic topic3.subs partition count changed from 1 to 0
2024-11-24T22:43:52.020068014-05:00 stderr F %5|1732506232.019|PARTCNT|rdkafka#consumer-1| [thrd:main]: Topic topic4.subs partition count changed from 1 to 0
2024-11-24T22:43:52.020068014-05:00 stderr F %5|1732506232.019|PARTCNT|rdkafka#consumer-1| [thrd:main]: Topic topic5 partition count changed from 1 to 0
2024-11-24T22:43:52.020068014-05:00 stderr F %5|1732506232.020|PARTCNT|rdkafka#consumer-1| [thrd:main]: Topic topic6.subs partition count changed from 1 to 0
2024-11-24T22:43:52.020068014-05:00 stderr F %5|1732506232.020|PARTCNT|rdkafka#consumer-1| [thrd:main]: Topic topic7.subs partition count changed from 1 to 0
2024-11-24T22:43:52.020068014-05:00 stderr F %5|1732506232.020|PARTCNT|rdkafka#consumer-1| [thrd:main]: Topic topic8.subs partition count changed from 1 to 0
2024-11-24T22:43:52.020068014-05:00 stderr F %5|1732506232.020|PARTCNT|rdkafka#consumer-1| [thrd:main]: Topic topic9.subs partition count changed from 1 to 0
2024-11-24T22:43:52.020068014-05:00 stderr F %5|1732506232.020|PARTCNT|rdkafka#consumer-1| [thrd:main]: Topic topic10.subs partition count changed from 1 to 0
2024-11-24T22:43:52.020068014-05:00 stderr F %5|1732506232.020|PARTCNT|rdkafka#consumer-1| [thrd:main]: Topic topic11.chunks.subs partition count changed from 1 to 0
2024-11-24T22:43:52.020068014-05:00 stderr F %5|1732506232.020|PARTCNT|rdkafka#consumer-1| [thrd:main]: Topic topic12.subs partition count changed from 1 to 0
2024-11-24T22:43:52.020124514-05:00 stderr F %5|1732506232.020|PARTCNT|rdkafka#consumer-1| [thrd:main]: Topic topic13.subs partition count changed from 1 to 0
2024-11-24T22:43:52.020124514-05:00 stderr F %5|1732506232.020|PARTCNT|rdkafka#consumer-1| [thrd:main]: Topic topic14.subs partition count changed from 1 to 0
2024-11-24T22:43:52.020124514-05:00 stderr F %5|1732506232.020|PARTCNT|rdkafka#consumer-1| [thrd:main]: Topic topic15.subs partition count changed from 1 to 0
2024-11-24T22:43:52.020124514-05:00 stderr F %5|1732506232.020|PARTCNT|rdkafka#consumer-1| [thrd:main]: Topic topic16.subs partition count changed from 1 to 0
2024-11-24T22:43:52.020124514-05:00 stderr F %5|1732506232.020|PARTCNT|rdkafka#consumer-1| [thrd:main]: Topic topic17.subs partition count changed from 1 to 0
2024-11-24T22:43:52.020124514-05:00 stderr P %5|1732506232.020|PARTCNT|rdkafka#consumer-1| [thrd:main]: Topic topic18.subs partition count changed from 1 to 0
2024-11-24T22:47:22.785841881-05:00 stderr F INFO:kafkacrypto.confluent_kafka_wrapper:Executing Consumer subscribe with topics=[<bunch of topics]

I've left in here the application message log info just before and after, to show that clearly in most cases librdkafka reconnects and processing continues. HOWEVER, in rare cases the partition counts are set to 0, and then the consumer never receives further messages, even after a resubscribe is executed.

For each network event, which consumer(s) are affected appears random -- each consumer does not exhibit this issue most of the time, and those that do exhibit this behavior after a network outage is different each time. There is no apparent correlation which which topic(s) are subscribed to.

Speculations

This smells like some kind of race condition, but of what?

Additional Details

All of our producers and consumers are written in python, and use the confluent-kafka-python wrapper for librdkafka. This is being reported here because it looks like a bug in librdkafka itself, not an artifact from the wrapper library. In most cases, the consumers have a very simple structure:

while True:
    if (time() > last_subscribe_time+interval):
        resubscribe to topics
    msgs = consumer.poll(500 ms timeout, 8 max records)
    for msg in msgs:
        process messages
    (commit offset, if auto_offset_commit is disabled... usually it is not)

Since we see the application logging lines for the resubscription process, we infer that this is not a hang, but instead for some reason librdkafka does not bring topics set to 0 partitions back later (or maybe erroneously sets them to 0 partitions in the first place?).

Checklist

  • librdkafka version (release number or git tag): v2.5.3 (plus one observation on v2.6.1 as part of our internal attempts to debug)
  • Apache Kafka version: 3.90 (also happened on 3.6.1)
  • librdkafka client configuration: In most cases, no librdkafka parameters are changed from the defaults, beyond those required to connect to the broker (i.e. server, sasl username/password, ssl.ca.location).
  • Operating system: Various, including Rocky Linux 8.9, CentOS 7.9, Windows 10 (all on x64).
  • Provide logs (with debug=.. as necessary) from librdkafka; see above.
  • Provide broker log excerpts: we do not have easy access to these logs, but if we know what to look for, we can ask for thoe snippets.
  • Critical issue
@nrlulz
Copy link

nrlulz commented Dec 3, 2024

I believe we've just seen this happen too, due to a connection interruption during routine maintenance. I have never seen a partial failure like this, usually it's all or nothing. For what it's worth, we're using confluent-kafka-python / librdkafka 2.6.1 and AWS MSK 2.6.2. I have a topic with 48 partitions across 3 brokers, and 6 consumers of this topic. 3 of the consumers were affected while the other 3 happily continued processing messages from all subscribed partitions. After receiving a sum offset lag alert from my monitoring, I manually restarted the affected consumers, and things have been working fine since. But I do believe it should have automatically recovered from this.

In the 3 that were affected, I see the below message in the logs following a handful of (expected) "connection refused" messages. This message did not appear in the unaffected consumers' logs.

2024-12-03 14:08:32.151	%5|1733263712.151|PARTCNT|rdkafka#consumer-1| [thrd:main]: Topic topic_name partition count changed from 48 to 0

Also for what it's worth, here is the output of kafka-consumer-groups during the incident. Most of the partitions have a normal lag in single or double digits, but affected partitions have a lag in the thousands (and were climbing until restarted).

consumergroup.txt

@DainiusCerniauskasAngstrom

We have been seeing this for topics that have a replication factor of less than 2, been happening since upgrading to 1.8.2 afaik.
Increasing replication factor has stopped this issue re-occuring (as long as only one broker goes down at a time) for us.

@superhx
Copy link

superhx commented Dec 31, 2024

I also encounter this problem when there is only 1 replica for my topic and the broker restart(killed -9)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants