Skip to content

Commit

Permalink
PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
seglo committed Oct 30, 2019
1 parent 90d2e2a commit 405687b
Show file tree
Hide file tree
Showing 12 changed files with 62 additions and 50 deletions.
6 changes: 0 additions & 6 deletions core/src/main/mima-filters/1.1.0-RC2.backwards.excludes

This file was deleted.

6 changes: 6 additions & 0 deletions core/src/main/mima-filters/1.1.x.backwards.excludes
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
# allow more parameters to private constructors
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.ProducerSettings.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.ConsumerSettings.this")
# PR #930 Transactional.partitionedSource
# https://github.com/akka/alpakka-kafka/pull/930
ProblemFilters.exclude[MissingTypesProblem]("akka.kafka.ConsumerMessage$PartitionOffsetCommittedMarker$")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.ConsumerMessage#PartitionOffsetCommittedMarker.apply")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.ConsumerMessage#PartitionOffsetCommittedMarker.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.ConsumerMessage#PartitionOffset.withCommittedMarker")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.kafka.ConsumerMessage#PartitionOffsetCommittedMarker.unapply")
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private[kafka] final class CommittableSubSource[K, V](
shape: SourceShape[CommittableMessage[K, V]],
tp: TopicPartition,
consumerActor: ActorRef,
subSourceStartedCb: AsyncCallback[(TopicPartition, (Control, ActorRef))],
subSourceStartedCb: AsyncCallback[(TopicPartition, ControlAndStageActor)],
subSourceCancelledCb: AsyncCallback[(TopicPartition, Option[ConsumerRecord[K, V]])],
actorNumber: Int
): SubSourceStageLogic[K, V, CommittableMessage[K, V]] =
Expand Down Expand Up @@ -213,7 +213,7 @@ private class CommittableSubSourceStageLogic[K, V](
shape: SourceShape[CommittableMessage[K, V]],
tp: TopicPartition,
consumerActor: ActorRef,
subSourceStartedCb: AsyncCallback[(TopicPartition, (Control, ActorRef))],
subSourceStartedCb: AsyncCallback[(TopicPartition, ControlAndStageActor)],
subSourceCancelledCb: AsyncCallback[(TopicPartition, Option[ConsumerRecord[K, V]])],
actorNumber: Int,
consumerSettings: ConsumerSettings[K, V],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,13 @@ import scala.util.{Failure, Success, Try}
private[kafka] class DefaultProducerStage[K, V, P, IN <: Envelope[K, V, P], OUT <: Results[K, V, P]](
val settings: ProducerSettings[K, V]
) extends GraphStage[FlowShape[IN, Future[OUT]]]
with ProducerStage[K, V, P, IN, OUT, Unit] {
with ProducerStage[K, V, P, IN, OUT] {

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new DefaultProducerStageLogic(this, inheritedAttributes) {
// TODO: do we need to eagerly resolve a producer here?
//override protected var producer: Producer[K, V] = producerProvider(())
}
new DefaultProducerStageLogic(this, inheritedAttributes)
}

object DefaultProducerStageLogic {
private[kafka] object DefaultProducerStageLogic {
sealed trait ProducerAssignmentLifecycle
case object Unassigned extends ProducerAssignmentLifecycle
case object AsyncCreateRequestSent extends ProducerAssignmentLifecycle
Expand All @@ -51,8 +48,8 @@ object DefaultProducerStageLogic {
*
* Used by [[DefaultProducerStage]], extended by [[TransactionalProducerStageLogic]].
*/
private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <: Results[K, V, P], S](
stage: ProducerStage[K, V, P, IN, OUT, S],
private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <: Results[K, V, P]](
stage: ProducerStage[K, V, P, IN, OUT],
inheritedAttributes: Attributes
) extends TimerGraphStageLogic(stage.shape)
with StageLogging
Expand Down
32 changes: 19 additions & 13 deletions core/src/main/scala/akka/kafka/internal/MessageBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private[kafka] trait TransactionalMessageBuilderBase[K, V, Msg] extends MessageB

def onMessage(consumerMessage: ConsumerRecord[K, V]): Unit

val fromPartitionedSource: Boolean
def fromPartitionedSource: Boolean
}

/** Internal API */
Expand All @@ -55,13 +55,16 @@ private[kafka] trait TransactionalMessageBuilder[K, V]
extends TransactionalMessageBuilderBase[K, V, TransactionalMessage[K, V]] {
override def createMessage(rec: ConsumerRecord[K, V]): TransactionalMessage[K, V] = {
onMessage(rec)
val groupTopicPartition = GroupTopicPartition(
groupId = groupId,
topic = rec.topic,
partition = rec.partition
val offset = PartitionOffsetCommittedMarker(
GroupTopicPartition(
groupId = groupId,
topic = rec.topic,
partition = rec.partition
),
offset = rec.offset,
committedMarker,
fromPartitionedSource
)
val offset =
PartitionOffsetCommittedMarker(groupTopicPartition, offset = rec.offset, committedMarker, fromPartitionedSource)
ConsumerMessage.TransactionalMessage(rec, offset)
}
}
Expand All @@ -72,13 +75,16 @@ private[kafka] trait TransactionalOffsetContextBuilder[K, V]
extends TransactionalMessageBuilderBase[K, V, (ConsumerRecord[K, V], PartitionOffset)] {
override def createMessage(rec: ConsumerRecord[K, V]): (ConsumerRecord[K, V], PartitionOffset) = {
onMessage(rec)
val groupTopicPartition = GroupTopicPartition(
groupId = groupId,
topic = rec.topic,
partition = rec.partition
val offset = PartitionOffsetCommittedMarker(
GroupTopicPartition(
groupId = groupId,
topic = rec.topic,
partition = rec.partition
),
offset = rec.offset,
committedMarker,
fromPartitionedSource
)
val offset =
PartitionOffsetCommittedMarker(groupTopicPartition, offset = rec.offset, committedMarker, fromPartitionedSource)
(rec, offset)
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/akka/kafka/internal/PlainSources.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private[kafka] final class PlainSubSource[K, V](
shape: SourceShape[ConsumerRecord[K, V]],
tp: TopicPartition,
consumerActor: ActorRef,
subSourceStartedCb: AsyncCallback[(TopicPartition, (Control, ActorRef))],
subSourceStartedCb: AsyncCallback[(TopicPartition, ControlAndStageActor)],
subSourceCancelledCb: AsyncCallback[(TopicPartition, Option[ConsumerRecord[K, V]])],
actorNumber: Int
): SubSourceStageLogic[K, V, ConsumerRecord[K, V]] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import scala.concurrent.Future
* Implemented by [[DefaultProducerStage]] and [[TransactionalProducerStage]].
*/
@InternalApi
private[internal] trait ProducerStage[K, V, P, IN <: Envelope[K, V, P], OUT <: Results[K, V, P], S] {
private[internal] trait ProducerStage[K, V, P, IN <: Envelope[K, V, P], OUT <: Results[K, V, P]] {
val settings: ProducerSettings[K, V]

val in: Inlet[IN] = Inlet[IN]("messages")
Expand Down
31 changes: 19 additions & 12 deletions core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ private class SubSourceLogic[K, V, Msg](

/** We have created a source for these partitions, but it has not started up and is not in subSources yet. */
var partitionsInStartup: immutable.Set[TopicPartition] = immutable.Set.empty
var subSources: Map[TopicPartition, (Control, ActorRef)] = immutable.Map.empty
var subSources: Map[TopicPartition, ControlAndStageActor] = immutable.Map.empty

/** Kafka has signalled these partitions are revoked, but some may be re-assigned just after revoking. */
var partitionsToRevoke: Set[TopicPartition] = Set.empty
Expand Down Expand Up @@ -179,7 +179,7 @@ private class SubSourceLogic[K, V, Msg](
onRevoke(partitionsToRevoke)
pendingPartitions --= partitionsToRevoke
partitionsInStartup --= partitionsToRevoke
partitionsToRevoke.flatMap(subSources.get).map(_._1).foreach(_.shutdown())
partitionsToRevoke.flatMap(subSources.get).map(_.control).foreach(_.shutdown())
subSources --= partitionsToRevoke
partitionsToRevoke = Set.empty
}
Expand All @@ -200,9 +200,9 @@ private class SubSourceLogic[K, V, Msg](
}
}

val subsourceStartedCB: AsyncCallback[(TopicPartition, (Control, ActorRef))] =
getAsyncCallback[(TopicPartition, (Control, ActorRef))] {
case (tp, value @ (control, actorRef)) =>
val subsourceStartedCB: AsyncCallback[(TopicPartition, ControlAndStageActor)] =
getAsyncCallback[(TopicPartition, ControlAndStageActor)] {
case (tp, value @ ControlAndStageActor(control, actorRef)) =>
if (!partitionsInStartup.contains(tp)) {
// Partition was revoked while
// starting up. Kill!
Expand Down Expand Up @@ -253,7 +253,7 @@ private class SubSourceLogic[K, V, Msg](
override def performStop(): Unit = {
setKeepGoing(true)
subSources.foreach {
case (_, (control, _)) => control.stop()
case (_, ControlAndStageActor(control, _)) => control.stop()
}
complete(shape.out)
onStop()
Expand All @@ -263,7 +263,7 @@ private class SubSourceLogic[K, V, Msg](
setKeepGoing(true)
//todo we should wait for subsources to be shutdown and next shutdown main stage
subSources.foreach {
case (_, (control, _)) => control.shutdown()
case (_, ControlAndStageActor(control, _)) => control.shutdown()
}

if (!isClosed(shape.out)) {
Expand Down Expand Up @@ -301,7 +301,7 @@ private object SubSourceLogic {
private final class SubSourceStage[K, V, Msg](
tp: TopicPartition,
consumerActor: ActorRef,
subSourceStartedCb: AsyncCallback[(TopicPartition, (Control, ActorRef))],
subSourceStartedCb: AsyncCallback[(TopicPartition, ControlAndStageActor)],
subSourceCancelledCb: AsyncCallback[(TopicPartition, Option[ConsumerRecord[K, V]])],
actorNumber: Int,
subSourceStageLogicFactory: SubSourceStageLogicFactory[K, V, Msg]
Expand All @@ -314,6 +314,13 @@ private final class SubSourceStage[K, V, Msg](
subSourceStageLogicFactory.create(shape, tp, consumerActor, subSourceStartedCb, subSourceCancelledCb, actorNumber)
}

/** Internal API
*
* SubSourceStageLogic [[akka.kafka.scaladsl.Consumer.Control]] and the stage actor [[ActorRef]]
*/
@InternalApi
private final case class ControlAndStageActor(control: Control, stageActor: ActorRef)

/** Internal API
*
* Encapsulates a factory method to create a `SubSourceStageLogic` within `SubSourceLogic` where the context
Expand All @@ -325,7 +332,7 @@ private trait SubSourceStageLogicFactory[K, V, Msg] {
shape: SourceShape[Msg],
tp: TopicPartition,
consumerActor: ActorRef,
subSourceStartedCb: AsyncCallback[(TopicPartition, (Control, ActorRef))],
subSourceStartedCb: AsyncCallback[(TopicPartition, ControlAndStageActor)],
subSourceCancelledCb: AsyncCallback[(TopicPartition, Option[ConsumerRecord[K, V]])],
actorNumber: Int
): SubSourceStageLogic[K, V, Msg]
Expand All @@ -341,7 +348,7 @@ private abstract class SubSourceStageLogic[K, V, Msg](
val shape: SourceShape[Msg],
tp: TopicPartition,
consumerActor: ActorRef,
subSourceStartedCb: AsyncCallback[(TopicPartition, (Control, ActorRef))],
subSourceStartedCb: AsyncCallback[(TopicPartition, ControlAndStageActor)],
subSourceCancelledCb: AsyncCallback[(TopicPartition, Option[ConsumerRecord[K, V]])],
actorNumber: Int
) extends GraphStageLogic(shape)
Expand All @@ -351,7 +358,7 @@ private abstract class SubSourceStageLogic[K, V, Msg](
with StageLogging {
override def executionContext: ExecutionContext = materializer.executionContext
override def consumerFuture: Future[ActorRef] = Future.successful(consumerActor)
val requestMessages = KafkaConsumerActor.Internal.RequestMessages(0, Set(tp))
private val requestMessages = KafkaConsumerActor.Internal.RequestMessages(0, Set(tp))
var requested = false
var subSourceActor: StageActor = _
var buffer: Iterator[ConsumerRecord[K, V]] = Iterator.empty
Expand All @@ -362,7 +369,7 @@ private abstract class SubSourceStageLogic[K, V, Msg](
subSourceActor = getStageActor(messageHandling)
subSourceActor.watch(consumerActor)
consumerActor.tell(RegisterSubStage, subSourceActor.ref)
val controlAndActor = (this.asInstanceOf[Control], subSourceActor.ref)
val controlAndActor = ControlAndStageActor(this.asInstanceOf[Control], subSourceActor.ref)
subSourceStartedCb.invoke(tp -> controlAndActor)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ private[kafka] final class TransactionalProducerStage[K, V, P](
val settings: ProducerSettings[K, V],
transactionalId: String
) extends GraphStage[FlowShape[Envelope[K, V, P], Future[Results[K, V, P]]]]
with ProducerStage[K, V, P, Envelope[K, V, P], Results[K, V, P], String] {
with ProducerStage[K, V, P, Envelope[K, V, P], Results[K, V, P]] {

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TransactionalProducerStageLogic(this, transactionalId, inheritedAttributes)
Expand Down Expand Up @@ -101,7 +101,7 @@ private final class TransactionalProducerStageLogic[K, V, P](
stage: TransactionalProducerStage[K, V, P],
transactionalId: String,
inheritedAttributes: Attributes
) extends DefaultProducerStageLogic[K, V, P, Envelope[K, V, P], Results[K, V, P], String](stage, inheritedAttributes)
) extends DefaultProducerStageLogic[K, V, P, Envelope[K, V, P], Results[K, V, P]](stage, inheritedAttributes)
with StageLogging
with MessageCallback[K, V, P]
with ProducerCompletionState {
Expand Down Expand Up @@ -265,6 +265,6 @@ private final class TransactionalProducerStageLogic[K, V, P](

private def abortTransaction(): Unit = {
log.debug("Aborting transaction")
Option(producer).foreach(_.abortTransaction())
if (producer != null) producer.abortTransaction()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ private[kafka] final class TransactionalSubSource[K, V](
shape: SourceShape[TransactionalMessage[K, V]],
tp: TopicPartition,
consumerActor: ActorRef,
subSourceStartedCb: AsyncCallback[(TopicPartition, (Control, ActorRef))],
subSourceStartedCb: AsyncCallback[(TopicPartition, ControlAndStageActor)],
subSourceCancelledCb: AsyncCallback[(TopicPartition, Option[ConsumerRecord[K, V]])],
actorNumber: Int
): SubSourceStageLogic[K, V, TransactionalMessage[K, V]] =
Expand All @@ -241,7 +241,7 @@ private[kafka] final class TransactionalSubSource[K, V](
override def onRevoke(revokedTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit =
if (revokedTps.isEmpty) ()
else if (waitForDraining(revokedTps)) {
subSources.values.map(_._2).foreach(_ ! Revoked(revokedTps.toList))
subSources.values.map(_.stageActor).foreach(_ ! Revoked(revokedTps.toList))
} else {
sourceActor.ref ! Status.Failure(new Error("Timeout while draining"))
consumerActor ! KafkaConsumerActor.Internal.Stop
Expand All @@ -256,7 +256,7 @@ private[kafka] final class TransactionalSubSource[K, V](
import akka.pattern.ask
implicit val timeout = Timeout(txConsumerSettings.commitTimeout)
try {
val drainCommandFutures = subSources.values.map(_._2).map(ask(_, Drain(partitions, None, Drained)))
val drainCommandFutures = subSources.values.map(_.stageActor).map(ask(_, Drain(partitions, None, Drained)))
implicit val ec = executionContext
Await.result(Future.sequence(drainCommandFutures), timeout.duration)
true
Expand Down Expand Up @@ -342,7 +342,7 @@ private class TransactionalSubSourceStageLogic[K, V](
shape: SourceShape[TransactionalMessage[K, V]],
tp: TopicPartition,
consumerActor: ActorRef,
subSourceStartedCb: AsyncCallback[(TopicPartition, (Control, ActorRef))],
subSourceStartedCb: AsyncCallback[(TopicPartition, ControlAndStageActor)],
subSourceCancelledCb: AsyncCallback[(TopicPartition, Option[ConsumerRecord[K, V]])],
actorNumber: Int,
consumerSettings: ConsumerSettings[K, V]
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/akka/kafka/javadsl/Transactional.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ object Transactional {
* By generating the `transactionalId` from the [[TopicPartition]], multiple instances of your application can run
* without having to manually assign partitions to each instance.
*/
@ApiMayChange
def partitionedSource[K, V](
consumerSettings: ConsumerSettings[K, V],
subscription: AutoSubscription
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ object Transactional {
* By generating the `transactionalId` from the [[TopicPartition]], multiple instances of your application can run
* without having to manually assign partitions to each instance.
*/
@ApiMayChange
def partitionedSource[K, V](
settings: ConsumerSettings[K, V],
subscription: AutoSubscription
Expand Down

0 comments on commit 405687b

Please sign in to comment.