-
Notifications
You must be signed in to change notification settings - Fork 137
[BUG] It takes a few minutes for the KOP consumer to start to consume data #1379
Comments
Could you upload the broker logs? BTW, you can attach files instead of pasting the massive logs. |
broker logs Click the link below: |
It might be a bug at Pulsar side because the core issue is the reader on
The if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) {
callback.readEntriesFailed(new ManagedLedgerException("We can only have a single waiting callback"),
ctx);
return;
} If |
Ok, thank you. I am producing and consuming normally now, but the server has the following error about KOP, what should be the reason: |
It's caused by the disconnection from Kafka client because Kafka client didn't receive the response after |
…re entries ### Motivation Related issue: streamnative/kop#1379 KoP uses reader on a single partition of a compacted topic and we observed a lot of logs like: > Error reading entries at 928511:1 : We can only have a single waiting callback It happened on a `ManagedCursorImpl` when `hasMoreEntries` returns false, `asyncReadEntriesOrWait` is called for multiple times before `cancelPendingReadRequest` or new messages arrived. ### Modifications Add a `canNotReadMoreEntries` method to detect the case that there is already a pending read request while there are no more entries. Then skip reading in `readMoreEntries`. In addition, this PR reverts the redundant synchronized block introduced from apache#14716.
…re entries ### Motivation Related issue: streamnative/kop#1379 KoP uses reader on a single partition of a compacted topic and we observed a lot of logs like: > Error reading entries at 928511:1 : We can only have a single waiting callback It happened on a `ManagedCursorImpl` when `hasMoreEntries` returns false, `asyncReadEntriesOrWait` is called for multiple times before `cancelPendingReadRequest` or new messages arrived. ### Modifications Throw a `ConcurrentWaitCallbackException` instead of a raw `ManagedLedgerException` when there are more wait callbacks. Then check this exception type and skip the following steps in `PersistentDispatcherSingleActiveConsumer#internalReadEntriesFailed`.
…re entries ### Motivation Related issue: streamnative/kop#1379 KoP uses reader on a single partition of a compacted topic and we observed a lot of logs like: > Error reading entries at 928511:1 : We can only have a single waiting callback It happened on a `ManagedCursorImpl` when `hasMoreEntries` returns false, `asyncReadEntriesOrWait` is called for multiple times before `cancelPendingReadRequest` or new messages arrived. ### Modifications Throw a `ConcurrentWaitCallbackException` instead of a raw `ManagedLedgerException` when there are more wait callbacks. Then check this exception type and skip the following steps in `PersistentDispatcherSingleActiveConsumer#internalReadEntriesFailed`.
…re entries ### Motivation Related issue: streamnative/kop#1379 KoP uses reader on a single partition of a compacted topic and we observed a lot of logs like: > Error reading entries at 928511:1 : We can only have a single waiting callback It happened on a `ManagedCursorImpl` when `hasMoreEntries` returns false, `asyncReadEntriesOrWait` is called for multiple times before `cancelPendingReadRequest` or new messages arrived. ### Modifications Throw a `ConcurrentWaitCallbackException` instead of a raw `ManagedLedgerException` when there are more wait callbacks. Then check this exception type and skip the following steps in `PersistentDispatcherSingleActiveConsumer#internalReadEntriesFailed`.
…re entries ### Motivation Related issue: streamnative/kop#1379 KoP uses reader on a single partition of a compacted topic and we observed a lot of logs like: > Error reading entries at 928511:1 : We can only have a single waiting callback It happened on a `ManagedCursorImpl` when `hasMoreEntries` returns false, `asyncReadEntriesOrWait` is called for multiple times before `cancelPendingReadRequest` or new messages arrived. ### Modifications Throw a `ConcurrentWaitCallbackException` instead of a raw `ManagedLedgerException` when there are more wait callbacks. Then check this exception type and skip the following steps in `PersistentDispatcherSingleActiveConsumer#internalReadEntriesFailed`.
…re entries (#16400) ### Motivation Related issue: streamnative/kop#1379 KoP uses reader on a single partition of a compacted topic and we observed a lot of logs like: > Error reading entries at 928511:1 : We can only have a single waiting callback It happened on a `ManagedCursorImpl` when `hasMoreEntries` returns false, `asyncReadEntriesOrWait` is called for multiple times before `cancelPendingReadRequest` or new messages arrived. ### Modifications Throw a `ConcurrentWaitCallbackException` instead of a raw `ManagedLedgerException` when there are more wait callbacks. Then check this exception type and skip the following steps in `PersistentDispatcherSingleActiveConsumer#internalReadEntriesFailed`.
…re entries (apache#16400) ### Motivation Related issue: streamnative/kop#1379 KoP uses reader on a single partition of a compacted topic and we observed a lot of logs like: > Error reading entries at 928511:1 : We can only have a single waiting callback It happened on a `ManagedCursorImpl` when `hasMoreEntries` returns false, `asyncReadEntriesOrWait` is called for multiple times before `cancelPendingReadRequest` or new messages arrived. ### Modifications Throw a `ConcurrentWaitCallbackException` instead of a raw `ManagedLedgerException` when there are more wait callbacks. Then check this exception type and skip the following steps in `PersistentDispatcherSingleActiveConsumer#internalReadEntriesFailed`. (cherry picked from commit 5ec4e3d)
…re entries (apache#16400) ### Motivation Related issue: streamnative/kop#1379 KoP uses reader on a single partition of a compacted topic and we observed a lot of logs like: > Error reading entries at 928511:1 : We can only have a single waiting callback It happened on a `ManagedCursorImpl` when `hasMoreEntries` returns false, `asyncReadEntriesOrWait` is called for multiple times before `cancelPendingReadRequest` or new messages arrived. ### Modifications Throw a `ConcurrentWaitCallbackException` instead of a raw `ManagedLedgerException` when there are more wait callbacks. Then check this exception type and skip the following steps in `PersistentDispatcherSingleActiveConsumer#internalReadEntriesFailed`.
…re entries (#16400) ### Motivation Related issue: streamnative/kop#1379 KoP uses reader on a single partition of a compacted topic and we observed a lot of logs like: > Error reading entries at 928511:1 : We can only have a single waiting callback It happened on a `ManagedCursorImpl` when `hasMoreEntries` returns false, `asyncReadEntriesOrWait` is called for multiple times before `cancelPendingReadRequest` or new messages arrived. ### Modifications Throw a `ConcurrentWaitCallbackException` instead of a raw `ManagedLedgerException` when there are more wait callbacks. Then check this exception type and skip the following steps in `PersistentDispatcherSingleActiveConsumer#internalReadEntriesFailed`. (cherry picked from commit 5ec4e3d)
…re entries (#16400) ### Motivation Related issue: streamnative/kop#1379 KoP uses reader on a single partition of a compacted topic and we observed a lot of logs like: > Error reading entries at 928511:1 : We can only have a single waiting callback It happened on a `ManagedCursorImpl` when `hasMoreEntries` returns false, `asyncReadEntriesOrWait` is called for multiple times before `cancelPendingReadRequest` or new messages arrived. ### Modifications Throw a `ConcurrentWaitCallbackException` instead of a raw `ManagedLedgerException` when there are more wait callbacks. Then check this exception type and skip the following steps in `PersistentDispatcherSingleActiveConsumer#internalReadEntriesFailed`. (cherry picked from commit 5ec4e3d)
…re entries (#16400) ### Motivation Related issue: streamnative/kop#1379 KoP uses reader on a single partition of a compacted topic and we observed a lot of logs like: > Error reading entries at 928511:1 : We can only have a single waiting callback It happened on a `ManagedCursorImpl` when `hasMoreEntries` returns false, `asyncReadEntriesOrWait` is called for multiple times before `cancelPendingReadRequest` or new messages arrived. ### Modifications Throw a `ConcurrentWaitCallbackException` instead of a raw `ManagedLedgerException` when there are more wait callbacks. Then check this exception type and skip the following steps in `PersistentDispatcherSingleActiveConsumer#internalReadEntriesFailed`. (cherry picked from commit 5ec4e3d)
Describe the bug
When I enabled the Kafka consumer client locally, it took 5 minutes from 17:17 to 17:21 to start and then consume data
To Reproduce
17:17:02.479 [main] INFO o.a.k.c.s.a.AbstractLogin - Successfully logged in.
17:17:02.807 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka version: 2.8.1
17:17:02.809 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka commitId: 839b886f9b732b15
17:17:02.809 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1656926222803
17:17:02.812 [main] INFO o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=kop_con, groupId=sub01] Subscribed to topic(s): persistent://kop-tenant/kop-test1/kop-topic1
17:17:20.177 [main] WARN o.apache.kafka.clients.NetworkClient - [Consumer clientId=kop_con, groupId=sub01] Bootstrap broker 10.101.129.68:9092 (id: -2 rack: null) disconnected
17:17:40.292 [main] WARN o.apache.kafka.clients.NetworkClient - [Consumer clientId=kop_con, groupId=sub01] Bootstrap broker 10.101.129.70:9092 (id: -3 rack: null) disconnected
17:18:00.404 [main] WARN o.apache.kafka.clients.NetworkClient - [Consumer clientId=kop_con, groupId=sub01] Bootstrap broker 10.101.129.75:9092 (id: -1 rack: null) disconnected
17:18:17.639 [main] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=kop_con, groupId=sub01] Cluster ID: pulsar_dev2
17:18:17.639 [main] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=kop_con, groupId=sub01] Discovered group coordinator 10.101.129.70:9092 (id: 1021442981 rack: null)
17:18:34.744 [main] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=kop_con, groupId=sub01] (Re-)joining group
17:18:34.751 [main] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=kop_con, groupId=sub01] Group coordinator 10.101.129.70:9092 (id: 1021442981 rack: null) is unavailable or invalid due to cause: null.isDisconnected: true. Rediscovery will be attempted.
17:18:34.753 [main] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=kop_con, groupId=sub01] Rebalance failed.
org.apache.kafka.common.errors.DisconnectException: null
17:19:53.684 [main] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=kop_con, groupId=sub01] Discovered group coordinator 10.101.129.70:9092 (id: 1021442981 rack: null)
17:20:12.278 [main] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=kop_con, groupId=sub01] (Re-)joining group
17:20:15.321 [main] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=kop_con, groupId=sub01] Successfully joined group with generation Generation{generationId=325, memberId='kop_con-cabb1ea0-a23e-4df2-bbc8-5a6f03214e76', protocol='range'}
17:20:15.323 [main] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=kop_con, groupId=sub01] Finished assignment for group at generation 325: {kop_con-cabb1ea0-a23e-4df2-bbc8-5a6f03214e76=Assignment(partitions=[persistent://kop-tenant/kop-test1/kop-topic1-0, persistent://kop-tenant/kop-test1/kop-topic1-1, persistent://kop-tenant/kop-test1/kop-topic1-2, persistent://kop-tenant/kop-test1/kop-topic1-3])}
17:20:15.334 [main] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=kop_con, groupId=sub01] Successfully synced group in generation Generation{generationId=325, memberId='kop_con-cabb1ea0-a23e-4df2-bbc8-5a6f03214e76', protocol='range'}
17:20:15.335 [main] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=kop_con, groupId=sub01] Notifying assignor about the new Assignment(partitions=[persistent://kop-tenant/kop-test1/kop-topic1-0, persistent://kop-tenant/kop-test1/kop-topic1-1, persistent://kop-tenant/kop-test1/kop-topic1-2, persistent://kop-tenant/kop-test1/kop-topic1-3])
17:20:15.337 [main] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=kop_con, groupId=sub01] Adding newly assigned partitions: persistent://kop-tenant/kop-test1/kop-topic1-0, persistent://kop-tenant/kop-test1/kop-topic1-1, persistent://kop-tenant/kop-test1/kop-topic1-2, persistent://kop-tenant/kop-test1/kop-topic1-3
17:20:15.356 [main] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=kop_con, groupId=sub01] Setting offset for partition persistent://kop-tenant/kop-test1/kop-topic1-0 to the committed offset FetchPosition{offset=1180189, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[10.101.129.68:9092 (id: 2057388004 rack: null)], epoch=absent}}
17:20:15.356 [main] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=kop_con, groupId=sub01] Setting offset for partition persistent://kop-tenant/kop-test1/kop-topic1-1 to the committed offset FetchPosition{offset=1113983, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[10.101.129.75:9092 (id: 483712620 rack: null)], epoch=absent}}
17:20:15.356 [main] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=kop_con, groupId=sub01] Setting offset for partition persistent://kop-tenant/kop-test1/kop-topic1-2 to the committed offset FetchPosition{offset=1266502, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[10.101.129.70:9092 (id: 1126040666 rack: null)], epoch=absent}}
17:20:15.356 [main] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=kop_con, groupId=sub01] Setting offset for partition persistent://kop-tenant/kop-test1/kop-topic1-3 to the committed offset FetchPosition{offset=1509272, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[10.101.129.70:9092 (id: 1126040666 rack: null)], epoch=absent}}
17:20:41.447 [main] INFO o.a.k.clients.FetchSessionHandler - [Consumer clientId=kop_con, groupId=sub01] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1126040666:
org.apache.kafka.common.errors.DisconnectException: null
17:20:41.447 [main] INFO o.a.k.clients.FetchSessionHandler - [Consumer clientId=kop_con, groupId=sub01] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 2057388004:
org.apache.kafka.common.errors.DisconnectException: null
17:20:41.448 [kafka-coordinator-heartbeat-thread | sub01] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=kop_con, groupId=sub01] Group coordinator 10.101.129.70:9092 (id: 1021442981 rack: null) is unavailable or invalid due to cause: session timed out without receiving a heartbeat response.isDisconnected: false. Rediscovery will be attempted.
17:20:41.462 [main] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=kop_con, groupId=sub01] Discovered group coordinator 10.101.129.70:9092 (id: 1021442981 rack: null)
17:21:01.665 [kafka-coordinator-heartbeat-thread | sub01] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=kop_con, groupId=sub01] Group coordinator 10.101.129.70:9092 (id: 1021442981 rack: null) is unavailable or invalid due to cause: null.isDisconnected: true. Rediscovery will be attempted.
offset = 1171266, key = 4, value = kafka客户端发送的第4条消息
offset = 1171267, key = 16, value = kafka客户端发送的第16条消息
broker.conf of kop
There are three nodes in total. For each node, the configuration is the same except for IP.
kop configuration
messagingProtocols=kafka
protocolHandlerDirectory=./protocols
narExtractionDirectory=./protocols
kafkaListeners=PLAINTEXT://0.0.0.0:9092
kafkaAdvertisedListeners=PLAINTEXT://ip1:9092
brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
saslAllowedMechanisms=PLAIN
kafkaEnableMultiTenantMetadata=false
kafkaTransactionCoordinatorEnabled=true
The text was updated successfully, but these errors were encountered: