Skip to content

Commit

Permalink
Transactional partitioned sources consistency testing
Browse files Browse the repository at this point in the history
  • Loading branch information
seglo committed Nov 5, 2019
1 parent b5de307 commit 44cb47d
Show file tree
Hide file tree
Showing 11 changed files with 297 additions and 140 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ script:
jobs:
include:
- stage: debug
name: "debug test: must support copy stream with merging and multi message x50"
script: sbt "tests/testOnly *.TransactionsSpec -- -z \"must support copy stream with merging and multi message\" -DtimesToRepeat=50"
name: "debug test: TransactionsSpec x50"
script: sbt "tests/testOnly *.TransactionsSpec -DtimesToRepeat=50"
- name: "debug test: must provide consistency when multiple transactional streams are being restarted x10"
script: sbt "tests/it:testOnly *.TransactionsSourceSpec -- -z \"must provide consistency when multiple transactional streams are being restarted\" -DtimesToRepeat=10"
- name: "debug test: must provide consistency when multiple partitioned transactional streams are being restarted x10"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private[kafka] final class CommittableSubSource[K, V](
tp: TopicPartition,
consumerActor: ActorRef,
subSourceStartedCb: AsyncCallback[(TopicPartition, ControlAndStageActor)],
subSourceCancelledCb: AsyncCallback[(TopicPartition, Option[ConsumerRecord[K, V]])],
subSourceCancelledCb: AsyncCallback[(TopicPartition, SubSourceCancellationStrategy)],
actorNumber: Int
): SubSourceStageLogic[K, V, CommittableMessage[K, V]] =
new CommittableSubSourceStageLogic(shape,
Expand Down Expand Up @@ -214,7 +214,7 @@ private class CommittableSubSourceStageLogic[K, V](
tp: TopicPartition,
consumerActor: ActorRef,
subSourceStartedCb: AsyncCallback[(TopicPartition, ControlAndStageActor)],
subSourceCancelledCb: AsyncCallback[(TopicPartition, Option[ConsumerRecord[K, V]])],
subSourceCancelledCb: AsyncCallback[(TopicPartition, SubSourceCancellationStrategy)],
actorNumber: Int,
consumerSettings: ConsumerSettings[K, V],
_metadataFromRecord: ConsumerRecord[K, V] => String = CommittableMessageBuilder.NoMetadataFromRecord
Expand Down
10 changes: 7 additions & 3 deletions core/src/main/scala/akka/kafka/internal/PlainSources.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,15 @@ private[kafka] final class PlainSubSource[K, V](
tp: TopicPartition,
consumerActor: ActorRef,
subSourceStartedCb: AsyncCallback[(TopicPartition, ControlAndStageActor)],
subSourceCancelledCb: AsyncCallback[(TopicPartition, Option[ConsumerRecord[K, V]])],
subSourceCancelledCb: AsyncCallback[(TopicPartition, SubSourceCancellationStrategy)],
actorNumber: Int
): SubSourceStageLogic[K, V, ConsumerRecord[K, V]] =
new SubSourceStageLogic(shape, tp, consumerActor, subSourceStartedCb, subSourceCancelledCb, actorNumber)
with PlainMessageBuilder[K, V]
new SubSourceStageLogic[K, V, ConsumerRecord[K, V]](shape,
tp,
consumerActor,
subSourceStartedCb,
subSourceCancelledCb,
actorNumber) with PlainMessageBuilder[K, V]
}

new SubSourceLogic[K, V, ConsumerRecord[K, V]](shape,
Expand Down
65 changes: 39 additions & 26 deletions core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -187,19 +187,25 @@ private class SubSourceLogic[K, V, Msg](
partitionsToRevoke = Set.empty
}

val subsourceCancelledCB: AsyncCallback[(TopicPartition, Option[ConsumerRecord[K, V]])] =
getAsyncCallback[(TopicPartition, Option[ConsumerRecord[K, V]])] {
case (tp, firstUnconsumed) =>
val subsourceCancelledCB: AsyncCallback[(TopicPartition, SubSourceCancellationStrategy)] =
getAsyncCallback[(TopicPartition, SubSourceCancellationStrategy)] {
case (tp, cancellationStrategy: SubSourceCancellationStrategy) =>
subSources -= tp
partitionsInStartup -= tp
pendingPartitions += tp
firstUnconsumed match {
case Some(record) =>

cancellationStrategy match {
case SeekToOffsetAndReEmit(offset) =>
// re-add this partition to pending partitions so it can be re-emitted
pendingPartitions += tp
if (log.isDebugEnabled) {
log.debug("#{} Seeking {} to {} after partition SubSource cancelled", actorNumber, tp, record.offset())
log.debug("#{} Seeking {} to {} after partition SubSource cancelled", actorNumber, tp, offset)
}
seekAndEmitSubSources(formerlyUnknown = Set.empty, Map(tp -> record.offset()))
case None => emitSubSourcesForPendingPartitions()
seekAndEmitSubSources(formerlyUnknown = Set.empty, Map(tp -> offset))
case ReEmit =>
// re-add this partition to pending partitions so it can be re-emitted
pendingPartitions += tp
emitSubSourcesForPendingPartitions()
case DoNothing =>
}
}

Expand All @@ -216,12 +222,14 @@ private class SubSourceLogic[K, V, Msg](
}
}

setHandler(shape.out, new OutHandler {
override def onPull(): Unit =
emitSubSourcesForPendingPartitions()
override def onDownstreamFinish(): Unit =
performShutdown()
})
setHandler(
shape.out,
new OutHandler {
override def onPull(): Unit =
emitSubSourcesForPendingPartitions()
override def onDownstreamFinish(): Unit = performShutdown()
}
)

private def updatePendingPartitionsAndEmitSubSources(formerlyUnknownPartitions: Set[TopicPartition]): Unit = {
pendingPartitions ++= formerlyUnknownPartitions.filter(!partitionsInStartup.contains(_))
Expand Down Expand Up @@ -268,7 +276,6 @@ private class SubSourceLogic[K, V, Msg](
subSources.foreach {
case (_, ControlAndStageActor(control, _)) => control.shutdown()
}

if (!isClosed(shape.out)) {
complete(shape.out)
}
Expand Down Expand Up @@ -305,7 +312,7 @@ private final class SubSourceStage[K, V, Msg](
tp: TopicPartition,
consumerActor: ActorRef,
subSourceStartedCb: AsyncCallback[(TopicPartition, ControlAndStageActor)],
subSourceCancelledCb: AsyncCallback[(TopicPartition, Option[ConsumerRecord[K, V]])],
subSourceCancelledCb: AsyncCallback[(TopicPartition, SubSourceCancellationStrategy)],
actorNumber: Int,
subSourceStageLogicFactory: SubSourceStageLogicFactory[K, V, Msg]
) extends GraphStage[SourceShape[Msg]] { stage =>
Expand All @@ -324,6 +331,11 @@ private final class SubSourceStage[K, V, Msg](
@InternalApi
private final case class ControlAndStageActor(control: Control, stageActor: ActorRef)

sealed trait SubSourceCancellationStrategy
final case class SeekToOffsetAndReEmit(offset: Long) extends SubSourceCancellationStrategy
case object ReEmit extends SubSourceCancellationStrategy
case object DoNothing extends SubSourceCancellationStrategy

/** Internal API
*
* Encapsulates a factory method to create a `SubSourceStageLogic` within `SubSourceLogic` where the context
Expand All @@ -336,7 +348,7 @@ private trait SubSourceStageLogicFactory[K, V, Msg] {
tp: TopicPartition,
consumerActor: ActorRef,
subSourceStartedCb: AsyncCallback[(TopicPartition, ControlAndStageActor)],
subSourceCancelledCb: AsyncCallback[(TopicPartition, Option[ConsumerRecord[K, V]])],
subSourceCancelledCb: AsyncCallback[(TopicPartition, SubSourceCancellationStrategy)],
actorNumber: Int
): SubSourceStageLogic[K, V, Msg]
}
Expand All @@ -352,7 +364,7 @@ private abstract class SubSourceStageLogic[K, V, Msg](
tp: TopicPartition,
consumerActor: ActorRef,
subSourceStartedCb: AsyncCallback[(TopicPartition, ControlAndStageActor)],
subSourceCancelledCb: AsyncCallback[(TopicPartition, Option[ConsumerRecord[K, V]])],
subSourceCancelledCb: AsyncCallback[(TopicPartition, SubSourceCancellationStrategy)],
actorNumber: Int
) extends GraphStageLogic(shape)
with PromiseControl
Expand Down Expand Up @@ -392,6 +404,13 @@ private abstract class SubSourceStageLogic[K, V, Msg](
failStage(new ConsumerFailed)
}

protected def onDownstreamFinishSubSourceCancellationStrategy(): SubSourceCancellationStrategy =
if (buffer.hasNext) {
SeekToOffsetAndReEmit(buffer.next().offset())
} else {
ReEmit
}

override def postStop(): Unit = {
onShutdown()
super.postStop()
Expand All @@ -404,13 +423,7 @@ private abstract class SubSourceStageLogic[K, V, Msg](
pump()

override def onDownstreamFinish(): Unit = {
val firstUnconsumed = if (buffer.hasNext) {
Some(buffer.next())
} else {
None
}

subSourceCancelledCb.invoke(tp -> firstUnconsumed)
subSourceCancelledCb.invoke(tp -> onDownstreamFinishSubSourceCancellationStrategy())
super.onDownstreamFinish()
}
}
Expand Down
17 changes: 13 additions & 4 deletions core/src/main/scala/akka/kafka/internal/TransactionalSources.scala
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ private[kafka] final class TransactionalSubSource[K, V](
tp: TopicPartition,
consumerActor: ActorRef,
subSourceStartedCb: AsyncCallback[(TopicPartition, ControlAndStageActor)],
subSourceCancelledCb: AsyncCallback[(TopicPartition, Option[ConsumerRecord[K, V]])],
subSourceCancelledCb: AsyncCallback[(TopicPartition, SubSourceCancellationStrategy)],
actorNumber: Int
): SubSourceStageLogic[K, V, TransactionalMessage[K, V]] =
new TransactionalSubSourceStageLogic(shape,
Expand Down Expand Up @@ -341,11 +341,17 @@ private class TransactionalSubSourceStageLogic[K, V](
tp: TopicPartition,
consumerActor: ActorRef,
subSourceStartedCb: AsyncCallback[(TopicPartition, ControlAndStageActor)],
subSourceCancelledCb: AsyncCallback[(TopicPartition, Option[ConsumerRecord[K, V]])],
subSourceCancelledCb: AsyncCallback[(TopicPartition, SubSourceCancellationStrategy)],
actorNumber: Int,
consumerSettings: ConsumerSettings[K, V]
) extends SubSourceStageLogic(shape, tp, consumerActor, subSourceStartedCb, subSourceCancelledCb, actorNumber)
) extends SubSourceStageLogic[K, V, TransactionalMessage[K, V]](shape,
tp,
consumerActor,
subSourceStartedCb,
subSourceCancelledCb,
actorNumber)
with TransactionalMessageBuilder[K, V] {

import TransactionalSourceLogic._

val inFlightRecords = InFlightRecords.empty
Expand All @@ -363,6 +369,8 @@ private class TransactionalSubSourceStageLogic[K, V](
inFlightRecords.revoke(tps.toSet)
}

override protected def onDownstreamFinishSubSourceCancellationStrategy(): SubSourceCancellationStrategy = DoNothing

def shuttingDownReceive: PartialFunction[(ActorRef, Any), Unit] =
drainHandling
.orElse {
Expand All @@ -373,9 +381,10 @@ private class TransactionalSubSourceStageLogic[K, V](
}

override def performShutdown(): Unit = {
log.debug("#{} Completing SubSource for partition {}", actorNumber, tp)
setKeepGoing(true)
if (!isClosed(shape.out)) {
complete(shape.out)
complete(shape.out) // initiate shutdown of SubSource
}
subSourceActor.become(shuttingDownReceive)
drainAndComplete()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import akka.kafka.scaladsl.SpecBase
import akka.kafka.testkit.KafkaTestkitTestcontainersSettings
import akka.kafka.testkit.scaladsl.TestcontainersKafkaPerClassLike
import akka.stream._
import akka.stream.scaladsl.{Flow, Keep, RestartSource, Sink}
import akka.stream.scaladsl.{Keep, RestartSource, Sink}
import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{Matchers, WordSpecLike}
Expand All @@ -25,33 +25,35 @@ class TransactionsPartitionedSourceSpec extends SpecBase
with TransactionsOps
with Repeated {

val replicationFactor = 2

implicit val pc = PatienceConfig(45.seconds, 1.second)

override val testcontainersSettings = KafkaTestkitTestcontainersSettings(system)
.withNumBrokers(3)
.withInternalTopicsReplicationFactor(2)
.withInternalTopicsReplicationFactor(replicationFactor)

"A multi-broker consume-transform-produce cycle" must {
"provide consistency when multiple partitioned transactional streams are being restarted" in assertAllStagesStopped {
val sourcePartitions = 10
val sourcePartitions = 4
val destinationPartitions = 4
val consumers = 3
val replication = 2
val replication = replicationFactor

val sourceTopic = createTopic(1, sourcePartitions, replication)
val sinkTopic = createTopic(2, destinationPartitions, replication)
val group = createGroupId(1)
val transactionalId = createTransactionalId()

val elements = 100 * 1000
val restartAfter = 10 * 1000 / sourcePartitions
val elements = 100 * 1000 // 100 * 1,000 = 100,000
val restartAfter = (10 * 1000) / sourcePartitions // (10 * 1,000) / 10 = 100

val partitionSize = elements / sourcePartitions
val producers: immutable.Seq[Future[Done]] =
(0 until sourcePartitions).map(
part => produce(sourceTopic, ((part * partitionSize) + 1) to (partitionSize * (part + 1)), part)
)
(0 until sourcePartitions).map { part =>
produce(sourceTopic, range = 1 to elements, partition = part)
}

Await.result(Future.sequence(producers), 1.minute)
Await.result(Future.sequence(producers), 4.minute)

val consumerSettings = consumerDefaults.withGroupId(group)

Expand All @@ -62,13 +64,12 @@ class TransactionsPartitionedSourceSpec extends SpecBase
RestartSource
.onFailuresWithBackoff(10.millis, 100.millis, 0.2)(
() => {
val transactionId = s"$group-$id"
transactionalPartitionedCopyStream(
consumerSettings,
txProducerDefaults,
sourceTopic,
sinkTopic,
transactionId,
transactionalId,
idleTimeout = 10.seconds,
maxPartitions = sourcePartitions,
restartAfter = Some(restartAfter)
Expand All @@ -94,38 +95,21 @@ class TransactionsPartitionedSourceSpec extends SpecBase
.map(_.toString)
.map(runStream)

val probeConsumerGroup = createGroupId(2)

while (completedCopy.get() < consumers) {
Thread.sleep(2000)
}

val consumer = offsetValueSource(probeConsumerSettings(probeConsumerGroup), sinkTopic)
.take(elements.toLong)
.idleTimeout(30.seconds)
.alsoTo(
Flow[(Long, String)]
.scan(0) { case (count, _) => count + 1 }
.filter(_ % 100 == 0)
.log("received")
.to(Sink.ignore)
)
.recover {
case t => (0L, "no-more-elements")
}
.filter(_._2 != "no-more-elements")
.runWith(Sink.seq)

val values = Await.result(consumer, 10.minutes)

val expected = (1 to elements).map(_.toString)
val consumer = consumePartitionOffsetValues(
probeConsumerSettings(createGroupId(2)),
sinkTopic,
elementsToTake = (elements * destinationPartitions).toLong
)

log.debug("Expected elements: {}, actual elements: {}", elements, values.length)
val actualValues = Await.result(consumer, 10.minutes)

//println(s"Actual elements:\n$values")
log.debug("Expected elements: {}, actual elements: {}", elements, actualValues.length)

checkForMissing(values, expected)
checkForDuplicates(values, expected)
assertPartitionedConsistency(elements, destinationPartitions, actualValues)

controls.foreach(_.shutdown())
}
Expand Down
2 changes: 1 addition & 1 deletion tests/src/it/scala/akka/kafka/TransactionsSourceSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class TransactionsSourceSpec extends SpecBase

val expected = (1 to elements).map(_.toString)

log.debug("Expected elements: {}, actual elements: {}", elements, values.length)
log.info("Expected elements: {}, actual elements: {}", elements, values.length)

checkForMissing(values, expected)
checkForDuplicates(values, expected)
Expand Down
2 changes: 1 addition & 1 deletion tests/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
<logger name="com.github.dockerjava" level="WARN"/>

<root level="DEBUG">
<!--<appender-ref ref="STDOUT" /-->
<!-- appender-ref ref="STDOUT" /-->
<appender-ref ref="FILE" />
</root>

Expand Down
Loading

0 comments on commit 44cb47d

Please sign in to comment.