From e526d6482fb581cbbf0d4e2e90f03ffd8837c72d Mon Sep 17 00:00:00 2001 From: Viktor Somogyi-Vass Date: Tue, 30 Jun 2026 16:46:44 +0200 Subject: [PATCH 1/2] fix(inkless:consolidation): don't fence a consolidating leader below the seal maybeReconcileSwitchedLeader fenced a switched leader offline whenever its LEO fell below the committed seal. It runs during applyLocalLeadersDelta, before the ConsolidationReconciler, so the wiped-leader recovery from #673 never ran. The reconciler only sees online partitions, and a wiped consolidating leader (LEO 0 < seal) was offline before it could rebuild. Every fetch and offset request then failed with KAFKA_STORAGE_ERROR. For a consolidating diskless topic with remote storage enabled, [0, seal) is in the remote tier and can be rebuilt, so leave the partition online and let the reconciler arm consolidation at the current LEO. Non-consolidating topics still fence, since their classic prefix has no remote copy and LEO < seal is unrecoverable corruption. Co-authored-by: Cursor --- .../scala/kafka/server/ReplicaManager.scala | 39 +++- .../server/ReplicaManagerInklessTest.scala | 188 +++++++++++++++++- 2 files changed, 217 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 6ee4dc0dfd..696a50cf43 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -3321,8 +3321,11 @@ class ReplicaManager(val config: KafkaConfig, * - LEO > seal: truncate down to the seal unless the local suffix [seal, LEO) is already * materialized consolidated diskless data. * - LEO == seal and HW < seal: advance HW to the seal so consumers can cross into diskless. - * - LEO < seal: fence offline. The classic prefix is incomplete, so serving it would hide a - * hole in acknowledged data. + * - LEO < seal: fence offline, unless this is a consolidating diskless topic with remote + * storage enabled. In that case the classic prefix [0, seal) lives in the remote tier and + * can be rebuilt, so the partition is left online for the ConsolidationReconciler to + * rebuild from remote (the inline comment below carries the mechanism). Fencing here would + * preempt that recovery: the reconciler only sees online partitions. * * Must run after makeLeader (so the log exists) and before any consolidation fetcher starts. */ @@ -3350,11 +3353,21 @@ class ReplicaManager(val config: KafkaConfig, stateChangeLogger.info(s"Stale high watermark detected: advanced high watermark to seal offset $seal for " + s"switched leader partition $tp") } else if (log.logEndOffset < seal) { - // This is unreachable in normal operation - stateChangeLogger.error(s"Leader partition $tp has LEO ${log.logEndOffset} below " + - s"classic-to-diskless start offset $seal; cannot catch up from another replica. " + - s"Marking the partition offline as its local log is corrupt below the committed seal.") - markPartitionOffline(tp) + if (isConsolidatingPartition(partition) && log.remoteLogEnabled()) { + // The leader's local classic prefix was lost (full local-storage wipe / DR). + // [0, seal) lives in the remote tier, so leave the partition online and let the + // ConsolidationReconciler arm consolidation at the current LEO: the first fetch + // lands below the diskless WAL start, answers OFFSET_MOVED_TO_TIERED_STORAGE, and + // the tier-state machine rebuilds the log from remote. + stateChangeLogger.warn(s"Switched leader partition $tp is below the classic-to-diskless " + + s"seal $seal at LEO ${log.logEndOffset} with remote storage enabled; leaving online " + + s"for consolidation to rebuild the classic prefix from the remote tier.") + } else { + stateChangeLogger.error(s"Leader partition $tp has LEO ${log.logEndOffset} below " + + s"classic-to-diskless start offset $seal; cannot catch up from another replica. " + + s"Marking the partition offline as its local log is corrupt below the committed seal.") + markPartitionOffline(tp) + } } } } catch { @@ -3658,8 +3671,16 @@ class ReplicaManager(val config: KafkaConfig, } if (consolidatingDisklessPartitionsToStartFetching.nonEmpty) { - consolidationReconciler.foreach(_.startConsolidationFetchers(consolidatingDisklessPartitionsToStartFetching)) - stateChangeLogger.info(s"Started consolidating diskless fetchers as part of become-leader for ${consolidatingDisklessPartitionsToStartFetching.size} partitions") + // maybeReconcileSwitchedLeader above may have fenced a below-seal leader whose local log + // cannot be rebuilt from remote. Skip any partition that is no longer online so the + // reconciler does not dereference a fenced partition's local log. + val onlineToStartFetching = consolidatingDisklessPartitionsToStartFetching.filter { + case (tp, _) => onlinePartition(tp).isDefined + } + if (onlineToStartFetching.nonEmpty) { + consolidationReconciler.foreach(_.startConsolidationFetchers(onlineToStartFetching)) + stateChangeLogger.info(s"Started consolidating diskless fetchers as part of become-leader for ${onlineToStartFetching.size} partitions") + } } } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerInklessTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerInklessTest.scala index f992f1d50e..6c67b19315 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerInklessTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerInklessTest.scala @@ -30,6 +30,7 @@ import kafka.server.share.DelayedShareFetch import kafka.utils.TestUtils import kafka.utils.TestUtils.waitUntilTrue import kafka.log.LogManager +import kafka.log.LogTestUtils import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.{DirectoryId, IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.common.network.ListenerName @@ -6381,7 +6382,11 @@ class ReplicaManagerInklessTest { (disklessManagedReplicasEnabled || disklessRemoteStorageConsolidationEnabled).toString ) val config = KafkaConfig.fromProps(props) - val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)), defaultLogConfig.getOrElse(new LogConfig(new Properties()))) + // Source remoteStorageSystemEnable from the built broker config so the LogManager tracks + // whatever the prop wiring above actually set, rather than mirroring the input flag. + val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)), + defaultLogConfig.getOrElse(new LogConfig(new Properties())), + remoteStorageSystemEnable = config.remoteLogManagerConfig.isRemoteStorageSystemEnabled) val sharedState = mock(classOf[SharedState], Answers.RETURNS_DEEP_STUBS) when(sharedState.time()).thenReturn(Time.SYSTEM) val inklessConfigMap = new util.HashMap[String, Object]() @@ -6904,6 +6909,187 @@ class ReplicaManagerInklessTest { } } + @Test + def testApplyDeltaDoesNotFenceConsolidatingLeaderBelowSealWhenRemoteEnabled(): Unit = { + // A consolidating diskless leader whose local classic prefix [0, seal) was lost (full + // local-storage wipe / DR) but lives in the remote tier must stay online and let the + // ConsolidationReconciler arm consolidation at the current LEO to rebuild from remote, + // rather than being fenced offline. + val topicName = "switched-consolidating-topic" + val topicId = Uuid.randomUuid() + val tp = new TopicPartition(topicName, 0) + val brokerId = 1 + val otherReplica = 2 + val sealOffset = 10L + val localLeo = 5L + + val mockFetcherManager = mock(classOf[ReplicaFetcherManager]) + when(mockFetcherManager.removeFetcherForPartitions(any())).thenReturn(Map.empty[TopicPartition, PartitionFetchState]) + + val ctorInit: MockedConstruction.MockInitializer[ConsolidationFetcherManager] = { + case (mockCfm, _) => + when(mockCfm.removeFetcherForPartitions(any())).thenReturn(Map.empty[TopicPartition, PartitionFetchState]) + } + val consolidationCtor = mockConstruction(classOf[ConsolidationFetcherManager], ctorInit) + try { + // disklessRemoteStorageConsolidationEnabled + consolidatingDisklessTopics mark the topic as + // consolidating, so applyLocalLeadersDelta routes both deltas through the new-partition + // consolidating branch (makeLeader + arm consolidation), not the classic switch branch. + // defaultLogConfig with remoteLogStorageEnable=true makes log.remoteLogEnabled() true so the + // recovery branch (not the fence) fires. + val replicaManager = spy(createReplicaManager( + Seq(topicName), + disklessRemoteStorageConsolidationEnabled = true, + consolidatingDisklessTopics = Set(topicName), + mockReplicaFetcherManager = Some(mockFetcherManager), + defaultLogConfig = Some(LogTestUtils.createLogConfig(remoteLogStorageEnable = true)), + )) + try { + val pendingImage = applyPendingSwitchDelta(replicaManager, topicName, topicId, tp, brokerId, otherReplica, localLeo) + assertTrue(replicaManager.localLogOrException(tp).remoteLogEnabled(), + "Test log must have remote storage enabled for the recovery branch to fire") + commitSealDelta(replicaManager, pendingImage, topicId, tp, brokerId, sealOffset) + + // The leader is below the committed seal, but the classic prefix is recoverable from the + // remote tier, so the partition stays online for the ConsolidationReconciler to rebuild. + val partition = replicaManager.getPartition(tp) + assertTrue(partition.isInstanceOf[HostedPartition.Online], + s"Consolidating leader below the seal with remote enabled must stay online for remote " + + s"rebuild, but was $partition") + + // Consolidation was armed at the current LEO so the first fetch lands below the diskless + // WAL start and triggers the rebuild via OFFSET_MOVED_TO_TIERED_STORAGE. (The PENDING delta + // also calls addFetcherForPartitions with an empty map -- the reconciler Retries while the + // seal is still pending -- so match the specific armed call rather than counting invocations.) + val mockCfm = consolidationCtor.constructed().get(0) + verify(mockCfm, atLeastOnce()).addFetcherForPartitions( + Map(tp -> InitialFetchState( + topicId = Some(topicId), + leader = new BrokerEndPoint(-1, "diskless", -1), + currentLeaderEpoch = 0, + initOffset = localLeo + )) + ) + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } finally { + consolidationCtor.close() + } + } + + @Test + def testApplyDeltaFencesConsolidatingLeaderBelowSealWhenRemoteDisabled(): Unit = { + // Third cell of the 2x2: a consolidating diskless topic whose per-log remoteLogEnabled() is + // false. There is no remote copy to rebuild the classic prefix from, so a leader below the + // seal must still be fenced offline. Guards the log.remoteLogEnabled() conjunct in + // maybeReconcileSwitchedLeader: dropping it would turn this into an always-skip for + // consolidating topics and no existing test would catch it. + val topicName = "switched-consolidating-remote-disabled-topic" + val topicId = Uuid.randomUuid() + val tp = new TopicPartition(topicName, 0) + val brokerId = 1 + val otherReplica = 2 + val sealOffset = 10L + val localLeo = 5L + + val mockFetcherManager = mock(classOf[ReplicaFetcherManager]) + when(mockFetcherManager.removeFetcherForPartitions(any())).thenReturn(Map.empty[TopicPartition, PartitionFetchState]) + + val ctorInit: MockedConstruction.MockInitializer[ConsolidationFetcherManager] = { + case (mockCfm, _) => + when(mockCfm.removeFetcherForPartitions(any())).thenReturn(Map.empty[TopicPartition, PartitionFetchState]) + } + val consolidationCtor = mockConstruction(classOf[ConsolidationFetcherManager], ctorInit) + try { + // Consolidating topic, but the per-log remote storage flag is false, so log.remoteLogEnabled() + // is false even though the broker-level remote-storage-system flag is on. + val replicaManager = spy(createReplicaManager( + Seq(topicName), + disklessRemoteStorageConsolidationEnabled = true, + consolidatingDisklessTopics = Set(topicName), + mockReplicaFetcherManager = Some(mockFetcherManager), + defaultLogConfig = Some(LogTestUtils.createLogConfig(remoteLogStorageEnable = false)), + )) + try { + val pendingImage = applyPendingSwitchDelta(replicaManager, topicName, topicId, tp, brokerId, otherReplica, localLeo) + assertFalse(replicaManager.localLogOrException(tp).remoteLogEnabled(), + "Test log must have remote storage disabled for the fence branch to fire") + commitSealDelta(replicaManager, pendingImage, topicId, tp, brokerId, sealOffset) + + // No remote copy to rebuild from, so the leader below the seal is fenced offline. + val partition = replicaManager.getPartition(tp) + assertTrue(partition.isInstanceOf[HostedPartition.Offline], + s"Consolidating leader below the seal with remote disabled must be fenced offline, " + + s"but was $partition") + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } finally { + consolidationCtor.close() + } + } + + /** + * Populates the local log to localLeo, then applies a PENDING classic-to-consolidated switch + * delta so the partition comes online as the leader with the seal still uncommitted. Returns the + * resulting metadata image for the caller to base the seal-commit delta on (see commitSealDelta). + */ + private def applyPendingSwitchDelta( + replicaManager: ReplicaManager, + topicName: String, topicId: Uuid, tp: TopicPartition, + brokerId: Int, otherReplica: Int, localLeo: Long): MetadataImage = { + val log = replicaManager.logManager.getOrCreateLog(tp, isNew = true, topicId = Optional.of(topicId)) + populateLocalLogAtLeoAndCheckpointedHwm(replicaManager, tp, log, leo = localLeo, hw = localLeo) + + when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(tp)) + .thenReturn(PartitionRegistration.CLASSIC_TO_DISKLESS_SWITCH_PENDING) + + val pendingDelta = new TopicsDelta(TopicsImage.EMPTY) + pendingDelta.replay(new TopicRecord().setName(topicName).setTopicId(topicId)) + val pendingRecord = new PartitionRecord() + .setPartitionId(tp.partition) + .setTopicId(topicId) + .setReplicas(util.Arrays.asList(brokerId, otherReplica)) + .setIsr(util.Arrays.asList(brokerId, otherReplica)) + .setLeader(brokerId) + .setLeaderEpoch(0) + .setPartitionEpoch(0) + pendingRecord.unknownTaggedFields().add( + InitDisklessLogFields.encodeClassicToDisklessStartOffset( + PartitionRegistration.CLASSIC_TO_DISKLESS_SWITCH_PENDING)) + pendingDelta.replay(pendingRecord) + + val pendingImage = imageFromTopics(pendingDelta.apply()) + replicaManager.applyDelta(pendingDelta, pendingImage) + + val leaderBefore = replicaManager.getPartition(tp).asInstanceOf[HostedPartition.Online].partition + assertTrue(leaderBefore.isLeader, "Partition should be the leader before the seal commit") + assertEquals(localLeo, replicaManager.localLogOrException(tp).logEndOffset) + pendingImage + } + + /** + * Commits the classic-to-consolidated seal at sealOffset via a partition-change delta, modeling + * the controller fixing classicToDisklessStartOffset above the leader's local LEO. + */ + private def commitSealDelta( + replicaManager: ReplicaManager, pendingImage: MetadataImage, + topicId: Uuid, tp: TopicPartition, brokerId: Int, sealOffset: Long): Unit = { + when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(tp)).thenReturn(sealOffset) + val sealDelta = new TopicsDelta(pendingImage.topics()) + val sealRecord = new PartitionChangeRecord() + .setTopicId(topicId) + .setPartitionId(tp.partition) + sealRecord.unknownTaggedFields().add( + InitDisklessLogFields.encodeClassicToDisklessStartOffset(sealOffset)) + sealDelta.replay(sealRecord) + + val localChanges = sealDelta.localChanges(brokerId) + assertTrue(localChanges.leaders.containsKey(tp)) + replicaManager.applyDelta(sealDelta, imageFromTopics(sealDelta.apply())) + } + @Test def testSwitchedConsolidatingFollowerBelowSealStaysOnClassicFetcher(): Unit = { // A switched consolidating follower whose local log is still below the committed seal must From 394be5c3cbb061fb2432ad11500a3af8f9da9fc5 Mon Sep 17 00:00:00 2001 From: Viktor Somogyi-Vass Date: Wed, 1 Jul 2026 15:33:10 +0200 Subject: [PATCH 2/2] fixup(inkless:consolidation): don't fence a consolidating leader below the seal --- core/src/main/scala/kafka/server/ReplicaManager.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 696a50cf43..3635bf0e34 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -3363,9 +3363,10 @@ class ReplicaManager(val config: KafkaConfig, s"seal $seal at LEO ${log.logEndOffset} with remote storage enabled; leaving online " + s"for consolidation to rebuild the classic prefix from the remote tier.") } else { - stateChangeLogger.error(s"Leader partition $tp has LEO ${log.logEndOffset} below " + - s"classic-to-diskless start offset $seal; cannot catch up from another replica. " + - s"Marking the partition offline as its local log is corrupt below the committed seal.") + stateChangeLogger.error(s"Leader partition $tp has LEO ${log.logEndOffset} below the " + + s"classic-to-diskless seal $seal and the classic prefix [0, $seal) is locally incomplete " + + s"and not recoverable from remote (no consolidation / remote storage on this broker); " + + s"marking the partition offline. Cannot catch up from another replica.") markPartitionOffline(tp) } }