Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 30 additions & 9 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3321,8 +3321,11 @@ class ReplicaManager(val config: KafkaConfig,
* - LEO > seal: truncate down to the seal unless the local suffix [seal, LEO) is already
* materialized consolidated diskless data.
* - LEO == seal and HW < seal: advance HW to the seal so consumers can cross into diskless.
* - LEO < seal: fence offline. The classic prefix is incomplete, so serving it would hide a
* hole in acknowledged data.
* - LEO < seal: fence offline, unless this is a consolidating diskless topic with remote
* storage enabled. In that case the classic prefix [0, seal) lives in the remote tier and
* can be rebuilt, so the partition is left online for the ConsolidationReconciler to
* rebuild from remote (the inline comment below carries the mechanism). Fencing here would
* preempt that recovery: the reconciler only sees online partitions.
*
* Must run after makeLeader (so the log exists) and before any consolidation fetcher starts.
*/
Expand Down Expand Up @@ -3350,11 +3353,21 @@ class ReplicaManager(val config: KafkaConfig,
stateChangeLogger.info(s"Stale high watermark detected: advanced high watermark to seal offset $seal for " +
s"switched leader partition $tp")
} else if (log.logEndOffset < seal) {
// This is unreachable in normal operation
stateChangeLogger.error(s"Leader partition $tp has LEO ${log.logEndOffset} below " +
s"classic-to-diskless start offset $seal; cannot catch up from another replica. " +
s"Marking the partition offline as its local log is corrupt below the committed seal.")
markPartitionOffline(tp)
if (isConsolidatingPartition(partition) && log.remoteLogEnabled()) {
Comment thread
viktorsomogyi marked this conversation as resolved.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

personal note: this can be simplified when #678 lands -- not a blocker

// The leader's local classic prefix was lost (full local-storage wipe / DR).
// [0, seal) lives in the remote tier, so leave the partition online and let the
// ConsolidationReconciler arm consolidation at the current LEO: the first fetch
// lands below the diskless WAL start, answers OFFSET_MOVED_TO_TIERED_STORAGE, and
// the tier-state machine rebuilds the log from remote.
stateChangeLogger.warn(s"Switched leader partition $tp is below the classic-to-diskless " +
s"seal $seal at LEO ${log.logEndOffset} with remote storage enabled; leaving online " +
s"for consolidation to rebuild the classic prefix from the remote tier.")
} else {
stateChangeLogger.error(s"Leader partition $tp has LEO ${log.logEndOffset} below " +
s"classic-to-diskless start offset $seal; cannot catch up from another replica. " +
s"Marking the partition offline as its local log is corrupt below the committed seal.")
Comment thread
viktorsomogyi marked this conversation as resolved.
Outdated
markPartitionOffline(tp)
}
}
}
} catch {
Expand Down Expand Up @@ -3658,8 +3671,16 @@ class ReplicaManager(val config: KafkaConfig,
}

if (consolidatingDisklessPartitionsToStartFetching.nonEmpty) {
consolidationReconciler.foreach(_.startConsolidationFetchers(consolidatingDisklessPartitionsToStartFetching))
stateChangeLogger.info(s"Started consolidating diskless fetchers as part of become-leader for ${consolidatingDisklessPartitionsToStartFetching.size} partitions")
// maybeReconcileSwitchedLeader above may have fenced a below-seal leader whose local log
// cannot be rebuilt from remote. Skip any partition that is no longer online so the
// reconciler does not dereference a fenced partition's local log.
val onlineToStartFetching = consolidatingDisklessPartitionsToStartFetching.filter {
case (tp, _) => onlinePartition(tp).isDefined
}
if (onlineToStartFetching.nonEmpty) {
consolidationReconciler.foreach(_.startConsolidationFetchers(onlineToStartFetching))
stateChangeLogger.info(s"Started consolidating diskless fetchers as part of become-leader for ${onlineToStartFetching.size} partitions")
}
}
}

Expand Down
188 changes: 187 additions & 1 deletion core/src/test/scala/unit/kafka/server/ReplicaManagerInklessTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import kafka.server.share.DelayedShareFetch
import kafka.utils.TestUtils
import kafka.utils.TestUtils.waitUntilTrue
import kafka.log.LogManager
import kafka.log.LogTestUtils
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.{DirectoryId, IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.network.ListenerName
Expand Down Expand Up @@ -6381,7 +6382,11 @@ class ReplicaManagerInklessTest {
(disklessManagedReplicasEnabled || disklessRemoteStorageConsolidationEnabled).toString
)
val config = KafkaConfig.fromProps(props)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)), defaultLogConfig.getOrElse(new LogConfig(new Properties())))
// Source remoteStorageSystemEnable from the built broker config so the LogManager tracks
// whatever the prop wiring above actually set, rather than mirroring the input flag.
val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)),
defaultLogConfig.getOrElse(new LogConfig(new Properties())),
remoteStorageSystemEnable = config.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
val sharedState = mock(classOf[SharedState], Answers.RETURNS_DEEP_STUBS)
when(sharedState.time()).thenReturn(Time.SYSTEM)
val inklessConfigMap = new util.HashMap[String, Object]()
Expand Down Expand Up @@ -6904,6 +6909,187 @@ class ReplicaManagerInklessTest {
}
}

