Skip to content

Commit

Permalink
Transactional partitioned: Adapt to current state, more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
seglo committed Nov 29, 2019
1 parent 84ab62c commit 367edfb
Show file tree
Hide file tree
Showing 13 changed files with 232 additions and 214 deletions.
6 changes: 1 addition & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ jobs:

- stage: integration
env: CMD="tests/it:test"
name: "Run multi-broker integration tests"
name: "Run multi-broker and long running integration tests"
- env: CMD="benchmarks/it:compile"
name: "Compile benchmark tests"

Expand All @@ -75,10 +75,6 @@ jobs:
name: "Publish API and reference documentation"

stages:
# runs on master commits and PRs
- name: debug
if: NOT tag =~ ^v

# runs on master commits and PRs
- name: check
if: NOT tag =~ ^v
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ private[kafka] class KafkaAsyncConsumerCommitterRef(private val consumerActor: A
}

@InternalApi
private class CommittableSubSourceStageLogic[K, V](
private final class CommittableSubSourceStageLogic[K, V](
shape: SourceShape[CommittableMessage[K, V]],
tp: TopicPartition,
consumerActor: ActorRef,
Expand Down
61 changes: 30 additions & 31 deletions core/src/main/scala/akka/kafka/internal/DefaultProducerStage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,22 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <:

final override val producerSettings: ProducerSettings[K, V] = stage.settings

protected class DefaultInHandler extends InHandler {
override def onPush(): Unit = produce(grab(stage.in))

override def onUpstreamFinish(): Unit = {
inIsClosed = true
completionState = Some(Success(Done))
checkForCompletion()
}

override def onUpstreamFailure(ex: Throwable): Unit = {
inIsClosed = true
completionState = Some(Failure(ex))
checkForCompletion()
}
}

override def preStart(): Unit = {
super.preStart()
resolveProducer(stage.settings)
Expand Down Expand Up @@ -86,9 +102,7 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <:
failStage(ex)
}

def filterSend(msg: Envelope[K, V, P]): Boolean = true

def postSend(msg: Envelope[K, V, P]): Unit = ()
protected def postSend(msg: Envelope[K, V, P]): Unit = ()

override protected def producerAssigned(): Unit = resumeDemand()

Expand All @@ -103,9 +117,13 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <:
}
}

