Skip to content

Commit

Permalink
Travis test debug: repeat flakey tests
Browse files Browse the repository at this point in the history
  • Loading branch information
seglo committed Nov 2, 2019
1 parent c6f3bf3 commit 6364a0a
Show file tree
Hide file tree
Showing 12 changed files with 581 additions and 256 deletions.
17 changes: 12 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@ services:
- docker

before_install:
# upgrade to a later docker-compose which supports services.kafka.scale
- sudo rm /usr/local/bin/docker-compose
- curl -L https://github.com/docker/compose/releases/download/1.22.0/docker-compose-`uname -s`-`uname -m` > docker-compose
- chmod +x docker-compose
- sudo mv docker-compose /usr/local/bin
# fetch full history for correct current and previous version detection
- git fetch --unshallow
# using jabba for custom jdk management
Expand All @@ -25,6 +20,14 @@ script:

jobs:
include:
- stage: debug
name: "debug test: must support copy stream with merging and multi message x50"
script: sbt "tests/testOnly *.TransactionsSpec -- -z \"must support copy stream with merging and multi message\" -DtimesToRepeat=50"
- name: "debug test: must provide consistency when multiple transactional streams are being restarted x10"
script: sbt "tests/it:testOnly *.TransactionsSourceSpec -- -z \"must provide consistency when multiple transactional streams are being restarted\" -DtimesToRepeat=10"
- name: "debug test: must provide consistency when multiple partitioned transactional streams are being restarted x10"
script: sbt "tests/it:testOnly *.TransactionsSourceSpec -- -z \"must provide consistency when multiple partitioned transactional streams are being restarted\" -DtimesToRepeat=10"

- stage: check
script: sbt verifyCodeStyle
name: "Code style check. Run locally with: sbt verifyCodeStyle"
Expand Down Expand Up @@ -80,6 +83,10 @@ jobs:
name: "Publish API and reference documentation"

stages:
# runs on master commits and PRs
- name: debug
if: NOT tag =~ ^v