@Test
def testApplyDeltaDoesNotFenceConsolidatingLeaderBelowSealWhenRemoteEnabled(): Unit = {
// A consolidating diskless leader whose local classic prefix [0, seal) was lost (full
// local-storage wipe / DR) but lives in the remote tier must stay online and let the
// ConsolidationReconciler arm consolidation at the current LEO to rebuild from remote,
// rather than being fenced offline.
val topicName = "switched-consolidating-topic"
val topicId = Uuid.randomUuid()
val tp = new TopicPartition(topicName, 0)
val brokerId = 1
val otherReplica = 2
val sealOffset = 10L
val localLeo = 5L

val mockFetcherManager = mock(classOf[ReplicaFetcherManager])
when(mockFetcherManager.removeFetcherForPartitions(any())).thenReturn(Map.empty[TopicPartition, PartitionFetchState])

val ctorInit: MockedConstruction.MockInitializer[ConsolidationFetcherManager] = {
case (mockCfm, _) =>
when(mockCfm.removeFetcherForPartitions(any())).thenReturn(Map.empty[TopicPartition, PartitionFetchState])
}
val consolidationCtor = mockConstruction(classOf[ConsolidationFetcherManager], ctorInit)
try {
// disklessRemoteStorageConsolidationEnabled + consolidatingDisklessTopics mark the topic as
// consolidating, so applyLocalLeadersDelta routes both deltas through the new-partition
// consolidating branch (makeLeader + arm consolidation), not the classic switch branch.
// defaultLogConfig with remoteLogStorageEnable=true makes log.remoteLogEnabled() true so the
// recovery branch (not the fence) fires.
val replicaManager = spy(createReplicaManager(
Seq(topicName),
disklessRemoteStorageConsolidationEnabled = true,
consolidatingDisklessTopics = Set(topicName),
mockReplicaFetcherManager = Some(mockFetcherManager),
defaultLogConfig = Some(LogTestUtils.createLogConfig(remoteLogStorageEnable = true)),
))
try {
val pendingImage = applyPendingSwitchDelta(replicaManager, topicName, topicId, tp, brokerId, otherReplica, localLeo)
assertTrue(replicaManager.localLogOrException(tp).remoteLogEnabled(),
"Test log must have remote storage enabled for the recovery branch to fire")
commitSealDelta(replicaManager, pendingImage, topicId, tp, brokerId, sealOffset)

// The leader is below the committed seal, but the classic prefix is recoverable from the
// remote tier, so the partition stays online for the ConsolidationReconciler to rebuild.
val partition = replicaManager.getPartition(tp)
assertTrue(partition.isInstanceOf[HostedPartition.Online],
s"Consolidating leader below the seal with remote enabled must stay online for remote " +
s"rebuild, but was $partition")

// Consolidation was armed at the current LEO so the first fetch lands below the diskless
// WAL start and triggers the rebuild via OFFSET_MOVED_TO_TIERED_STORAGE. (The PENDING delta
// also calls addFetcherForPartitions with an empty map -- the reconciler Retries while the
// seal is still pending -- so match the specific armed call rather than counting invocations.)
val mockCfm = consolidationCtor.constructed().get(0)
verify(mockCfm, atLeastOnce()).addFetcherForPartitions(
Map(tp -> InitialFetchState(
topicId = Some(topicId),
leader = new BrokerEndPoint(-1, "diskless", -1),
currentLeaderEpoch = 0,
initOffset = localLeo
))
)
} finally {
replicaManager.shutdown(checkpointHW = false)
}
} finally {
consolidationCtor.close()
}
}

