Skip to content

Commit

Permalink
Sprinkle final, private, protected
Browse files Browse the repository at this point in the history
  • Loading branch information
ennru committed Nov 29, 2019
1 parent 75d99a2 commit 5dd9017
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ private[kafka] class KafkaAsyncConsumerCommitterRef(private val consumerActor: A
}

@InternalApi
private class CommittableSubSourceStageLogic[K, V](
private final class CommittableSubSourceStageLogic[K, V](
shape: SourceShape[CommittableMessage[K, V]],
tp: TopicPartition,
consumerActor: ActorRef,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <:
failStage(ex)
}

def postSend(msg: Envelope[K, V, P]): Unit = ()
protected def postSend(msg: Envelope[K, V, P]): Unit = ()

override protected def producerAssigned(): Unit = resumeDemand()

Expand All @@ -123,7 +123,7 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <:
}

// factored out of suspendDemand because logging is not permitted when called from the stage logic constructor
def suspendDemandOutHandler(): Unit = {
private def suspendDemandOutHandler(): Unit = {
setHandler(
stage.out,
new OutHandler {
Expand All @@ -139,7 +139,7 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <:
suspendDemandOutHandler()
initialInHandler()

def produce(in: Envelope[K, V, P]): Unit =
protected def produce(in: Envelope[K, V, P]): Unit =
in match {
case msg: Message[K, V, P] =>
val r = Promise[Result[K, V, P]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ private[kafka] trait DeferredProducer[K, V] {
}
}

protected def changeProducerAssignmentLifecycle(state: ProducerAssignmentLifecycle): Unit = {
private def changeProducerAssignmentLifecycle(state: ProducerAssignmentLifecycle): Unit = {
val oldState = producerAssignmentLifecycle
producerAssignmentLifecycle = state
log.debug("Asynchronous producer assignment lifecycle changed '{} -> {}'", oldState, state)
Expand Down
28 changes: 14 additions & 14 deletions core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,22 +57,22 @@ private class SubSourceLogic[K, V, Msg](
with StageLogging {
import SubSourceLogic._

val consumerPromise = Promise[ActorRef]
private val consumerPromise = Promise[ActorRef]
final val actorNumber = KafkaConsumerActor.Internal.nextNumber()
override def executionContext: ExecutionContext = materializer.executionContext
override def consumerFuture: Future[ActorRef] = consumerPromise.future
var consumerActor: ActorRef = _
var sourceActor: StageActor = _
protected var consumerActor: ActorRef = _
protected var sourceActor: StageActor = _

/** Kafka has notified us that we have these partitions assigned, but we have not created a source for them yet. */
var pendingPartitions: immutable.Set[TopicPartition] = immutable.Set.empty
private var pendingPartitions: immutable.Set[TopicPartition] = immutable.Set.empty

/** We have created a source for these partitions, but it has not started up and is not in subSources yet. */
var partitionsInStartup: immutable.Set[TopicPartition] = immutable.Set.empty
var subSources: Map[TopicPartition, ControlAndStageActor] = immutable.Map.empty
private var partitionsInStartup: immutable.Set[TopicPartition] = immutable.Set.empty
protected var subSources: Map[TopicPartition, ControlAndStageActor] = immutable.Map.empty

/** Kafka has signalled these partitions are revoked, but some may be re-assigned just after revoking. */
var partitionsToRevoke: Set[TopicPartition] = Set.empty
private var partitionsToRevoke: Set[TopicPartition] = Set.empty

override def preStart(): Unit = {
super.preStart()
Expand Down Expand Up @@ -121,7 +121,7 @@ private class SubSourceLogic[K, V, Msg](
failStage(ex)
}

val partitionAssignedCB = getAsyncCallback[Set[TopicPartition]] { assigned =>
private val partitionAssignedCB = getAsyncCallback[Set[TopicPartition]] { assigned =>
val formerlyUnknown = assigned -- partitionsToRevoke

if (log.isDebugEnabled && formerlyUnknown.nonEmpty) {
Expand Down Expand Up @@ -170,7 +170,7 @@ private class SubSourceLogic[K, V, Msg](
}
}

val partitionRevokedCB = getAsyncCallback[Set[TopicPartition]] { revoked =>
private val partitionRevokedCB = getAsyncCallback[Set[TopicPartition]] { revoked =>
partitionsToRevoke ++= revoked
scheduleOnce(CloseRevokedPartitions, settings.waitClosePartition)
}
Expand All @@ -188,7 +188,7 @@ private class SubSourceLogic[K, V, Msg](
partitionsToRevoke = Set.empty
}

val subsourceCancelledCB: AsyncCallback[(TopicPartition, SubSourceCancellationStrategy)] =
private val subsourceCancelledCB: AsyncCallback[(TopicPartition, SubSourceCancellationStrategy)] =
getAsyncCallback[(TopicPartition, SubSourceCancellationStrategy)] {
case (tp, cancellationStrategy: SubSourceCancellationStrategy) =>
subSources -= tp
Expand All @@ -210,7 +210,7 @@ private class SubSourceLogic[K, V, Msg](
}
}

val subsourceStartedCB: AsyncCallback[(TopicPartition, ControlAndStageActor)] =
private val subsourceStartedCB: AsyncCallback[(TopicPartition, ControlAndStageActor)] =
getAsyncCallback[(TopicPartition, ControlAndStageActor)] {
case (tp, value @ ControlAndStageActor(control, _)) =>
if (!partitionsInStartup.contains(tp)) {
Expand Down Expand Up @@ -380,9 +380,9 @@ private abstract class SubSourceStageLogic[K, V, Msg](
override def executionContext: ExecutionContext = materializer.executionContext
override def consumerFuture: Future[ActorRef] = Future.successful(consumerActor)
private val requestMessages = KafkaConsumerActor.Internal.RequestMessages(0, Set(tp))
var requested = false
var subSourceActor: StageActor = _
var buffer: Iterator[ConsumerRecord[K, V]] = Iterator.empty
private var requested = false
protected var subSourceActor: StageActor = _
private var buffer: Iterator[ConsumerRecord[K, V]] = Iterator.empty

override def preStart(): Unit = {
log.debug("#{} Starting SubSource for partition {}", actorNumber, tp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,23 +180,21 @@ private final class TransactionalProducerStageLogic[K, V, P](
* When using partitioned sources we extract the transactional id, group id, and topic partition information from
* the first message in order to define a `transacitonal.id` before constructing the [[org.apache.kafka.clients.producer.KafkaProducer]]
*/
def parseFirstMessage(msg: Envelope[K, V, P]): Boolean =
private def parseFirstMessage(msg: Envelope[K, V, P]): Boolean =
producerAssignmentLifecycle match {
case Assigned => true
case Unassigned if firstMessage.nonEmpty =>
// this should never happen because demand should be suspended until the producer is assigned
throw new IllegalStateException("Cannot reapply first message")
case Unassigned =>
if (firstMessage.nonEmpty) {
// this should never happen because demand should be suspended until the producer is assigned
throw new IllegalStateException("Cannot reapply first message")
} else {
// stash the first message so it can be sent after the producer is assigned
firstMessage = Some(msg)
// initiate async async producer request _after_ first message is stashed in case future eagerly resolves
// instead of asynccallback
resolveProducer(generatedTransactionalConfig(msg))
// suspend demand after we receive the first message until the producer is assigned
suspendDemand()
false
}
// stash the first message so it can be sent after the producer is assigned
firstMessage = Some(msg)
// initiate async async producer request _after_ first message is stashed in case future eagerly resolves
// instead of asynccallback
resolveProducer(generatedTransactionalConfig(msg))
// suspend demand after we receive the first message until the producer is assigned
suspendDemand()
false
case AsyncCreateRequestSent =>
throw new IllegalStateException(
s"Should never receive new messages while in producer assignment state '$AsyncCreateRequestSent'"
Expand All @@ -220,7 +218,7 @@ private final class TransactionalProducerStageLogic[K, V, P](
)
}

override def postSend(msg: Envelope[K, V, P]): Unit = msg.passThrough match {
override protected def postSend(msg: Envelope[K, V, P]): Unit = msg.passThrough match {
case o: ConsumerMessage.PartitionOffsetCommittedMarker => batchOffsets = batchOffsets.updated(o)
}

Expand Down Expand Up @@ -262,7 +260,7 @@ private final class TransactionalProducerStageLogic[K, V, P](
}
}

val onInternalCommitAckCb: AsyncCallback[Unit] = {
private val onInternalCommitAckCb: AsyncCallback[Unit] = {
getAsyncCallback[Unit](
_ => scheduleOnce(commitSchedulerKey, producerSettings.eosCommitInterval)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,10 +275,10 @@ private object TransactionalSourceLogic {
type Offset = Long

case object Drained
case class Drain[T](partitions: Set[TopicPartition],
drainedConfirmationRef: Option[ActorRef],
drainedConfirmationMsg: T)
case class Committed(offsets: Map[TopicPartition, OffsetAndMetadata])
final case class Drain[T](partitions: Set[TopicPartition],
drainedConfirmationRef: Option[ActorRef],
drainedConfirmationMsg: T)
final case class Committed(offsets: Map[TopicPartition, OffsetAndMetadata])
case object CommittingFailure

private[internal] final case class CommittedMarkerRef(sourceActor: ActorRef, commitTimeout: FiniteDuration)(
Expand Down Expand Up @@ -338,7 +338,7 @@ private object TransactionalSourceLogic {
}

@InternalApi
private class TransactionalSubSourceStageLogic[K, V](
private final class TransactionalSubSourceStageLogic[K, V](
shape: SourceShape[TransactionalMessage[K, V]],
tp: TopicPartition,
consumerActor: ActorRef,
Expand All @@ -356,7 +356,7 @@ private class TransactionalSubSourceStageLogic[K, V](

import TransactionalSourceLogic._

val inFlightRecords = InFlightRecords.empty
private val inFlightRecords = InFlightRecords.empty

override def groupId: String = consumerSettings.properties(ConsumerConfig.GROUP_ID_CONFIG)

Expand All @@ -373,7 +373,7 @@ private class TransactionalSubSourceStageLogic[K, V](

override protected def onDownstreamFinishSubSourceCancellationStrategy(): SubSourceCancellationStrategy = DoNothing

def shuttingDownReceive: PartialFunction[(ActorRef, Any), Unit] =
private def shuttingDownReceive: PartialFunction[(ActorRef, Any), Unit] =
drainHandling
.orElse {
case (_, Status.Failure(e)) =>
Expand All @@ -392,10 +392,10 @@ private class TransactionalSubSourceStageLogic[K, V](
drainAndComplete()
}

def drainAndComplete(): Unit =
private def drainAndComplete(): Unit =
subSourceActor.ref.tell(Drain(inFlightRecords.assigned(), None, DrainingComplete), subSourceActor.ref)

def drainHandling: PartialFunction[(ActorRef, Any), Unit] = {
private def drainHandling: PartialFunction[(ActorRef, Any), Unit] = {
case (sender, Committed(offsets)) =>
inFlightRecords.committed(offsets.view.mapValues(_.offset() - 1).toMap)
sender ! Done
Expand Down

0 comments on commit 5dd9017

Please sign in to comment.