Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3321,8 +3321,12 @@ class ReplicaManager(val config: KafkaConfig,
* - LEO > seal: truncate down to the seal unless the local suffix [seal, LEO) is already
* materialized consolidated diskless data.
* - LEO == seal and HW < seal: advance HW to the seal so consumers can cross into diskless.
* - LEO < seal: fence offline. The classic prefix is incomplete, so serving it would hide a
* hole in acknowledged data.
* - LEO < seal: fence offline, unless this is a consolidating diskless topic with remote
* storage enabled. In that case the classic prefix [0, seal) lives in the remote tier and
* can be rebuilt, so the partition is left online for the ConsolidationReconciler to arm
* consolidation at the current LEO (the first fetch answers
* OFFSET_MOVED_TO_TIERED_STORAGE and the tier-state machine rebuilds from remote).
* Fencing here would preempt that recovery: the reconciler only sees online partitions.
*
* Must run after makeLeader (so the log exists) and before any consolidation fetcher starts.
*/
Expand Down Expand Up @@ -3350,11 +3354,22 @@ class ReplicaManager(val config: KafkaConfig,
stateChangeLogger.info(s"Stale high watermark detected: advanced high watermark to seal offset $seal for " +
s"switched leader partition $tp")
} else if (log.logEndOffset < seal) {
// This is unreachable in normal operation
stateChangeLogger.error(s"Leader partition $tp has LEO ${log.logEndOffset} below " +
s"classic-to-diskless start offset $seal; cannot catch up from another replica. " +
s"Marking the partition offline as its local log is corrupt below the committed seal.")
markPartitionOffline(tp)
if (isConsolidatingPartition(partition) && log.remoteLogEnabled()) {
// The leader's local classic prefix was lost (full local-storage wipe / DR).
// [0, seal) lives in the remote tier, so leave the partition online and let the
// ConsolidationReconciler arm consolidation at the current LEO: the first fetch
// lands below the diskless WAL start, answers OFFSET_MOVED_TO_TIERED_STORAGE, and
// the tier-state machine rebuilds the log from remote.
stateChangeLogger.warn(s"Switched leader partition $tp is below the " +
s"classic-to-diskless seal $seal at LEO ${log.logEndOffset} with remote storage " +
s"enabled; leaving online for consolidation to rebuild the classic prefix from " +
s"the remote tier.")
} else {
stateChangeLogger.error(s"Leader partition $tp has LEO ${log.logEndOffset} below " +
s"classic-to-diskless start offset $seal; cannot catch up from another replica. " +
s"Marking the partition offline as its local log is corrupt below the committed seal.")
markPartitionOffline(tp)
}
}
}
} catch {
Expand Down
117 changes: 116 additions & 1 deletion core/src/test/scala/unit/kafka/server/ReplicaManagerInklessTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import kafka.server.share.DelayedShareFetch
import kafka.utils.TestUtils
import kafka.utils.TestUtils.waitUntilTrue
import kafka.log.LogManager
import kafka.log.LogTestUtils
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.{DirectoryId, IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.network.ListenerName
Expand Down Expand Up @@ -6381,7 +6382,11 @@ class ReplicaManagerInklessTest {
(disklessManagedReplicasEnabled || disklessRemoteStorageConsolidationEnabled).toString
)
val config = KafkaConfig.fromProps(props)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)), defaultLogConfig.getOrElse(new LogConfig(new Properties())))
// remoteStorageSystemEnable must mirror the broker config (set above when consolidation is
// enabled) so UnifiedLog.remoteLogEnabled() can be true for consolidating-diskless logs.
val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)),
defaultLogConfig.getOrElse(new LogConfig(new Properties())),
remoteStorageSystemEnable = disklessRemoteStorageConsolidationEnabled)
val sharedState = mock(classOf[SharedState], Answers.RETURNS_DEEP_STUBS)
when(sharedState.time()).thenReturn(Time.SYSTEM)
val inklessConfigMap = new util.HashMap[String, Object]()
Expand Down Expand Up @@ -6904,6 +6909,116 @@ class ReplicaManagerInklessTest {
}
}

