-
Notifications
You must be signed in to change notification settings - Fork 386
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement draining logic for transactional flow #757
Implement draining logic for transactional flow #757
Conversation
core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala
Outdated
Show resolved
Hide resolved
Some afterthoughts: |
core/src/main/scala/akka/kafka/internal/TransactionalSource.scala
Outdated
Show resolved
Hide resolved
c0e92db
to
9c04dcf
Compare
Rebased on top of the latest master, and also fixed a bug, where a wrong |
@@ -151,9 +151,9 @@ private final class TransactionalProducerStageLogic[K, V, P](stage: Transactiona | |||
private def maybeCommitTransaction(beginNewTransaction: Boolean = true): Unit = { | |||
val awaitingConf = awaitingConfirmation.get | |||
batchOffsets match { | |||
case batch: NonemptyTransactionBatch if awaitingConf == 0 => | |||
case batch: NonemptyTransactionBatch if awaitingConf == 0 && !hasBeenPulled(stage.in) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain why we need this logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this PR, the transaction commit is split into two synchronous blocks in the Producer stage. That means that in between, a message could be sent to the stage which would be sent to the Kafka producer, but the transaction has not yet been started.
So this guard was needed to be sure that no message will come in between the the committing of transaction and beginning of the new one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still didn't quite get it. I was hoping that suspending demand when transaction is in progress will be enough.
Also, I'm wondering if we just shouldn't block waiting for internal commit.
kafkaProducer.commitTrasnaction()
is already blocking the whole stage waiting on a network call. And internalCommit
is just waiting for local memory change, so it shouldn't add much...
That would simplify the code a bit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Making internalCommit
just send a message would also be possible. Seems like TransactionalProducerStage
does not have to wait for the SourceActor to acknowledge getting the message, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was hoping that suspending demand when transaction is in progress will be enough.
Calling suspendDemand
does make sure that stage.in
is not going to be pulled anymore from that point on, but it could have been pulled from before.
Seems like TransactionalProducerStage does not have to wait for the SourceActor to acknowledge getting the message, right?
Yes, currently it looks like that's the case. I'll need to think about it a bit more.
Thanks, @2m |
def waitForDraining(): Unit = { | ||
import akka.pattern.ask | ||
implicit val timeout = Timeout(5.seconds) | ||
Await.result(ask(stageActor.ref, Drain(stageActor.ref, Drained)), 10.seconds) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm afraid if the behaviour in case of a timeout is correct.
IIUC TimeoutException is thrown and it's caught in kafka code by ConsumerCoordinator,
here:
try {
listener.onPartitionsRevoked(revoked);
} catch (WakeupException | InterruptException e) {
throw e;
} catch (Exception e) {
log.error("User provided listener {} failed on partition revocation", listener.getClass().getName(), e);
}
So due to catch (Exception e)
we won't fail the stream and we'll just continue like nothing happened and the stream was drained.
Therefore if it takes too long to drain then we still allow messages to remain in the stream. In the meantime some other consumer might start processing data from the same partition because we just released it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right @jotek7. In this case we should fail the stream starting from the producer. Which is similar to what we should do generally in the failure case (see my comments)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, the stage needs to be stopped in that scenario. But I am still not sure if the stopping should start from the consumer: see #757 (comment)
case (_, Drain(ack, msg)) => | ||
if (inFlightRecords.empty()) { | ||
log.debug("Source drained") | ||
ack ! msg |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should also send an answer back to the sender
otherwise the ask
on line 139 will not be completed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, fixed in the next commit.
I am not sure if this is a problem. If the stream is being torn down, then after the blocking |
I added a commit where it fails the consumer stage if the draining timeout happens. |
@@ -266,7 +266,7 @@ class TransactionsSpec extends SpecBase(kafkaPort = KafkaPorts.TransactionsSpec) | |||
"provide consistency when multiple transactional streams are being restarted" in { | |||
val sourcePartitions = 10 | |||
val destinationPartitions = 4 | |||
val consumers = 3 | |||
val consumers = 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why did you need to change this here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed that this test-case does not finish when running with 3 consumers. The stopping of the stage on drain timeout does not work nicely with how this test uses the RestartSource and it gets restarted all the time and the test never finishes. I also noticed that in Travis it did not finish even with 1 consumer. I need to improve that test-case.
There is still a chance that consumer will be closed before the producer finishes |
What are the requirements for binary compatibility? After turning |
Yes, otherwise we will not be able to include this in a patch release. |
Last travis run reports: Tests for
|
Looks like the test Can we make the test case smaller, @2m ? |
Yes, lets decrease the number of messages to 100k and see how long does that take. |
|
||
def waitForDraining(): Boolean = { | ||
import akka.pattern.ask | ||
// Shall we use commitTimeout here? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it should be at least commit timeout. Possible a bit larger as well. Can you refactor this from hardcoded constant to configuration value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After some thoughts I think using commitTimeout is just ok, will use that.
log.debug(s"Draining partitions {}", inFlightRecords) | ||
// TODO: This timeout should be somehow related to the committing interval. | ||
// For instance having 10.millis makes no sense if we commit transaction every second | ||
materializer.scheduleOnce(30.millis, new Runnable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be good to move this to configuration as well.
Two of the other tests in the TransactionalSpec started failing. Could be the new timing configuration not playing nicely with the test-cases. |
These tests actually catch a valid issue now: |
If produced is closed after committing a transaction we can safely close the consumer. |
Looks like |
TBH there is no reason why this test should work reliably. This means that after a rebalancing consumers pick up the latest element from the queue. If there is more elements written to a ktp during the rebalancing this elements are never read. |
Can you run tests again? |
Great catch. That was the reason I saw the multiple "waiting for draining" messages in the logs when the stage was stopping because of failure.
I do not see it failing anymore, but if we see that again, lets create an issue and fix it in a separate PR. |
👍 |
What are the next steps with the PR? |
I am currently running the test with |
91fa31f
to
488baca
Compare
With multiple consumers the test fails with missing messages. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left some minor comments. I think the only missing piece might be some updates to the documentation to explain the draining configuration and its purpose during the commit process. Though maybe that work could be deferred until after PR's for the synchronous rebalance listeners and partitioned transaction sources are merged, which will making distributed transactional workloads more robust.
I'm really happy to see this feature get used and hardened. Thanks a lot for all the hard work!
@@ -93,6 +93,10 @@ akka.kafka.consumer { | |||
# This value is used instead of Kafka's default from `default.api.timeout.ms` | |||
# which is 1 minute. | |||
metadata-request-timeout = 5s | |||
|
|||
# Interval for cheking that transaction was completed before closing the consumer. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see any more documentation for the end user about his new functionality. I suggest updating the Transactions section in the docs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Minor] Typo on "checking".
}(materializer.executionContext) | ||
} | ||
|
||
val onInternallCommitAckCb: AsyncCallback[Boolean] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Minor] Typo onInternalCommitAckCb
@@ -159,21 +190,35 @@ private final class TransactionalProducerStageLogic[K, V, P](stage: Transactiona | |||
override def onCompletionFailure(ex: Throwable): Unit = { | |||
log.debug("Aborting transaction due to stage failure") | |||
abortTransaction() | |||
batchOffsets.committingFailed() | |||
super.onCompletionFailure(ex) | |||
} | |||
|
|||
private def commitTransaction(batch: NonemptyTransactionBatch, beginNewTransaction: Boolean): Unit = { | |||
val group = batch.group | |||
log.debug("Committing transaction for consumer group '{}' with offsets: {}", group, batch.offsetMap()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Minor] Use the local variable offsetMap so we don't have to build it for each log line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work, I left a few minor comments.
tests/src/test/scala/akka/kafka/scaladsl/TransactionsSpec.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala
Show resolved
Hide resolved
|
||
# Interval for checking that transaction was completed before closing the consumer. | ||
# Used in the transactional flow. | ||
draining-check-interval = 30ms |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if the setting name should contain eos
or something. There are so many settings now it is hard to see which belong together.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tbh... eos
is cryptic even for me...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but we may have some distinct section for transactional processing?
The test for the failure case is still failing showing both duplication and data loss. My intuition what happens is as follows:
|
I'm also seeing some problems with the draining mechanism, specifically it happens that it waits for the last message that is never drained. |
Also setting higher backoff for restarts increases the probability of test success (from what I observed)
|
I only noticed lost messages when running that test-case with multiple consumers. Investigating that right now.
I have not seen test-case failing because of duplication for quite some time now. The duplication check is first, and at least in Travis it passes now for quite some time before the test-case fails with missing elements check. |
Pushed two more commits that improve the failing test-case in a couple of ways. When it comes to duplicates, it could very well be, that we get duplicated messages when rebalance happens while consuming messages after transactional processing. For that I added offset tracking which will ignore duplicates that have same offsets. We should only care about duplicates with different offsets, because those would be from the transactions. Also I moved the place where the transactional processing flow is terminated in the test stream, so the stream is always terminated with an error (from upstream) instead of cancellation (from downstream). Lets see what Travis thinks about that when it comes to missing messages. |
I'm looking into it too, and what worries me know is seeing |
Also, when the timeout happens, we should make sure that close the producer before closing the consumer. This doesn't happen, right? |
Not explicitly, but there is a I see that we are overriding
Yea, noticed those timeouts as well. But I also see |
That was another idea I had! |
After increasing the parallelism of sending messages to kafka we don't have to wait for messages to be flushed. As a result we are not failing stream draining that often, which gives very good chance of the test to pass (didn't see a single failure on my machine after changing that). It also speeds up the test a lot! |
Pushed a revert of those two commits. I'll do a docs update and then this will be good to go. |
Yeap, happy to see this simplified again! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some nitpicks.
core/src/main/scala/akka/kafka/internal/SingleSourceLogic.scala
Outdated
Show resolved
Hide resolved
58046cb
to
7203516
Compare
Rebased on top of master and addressed the latest feedback. |
* extract rebalancing hanling and stopping consumer to separate methods * introduce CommittedMarker and hook it up to TransactionalMessage * add marking committed offsets logic to TransactionalProducerStage * add draining mechanism to TransactionalSource * add draining-check-interval to the reference config * handle committing failure when draining the stream * add test for transactions and multi messages * get continuous blocks for missing messages * distinguish duplicates that happen during counting from transactions * update documentation
7203516
to
5101c54
Compare
Pull Request Checklist
Fixes
Duplications in a transactional flow that occur, when either closing the stream or partitions are revoked by Kafka.
Purpose
This PR introduces draining logic for
TransactionalSource
. Specifically, for each partition managed by the source, we keep track of the message offsets emitted by the source.When closing the stage or revoking partitions, we make sure that all the offsets are committed back to the Kafka by the producer.
To know that an offset was committed by the producer we attach
CommittedMarker
to each message produced by the stream.TransactionalProducerStage
uses this marker to tell the source that an offset was committed.Background Context
In a transactional stream processing partition
P
, we need to wait for all commits to be acknowledged by Kafka before closing the consumer.If we don't, it's possible that another consumer is assigned
P
, which triggers fetching offset that can be stale or overwritten later. This causes data duplication.References
#758
#756