# runs on master commits and PRs
- name: check
if: NOT tag =~ ^v
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <:
override def postStop(): Unit = {
log.debug("Stage completed")

if (stage.settings.closeProducerOnStop && producer != null) {
if (stage.settings.closeProducerOnStop && producerAssignmentLifecycle == Assigned) {
try {
// we do not have to check if producer was already closed in send-callback as `flush()` and `close()` are effectively no-ops in this case
producer.flush()
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ private class SubSourceLogic[K, V, Msg](

val subsourceStartedCB: AsyncCallback[(TopicPartition, ControlAndStageActor)] =
getAsyncCallback[(TopicPartition, ControlAndStageActor)] {
case (tp, value @ ControlAndStageActor(control, actorRef)) =>
case (tp, value @ ControlAndStageActor(control, _)) =>
if (!partitionsInStartup.contains(tp)) {
// Partition was revoked while
// starting up. Kill!
Expand Down Expand Up @@ -371,9 +371,9 @@ private abstract class SubSourceStageLogic[K, V, Msg](
super.preStart()
subSourceActor = getStageActor(messageHandling)
subSourceActor.watch(consumerActor)
consumerActor.tell(RegisterSubStage, subSourceActor.ref)
val controlAndActor = ControlAndStageActor(this.asInstanceOf[Control], subSourceActor.ref)
subSourceStartedCb.invoke(tp -> controlAndActor)
consumerActor.tell(RegisterSubStage, subSourceActor.ref)
}

protected def messageHandling: PartialFunction[(ActorRef, Any), Unit] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,13 @@ private final class TransactionalProducerStageLogic[K, V, P](
scheduleOnce(commitSchedulerKey, stage.settings.eosCommitInterval)
}

private def produceFirstMessage(): Unit =
firstMessage = firstMessage.flatMap { msg =>
// produce first message before resuming demand
private def produceFirstMessage(): Unit = firstMessage match {
case Some(msg) =>
produce(msg)
None
}
firstMessage = None
case _ =>
throw new IllegalStateException("Should never attempt to produce first message if it does not exist.")
}

override protected def resumeDemand(tryToPull: Boolean = true): Unit = {
super.resumeDemand(tryToPull)
Expand Down Expand Up @@ -171,13 +172,15 @@ private final class TransactionalProducerStageLogic[K, V, P](
producerAssignmentLifecycle match {
case Assigned => true
case Unassigned =>
assignProducerRequest(generatedTransactionalConfig(msg))
if (firstMessage.nonEmpty) {
// this should never happen because demand should be suspended until the producer is assigned
throw new IllegalStateException("Cannot reapply first message")
}
// stash the first message so it can be sent after the producer is assigned
firstMessage = Some(msg)
// initiate async async producer request _after_ first message is stashed in case future eagerly resolves
// instead of asynccallback
assignProducerRequest(generatedTransactionalConfig(msg))
// suspend demand after we receive the first message until the producer is assigned
suspendDemand()
false
Expand All @@ -190,7 +193,7 @@ private final class TransactionalProducerStageLogic[K, V, P](
case committedMarker: PartitionOffsetCommittedMarker if committedMarker.fromPartitionedSource =>
val gtp = committedMarker.key
val txId = s"$transactionalId-${gtp.groupId}-${gtp.topic}-${gtp.partition}"
log.debug("Generated transactional id from partitioned source ''", txId)
log.debug("Generated transactional id from partitioned source '{}'", txId)
txId
case _ => transactionalId
}
Expand Down Expand Up @@ -267,6 +270,6 @@ private final class TransactionalProducerStageLogic[K, V, P](

private def abortTransaction(): Unit = {
log.debug("Aborting transaction")
if (producer != null) producer.abortTransaction()
if (producerAssignmentLifecycle == Assigned) producer.abortTransaction()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ private[kafka] final class TransactionalSubSource[K, V](
/** Internal API */
@InternalApi
private object TransactionalSourceLogic {
type Offset = Long

case object Drained
case class Drain[T](partitions: Set[TopicPartition],
drainedConfirmationRef: Option[ActorRef],
Expand All @@ -291,8 +293,6 @@ private object TransactionalSourceLogic {
sourceActor ! CommittingFailure
}

type Offset = Long

private[internal] trait InFlightRecords {
// Assumes that offsets per topic partition are added in the increasing order
// The assumption is true for Kafka consumer that guarantees that elements are emitted
Expand Down
2 changes: 1 addition & 1 deletion tests/src/it/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@

<root level="DEBUG">
<appender-ref ref="FILE" />
<appender-ref ref="STDOUT" />
<!--appender-ref ref="STDOUT" /-->
</root>
</configuration>
142 changes: 142 additions & 0 deletions tests/src/it/scala/akka/kafka/TransactionsPartitionedSourceSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package akka.kafka

import java.util.concurrent.atomic.AtomicInteger

import akka.Done
import akka.kafka.scaladsl.SpecBase
import akka.kafka.testkit.KafkaTestkitTestcontainersSettings
import akka.kafka.testkit.scaladsl.TestcontainersKafkaPerClassLike
import akka.stream._
import akka.stream.scaladsl.{Flow, Keep, RestartSource, Sink}
import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{Matchers, WordSpecLike}

import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.{Await, Future, TimeoutException}
import scala.util.{Failure, Success}

class TransactionsPartitionedSourceSpec extends SpecBase
with TestcontainersKafkaPerClassLike
with WordSpecLike
with ScalaFutures
with Matchers
with TransactionsOps
with Repeated {

implicit val pc = PatienceConfig(45.seconds, 1.second)

override val testcontainersSettings = KafkaTestkitTestcontainersSettings(system)
.withNumBrokers(3)
.withInternalTopicsReplicationFactor(2)

"A multi-broker consume-transform-produce cycle" must {
"provide consistency when multiple partitioned transactional streams are being restarted" in assertAllStagesStopped {
val sourcePartitions = 10
val destinationPartitions = 4
val consumers = 3
val replication = 2

val sourceTopic = createTopic(1, sourcePartitions, replication)
val sinkTopic = createTopic(2, destinationPartitions, replication)
val group = createGroupId(1)

val elements = 100 * 1000
val restartAfter = 10 * 1000 / sourcePartitions

val partitionSize = elements / sourcePartitions
val producers: immutable.Seq[Future[Done]] =
(0 until sourcePartitions).map(
part => produce(sourceTopic, ((part * partitionSize) + 1) to (partitionSize * (part + 1)), part)
)

Await.result(Future.sequence(producers), 1.minute)

val consumerSettings = consumerDefaults.withGroupId(group)

val completedCopy = new AtomicInteger(0)
val completedWithTimeout = new AtomicInteger(0)

def runStream(id: String): UniqueKillSwitch =
RestartSource
.onFailuresWithBackoff(10.millis, 100.millis, 0.2)(
() => {
val transactionId = s"$group-$id"
transactionalPartitionedCopyStream(
consumerSettings,
txProducerDefaults,
sourceTopic,
sinkTopic,
transactionId,
idleTimeout = 10.seconds,
maxPartitions = sourcePartitions,
restartAfter = Some(restartAfter)
)
.recover {
case e: TimeoutException =>
if (completedWithTimeout.incrementAndGet() > 10)
"no more messages to copy"
else
throw new Error("Continue restarting copy stream")
}
}
)
.viaMat(KillSwitches.single)(Keep.right)
.toMat(Sink.onComplete {
case Success(_) =>
completedCopy.incrementAndGet()
case Failure(_) => // restart
})(Keep.left)
.run()

val controls: Seq[UniqueKillSwitch] = (0 until consumers)
.map(_.toString)
.map(runStream)

val probeConsumerGroup = createGroupId(2)

while (completedCopy.get() < consumers) {
Thread.sleep(2000)
}

val consumer = offsetValueSource(probeConsumerSettings(probeConsumerGroup), sinkTopic)
.take(elements.toLong)
.idleTimeout(30.seconds)
.alsoTo(
Flow[(Long, String)]
.scan(0) { case (count, _) => count + 1 }
.filter(_ % 100 == 0)
.log("received")
.to(Sink.ignore)
)
.recover {
case t => (0L, "no-more-elements")
}
.filter(_._2 != "no-more-elements")
.runWith(Sink.seq)

val values = Await.result(consumer, 10.minutes)

val expected = (1 to elements).map(_.toString)

log.debug("Expected elements: {}, actual elements: {}", elements, values.length)

//println(s"Actual elements:\n$values")

checkForMissing(values, expected)
checkForDuplicates(values, expected)

controls.foreach(_.shutdown())
}
}

private def probeConsumerSettings(groupId: String): ConsumerSettings[String, String] =
withProbeConsumerSettings(consumerDefaults, groupId)

override def producerDefaults: ProducerSettings[String, String] =
withTestProducerSettings(super.producerDefaults)

def txProducerDefaults: ProducerSettings[String, String] =
withTransactionalProducerSettings(super.producerDefaults)
}
Loading

0 comments on commit 6364a0a

Please sign in to comment.