Skip to content

Commit

Permalink
KAFKA-18654 [1/2]: Transaction Version 2 performance regression due t…
Browse files Browse the repository at this point in the history
…o early return (#18720)

https://issues.apache.org/jira/browse/KAFKA-18575 solved a critical race condition by returning with CONCURRENT_TRANSACTIONS early when the transaction was still completing.
In testing, it was discovered that this early return could cause performance regressions.

Prior to KIP-890 the addpartitions call was a separate call from the producer. There was a previous change https://issues.apache.org/jira/browse/KAFKA-5477 that decreased the retry backoff to 20ms. With KIP-890 and making the call through the produce path, we go back to the default retry backoff which takes longer. Prior to 18575 we introduce a slight delay when sending to the coordinator, so prior to 18575, we are less likely to return quickly and get stuck in this backoff. However, based on results from produce benchmarks, we can still run into the default backoff in some scenarios.

This PR reverts KAFKA-18575, and doesn't return early and wait until the coordinator for checking if a transaction is ongoing. Instead, it will fix the handling with the verification guard so we don't hit the edge condition.

Also cleans up some of the verification text that was unclear.

Reviewers: Jeff Kim <[email protected]>, Artem Livshits <[email protected]>
  • Loading branch information
jolshan authored Feb 3, 2025
1 parent 87b536d commit ab8ef87
Show file tree
Hide file tree
Showing 16 changed files with 113 additions and 103 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -617,9 +617,9 @@ class Partition(val topicPartition: TopicPartition,

// Returns a VerificationGuard if we need to verify. This starts or continues the verification process. Otherwise return the
// sentinel VerificationGuard.
def maybeStartTransactionVerification(producerId: Long, sequence: Int, epoch: Short): VerificationGuard = {
def maybeStartTransactionVerification(producerId: Long, sequence: Int, epoch: Short, supportsEpochBump: Boolean): VerificationGuard = {
leaderLogIfLocal match {
case Some(log) => log.maybeStartTransactionVerification(producerId, sequence, epoch)
case Some(log) => log.maybeStartTransactionVerification(producerId, sequence, epoch, supportsEpochBump)
case None => throw new NotLeaderOrFollowerException()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class CoordinatorPartitionWriter(
): CompletableFuture[VerificationGuard] = {
val transactionSupportedOperation = AddPartitionsToTxnManager.txnOffsetCommitRequestVersionToTransactionSupportedOperation(apiVersion)
val future = new CompletableFuture[VerificationGuard]()
replicaManager.maybeStartTransactionVerificationForPartition(
replicaManager.maybeSendPartitionToTransactionCoordinator(
topicPartition = tp,
transactionalId = transactionalId,
producerId = producerId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,7 @@ private[group] class GroupCoordinator(
}
}
val transactionSupportedOperation = AddPartitionsToTxnManager.txnOffsetCommitRequestVersionToTransactionSupportedOperation(apiVersion)
groupManager.replicaManager.maybeStartTransactionVerificationForPartition(
groupManager.replicaManager.maybeSendPartitionToTransactionCoordinator(
topicPartition = offsetTopicPartition,
transactionalId,
producerId,
Expand Down
18 changes: 7 additions & 11 deletions core/src/main/scala/kafka/log/UnifiedLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -569,20 +569,21 @@ class UnifiedLog(@volatile var logStartOffset: Long,
* Maybe create and return the VerificationGuard for the given producer ID if the transaction is not yet ongoing.
* Creation starts the verification process. Otherwise return the sentinel VerificationGuard.
*/
def maybeStartTransactionVerification(producerId: Long, sequence: Int, epoch: Short): VerificationGuard = lock synchronized {
def maybeStartTransactionVerification(producerId: Long, sequence: Int, epoch: Short, supportsEpochBump: Boolean): VerificationGuard = lock synchronized {
if (hasOngoingTransaction(producerId, epoch))
VerificationGuard.SENTINEL
else
maybeCreateVerificationGuard(producerId, sequence, epoch)
maybeCreateVerificationGuard(producerId, sequence, epoch, supportsEpochBump)
}

/**
* Maybe create the VerificationStateEntry for the given producer ID -- always return the VerificationGuard
*/
private def maybeCreateVerificationGuard(producerId: Long,
sequence: Int,
epoch: Short): VerificationGuard = lock synchronized {
producerStateManager.maybeCreateVerificationStateEntry(producerId, sequence, epoch).verificationGuard
epoch: Short,
supportsEpochBump: Boolean): VerificationGuard = lock synchronized {
producerStateManager.maybeCreateVerificationStateEntry(producerId, sequence, epoch, supportsEpochBump).verificationGuard
}

/**
Expand All @@ -600,11 +601,6 @@ class UnifiedLog(@volatile var logStartOffset: Long,
*/
def hasOngoingTransaction(producerId: Long, producerEpoch: Short): Boolean = lock synchronized {
val entry = producerStateManager.activeProducers.get(producerId)
// With transactions V2, if we see a future epoch, we are likely in the process of completing the previous transaction.
// Return early with ConcurrentTransactionsException until the transaction completes.
if (entry != null && entry.currentTxnFirstOffset.isPresent && entry.producerEpoch() < producerEpoch)
throw new ConcurrentTransactionsException("The producer attempted to update a transaction " +
"while another concurrent operation on the same transaction was ongoing.")
entry != null && entry.currentTxnFirstOffset.isPresent && entry.producerEpoch() == producerEpoch
}

Expand Down Expand Up @@ -1035,7 +1031,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// transaction is completed or aborted. We can guarantee the transaction coordinator knows about the transaction given step 1 and that the transaction is still
// ongoing. If the transaction is expected to be ongoing, we will not set a VerificationGuard. If the transaction is aborted, hasOngoingTransaction is false and
// requestVerificationGuard is the sentinel, so we will throw an error. A subsequent produce request (retry) should create verification state and return to phase 1.
if (batch.isTransactional && !batch.isControlBatch && !hasOngoingTransaction(batch.producerId, batch.producerEpoch()) && batchMissingRequiredVerification(batch, requestVerificationGuard))
if (batch.isTransactional && !hasOngoingTransaction(batch.producerId, batch.producerEpoch()) && batchMissingRequiredVerification(batch, requestVerificationGuard))
throw new InvalidTxnStateException("Record was not part of an ongoing transaction")
}

Expand All @@ -1056,7 +1052,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}

private def batchMissingRequiredVerification(batch: MutableRecordBatch, requestVerificationGuard: VerificationGuard): Boolean = {
producerStateManager.producerStateManagerConfig().transactionVerificationEnabled() &&
producerStateManager.producerStateManagerConfig().transactionVerificationEnabled() && !batch.isControlBatch &&
!verificationGuard(batch.producerId).verify(requestVerificationGuard)
}

Expand Down
10 changes: 7 additions & 3 deletions core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,14 @@ object AddPartitionsToTxnManager {
* genericErrorSupported: This maps to the case when the clients are updated to handle the TransactionAbortableException
* addPartition: This allows the partition to be added to the transactions inflight with the Produce and TxnOffsetCommit requests. Plus the behaviors in genericErrorSupported.
*/
sealed trait TransactionSupportedOperation
sealed trait TransactionSupportedOperation {
val supportsEpochBump = false;
}
case object defaultError extends TransactionSupportedOperation
case object genericErrorSupported extends TransactionSupportedOperation
case object addPartition extends TransactionSupportedOperation
case object addPartition extends TransactionSupportedOperation {
override val supportsEpochBump = true
}

/*
* Data structure to hold the transactional data to send to a node. Note -- at most one request per transactional ID
Expand Down Expand Up @@ -128,7 +132,7 @@ class AddPartitionsToTxnManager(
.setTransactionalId(transactionalId)
.setProducerId(producerId)
.setProducerEpoch(producerEpoch)
.setVerifyOnly(transactionSupportedOperation != addPartition)
.setVerifyOnly(!transactionSupportedOperation.supportsEpochBump)
.setTopics(topicCollection)

addTxnData(coordinatorNode.get, transactionData, callback, transactionSupportedOperation)
Expand Down
37 changes: 23 additions & 14 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ class ReplicaManager(val config: KafkaConfig,
Errors.NOT_COORDINATOR => Some(new NotEnoughReplicasException(
s"Unable to verify the partition has been added to the transaction. Underlying error: ${error.toString}"))
case Errors.CONCURRENT_TRANSACTIONS =>
if (transactionSupportedOperation != addPartition) {
if (!transactionSupportedOperation.supportsEpochBump) {
Some(new NotEnoughReplicasException(
s"Unable to verify the partition has been added to the transaction. Underlying error: ${error.toString}"))
} else {
Expand Down Expand Up @@ -790,7 +790,7 @@ class ReplicaManager(val config: KafkaConfig,
return
}

maybeStartTransactionVerificationForPartitions(
maybeSendPartitionsToTransactionCoordinator(
topicPartitionBatchInfo,
transactionalId,
transactionalProducerInfo.head._1,
Expand Down Expand Up @@ -895,19 +895,23 @@ class ReplicaManager(val config: KafkaConfig,

/**
*
* @param topicPartition the topic partition to maybe verify
* @param topicPartition the topic partition to maybe verify or add
* @param transactionalId the transactional id for the transaction
* @param producerId the producer id for the producer writing to the transaction
* @param producerEpoch the epoch of the producer writing to the transaction
* @param baseSequence the base sequence of the first record in the batch we are trying to append
* @param callback the method to execute once the verification is either completed or returns an error
* @param transactionSupportedOperation determines the supported operation based on the client's Request API version
*
* When the verification returns, the callback will be supplied the error if it exists or Errors.NONE.
* If this is the first time a partition appears in a transaction, it must be verified or added to the partition depending on the
* transactionSupported operation.
* If verifying, when the verification returns, the callback will be supplied the error if it exists or Errors.NONE.
* If the verification guard exists, it will also be supplied. Otherwise the SENTINEL verification guard will be returned.
* This guard can not be used for verification and any appends that attempt to use it will fail.
*
* If adding, the callback will be supplied the error if it exists or Errors.NONE.
*/
def maybeStartTransactionVerificationForPartition(
def maybeSendPartitionToTransactionCoordinator(
topicPartition: TopicPartition,
transactionalId: String,
producerId: Long,
Expand All @@ -924,7 +928,7 @@ class ReplicaManager(val config: KafkaConfig,
))
}

maybeStartTransactionVerificationForPartitions(
maybeSendPartitionsToTransactionCoordinator(
Map(topicPartition -> baseSequence),
transactionalId,
producerId,
Expand All @@ -936,19 +940,23 @@ class ReplicaManager(val config: KafkaConfig,

/**
*
* @param topicPartitionBatchInfo the topic partitions to maybe verify mapped to the base sequence of their first record batch
* @param topicPartitionBatchInfo the topic partitions to maybe verify or add mapped to the base sequence of their first record batch
* @param transactionalId the transactional id for the transaction
* @param producerId the producer id for the producer writing to the transaction
* @param producerEpoch the epoch of the producer writing to the transaction
* @param callback the method to execute once the verification is either completed or returns an error
* @param transactionSupportedOperation determines the supported operation based on the client's Request API version
*
* When the verification returns, the callback will be supplied the errors per topic partition if there were errors.
* If this is the first time the partitions appear in a transaction, they must be verified or added to the partition depending on the
* transactionSupported operation.
* If verifying, when the verification returns, the callback will be supplied the errors per topic partition if there were errors.
* The callback will also be supplied the verification guards per partition if they exist. It is possible to have an
* error and a verification guard for a topic partition if the topic partition was unable to be verified by the transaction
* coordinator. Transaction coordinator errors are mapped to append-friendly errors.
*
* If adding, the callback will be e supplied the errors per topic partition if there were errors.
*/
private def maybeStartTransactionVerificationForPartitions(
private def maybeSendPartitionsToTransactionCoordinator(
topicPartitionBatchInfo: Map[TopicPartition, Int],
transactionalId: String,
producerId: Long,
Expand All @@ -958,7 +966,7 @@ class ReplicaManager(val config: KafkaConfig,
): Unit = {
// Skip verification if the request is not transactional or transaction verification is disabled.
if (transactionalId == null ||
!config.transactionLogConfig.transactionPartitionVerificationEnable
(!config.transactionLogConfig.transactionPartitionVerificationEnable && !transactionSupportedOperation.supportsEpochBump)
|| addPartitionsToTxnManager.isEmpty
) {
callback((Map.empty[TopicPartition, Errors], Map.empty[TopicPartition, VerificationGuard]))
Expand All @@ -973,7 +981,8 @@ class ReplicaManager(val config: KafkaConfig,
topicPartition,
producerId,
producerEpoch,
baseSequence
baseSequence,
transactionSupportedOperation.supportsEpochBump
)

errorOrGuard match {
Expand All @@ -983,7 +992,6 @@ class ReplicaManager(val config: KafkaConfig,
}
}

// No partitions require verification.
if (verificationGuards.isEmpty) {
callback((errors.toMap, Map.empty[TopicPartition, VerificationGuard]))
return
Expand All @@ -1010,11 +1018,12 @@ class ReplicaManager(val config: KafkaConfig,
topicPartition: TopicPartition,
producerId: Long,
producerEpoch: Short,
baseSequence: Int
baseSequence: Int,
supportsEpochBump: Boolean
): Either[Errors, VerificationGuard] = {
try {
val verificationGuard = getPartitionOrException(topicPartition)
.maybeStartTransactionVerification(producerId, baseSequence, producerEpoch)
.maybeStartTransactionVerification(producerId, baseSequence, producerEpoch, supportsEpochBump)
Right(verificationGuard)
} catch {
case e: Exception =>
Expand Down
8 changes: 4 additions & 4 deletions core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1025,7 +1025,7 @@ class PartitionTest extends AbstractPartitionTest {
new SimpleRecord("k3".getBytes, "v3".getBytes)),
baseOffset = 0L,
producerId = 2L)
val verificationGuard = partition.maybeStartTransactionVerification(2L, 0, 0)
val verificationGuard = partition.maybeStartTransactionVerification(2L, 0, 0, true)
partition.appendRecordsToLeader(records, origin = AppendOrigin.CLIENT, requiredAcks = 0, RequestLocal.withThreadConfinedCaching, verificationGuard)

def fetchOffset(isolationLevel: Option[IsolationLevel], timestamp: Long): TimestampAndOffset = {
Expand Down Expand Up @@ -3561,20 +3561,20 @@ class PartitionTest extends AbstractPartitionTest {
assertThrows(classOf[InvalidTxnStateException], () => partition.appendRecordsToLeader(transactionRecords(), origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching))

// Before appendRecordsToLeader is called, ReplicaManager will call maybeStartTransactionVerification. We should get a non-sentinel VerificationGuard.
val verificationGuard = partition.maybeStartTransactionVerification(producerId, 3, 0)
val verificationGuard = partition.maybeStartTransactionVerification(producerId, 3, 0, true)
assertNotEquals(VerificationGuard.SENTINEL, verificationGuard)

// With the wrong VerificationGuard, append should fail.
assertThrows(classOf[InvalidTxnStateException], () => partition.appendRecordsToLeader(transactionRecords(),
origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching, new VerificationGuard()))

// We should return the same VerificationGuard when we still need to verify. Append should proceed.
val verificationGuard2 = partition.maybeStartTransactionVerification(producerId, 3, 0)
val verificationGuard2 = partition.maybeStartTransactionVerification(producerId, 3, 0, true)
assertEquals(verificationGuard, verificationGuard2)
partition.appendRecordsToLeader(transactionRecords(), origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching, verificationGuard)

// We should no longer need a VerificationGuard. Future appends without VerificationGuard will also succeed.
val verificationGuard3 = partition.maybeStartTransactionVerification(producerId, 3, 0)
val verificationGuard3 = partition.maybeStartTransactionVerification(producerId, 3, 0, true)
assertEquals(VerificationGuard.SENTINEL, verificationGuard3)
partition.appendRecordsToLeader(transactionRecords(), origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ object AbstractCoordinatorConcurrencyTest {

@volatile var logs: mutable.Map[TopicPartition, (UnifiedLog, Long)] = _

override def maybeStartTransactionVerificationForPartition(
override def maybeSendPartitionToTransactionCoordinator(
topicPartition: TopicPartition,
transactionalId: String,
producerId: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ class CoordinatorPartitionWriterTest {
val callbackCapture: ArgumentCaptor[((Errors, VerificationGuard)) => Unit] =
ArgumentCaptor.forClass(classOf[((Errors, VerificationGuard)) => Unit])

when(replicaManager.maybeStartTransactionVerificationForPartition(
when(replicaManager.maybeSendPartitionToTransactionCoordinator(
ArgumentMatchers.eq(tp),
ArgumentMatchers.eq("transactional-id"),
ArgumentMatchers.eq(10L),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4163,7 +4163,7 @@ class GroupCoordinatorTest {
// a non request handler thread. Set this to avoid error.
KafkaRequestHandler.setBypassThreadCheck(true)

when(replicaManager.maybeStartTransactionVerificationForPartition(
when(replicaManager.maybeSendPartitionToTransactionCoordinator(
ArgumentMatchers.eq(offsetTopicPartition),
ArgumentMatchers.eq(transactionalId),
ArgumentMatchers.eq(producerId),
Expand Down
Loading

0 comments on commit ab8ef87

Please sign in to comment.