@Test
def testApplyDeltaFencesConsolidatingLeaderBelowSealWhenRemoteDisabled(): Unit = {
// Third cell of the 2x2: a consolidating diskless topic whose per-log remoteLogEnabled() is
// false. There is no remote copy to rebuild the classic prefix from, so a leader below the
// seal must still be fenced offline. Guards the log.remoteLogEnabled() conjunct in
// maybeReconcileSwitchedLeader: dropping it would turn this into an always-skip for
// consolidating topics and no existing test would catch it.
val topicName = "switched-consolidating-remote-disabled-topic"
val topicId = Uuid.randomUuid()
val tp = new TopicPartition(topicName, 0)
val brokerId = 1
val otherReplica = 2
val sealOffset = 10L
val localLeo = 5L

val mockFetcherManager = mock(classOf[ReplicaFetcherManager])
when(mockFetcherManager.removeFetcherForPartitions(any())).thenReturn(Map.empty[TopicPartition, PartitionFetchState])

val ctorInit: MockedConstruction.MockInitializer[ConsolidationFetcherManager] = {
case (mockCfm, _) =>
when(mockCfm.removeFetcherForPartitions(any())).thenReturn(Map.empty[TopicPartition, PartitionFetchState])
}
val consolidationCtor = mockConstruction(classOf[ConsolidationFetcherManager], ctorInit)
try {
// Consolidating topic, but the per-log remote storage flag is false, so log.remoteLogEnabled()
// is false even though the broker-level remote-storage-system flag is on.
val replicaManager = spy(createReplicaManager(
Seq(topicName),
disklessRemoteStorageConsolidationEnabled = true,
consolidatingDisklessTopics = Set(topicName),
mockReplicaFetcherManager = Some(mockFetcherManager),
defaultLogConfig = Some(LogTestUtils.createLogConfig(remoteLogStorageEnable = false)),
))
try {
val pendingImage = applyPendingSwitchDelta(replicaManager, topicName, topicId, tp, brokerId, otherReplica, localLeo)
assertFalse(replicaManager.localLogOrException(tp).remoteLogEnabled(),
"Test log must have remote storage disabled for the fence branch to fire")
commitSealDelta(replicaManager, pendingImage, topicId, tp, brokerId, sealOffset)

// No remote copy to rebuild from, so the leader below the seal is fenced offline.
val partition = replicaManager.getPartition(tp)
assertTrue(partition.isInstanceOf[HostedPartition.Offline],
s"Consolidating leader below the seal with remote disabled must be fenced offline, " +
s"but was $partition")
} finally {
replicaManager.shutdown(checkpointHW = false)
}
} finally {
consolidationCtor.close()
}
}

/**
* Populates the local log to localLeo, then applies a PENDING classic-to-consolidated switch
* delta so the partition comes online as the leader with the seal still uncommitted. Returns the
* resulting metadata image for the caller to base the seal-commit delta on (see commitSealDelta).
*/
private def applyPendingSwitchDelta(
replicaManager: ReplicaManager,
topicName: String, topicId: Uuid, tp: TopicPartition,
brokerId: Int, otherReplica: Int, localLeo: Long): MetadataImage = {
val log = replicaManager.logManager.getOrCreateLog(tp, isNew = true, topicId = Optional.of(topicId))
populateLocalLogAtLeoAndCheckpointedHwm(replicaManager, tp, log, leo = localLeo, hw = localLeo)

when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(tp))
.thenReturn(PartitionRegistration.CLASSIC_TO_DISKLESS_SWITCH_PENDING)

val pendingDelta = new TopicsDelta(TopicsImage.EMPTY)
pendingDelta.replay(new TopicRecord().setName(topicName).setTopicId(topicId))
val pendingRecord = new PartitionRecord()
.setPartitionId(tp.partition)
.setTopicId(topicId)
.setReplicas(util.Arrays.asList(brokerId, otherReplica))
.setIsr(util.Arrays.asList(brokerId, otherReplica))
.setLeader(brokerId)
.setLeaderEpoch(0)
.setPartitionEpoch(0)
pendingRecord.unknownTaggedFields().add(
InitDisklessLogFields.encodeClassicToDisklessStartOffset(
PartitionRegistration.CLASSIC_TO_DISKLESS_SWITCH_PENDING))
pendingDelta.replay(pendingRecord)

val pendingImage = imageFromTopics(pendingDelta.apply())
replicaManager.applyDelta(pendingDelta, pendingImage)

val leaderBefore = replicaManager.getPartition(tp).asInstanceOf[HostedPartition.Online].partition
assertTrue(leaderBefore.isLeader, "Partition should be the leader before the seal commit")
assertEquals(localLeo, replicaManager.localLogOrException(tp).logEndOffset)
pendingImage
}

/**
* Commits the classic-to-consolidated seal at sealOffset via a partition-change delta, modeling
* the controller fixing classicToDisklessStartOffset above the leader's local LEO.
*/
private def commitSealDelta(
replicaManager: ReplicaManager, pendingImage: MetadataImage,
topicId: Uuid, tp: TopicPartition, brokerId: Int, sealOffset: Long): Unit = {
when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(tp)).thenReturn(sealOffset)
val sealDelta = new TopicsDelta(pendingImage.topics())
val sealRecord = new PartitionChangeRecord()
.setTopicId(topicId)
.setPartitionId(tp.partition)
sealRecord.unknownTaggedFields().add(
InitDisklessLogFields.encodeClassicToDisklessStartOffset(sealOffset))
sealDelta.replay(sealRecord)

val localChanges = sealDelta.localChanges(brokerId)
assertTrue(localChanges.leaders.containsKey(tp))
replicaManager.applyDelta(sealDelta, imageFromTopics(sealDelta.apply()))
}

@Test
def testSwitchedConsolidatingFollowerBelowSealStaysOnClassicFetcher(): Unit = {
// A switched consolidating follower whose local log is still below the committed seal must
Expand Down
Loading