From 272d947f96b531fc6a637cd2e1448389b71bb653 Mon Sep 17 00:00:00 2001 From: Ken Huang Date: Tue, 4 Feb 2025 01:16:35 +0800 Subject: [PATCH] KAFKA-18545: Remove Zookeeper logic from LogManager (#18592) Reviewers: Chia-Ping Tsai , Ismael Juma , Mickael Maison --- .../src/main/scala/kafka/log/LogManager.scala | 11 ++--- .../kafka/server/ReplicaManagerTest.scala | 47 ------------------- 2 files changed, 3 insertions(+), 55 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index d9e75d3cea5ca..572cd5d7b8b9a 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -353,8 +353,8 @@ class LogManager(logDirs: Seq[File], addStrayLog(topicPartition, log) warn(s"Loaded stray log: $logDir") } else if (isStray(log)) { - // Unlike Zookeeper mode, which tracks pending topic deletions under a ZNode, KRaft is unable to prevent a topic from being recreated before every replica has been deleted. - // A KRaft broker with an offline directory may be unable to detect it still holds a to-be-deleted replica, + // We are unable to prevent a topic from being recreated before every replica has been deleted. + // Broker with an offline directory may be unable to detect it still holds a to-be-deleted replica, // and can create a conflicting topic partition for a new incarnation of the topic in one of the remaining online directories. // So upon a restart in which the offline directory is back online we need to clean up the old replica directory. log.renameDir(UnifiedLog.logStrayDirName(log.topicPartition), shouldReinitialize = false) @@ -950,7 +950,6 @@ class LogManager(logDirs: Seq[File], wasRemoteLogEnabled: Boolean): Unit = { topicConfigUpdated(topic) val logs = logsByTopic(topic) - // Combine the default properties with the overrides in zk to create the new LogConfig val newLogConfig = LogConfig.fromProps(currentDefaultConfig.originals, newTopicConfig) val isRemoteLogStorageEnabled = newLogConfig.remoteStorageEnable() // We would like to validate the configuration no matter whether the logs have materialised on disk or not. @@ -1079,11 +1078,7 @@ class LogManager(logDirs: Seq[File], log } - // When running a ZK controller, we may get a log that does not have a topic ID. Assign it here. - if (log.topicId.isEmpty) { - topicId.foreach(log.assignTopicId) - } - + // Ensure topic IDs are consistent topicId.foreach { topicId => log.topicId.foreach { logTopicId => diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index ba6b6f80a27e1..34a2c7d5c0060 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -4188,53 +4188,6 @@ class ReplicaManagerTest { } } - @Test - def testPartitionMetadataFileCreatedWithExistingLog(): Unit = { - val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time)) - try { - val brokerList = Seq[Integer](0, 1).asJava - val topicPartition = new TopicPartition(topic, 0) - - replicaManager.logManager.getOrCreateLog(topicPartition, isNew = true, topicId = None) - - assertTrue(replicaManager.getLog(topicPartition).isDefined) - var log = replicaManager.getLog(topicPartition).get - assertEquals(None, log.topicId) - assertFalse(log.partitionMetadataFile.get.exists()) - - val topicIds = Collections.singletonMap(topic, Uuid.randomUuid()) - val topicNames = topicIds.asScala.map(_.swap).asJava - - def leaderAndIsrRequest(epoch: Int): LeaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch, - Seq(new LeaderAndIsrRequest.PartitionState() - .setTopicName(topic) - .setPartitionIndex(0) - .setControllerEpoch(0) - .setLeader(0) - .setLeaderEpoch(epoch) - .setIsr(brokerList) - .setPartitionEpoch(0) - .setReplicas(brokerList) - .setIsNew(true)).asJava, - topicIds, - Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() - - val response = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0), (_, _) => ()) - assertEquals(Errors.NONE, response.partitionErrors(topicNames).get(topicPartition)) - assertFalse(replicaManager.localLog(topicPartition).isEmpty) - val id = topicIds.get(topicPartition.topic()) - log = replicaManager.localLog(topicPartition).get - assertTrue(log.partitionMetadataFile.get.exists()) - val partitionMetadata = log.partitionMetadataFile.get.read() - - // Current version of PartitionMetadataFile is 0. - assertEquals(0, partitionMetadata.version) - assertEquals(id, partitionMetadata.topicId) - } finally { - replicaManager.shutdown(checkpointHW = false) - } - } - @Test def testInconsistentIdReturnsError(): Unit = { val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time))