Skip to content

Commit

Permalink
PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
seglo committed Oct 30, 2019
1 parent 90d2e2a commit 6f77477
Show file tree
Hide file tree
Showing 10 changed files with 59 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private[kafka] final class CommittableSubSource[K, V](
shape: SourceShape[CommittableMessage[K, V]],
tp: TopicPartition,
consumerActor: ActorRef,
subSourceStartedCb: AsyncCallback[(TopicPartition, (Control, ActorRef))],
subSourceStartedCb: AsyncCallback[(TopicPartition, ControlAndStageActor)],
subSourceCancelledCb: AsyncCallback[(TopicPartition, Option[ConsumerRecord[K, V]])],
actorNumber: Int
): SubSourceStageLogic[K, V, CommittableMessage[K, V]] =
Expand Down Expand Up @@ -213,7 +213,7 @@ private class CommittableSubSourceStageLogic[K, V](
shape: SourceShape[CommittableMessage[K, V]],
tp: TopicPartition,
consumerActor: ActorRef,
subSourceStartedCb: AsyncCallback[(TopicPartition, (Control, ActorRef))],
subSourceStartedCb: AsyncCallback[(TopicPartition, ControlAndStageActor)],
subSourceCancelledCb: AsyncCallback[(TopicPartition, Option[ConsumerRecord[K, V]])],
actorNumber: Int,
consumerSettings: ConsumerSettings[K, V],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,13 @@ import scala.util.{Failure, Success, Try}
private[kafka] class DefaultProducerStage[K, V, P, IN <: Envelope[K, V, P], OUT <: Results[K, V, P]](
val settings: ProducerSettings[K, V]
) extends GraphStage[FlowShape[IN, Future[OUT]]]
with ProducerStage[K, V, P, IN, OUT, Unit] {
with ProducerStage[K, V, P, IN, OUT] {

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new DefaultProducerStageLogic(this, inheritedAttributes) {
// TODO: do we need to eagerly resolve a producer here?
//override protected var producer: Producer[K, V] = producerProvider(())
}
new DefaultProducerStageLogic(this, inheritedAttributes)
}

object DefaultProducerStageLogic {
private[kafka] object DefaultProducerStageLogic {
sealed trait ProducerAssignmentLifecycle
case object Unassigned extends ProducerAssignmentLifecycle
case object AsyncCreateRequestSent extends ProducerAssignmentLifecycle
Expand All @@ -51,8 +48,8 @@ object DefaultProducerStageLogic {
*
* Used by [[DefaultProducerStage]], extended by [[TransactionalProducerStageLogic]].
*/
private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <: Results[K, V, P], S](
stage: ProducerStage[K, V, P, IN, OUT, S],
private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <: Results[K, V, P]](
stage: ProducerStage[K, V, P, IN, OUT],
inheritedAttributes: Attributes
) extends TimerGraphStageLogic(stage.shape)
with StageLogging
Expand Down
32 changes: 19 additions & 13 deletions core/src/main/scala/akka/kafka/internal/MessageBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private[kafka] trait TransactionalMessageBuilderBase[K, V, Msg] extends MessageB

def onMessage(consumerMessage: ConsumerRecord[K, V]): Unit

val fromPartitionedSource: Boolean
def fromPartitionedSource: Boolean
}

/** Internal API */
Expand All @@ -55,13 +55,16 @@ private[kafka] trait TransactionalMessageBuilder[K, V]
extends TransactionalMessageBuilderBase[K, V, TransactionalMessage[K, V]] {
override def createMessage(rec: ConsumerRecord[K, V]): TransactionalMessage[K, V] = {
onMessage(rec)
val groupTopicPartition = GroupTopicPartition(
groupId = groupId,
topic = rec.topic,
partition = rec.partition
val offset = PartitionOffsetCommittedMarker(
GroupTopicPartition(
groupId = groupId,
topic = rec.topic,
partition = rec.partition
),
offset = rec.offset,
committedMarker,
fromPartitionedSource
)
val offset =
PartitionOffsetCommittedMarker(groupTopicPartition, offset = rec.offset, committedMarker, fromPartitionedSource)
ConsumerMessage.TransactionalMessage(rec, offset)
}
}
Expand All @@ -72,13 +75,16 @@ private[kafka] trait TransactionalOffsetContextBuilder[K, V]
extends TransactionalMessageBuilderBase[K, V, (ConsumerRecord[K, V], PartitionOffset)] {
override def createMessage(rec: ConsumerRecord[K, V]): (ConsumerRecord[K, V], PartitionOffset) = {
onMessage(rec)
val groupTopicPartition = GroupTopicPartition(
groupId = groupId,
topic = rec.topic,
partition = rec.partition
val offset = PartitionOffsetCommittedMarker(
GroupTopicPartition(
groupId = groupId,
topic = rec.topic,
partition = rec.partition
),
offset = rec.offset,
committedMarker,
fromPartitionedSource
)
val offset =
PartitionOffsetCommittedMarker(groupTopicPartition, offset = rec.offset, committedMarker, fromPartitionedSource)
(rec, offset)
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/akka/kafka/internal/PlainSources.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private[kafka] final class PlainSubSource[K, V](
shape: SourceShape[ConsumerRecord[K, V]],
tp: TopicPartition,
consumerActor: ActorRef,
subSourceStartedCb: AsyncCallback[(TopicPartition, (Control, ActorRef))],
subSourceStartedCb: AsyncCallback[(TopicPartition, ControlAndStageActor)],
subSourceCancelledCb: AsyncCallback[(TopicPartition, Option[ConsumerRecord[K, V]])],
actorNumber: Int
): SubSourceStageLogic[K, V, ConsumerRecord[K, V]] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import scala.concurrent.Future
* Implemented by [[DefaultProducerStage]] and [[TransactionalProducerStage]].
*/
@InternalApi
private[internal] trait ProducerStage[K, V, P, IN <: Envelope[K, V, P], OUT <: Results[K, V, P], S] {
private[internal] trait ProducerStage[K, V, P, IN <: Envelope[K, V, P], OUT <: Results[K, V, P]] {
val settings: ProducerSettings[K, V]

val in: Inlet[IN] = Inlet[IN]("messages")
Expand Down
34 changes: 22 additions & 12 deletions core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ private class SubSourceLogic[K, V, Msg](

/** We have created a source for these partitions, but it has not started up and is not in subSources yet. */
var partitionsInStartup: immutable.Set[TopicPartition] = immutable.Set.empty
var subSources: Map[TopicPartition, (Control, ActorRef)] = immutable.Map.empty
var subSources: Map[TopicPartition, ControlAndStageActor] = immutable.Map.empty

/** Kafka has signalled these partitions are revoked, but some may be re-assigned just after revoking. */
var partitionsToRevoke: Set[TopicPartition] = Set.empty
Expand Down Expand Up @@ -179,7 +179,7 @@ private class SubSourceLogic[K, V, Msg](
onRevoke(partitionsToRevoke)
pendingPartitions --= partitionsToRevoke
partitionsInStartup --= partitionsToRevoke
partitionsToRevoke.flatMap(subSources.get).map(_._1).foreach(_.shutdown())
partitionsToRevoke.flatMap(subSources.get).map(_.control).foreach(_.shutdown())
subSources --= partitionsToRevoke
partitionsToRevoke = Set.empty
}
Expand All @@ -200,9 +200,9 @@ private class SubSourceLogic[K, V, Msg](
}
}

val subsourceStartedCB: AsyncCallback[(TopicPartition, (Control, ActorRef))] =
getAsyncCallback[(TopicPartition, (Control, ActorRef))] {
case (tp, value @ (control, actorRef)) =>
val subsourceStartedCB: AsyncCallback[(TopicPartition, ControlAndStageActor)] =
getAsyncCallback[(TopicPartition, ControlAndStageActor)] {
case (tp, value @ ControlAndStageActor(control, actorRef)) =>
if (!partitionsInStartup.contains(tp)) {
// Partition was revoked while
// starting up. Kill!
Expand Down Expand Up @@ -253,7 +253,7 @@ private class SubSourceLogic[K, V, Msg](
override def performStop(): Unit = {
setKeepGoing(true)
subSources.foreach {
case (_, (control, _)) => control.stop()
case (_, ControlAndStageActor(control, _)) => control.stop()
}
complete(shape.out)
onStop()
Expand All @@ -263,7 +263,7 @@ private class SubSourceLogic[K, V, Msg](
setKeepGoing(true)
//todo we should wait for subsources to be shutdown and next shutdown main stage
subSources.foreach {
case (_, (control, _)) => control.shutdown()
case (_, ControlAndStageActor(control, _)) => control.shutdown()
}

if (!isClosed(shape.out)) {
Expand Down Expand Up @@ -301,7 +301,7 @@ private object SubSourceLogic {
private final class SubSourceStage[K, V, Msg](
tp: TopicPartition,
consumerActor: ActorRef,
subSourceStartedCb: AsyncCallback[(TopicPartition, (Control, ActorRef))],
subSourceStartedCb: AsyncCallback[(TopicPartition, ControlAndStageActor)],
subSourceCancelledCb: AsyncCallback[(TopicPartition, Option[ConsumerRecord[K, V]])],
actorNumber: Int,
subSourceStageLogicFactory: SubSourceStageLogicFactory[K, V, Msg]
Expand All @@ -314,6 +314,15 @@ private final class SubSourceStage[K, V, Msg](
subSourceStageLogicFactory.create(shape, tp, consumerActor, subSourceStartedCb, subSourceCancelledCb, actorNumber)
}



/** Internal API
*
* SubSourceStageLogic [[akka.kafka.scaladsl.Consumer.Control]] and the stage actor [[ActorRef]]
*/
@InternalApi
private final case class ControlAndStageActor(control: Control, stageActor: ActorRef)

/** Internal API
*
* Encapsulates a factory method to create a `SubSourceStageLogic` within `SubSourceLogic` where the context
Expand All @@ -325,12 +334,13 @@ private trait SubSourceStageLogicFactory[K, V, Msg] {
shape: SourceShape[Msg],
tp: TopicPartition,
consumerActor: ActorRef,
subSourceStartedCb: AsyncCallback[(TopicPartition, (Control, ActorRef))],
subSourceStartedCb: AsyncCallback[(TopicPartition, ControlAndStageActor)],
subSourceCancelledCb: AsyncCallback[(TopicPartition, Option[ConsumerRecord[K, V]])],
actorNumber: Int
): SubSourceStageLogic[K, V, Msg]
}


/** Internal API
*
* A `SubSourceStageLogic` is the `GraphStageLogic` of a SubSourceStage.
Expand All @@ -341,7 +351,7 @@ private abstract class SubSourceStageLogic[K, V, Msg](
val shape: SourceShape[Msg],
tp: TopicPartition,
consumerActor: ActorRef,
subSourceStartedCb: AsyncCallback[(TopicPartition, (Control, ActorRef))],
subSourceStartedCb: AsyncCallback[(TopicPartition, ControlAndStageActor)],
subSourceCancelledCb: AsyncCallback[(TopicPartition, Option[ConsumerRecord[K, V]])],
actorNumber: Int
) extends GraphStageLogic(shape)
Expand All @@ -351,7 +361,7 @@ private abstract class SubSourceStageLogic[K, V, Msg](
with StageLogging {
override def executionContext: ExecutionContext = materializer.executionContext
override def consumerFuture: Future[ActorRef] = Future.successful(consumerActor)
val requestMessages = KafkaConsumerActor.Internal.RequestMessages(0, Set(tp))
private val requestMessages = KafkaConsumerActor.Internal.RequestMessages(0, Set(tp))
var requested = false
var subSourceActor: StageActor = _
var buffer: Iterator[ConsumerRecord[K, V]] = Iterator.empty
Expand All @@ -362,7 +372,7 @@ private abstract class SubSourceStageLogic[K, V, Msg](
subSourceActor = getStageActor(messageHandling)
subSourceActor.watch(consumerActor)
consumerActor.tell(RegisterSubStage, subSourceActor.ref)
val controlAndActor = (this.asInstanceOf[Control], subSourceActor.ref)
val controlAndActor = ControlAndStageActor(this.asInstanceOf[Control], subSourceActor.ref)
subSourceStartedCb.invoke(tp -> controlAndActor)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ private[kafka] final class TransactionalProducerStage[K, V, P](
val settings: ProducerSettings[K, V],
transactionalId: String
) extends GraphStage[FlowShape[Envelope[K, V, P], Future[Results[K, V, P]]]]
with ProducerStage[K, V, P, Envelope[K, V, P], Results[K, V, P], String] {
with ProducerStage[K, V, P, Envelope[K, V, P], Results[K, V, P]] {

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TransactionalProducerStageLogic(this, transactionalId, inheritedAttributes)
Expand Down Expand Up @@ -101,7 +101,7 @@ private final class TransactionalProducerStageLogic[K, V, P](
stage: TransactionalProducerStage[K, V, P],
transactionalId: String,
inheritedAttributes: Attributes
) extends DefaultProducerStageLogic[K, V, P, Envelope[K, V, P], Results[K, V, P], String](stage, inheritedAttributes)
) extends DefaultProducerStageLogic[K, V, P, Envelope[K, V, P], Results[K, V, P]](stage, inheritedAttributes)
with StageLogging
with MessageCallback[K, V, P]
with ProducerCompletionState {
Expand Down Expand Up @@ -265,6 +265,6 @@ private final class TransactionalProducerStageLogic[K, V, P](

private def abortTransaction(): Unit = {
log.debug("Aborting transaction")
Option(producer).foreach(_.abortTransaction())
if (producer != null) producer.abortTransaction()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ private[kafka] final class TransactionalSubSource[K, V](
shape: SourceShape[TransactionalMessage[K, V]],
tp: TopicPartition,
consumerActor: ActorRef,
subSourceStartedCb: AsyncCallback[(TopicPartition, (Control, ActorRef))],
subSourceStartedCb: AsyncCallback[(TopicPartition, ControlAndStageActor)],
subSourceCancelledCb: AsyncCallback[(TopicPartition, Option[ConsumerRecord[K, V]])],
actorNumber: Int
): SubSourceStageLogic[K, V, TransactionalMessage[K, V]] =
Expand All @@ -241,7 +241,7 @@ private[kafka] final class TransactionalSubSource[K, V](
override def onRevoke(revokedTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit =
if (revokedTps.isEmpty) ()
else if (waitForDraining(revokedTps)) {
subSources.values.map(_._2).foreach(_ ! Revoked(revokedTps.toList))
subSources.values.map(_.stageActor).foreach(_ ! Revoked(revokedTps.toList))
} else {
sourceActor.ref ! Status.Failure(new Error("Timeout while draining"))
consumerActor ! KafkaConsumerActor.Internal.Stop
Expand All @@ -256,7 +256,7 @@ private[kafka] final class TransactionalSubSource[K, V](
import akka.pattern.ask
implicit val timeout = Timeout(txConsumerSettings.commitTimeout)
try {
val drainCommandFutures = subSources.values.map(_._2).map(ask(_, Drain(partitions, None, Drained)))
val drainCommandFutures = subSources.values.map(_.stageActor).map(ask(_, Drain(partitions, None, Drained)))
implicit val ec = executionContext
Await.result(Future.sequence(drainCommandFutures), timeout.duration)
true
Expand Down Expand Up @@ -342,7 +342,7 @@ private class TransactionalSubSourceStageLogic[K, V](
shape: SourceShape[TransactionalMessage[K, V]],
tp: TopicPartition,
consumerActor: ActorRef,
subSourceStartedCb: AsyncCallback[(TopicPartition, (Control, ActorRef))],
subSourceStartedCb: AsyncCallback[(TopicPartition, ControlAndStageActor)],
subSourceCancelledCb: AsyncCallback[(TopicPartition, Option[ConsumerRecord[K, V]])],
actorNumber: Int,
consumerSettings: ConsumerSettings[K, V]
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/akka/kafka/javadsl/Transactional.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ object Transactional {
* By generating the `transactionalId` from the [[TopicPartition]], multiple instances of your application can run
* without having to manually assign partitions to each instance.
*/
@ApiMayChange
def partitionedSource[K, V](
consumerSettings: ConsumerSettings[K, V],
subscription: AutoSubscription
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ object Transactional {
* By generating the `transactionalId` from the [[TopicPartition]], multiple instances of your application can run
* without having to manually assign partitions to each instance.
*/
@ApiMayChange
def partitionedSource[K, V](
settings: ConsumerSettings[K, V],
subscription: AutoSubscription
Expand Down

0 comments on commit 6f77477

Please sign in to comment.