Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,11 @@ class ConsolidationReconciler(replicaManager: ReplicaManager,
val consolidatingPartitionAndOffsets: mutable.HashMap[TopicPartition, InitialFetchState] =
initConsolidatingPartitionFetching(consolidatingPartitions)

// Mark topics throttled BEFORE starting fetchers: addFetcherForPartitions starts the
// fetcher threads immediately, and ReplicaFetcherThread only records bytes to the quota
// when the partition is already throttled. Marking after would let the first fetch/append
// bypass the quota. Unlike classic replication (where throttled replicas are set via topic
// config during reassignment), consolidation marks all topics unconditionally — every
// consolidating partition's bytes must count toward the dedicated bandwidth quota.
//
// We never removeThrottle on stop: this follows the classic ReplicaFetcher pattern, where
// the topic-keyed throttle map is only cleared via config changes (ConfigHandler), not on
// per-partition fetcher removal. Entries are tiny (topic -> List(-1)) and bounded by the
// set of topics that have ever consolidated on this broker, so the residue is benign.
// Mark topics throttled BEFORE starting fetchers: addFetcherForPartitions starts the threads
// immediately and bytes only count toward the quota while the partition is already throttled,
// so marking after would let the first fetch bypass the quota. All consolidating topics are
// marked unconditionally. We never removeThrottle on stop (matching the classic ReplicaFetcher
// pattern); the leftover topic -> List(-1) entries are tiny and bounded, so the residue is benign.
consolidatingPartitionAndOffsets.keys.map(_.topic).toSet.foreach((topic: String) => consolidationQuotaManager.markThrottled(topic))
consolidationFetcherManager.addFetcherForPartitions(consolidatingPartitionAndOffsets)
consolidatingPartitionAndOffsets.keys.foreach(tp => consolidationMetrics.registerPartition(tp))
Expand All @@ -87,8 +81,17 @@ class ConsolidationReconciler(replicaManager: ReplicaManager,
def startConsolidationFetchersForCaughtUpClassicPartitions(topicPartitions: Set[TopicPartition]): Unit = {
val consolidatingDisklessPartitionsToStartFetching = new mutable.HashMap[TopicPartition, Partition]
topicPartitions.foreach { tp =>
if (inklessMetadataView.isConsolidatingDisklessTopic(tp.topic)) {
replicaManager.onlinePartition(tp).foreach(partition => consolidatingDisklessPartitionsToStartFetching.put(tp, partition))
replicaManager.onlinePartition(tp).foreach { partition =>
// This hook is the only trigger for an untiered-diskless -> consolidated flip, since
// remote.storage.enable=true is a config-only change the leader-delta path never re-enters.
// The metadata cache behind isConsolidatingDisklessTopic can lag the config record that has
// already enabled remote on the local log, so also trust the partition's own log: a diskless
// topic whose local log has remote storage enabled is consolidating.
val isConsolidating = inklessMetadataView.isConsolidatingDisklessTopic(tp.topic) ||
(inklessMetadataView.isDisklessTopic(tp.topic) && partition.log.exists(_.remoteLogEnabled()))
Comment on lines +90 to +91

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a note for future work: this can be collapsed to isDisklessTopic once the switch enables remote.storage.enable atomically as suggested here

if (isConsolidating) {
consolidatingDisklessPartitionsToStartFetching.put(tp, partition)
}
}
}
startConsolidationFetchers(consolidatingDisklessPartitionsToStartFetching)
Expand Down Expand Up @@ -129,31 +132,35 @@ class ConsolidationReconciler(replicaManager: ReplicaManager,
case seal if seal >= 0 =>
val log = partition.localLogOrException
if (log.logEndOffset < seal) {
// The classic prefix hasn't been fully replicated locally yet; a classic catch-up
// fetcher must finish bringing the local log up to the seal before consolidation
// can take over.
ConsolidationStartState.Retry(
s"Skipping consolidation for $tp because local LEO ${log.logEndOffset} is below " +
s"classic-to-diskless start offset $seal")
if (partition.isLeader && log.remoteLogEnabled()) {
// A leader below the seal means its local log was lost (e.g. a wiped replica promoted
// by the controller, whose metadata survived). Unlike a follower it has no peer to
// replicate the classic prefix [0, seal) from -- that prefix lives only in remote.
// Arm consolidation at the current LEO so the first fetch lands below the diskless WAL
// start, DisklessLeaderEndPoint answers OFFSET_MOVED_TO_TIERED_STORAGE, and the
// tier-state machine rebuilds the whole log from remote. Otherwise the partition would
// wait forever for a classic catch-up that can never happen.
armConsolidationAtLeo(partition, log, seal)
Comment thread
jeqo marked this conversation as resolved.
} else {
// Follower (or remote not yet enabled) below the seal: a classic catch-up fetcher must
// still bring the local log up to the seal before consolidation can take over.
ConsolidationStartState.Retry(
s"Skipping consolidation for $tp because local LEO ${log.logEndOffset} is below " +
s"classic-to-diskless start offset $seal")
}
} else {
// LEO >= seal. This covers both the initial switch (LEO == seal, nothing consolidated
// yet) and resuming an already-progressed partition after a restart, leadership
// failover, or reassignment (the local log either kept its consolidated frontier or
// was rehydrated from tiered storage). In every case we resume from the current local
// LEO so we never re-consolidate or skip data the local log already holds.
//
// The prune floor is the higher of the seal and the current log start offset:
// - at first switch logStartOffset is still the classic prefix start, so the floor is
// the seal, which blocks pruning the diskless region until consolidation has tiered
// past the boundary;
// - on resume logStartOffset has advanced past the seal as consolidated segments were
// tiered and deleted, so it reflects real pruning progress.
val pruneFloor = math.max(seal, log.logStartOffset)
partition.ensureConsolidationPruneFloorAtLeast(pruneFloor)
ConsolidationStartState.Ready(log.logEndOffset)
// LEO >= seal: the initial switch (LEO == seal) or a resume after restart, failover, or
// reassignment, where the local log kept or rehydrated its consolidated frontier. Resume
// from the current LEO so we neither re-consolidate nor skip data already held locally.
armConsolidationAtLeo(partition, log, seal)
Comment thread
jeqo marked this conversation as resolved.
}
case unexpected =>
ConsolidationStartState.Failed(new ReconciliationException(s"Skipping consolidation for $tp due to unexpected classic-to-diskless start offset: $unexpected"))
}
}

private def armConsolidationAtLeo(partition: Partition, log: UnifiedLog, seal: Long): ConsolidationStartState = {
partition.ensureConsolidationPruneFloorAtLeast(math.max(seal, log.logStartOffset))
ConsolidationStartState.Ready(log.logEndOffset)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ class ConsolidationReconcilerTest {
metadataView: InklessMetadataView,
fetcherManager: ConsolidationFetcherManager = mock(classOf[ConsolidationFetcherManager]),
initialFetchOffset: UnifiedLog => Long = _.highWatermark,
quotaManager: ReplicationQuotaManager = mock(classOf[ReplicationQuotaManager])
quotaManager: ReplicationQuotaManager = mock(classOf[ReplicationQuotaManager]),
replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
): ConsolidationReconciler = {
new ConsolidationReconciler(
mock(classOf[ReplicaManager]),
replicaManager,
new StateChangeLogger(0),
mock(classOf[ConsolidationMetrics]),
metadataView,
Expand All @@ -68,21 +69,26 @@ class ConsolidationReconcilerTest {
logStartOffset: Long,
logEndOffset: Long,
highWatermark: Long = 0L,
highestRemoteOffset: Long = -1L
highestRemoteOffset: Long = -1L,
isLeader: Boolean = false,
remoteLogEnabled: Boolean = false
): (Partition, UnifiedLog) = {
val log = mock(classOf[UnifiedLog])
when(log.logStartOffset).thenReturn(logStartOffset)
when(log.logEndOffset).thenReturn(logEndOffset)
when(log.highWatermark).thenReturn(highWatermark)
when(log.highestOffsetInRemoteStorage).thenReturn(highestRemoteOffset)
when(log.remoteLogEnabled()).thenReturn(remoteLogEnabled)
when(log.topicId).thenReturn(Optional.of(topicId))

val partition = mock(classOf[Partition])
when(partition.topicPartition).thenReturn(topicPartition)
when(partition.topic).thenReturn(topicPartition.topic)
when(partition.topicId).thenReturn(Some(topicId))
when(partition.localLogOrException).thenReturn(log)
when(partition.log).thenReturn(Some(log))
when(partition.getLeaderEpoch).thenReturn(7)
when(partition.isLeader).thenReturn(isLeader)
(partition, log)
}

Expand Down Expand Up @@ -200,16 +206,60 @@ class ConsolidationReconcilerTest {
}

@Test
def testMigratedPartitionBelowClassicToDisklessStartOffsetIsRetriedWithoutMarkingFailed(): Unit = {
def testFollowerBelowClassicToDisklessStartOffsetIsRetriedWithoutMarkingFailed(): Unit = {
// A *follower* whose local log is still below the seal can replicate the classic prefix from
// the live leader, so it must keep retrying (not start consolidation, not be marked failed).
val view = mockMetadataView(classicToDisklessStartOffset = 100L)
val fetcherManager = mock(classOf[ConsolidationFetcherManager])
val (partition, _) = mockPartition(logStartOffset = 0L, logEndOffset = 90L)
val (partition, _) = mockPartition(logStartOffset = 0L, logEndOffset = 90L,
isLeader = false, remoteLogEnabled = true)
val reconciler = newReconciler(view, fetcherManager)

val fetchStates = initFetchState(reconciler, partition)

assertTrue(fetchStates.isEmpty)
verify(partition, never()).truncateTo(anyLong(), anyBoolean())
verify(partition, never()).ensureConsolidationPruneFloorAtLeast(anyLong())
verify(fetcherManager, never()).addFailedPartition(topicPartition)
}

@Test
def testLeaderBelowSealWithRemoteStartsConsolidationToTriggerRemoteRebuild(): Unit = {
// A *leader* below the seal can only happen after local-log loss (full wipe / DR restart):
// the classic prefix [0, seal) lives only in the remote tier and there is no peer to replicate
// it from. Start consolidation armed at the (empty) LEO so the fetcher's first request lands
// below the diskless WAL start, DisklessLeaderEndPoint answers OFFSET_MOVED_TO_TIERED_STORAGE,
// and the tier-state machine rebuilds the whole-log state from remote. Gate pruning at the seal.
val view = mockMetadataView(classicToDisklessStartOffset = 100L)
val fetcherManager = mock(classOf[ConsolidationFetcherManager])
val (partition, _) = mockPartition(logStartOffset = 0L, logEndOffset = 0L,
isLeader = true, remoteLogEnabled = true)
val reconciler = newReconciler(view, fetcherManager)

val fetchStates = initFetchState(reconciler, partition)

assertEquals(0L, fetchStates(topicPartition).initOffset)
assertEquals(7, fetchStates(topicPartition).currentLeaderEpoch)
verify(partition, never()).truncateTo(anyLong(), anyBoolean())
verify(partition).ensureConsolidationPruneFloorAtLeast(100L)
verify(fetcherManager, never()).addFailedPartition(topicPartition)
}

@Test
def testLeaderBelowSealWithoutRemoteIsRetried(): Unit = {
Comment thread
jeqo marked this conversation as resolved.
Outdated
// Defensive: a leader below the seal but without remote storage enabled has no remote tier to
// rebuild from, so there is nothing to do but retry (this should not occur for a consolidating
// topic, where remote storage is always enabled).
val view = mockMetadataView(classicToDisklessStartOffset = 100L)
val fetcherManager = mock(classOf[ConsolidationFetcherManager])
val (partition, _) = mockPartition(logStartOffset = 0L, logEndOffset = 90L,
isLeader = true, remoteLogEnabled = false)
val reconciler = newReconciler(view, fetcherManager)

val fetchStates = initFetchState(reconciler, partition)

assertTrue(fetchStates.isEmpty)
verify(partition, never()).ensureConsolidationPruneFloorAtLeast(anyLong())
verify(fetcherManager, never()).addFailedPartition(topicPartition)
}

Expand All @@ -231,4 +281,46 @@ class ConsolidationReconcilerTest {
inOrder.verify(fetcherManager).addFetcherForPartitions(any())
}

@Test
def testStartConsolidationFetchersForCaughtUpClassicPartitionsStartsWhenMetadataCacheLagsRemoteFlip(): Unit = {
// Regression for the untiered-diskless -> consolidated upgrade: remote.storage.enable=true is a
// config-only metadata change with no leader-delta re-entry, so this hook is the sole trigger.
// The metadata cache backing isConsolidatingDisklessTopic can still report false at hook time
// (it lags the config record that already flipped remote on the local log), so the reconciler
// must fall back to the partition's own log and still start consolidation rather than silently
// drop the partition forever.
val view = mockMetadataView(classicToDisklessStartOffset = 100L)
when(view.isConsolidatingDisklessTopic(topicPartition.topic)).thenReturn(false)
when(view.isDisklessTopic(topicPartition.topic)).thenReturn(true)
val fetcherManager = mock(classOf[ConsolidationFetcherManager])
val replicaManager = mock(classOf[ReplicaManager])
val (partition, _) = mockPartition(logStartOffset = 0L, logEndOffset = 100L,
isLeader = true, remoteLogEnabled = true)
when(replicaManager.onlinePartition(topicPartition)).thenReturn(Some(partition))
val reconciler = newReconciler(view, fetcherManager, replicaManager = replicaManager)

reconciler.startConsolidationFetchersForCaughtUpClassicPartitions(Set(topicPartition))

verify(fetcherManager).addFetcherForPartitions(any())
}

@Test
def testStartConsolidationFetchersForCaughtUpClassicPartitionsSkipsNonConsolidatingTopic(): Unit = {
// A non-consolidating topic must never be routed to the consolidation fetcher: neither the
// metadata cache nor the local log reports it as a consolidating diskless topic.
val view = mockMetadataView(classicToDisklessStartOffset = 100L)
when(view.isConsolidatingDisklessTopic(topicPartition.topic)).thenReturn(false)
when(view.isDisklessTopic(topicPartition.topic)).thenReturn(false)
val fetcherManager = mock(classOf[ConsolidationFetcherManager])
val replicaManager = mock(classOf[ReplicaManager])
val (partition, _) = mockPartition(logStartOffset = 0L, logEndOffset = 100L,
isLeader = true, remoteLogEnabled = false)
when(replicaManager.onlinePartition(topicPartition)).thenReturn(Some(partition))
val reconciler = newReconciler(view, fetcherManager, replicaManager = replicaManager)

reconciler.startConsolidationFetchersForCaughtUpClassicPartitions(Set(topicPartition))

verify(fetcherManager, never()).addFetcherForPartitions(any())
}

}
Loading