Skip to content

Commit

Permalink
KAFKA-18545: Remove Zookeeper logic from LogManager (#18592)
Browse files Browse the repository at this point in the history
Reviewers: Chia-Ping Tsai <[email protected]>, Ismael Juma <[email protected]>, Mickael Maison <[email protected]>
  • Loading branch information
m1a2st authored Feb 3, 2025
1 parent 4ca24a7 commit 272d947
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 55 deletions.
11 changes: 3 additions & 8 deletions core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,8 @@ class LogManager(logDirs: Seq[File],
addStrayLog(topicPartition, log)
warn(s"Loaded stray log: $logDir")
} else if (isStray(log)) {
// Unlike Zookeeper mode, which tracks pending topic deletions under a ZNode, KRaft is unable to prevent a topic from being recreated before every replica has been deleted.
// A KRaft broker with an offline directory may be unable to detect it still holds a to-be-deleted replica,
// We are unable to prevent a topic from being recreated before every replica has been deleted.
// Broker with an offline directory may be unable to detect it still holds a to-be-deleted replica,
// and can create a conflicting topic partition for a new incarnation of the topic in one of the remaining online directories.
// So upon a restart in which the offline directory is back online we need to clean up the old replica directory.
log.renameDir(UnifiedLog.logStrayDirName(log.topicPartition), shouldReinitialize = false)
Expand Down Expand Up @@ -950,7 +950,6 @@ class LogManager(logDirs: Seq[File],
wasRemoteLogEnabled: Boolean): Unit = {
topicConfigUpdated(topic)
val logs = logsByTopic(topic)
// Combine the default properties with the overrides in zk to create the new LogConfig
val newLogConfig = LogConfig.fromProps(currentDefaultConfig.originals, newTopicConfig)
val isRemoteLogStorageEnabled = newLogConfig.remoteStorageEnable()
// We would like to validate the configuration no matter whether the logs have materialised on disk or not.
Expand Down Expand Up @@ -1079,11 +1078,7 @@ class LogManager(logDirs: Seq[File],

log
}
// When running a ZK controller, we may get a log that does not have a topic ID. Assign it here.
if (log.topicId.isEmpty) {
topicId.foreach(log.assignTopicId)
}


// Ensure topic IDs are consistent
topicId.foreach { topicId =>
log.topicId.foreach { logTopicId =>
Expand Down
47 changes: 0 additions & 47 deletions core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4188,53 +4188,6 @@ class ReplicaManagerTest {
}
}

@Test
def testPartitionMetadataFileCreatedWithExistingLog(): Unit = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time))
try {
val brokerList = Seq[Integer](0, 1).asJava
val topicPartition = new TopicPartition(topic, 0)

replicaManager.logManager.getOrCreateLog(topicPartition, isNew = true, topicId = None)

assertTrue(replicaManager.getLog(topicPartition).isDefined)
var log = replicaManager.getLog(topicPartition).get
assertEquals(None, log.topicId)
assertFalse(log.partitionMetadataFile.get.exists())

val topicIds = Collections.singletonMap(topic, Uuid.randomUuid())
val topicNames = topicIds.asScala.map(_.swap).asJava

def leaderAndIsrRequest(epoch: Int): LeaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch,
Seq(new LeaderAndIsrRequest.PartitionState()
.setTopicName(topic)
.setPartitionIndex(0)
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(epoch)
.setIsr(brokerList)
.setPartitionEpoch(0)
.setReplicas(brokerList)
.setIsNew(true)).asJava,
topicIds,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()

val response = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0), (_, _) => ())
assertEquals(Errors.NONE, response.partitionErrors(topicNames).get(topicPartition))
assertFalse(replicaManager.localLog(topicPartition).isEmpty)
val id = topicIds.get(topicPartition.topic())
log = replicaManager.localLog(topicPartition).get
assertTrue(log.partitionMetadataFile.get.exists())
val partitionMetadata = log.partitionMetadataFile.get.read()

// Current version of PartitionMetadataFile is 0.
assertEquals(0, partitionMetadata.version)
assertEquals(id, partitionMetadata.topicId)
} finally {
replicaManager.shutdown(checkpointHW = false)
}
}

@Test
def testInconsistentIdReturnsError(): Unit = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time))
Expand Down

0 comments on commit 272d947

Please sign in to comment.