@Test
def testApplyDeltaDoesNotFenceConsolidatingLeaderBelowSealWhenRemoteEnabled(): Unit = {
// Same scenario as testApplyDeltaFencesLeaderWithLocalLogBelowCommittedSeal, but for a
// consolidating diskless topic with remote storage enabled. The leader's local classic prefix
// [0, seal) was lost (full local-storage wipe / DR) but lives in the remote tier, so the
// partition must stay online and let the ConsolidationReconciler arm consolidation at the
// current LEO to rebuild from remote, rather than being fenced offline.
val topicName = "switched-consolidating-topic"
val topicId = Uuid.randomUuid()
val tp = new TopicPartition(topicName, 0)
val brokerId = 1
val otherReplica = 2
val sealOffset = 10L
val localLeo = 5L

val mockFetcherManager = mock(classOf[ReplicaFetcherManager])
when(mockFetcherManager.removeFetcherForPartitions(any())).thenReturn(Map.empty[TopicPartition, PartitionFetchState])

val ctorInit: MockedConstruction.MockInitializer[ConsolidationFetcherManager] = {
case (mockCfm, _) =>
when(mockCfm.removeFetcherForPartitions(any())).thenReturn(Map.empty[TopicPartition, PartitionFetchState])
}
val consolidationCtor = mockConstruction(classOf[ConsolidationFetcherManager], ctorInit)
try {
// disklessManagedReplicasEnabled + InitDisklessLogManager let the PENDING switch branch run;
// disklessRemoteStorageConsolidationEnabled + consolidatingDisklessTopics mark the topic as
// consolidating; defaultLogConfig with remoteLogStorageEnable=true makes log.remoteLogEnabled()
// true so the recovery branch (not the fence) fires.
val replicaManager = spy(createReplicaManager(
Seq(topicName),
disklessManagedReplicasEnabled = true,
disklessRemoteStorageConsolidationEnabled = true,
consolidatingDisklessTopics = Set(topicName),
mockReplicaFetcherManager = Some(mockFetcherManager),
initDisklessLogManager = Some(mock(classOf[InitDisklessLogManager])),
defaultLogConfig = Some(LogTestUtils.createLogConfig(remoteLogStorageEnable = true)),
))
try {
val log = replicaManager.logManager.getOrCreateLog(tp, isNew = true, topicId = Optional.of(topicId))
populateLocalLogAtLeoAndCheckpointedHwm(replicaManager, tp, log, leo = localLeo, hw = localLeo)
assertTrue(log.remoteLogEnabled(), "Test log must have remote storage enabled for the recovery branch to fire")

when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(tp))
.thenReturn(PartitionRegistration.CLASSIC_TO_DISKLESS_SWITCH_PENDING)

// Bring the partition online as a sealed leader with the switch still PENDING.
val pendingDelta = new TopicsDelta(TopicsImage.EMPTY)
pendingDelta.replay(new TopicRecord().setName(topicName).setTopicId(topicId))
val pendingRecord = new PartitionRecord()
.setPartitionId(0)
.setTopicId(topicId)
.setReplicas(util.Arrays.asList(brokerId, otherReplica))
.setIsr(util.Arrays.asList(brokerId, otherReplica))
.setLeader(brokerId)
.setLeaderEpoch(0)
.setPartitionEpoch(0)
pendingRecord.unknownTaggedFields().add(
InitDisklessLogFields.encodeClassicToDisklessStartOffset(
PartitionRegistration.CLASSIC_TO_DISKLESS_SWITCH_PENDING))
pendingDelta.replay(pendingRecord)

replicaManager.applyDelta(pendingDelta, imageFromTopics(pendingDelta.apply()))

val leaderBefore = replicaManager.getPartition(tp).asInstanceOf[HostedPartition.Online].partition
assertTrue(leaderBefore.isLeader, "Partition should be the leader before the seal commit")
assertEquals(localLeo, replicaManager.localLogOrException(tp).logEndOffset)

// Commit the seal above the leader's local LEO (simulating a wiped-and-restarted leader).
when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(tp)).thenReturn(sealOffset)
val sealDelta = new TopicsDelta(imageFromTopics(pendingDelta.apply()).topics())
val sealRecord = new PartitionChangeRecord()
.setTopicId(topicId)
.setPartitionId(0)
sealRecord.unknownTaggedFields().add(
InitDisklessLogFields.encodeClassicToDisklessStartOffset(sealOffset))
sealDelta.replay(sealRecord)

val localChanges = sealDelta.localChanges(brokerId)
assertTrue(localChanges.leaders.containsKey(tp))

replicaManager.applyDelta(sealDelta, imageFromTopics(sealDelta.apply()))

// The leader is below the committed seal, but the classic prefix is recoverable from the
// remote tier, so the partition stays online for the ConsolidationReconciler to rebuild.
val partition = replicaManager.getPartition(tp)
assertTrue(partition.isInstanceOf[HostedPartition.Online],
s"Consolidating leader below the seal with remote enabled must stay online for remote " +
s"rebuild, but was $partition")

// Consolidation was armed at the current LEO so the first fetch lands below the diskless
// WAL start and triggers the rebuild via OFFSET_MOVED_TO_TIERED_STORAGE. (The PENDING delta
// also calls addFetcherForPartitions with an empty map -- the reconciler Retries while the
// seal is still pending -- so match the specific armed call rather than counting invocations.)
val mockCfm = consolidationCtor.constructed().get(0)
verify(mockCfm, atLeastOnce()).addFetcherForPartitions(
Map(tp -> InitialFetchState(
topicId = Some(topicId),
leader = new BrokerEndPoint(-1, "diskless", -1),
currentLeaderEpoch = 0,
initOffset = localLeo
))
)
} finally {
replicaManager.shutdown(checkpointHW = false)
}
} finally {
consolidationCtor.close()
}
}

@Test
def testSwitchedConsolidatingFollowerBelowSealStaysOnClassicFetcher(): Unit = {
// A switched consolidating follower whose local log is still below the committed seal must
Expand Down
Loading