protected def suspendDemand(fromStageLogicConstructor: Boolean = false): Unit = {
// not permitted to access stage logic members from constructor
if (!fromStageLogicConstructor) log.debug("Suspend demand")
protected def suspendDemand(): Unit = {
log.debug("Suspend demand")
suspendDemandOutHandler()
}

// factored out of suspendDemand because logging is not permitted when called from the stage logic constructor
private def suspendDemandOutHandler(): Unit = {
setHandler(
stage.out,
new OutHandler {
Expand All @@ -114,33 +132,14 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <:
)
}

// suspend demand until a Producer has been created
suspendDemand(fromStageLogicConstructor = true)

setHandler(
stage.in,
new InHandler {
override def onPush(): Unit = {
val msg = grab(stage.in)
if (filterSend(msg))
produce(msg)
}

override def onUpstreamFinish(): Unit = {
inIsClosed = true
completionState = Some(Success(Done))
checkForCompletion()
}
protected def initialInHandler(): Unit = producingInHandler()
protected def producingInHandler(): Unit = setHandler(stage.in, new DefaultInHandler())

override def onUpstreamFailure(ex: Throwable): Unit = {
inIsClosed = true
completionState = Some(Failure(ex))
checkForCompletion()
}
}
)
// suspend demand until a Producer has been created
suspendDemandOutHandler()
initialInHandler()

def produce(in: Envelope[K, V, P]): Unit =
protected def produce(in: Envelope[K, V, P]): Unit =
in match {
case msg: Message[K, V, P] =>
val r = Promise[Result[K, V, P]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.util.{Failure, Success}
private[kafka] object DeferredProducer {

/**
* The [[ProducerAssignmentLifecycle]] allows us to change track the status of the aynchronous producer assignment
* The [[ProducerAssignmentLifecycle]] allows us to track the state of the asynchronous producer assignment
* within the stage. This is useful when we need to manage different behavior during the assignment process. For
* example, in [[TransactionalProducerStageLogic]] we match on the lifecycle when extracting the transactional.id
* of the first message received from a partitioned source.
Expand Down Expand Up @@ -76,7 +76,7 @@ private[kafka] trait DeferredProducer[K, V] {
}
}

protected def changeProducerAssignmentLifecycle(state: ProducerAssignmentLifecycle): Unit = {
private def changeProducerAssignmentLifecycle(state: ProducerAssignmentLifecycle): Unit = {
val oldState = producerAssignmentLifecycle
producerAssignmentLifecycle = state
log.debug("Asynchronous producer assignment lifecycle changed '{} -> {}'", oldState, state)
Expand Down
28 changes: 14 additions & 14 deletions core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,22 +57,22 @@ private class SubSourceLogic[K, V, Msg](
with StageIdLogging {
import SubSourceLogic._

val consumerPromise = Promise[ActorRef]
private val consumerPromise = Promise[ActorRef]
final val actorNumber = KafkaConsumerActor.Internal.nextNumber()
override def executionContext: ExecutionContext = materializer.executionContext
override def consumerFuture: Future[ActorRef] = consumerPromise.future
var consumerActor: ActorRef = _
var sourceActor: StageActor = _
protected var consumerActor: ActorRef = _
protected var sourceActor: StageActor = _

/** Kafka has notified us that we have these partitions assigned, but we have not created a source for them yet. */
var pendingPartitions: immutable.Set[TopicPartition] = immutable.Set.empty
private var pendingPartitions: immutable.Set[TopicPartition] = immutable.Set.empty

/** We have created a source for these partitions, but it has not started up and is not in subSources yet. */
var partitionsInStartup: immutable.Set[TopicPartition] = immutable.Set.empty
var subSources: Map[TopicPartition, ControlAndStageActor] = immutable.Map.empty
private var partitionsInStartup: immutable.Set[TopicPartition] = immutable.Set.empty
protected var subSources: Map[TopicPartition, ControlAndStageActor] = immutable.Map.empty

/** Kafka has signalled these partitions are revoked, but some may be re-assigned just after revoking. */
var partitionsToRevoke: Set[TopicPartition] = Set.empty
private var partitionsToRevoke: Set[TopicPartition] = Set.empty

override def preStart(): Unit = {
super.preStart()
Expand Down Expand Up @@ -121,7 +121,7 @@ private class SubSourceLogic[K, V, Msg](
failStage(ex)
}

val partitionAssignedCB = getAsyncCallback[Set[TopicPartition]] { assigned =>
private val partitionAssignedCB = getAsyncCallback[Set[TopicPartition]] { assigned =>
val formerlyUnknown = assigned -- partitionsToRevoke

if (log.isDebugEnabled && formerlyUnknown.nonEmpty) {
Expand Down Expand Up @@ -170,7 +170,7 @@ private class SubSourceLogic[K, V, Msg](
}
}

val partitionRevokedCB = getAsyncCallback[Set[TopicPartition]] { revoked =>
private val partitionRevokedCB = getAsyncCallback[Set[TopicPartition]] { revoked =>
partitionsToRevoke ++= revoked
scheduleOnce(CloseRevokedPartitions, settings.waitClosePartition)
}
Expand All @@ -188,7 +188,7 @@ private class SubSourceLogic[K, V, Msg](
partitionsToRevoke = Set.empty
}

val subsourceCancelledCB: AsyncCallback[(TopicPartition, SubSourceCancellationStrategy)] =
private val subsourceCancelledCB: AsyncCallback[(TopicPartition, SubSourceCancellationStrategy)] =
getAsyncCallback[(TopicPartition, SubSourceCancellationStrategy)] {
case (tp, cancellationStrategy: SubSourceCancellationStrategy) =>
subSources -= tp
Expand All @@ -210,7 +210,7 @@ private class SubSourceLogic[K, V, Msg](
}
}

val subsourceStartedCB: AsyncCallback[(TopicPartition, ControlAndStageActor)] =
private val subsourceStartedCB: AsyncCallback[(TopicPartition, ControlAndStageActor)] =
getAsyncCallback[(TopicPartition, ControlAndStageActor)] {
case (tp, value @ ControlAndStageActor(control, _)) =>
if (!partitionsInStartup.contains(tp)) {
Expand Down Expand Up @@ -380,9 +380,9 @@ private abstract class SubSourceStageLogic[K, V, Msg](
override def executionContext: ExecutionContext = materializer.executionContext
override def consumerFuture: Future[ActorRef] = Future.successful(consumerActor)
private val requestMessages = KafkaConsumerActor.Internal.RequestMessages(0, Set(tp))
var requested = false
var subSourceActor: StageActor = _
var buffer: Iterator[ConsumerRecord[K, V]] = Iterator.empty
private var requested = false
protected var subSourceActor: StageActor = _
private var buffer: Iterator[ConsumerRecord[K, V]] = Iterator.empty

override def preStart(): Unit = {
log.debug("#{} Starting SubSource for partition {}", actorNumber, tp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.kafka.common.TopicPartition
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success}

/**
* INTERNAL API
Expand Down Expand Up @@ -123,6 +124,7 @@ private final class TransactionalProducerStageLogic[K, V, P](
override def preStart(): Unit = resumeDemand()

override protected def producerAssigned(): Unit = {
producingInHandler()
initTransactions()
beginTransaction()
produceFirstMessage()
Expand All @@ -143,11 +145,16 @@ private final class TransactionalProducerStageLogic[K, V, P](
demandSuspended = false
}

override protected def suspendDemand(fromStageLogicConstructor: Boolean = false): Unit = {
if (!demandSuspended) super.suspendDemand(fromStageLogicConstructor)
override protected def suspendDemand(): Unit = {
if (!demandSuspended) super.suspendDemand()
demandSuspended = true
}

override protected def initialInHandler(): Unit =
setHandler(stage.in, new DefaultInHandler {
override def onPush(): Unit = parseFirstMessage(grab(stage.in))
})

override protected def onTimer(timerKey: Any): Unit =
if (timerKey == commitSchedulerKey) {
maybeCommitTransaction()
Expand All @@ -160,8 +167,7 @@ private final class TransactionalProducerStageLogic[K, V, P](
case batch: NonemptyTransactionBatch if awaitingConf == 0 =>
commitTransaction(batch, beginNewTransaction)
case _: EmptyTransactionBatch if awaitingConf == 0 && abortEmptyTransactionOnComplete =>
log.debug("Aborting empty transaction because we're completing.")
abortTransaction()
abortTransaction("Transaction is empty and stage is completing")
case _ if awaitingConf > 0 =>
suspendDemand()
scheduleOnce(commitSchedulerKey, messageDrainInterval)
Expand All @@ -170,14 +176,17 @@ private final class TransactionalProducerStageLogic[K, V, P](
}
}

override def filterSend(msg: Envelope[K, V, P]): Boolean =
/**
* When using partitioned sources we extract the transactional id, group id, and topic partition information from
* the first message in order to define a `transacitonal.id` before constructing the [[org.apache.kafka.clients.producer.KafkaProducer]]
*/
private def parseFirstMessage(msg: Envelope[K, V, P]): Boolean =
producerAssignmentLifecycle match {
case Assigned => true
case Unassigned if firstMessage.nonEmpty =>
// this should never happen because demand should be suspended until the producer is assigned
throw new IllegalStateException("Cannot reapply first message")
case Unassigned =>
if (firstMessage.nonEmpty) {
// this should never happen because demand should be suspended until the producer is assigned
throw new IllegalStateException("Cannot reapply first message")
}
// stash the first message so it can be sent after the producer is assigned
firstMessage = Some(msg)
// initiate async async producer request _after_ first message is stashed in case future eagerly resolves
Expand All @@ -187,7 +196,9 @@ private final class TransactionalProducerStageLogic[K, V, P](
suspendDemand()
false
case AsyncCreateRequestSent =>
throw new IllegalStateException(s"Should never receive new messages while in state '$AsyncCreateRequestSent'")
throw new IllegalStateException(
s"Should never receive new messages while in producer assignment state '$AsyncCreateRequestSent'"
)
}

private def generatedTransactionalConfig(msg: Envelope[K, V, P]): ProducerSettings[K, V] = {
Expand All @@ -200,18 +211,14 @@ private final class TransactionalProducerStageLogic[K, V, P](
case _ => transactionalId
}

stage.settings.withEnrichAsync { settings =>
Future.successful(
settings.withProperties(
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG -> true.toString,
ProducerConfig.TRANSACTIONAL_ID_CONFIG -> txId,
ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION -> 1.toString
)
)
}
stage.settings.withProperties(
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG -> true.toString,
ProducerConfig.TRANSACTIONAL_ID_CONFIG -> txId,
ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION -> 1.toString
)
}

override def postSend(msg: Envelope[K, V, P]): Unit = msg.passThrough match {
override protected def postSend(msg: Envelope[K, V, P]): Unit = msg.passThrough match {
case o: ConsumerMessage.PartitionOffsetCommittedMarker => batchOffsets = batchOffsets.updated(o)
}

Expand All @@ -223,8 +230,7 @@ private final class TransactionalProducerStageLogic[K, V, P](
}

override def onCompletionFailure(ex: Throwable): Unit = {
log.debug("Aborting transaction due to stage failure")
abortTransaction()
abortTransaction("Stage failure")
batchOffsets.committingFailed()
super.onCompletionFailure(ex)
}
Expand Down Expand Up @@ -254,7 +260,7 @@ private final class TransactionalProducerStageLogic[K, V, P](
}
}

val onInternalCommitAckCb: AsyncCallback[Unit] = {
private val onInternalCommitAckCb: AsyncCallback[Unit] = {
getAsyncCallback[Unit](
_ => scheduleOnce(commitSchedulerKey, producerSettings.eosCommitInterval)
)
Expand All @@ -270,8 +276,8 @@ private final class TransactionalProducerStageLogic[K, V, P](
producer.beginTransaction()
}

private def abortTransaction(): Unit = {
log.debug("Aborting transaction")
private def abortTransaction(reason: String): Unit = {
log.debug("Aborting transaction: {}", reason)
if (producerAssignmentLifecycle == Assigned) producer.abortTransaction()
}
}
Loading

0 comments on commit 367edfb

Please sign in to comment.