From d2f1671e2f86771a84347f93cb2e577011f3ce43 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Wed, 10 Jun 2026 10:49:30 +0300 Subject: [PATCH 1/6] test(inkless:consolidation): demonstrate maxWaitMs degradation at local/diskless boundary MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Deterministic regression tests for the consolidating-partition fetch path, using a MockTimer-backed delayed-fetch purgatory so timing is asserted without real waits: - testFetchConsolidatingSupplementRespondsWithoutDelayWhenDisklessDataAvailable: local log < minBytes with diskless data available must respond SYNCHRONOUSLY (merged), never parking in the purgatory. This RED/GREEN test reproduces the degradation: it FAILS on a build without the supplement (request parks and waits maxWaitMs) and PASSES with it. - testFetchConsolidatingParksUntilMaxWaitWhenNoDisklessSupplementData: bounds the contract — when no diskless data is available the request correctly still parks and only completes after maxWaitMs elapses. Adds an optional delayedFetchPurgatory param to the test's createReplicaManager helper to inject a MockTimer-backed purgatory. Co-Authored-By: Claude Opus 4.8 --- .../server/ReplicaManagerInklessTest.scala | 172 ++++++++++++++++++ 1 file changed, 172 insertions(+) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerInklessTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerInklessTest.scala index f40ffd3e15..23e026fea2 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerInklessTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerInklessTest.scala @@ -1167,6 +1167,178 @@ class ReplicaManagerInklessTest { } } + // When local log has data but below minBytes and diskless data is available, the fetch must + // respond immediately (merged local+diskless) without parking in the delayed-fetch purgatory. + @Test + def testFetchConsolidatingSupplementRespondsWithoutDelayWhenDisklessDataAvailable(): Unit = { + val supplementRecords = MemoryRecords.withRecords( + 2.toByte, 100L, Compression.NONE, TimestampType.CREATE_TIME, 456L, 0.toShort, 0, 0, false, new SimpleRecord(0, "supplement".getBytes()) + ) + val disklessResponse = Map(disklessTopicPartition -> + new FetchPartitionData( + Errors.NONE, + 500L, 0L, + supplementRecords, + Optional.empty(), OptionalLong.of(500L), Optional.empty(), OptionalInt.empty(), false) + ) + val fetchHandlerCtor = mockFetchHandler(disklessResponse) + val cp = mock(classOf[ControlPlane]) + val timer = new MockTimer(time) + val fetchPurgatory = new DelayedOperationPurgatory[DelayedFetch]("Fetch", timer, 0, false) + val replicaManager = spy(createReplicaManager( + List(disklessTopicPartition.topic()), + controlPlane = Some(cp), + disklessManagedReplicasEnabled = true, + disklessRemoteStorageConsolidationEnabled = true, + consolidatingDisklessTopics = Set(disklessTopicPartition.topic()), + delayedFetchPurgatory = Some(fetchPurgatory), + )) + try { + when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(disklessTopicPartition.topicPartition())) + .thenReturn(100L) + // Stub fetchOffsetSnapshot so DelayedFetch.tryComplete doesn't NPE if the request parks + // on a build without the supplement. + val mockPartition = mock(classOf[Partition]) + val mockLog = mock(classOf[UnifiedLog]) + when(mockLog.logEndOffset).thenReturn(100L) + when(mockPartition.log).thenReturn(Some(mockLog)) + val endOffsetMetadata = new LogOffsetMetadata(100L, 0L, 0) + when(mockPartition.fetchOffsetSnapshot(any(), any())) + .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, endOffsetMetadata, endOffsetMetadata)) + doReturn(Right(mockPartition)).when(replicaManager) + .getPartitionOrError(ArgumentMatchers.eq(disklessTopicPartition.topicPartition())) + doReturn(mockPartition).when(replicaManager) + .getPartitionOrException(ArgumentMatchers.eq(disklessTopicPartition.topicPartition())) + + val localFileRecords = memoryRecordsToFileRecords(RECORDS) + doReturn(Seq(disklessTopicPartition -> + new LogReadResult( + new FetchDataInfo(new LogOffsetMetadata(50L, 0L, 0), localFileRecords), + Optional.empty(), 100L, 0L, 100L, 0L, 0L, OptionalLong.empty() + )) + ).when(replicaManager).readFromLog(any(), any(), any(), any()) + + // Large minBytes + large maxWaitMs: WITHOUT the supplement this request would park in the + // purgatory and wait the full 30s. WITH the supplement it must respond immediately. + val maxWaitMs = 30000L + val fetchParams = new FetchParams( + FetchRequest.ORDINARY_CONSUMER_ID, -1L, + maxWaitMs, + RECORDS.sizeInBytes + 1, // minBytes > local-only size + 1024 * 1024, + FetchIsolation.HIGH_WATERMARK, Optional.empty() + ) + val fetchInfos = Seq( + disklessTopicPartition -> new PartitionData(disklessTopicPartition.topicId(), 50L, 0L, 1024, Optional.empty()) + ) + + @volatile var responseData: Map[TopicIdPartition, FetchPartitionData] = null + val responseCallback = (response: Seq[(TopicIdPartition, FetchPartitionData)]) => { + responseData = response.toMap + } + replicaManager.fetchMessages(fetchParams, fetchInfos, QuotaFactory.UNBOUNDED_QUOTA, responseCallback) + + // The response is produced synchronously: callback already fired, the clock was NOT advanced, + // and the delayed-fetch purgatory never held this operation. + waitForFetchResponse(responseData) + assertEquals(0, fetchPurgatory.watched(), + "Supplemented consolidating fetch must NOT be parked in the delayed-fetch purgatory") + assertEquals(1, responseData.size) + val result = responseData(disklessTopicPartition) + assertEquals(Errors.NONE, result.error) + assertEquals(500L, result.highWatermark) + assertTrue(result.records.sizeInBytes > RECORDS.sizeInBytes, + s"Expected merged local+diskless records, got ${result.records.sizeInBytes}") + verify(fetchHandlerCtor.constructed().get(0), times(1)).handle(any(), any()) + } finally { + replicaManager.shutdown(checkpointHW = false) + fetchHandlerCtor.close() + } + } + + // When local log has data below minBytes but no diskless data is available past logEndOffset, + // the fetch must park in the purgatory and only complete after maxWaitMs elapses. + @Test + def testFetchConsolidatingParksUntilMaxWaitWhenNoDisklessSupplementData(): Unit = { + // Diskless supplement returns an empty response: no data available past the local boundary yet. + val disklessResponse = Map(disklessTopicPartition -> + new FetchPartitionData( + Errors.NONE, + 100L, 0L, + MemoryRecords.EMPTY, + Optional.empty(), OptionalLong.of(100L), Optional.empty(), OptionalInt.empty(), false) + ) + val fetchHandlerCtor = mockFetchHandler(disklessResponse) + val cp = mock(classOf[ControlPlane]) + val timer = new MockTimer(time) + val fetchPurgatory = new DelayedOperationPurgatory[DelayedFetch]("Fetch", timer, 0, false) + val replicaManager = spy(createReplicaManager( + List(disklessTopicPartition.topic()), + controlPlane = Some(cp), + disklessManagedReplicasEnabled = true, + disklessRemoteStorageConsolidationEnabled = true, + consolidatingDisklessTopics = Set(disklessTopicPartition.topic()), + delayedFetchPurgatory = Some(fetchPurgatory), + )) + try { + when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(disklessTopicPartition.topicPartition())) + .thenReturn(100L) + // Stub a partition whose local log ends at 100 and exposes a matching offset snapshot, so the + // parked DelayedFetch.tryComplete can evaluate the partition's high watermark without NPE. + val mockPartition = mock(classOf[Partition]) + val mockLog = mock(classOf[UnifiedLog]) + when(mockLog.logEndOffset).thenReturn(100L) + when(mockPartition.log).thenReturn(Some(mockLog)) + val endOffsetMetadata = new LogOffsetMetadata(100L, 0L, 0) + when(mockPartition.fetchOffsetSnapshot(any(), any())) + .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, endOffsetMetadata, endOffsetMetadata)) + doReturn(Right(mockPartition)).when(replicaManager) + .getPartitionOrError(ArgumentMatchers.eq(disklessTopicPartition.topicPartition())) + doReturn(mockPartition).when(replicaManager) + .getPartitionOrException(ArgumentMatchers.eq(disklessTopicPartition.topicPartition())) + + val localFileRecords = memoryRecordsToFileRecords(RECORDS) + doReturn(Seq(disklessTopicPartition -> + new LogReadResult( + new FetchDataInfo(new LogOffsetMetadata(50L, 0L, 0), localFileRecords), + Optional.empty(), 100L, 0L, 100L, 0L, 0L, OptionalLong.empty() + )) + ).when(replicaManager).readFromLog(any(), any(), any(), any()) + + val maxWaitMs = 30000L + val fetchParams = new FetchParams( + FetchRequest.ORDINARY_CONSUMER_ID, -1L, + maxWaitMs, + RECORDS.sizeInBytes + 1, // minBytes > local-only size; supplement returns nothing + 1024 * 1024, + FetchIsolation.HIGH_WATERMARK, Optional.empty() + ) + val fetchInfos = Seq( + disklessTopicPartition -> new PartitionData(disklessTopicPartition.topicId(), 50L, 0L, 1024, Optional.empty()) + ) + + @volatile var responseData: Map[TopicIdPartition, FetchPartitionData] = null + val responseCallback = (response: Seq[(TopicIdPartition, FetchPartitionData)]) => { + responseData = response.toMap + } + replicaManager.fetchMessages(fetchParams, fetchInfos, QuotaFactory.UNBOUNDED_QUOTA, responseCallback) + + // No diskless data to supplement -> minBytes unmet -> parked in the purgatory, not yet responded. + assertEquals(1, fetchPurgatory.watched(), + "Fetch must be parked in the delayed-fetch purgatory when the supplement yields no data") + assertNull(responseData, "Response must not be produced until maxWaitMs elapses") + + // Purgatory purges lazily — don't assert watched()==0 after completion. + timer.advanceClock(maxWaitMs + 1) + waitForFetchResponse(responseData) + assertEquals(1, responseData.size) + assertEquals(Errors.NONE, responseData(disklessTopicPartition).error) + } finally { + replicaManager.shutdown(checkpointHW = false) + fetchHandlerCtor.close() + } + } + @Test def testFetchConsolidatingDisklessPartitionOfflineReturnsKafkaStorageError(): Unit = { val disklessResponse = Map(disklessTopicPartition -> From 7299d776ccd7c336b33a118999f7dc6d9d944dc4 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Fri, 5 Jun 2026 16:27:44 +0300 Subject: [PATCH 2/6] feat(inkless:consolidation): supplement local log with diskless data on fetch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a consolidating partition's local log doesn't satisfy minBytes, supplement with a synchronous diskless fetch starting at logEndOffset, then merge local + diskless records via ConcatenatedRecords.concat(). For consumer requests that only span consolidating partitions, the supplement is applied inline and the response is produced immediately without parking in the delayed-fetch purgatory. For mixed requests (consolidating + pure-diskless partitions), the supplement and the diskless fetch run concurrently in DelayedFetch.onComplete so latency equals the slower of the two, not their sum. An AtomicBoolean ensures the response callback fires exactly once even if an exception escapes the future composition. Guards on the inline supplement path: - !isFromFollower: followers must not receive merged diskless data - maxWaitMs > 0: non-blocking polls must not be held by a round-trip - !hasPreferredReadReplica: redirect responses must not be delayed - disklessFetchInfos.isEmpty: avoids a wasted blocking call when the request will park in DelayedFetch anyway - inklessSharedState.isDefined checked at collection time bytesReadable only counts error-free supplement bytes — a supplement response with an error but non-empty records cannot trick the broker into responding prematurely. The merge preserves localData.abortedTransactions and localData.preferredReadReplica (diskless has no transaction index and no replica selector). Warns when aborted transactions are present since the diskless portion will have no abort markers. Falls back to local-only data on unknown Records types or merge failures. Exception handling on the inline supplement fetch: - InterruptedException: re-interrupts the thread and falls back - TimeoutException: logged specifically for operator visibility - Other failures: logged and falls back to local-only On diskless future failure in DelayedFetch.onComplete, per-partition error entries (Errors.forException) are returned so clients see the partition in the response rather than it being silently dropped. ConcatenatedRecords.concat(prefix, tail) static factory added to encapsulate type dispatch over MemoryRecords/ConcatenatedRecords and avoid materializing supplement backing records into a contiguous buffer. Validates inputs with Objects.requireNonNull. Co-Authored-By: Claude Sonnet 4.6 Co-Authored-By: Claude Opus 4.6 --- .../scala/kafka/server/DelayedFetch.scala | 148 +++-- .../scala/kafka/server/ReplicaManager.scala | 147 ++++- .../kafka/server/DelayedFetchTest.scala | 309 +++++++++- .../server/ReplicaManagerInklessTest.scala | 579 +++++++++++++++++- .../inkless/consume/ConcatenatedRecords.java | 18 + 5 files changed, 1147 insertions(+), 54 deletions(-) diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index 3926f6ec76..3b31d8a9ea 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -21,15 +21,17 @@ import com.yammer.metrics.core.Meter import io.aiven.inkless.control_plane.FindBatchRequest import kafka.utils.Logging -import java.util.concurrent.TimeUnit +import java.util.concurrent.{CompletableFuture, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean import org.apache.kafka.common.TopicIdPartition import org.apache.kafka.common.errors._ import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.MemoryRecords import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET} import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.purgatory.DelayedOperation import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, FetchPartitionData} -import org.apache.kafka.storage.internals.log.{FetchPartitionStatus, LogOffsetMetadata} +import org.apache.kafka.storage.internals.log.{FetchPartitionStatus, LogOffsetMetadata, LogReadResult} import java.util import scala.collection._ @@ -225,7 +227,6 @@ class DelayedFetch( * Upon completion, read whatever data is available and pass to the complete callback */ override def onComplete(): Unit = { - // Complete the classic fetches first val classicFetchInfos = classicFetchPartitionStatus.asScala.iterator.map { case (tp, status) => tp -> status.fetchInfo }.toBuffer @@ -235,50 +236,127 @@ class DelayedFetch( val totalRequestsSize = classicRequestsSize + disklessRequestsSize if (totalRequestsSize == 0) { - // No partitions to fetch, just return an empty response responseCallback(Seq.empty) return } - val fetchPartitionData = if (classicRequestsSize > 0) { - // adjust the max bytes for classic fetches based on the percentage of classic partitions + // Read classic partitions from local log. + val (logReadResults, logReadResultMap) = if (classicRequestsSize > 0) { val classicPercentage = classicRequestsSize / totalRequestsSize val classicParams = replicaManager.fetchParamsWithNewMaxBytes(params, classicPercentage) + val results = replicaManager.readFromLog(classicParams, classicFetchInfos, quota, readFromPurgatory = true) + val resultMap = new util.HashMap[TopicIdPartition, LogReadResult]() + results.foreach { case (tp, r) => resultMap.put(tp, r) } + (results, resultMap) + } else (Seq.empty, new util.HashMap[TopicIdPartition, LogReadResult]()) - val logReadResults = replicaManager.readFromLog( - classicParams, - classicFetchInfos, - quota, - readFromPurgatory = true - ) - - logReadResults.map { case (tp, result) => - val isReassignmentFetch = params.isFromFollower && - replicaManager.isAddingReplica(tp.topicPartition, params.replicaId) - - tp -> result.toFetchPartitionData(isReassignmentFetch) + // Identify consolidating partitions that still have budget left after the local read and + // fire their supplement fetch concurrently with the pure-diskless fetch. Both futures are + // independent so running them in parallel keeps latency equal to the slower of the two. + // Guards like maxWaitMs>0 and !hasPreferredReadReplica are unnecessary here: by the time + // onComplete fires, those conditions were already true (maxWaitMs was positive to park, + // preferredReadReplica triggers immediate response and never parks). + val consolidatingSupplements = { + val supplements = new mutable.HashMap[TopicIdPartition, Long]() + if (!params.isFromFollower) { + classicFetchPartitionStatus.asScala.foreach { case (tp, status) => + if (replicaManager.inklessMetadataView().isConsolidatingDisklessTopic(tp.topic)) { + val readResult = logReadResultMap.get(tp) + if (readResult != null && readResult.error == Errors.NONE) { + replicaManager.getPartitionOrError(tp.topicPartition).foreach { partition => + partition.log.foreach { log => + if (status.startOffsetMetadata.messageOffset < log.logEndOffset) + supplements.put(tp, log.logEndOffset) + } + } + } + } + } } - } else Seq.empty + supplements + } + val supplementFetchInfos = replicaManager.buildConsolidationSupplementFetchInfos( + consolidatingSupplements, classicFetchInfos.toSeq, logReadResultMap) - if (disklessRequestsSize > 0) { - // Classic fetches are complete, now handle diskless fetches - // adjust the max bytes for diskless fetches based on the percentage of diskless partitions - val disklessPercentage = disklessRequestsSize / totalRequestsSize - val disklessParams = replicaManager.fetchParamsWithNewMaxBytes(params, disklessPercentage) - val disklessFetchInfos = disklessFetchPartitionStatus.asScala.map { case (tp, status) => - tp -> status.fetchInfo + val emptySupplementMap: Map[TopicIdPartition, FetchPartitionData] = Map.empty + val supplementFuture: CompletableFuture[Map[TopicIdPartition, FetchPartitionData]] = + if (supplementFetchInfos.nonEmpty) { + val supplementParams = replicaManager.fetchParamsWithNewMaxBytes( + params, supplementFetchInfos.size.toFloat / totalRequestsSize) + replicaManager.fetchDisklessMessages(supplementParams, supplementFetchInfos) + .thenApply[Map[TopicIdPartition, FetchPartitionData]](_.toMap) + .exceptionally((e: Throwable) => { + logger.warn("Failed to fetch diskless supplement for consolidating partitions in delayed fetch, returning local data only", e) + emptySupplementMap + }) + } else { + CompletableFuture.completedFuture(emptySupplementMap) } - val disklessFetchResponseFuture = replicaManager.fetchDisklessMessages(disklessParams, disklessFetchInfos.toSeq) - // Combine the classic fetch results with the diskless fetch results - disklessFetchResponseFuture.whenComplete { case (disklessFetchPartitionData, _) => - // Do a single response callback with both classic and diskless fetch results - responseCallback(fetchPartitionData ++ disklessFetchPartitionData) + val emptyDisklessSeq: Seq[(TopicIdPartition, FetchPartitionData)] = Seq.empty + val disklessFuture: CompletableFuture[Seq[(TopicIdPartition, FetchPartitionData)]] = + if (disklessRequestsSize > 0) { + val disklessPercentage = disklessRequestsSize / totalRequestsSize + val disklessParams = replicaManager.fetchParamsWithNewMaxBytes(params, disklessPercentage) + val disklessFetchInfos = disklessFetchPartitionStatus.asScala.map { case (tp, status) => + tp -> status.fetchInfo + } + replicaManager.fetchDisklessMessages(disklessParams, disklessFetchInfos.toSeq) + .exceptionally((e: Throwable) => { + logger.warn("Failed to fetch diskless data in delayed fetch, returning per-partition errors", e) + val error = Errors.forException(e) + disklessFetchInfos.map { case (tp, _) => + tp -> new FetchPartitionData( + error, + -1L, + -1L, + MemoryRecords.EMPTY, + util.Optional.empty(), + util.OptionalLong.empty(), + util.Optional.empty(), + util.OptionalInt.empty(), + false + ) + }.toSeq + }) + } else { + CompletableFuture.completedFuture(emptyDisklessSeq) } - } else { - // No diskless fetches, just return the classic fetch results - responseCallback(fetchPartitionData) - } + + // Join both futures and fire the single response callback once both are ready. + // The supplement and pure-diskless fetches run concurrently so latency equals the slower of the two. + // The AtomicBoolean ensures responseCallback is called exactly once even if an unexpected exception + // escapes the thenCombine lambda after the happy-path callback has already fired. + val callbackFired = new AtomicBoolean(false) + supplementFuture.thenCombine[Seq[(TopicIdPartition, FetchPartitionData)], Void](disklessFuture, + (supplementData: Map[TopicIdPartition, FetchPartitionData], + disklessData: Seq[(TopicIdPartition, FetchPartitionData)]) => { + val classicData = logReadResults.map { case (tp, result) => + val isReassignmentFetch = params.isFromFollower && + replicaManager.isAddingReplica(tp.topicPartition, params.replicaId) + val localData = result.toFetchPartitionData(isReassignmentFetch) + val mergedData = supplementData.get(tp) match { + // don't merge if local result has errors + case Some(sd) if result.error == Errors.NONE && sd.error == Errors.NONE && sd.records.sizeInBytes > 0 => + try replicaManager.mergeConsolidationSupplement(tp, localData, sd) + catch { + case e: Exception => + logger.warn(s"Failed to merge diskless supplement for $tp in delayed fetch, returning local data only", e) + localData + } + case _ => localData + } + tp -> mergedData + } + if (callbackFired.compareAndSet(false, true)) + responseCallback(classicData ++ disklessData) + null + }).exceptionally((e: Throwable) => { + logger.error("Unexpected error in delayed fetch completion", e) + if (callbackFired.compareAndSet(false, true)) + responseCallback(Seq.empty) + null + }) } } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 15e0424503..5d249b924a 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -18,7 +18,7 @@ package kafka.server import com.yammer.metrics.core.Meter import io.aiven.inkless.common.SharedState -import io.aiven.inkless.consume.{FetchHandler, FetchOffsetHandler, Reader} +import io.aiven.inkless.consume.{ConcatenatedRecords, FetchHandler, FetchOffsetHandler, Reader} import io.aiven.inkless.control_plane.{BatchInfo, FindBatchRequest, FindBatchResponse, InitDisklessLogProducerState} import io.aiven.inkless.delete.{DeleteRecordsInterceptor, FileCleaner, RetentionEnforcer} import io.aiven.inkless.produce.AppendHandler @@ -79,6 +79,7 @@ import org.apache.kafka.storage.log.metrics.BrokerTopicStats import java.io.File import java.lang.{Long => JLong} +import java.nio.ByteBuffer import java.nio.file.{Files, Paths} import java.util import java.util.concurrent.atomic.AtomicBoolean @@ -2001,6 +2002,82 @@ class ReplicaManager(val config: KafkaConfig, originalParams.isolation, originalParams.clientMetadata, originalParams.shareFetchRequest) } + /** + * Build the diskless supplement fetch requests for consolidating partitions whose local log + * read did not satisfy minBytes. For each tracked partition, computes the remaining byte budget + * (original maxBytes minus what the local read already returned) and emits a PartitionData + * starting at logEndOffset. Partitions whose remaining budget is zero are dropped. + */ + private[server] def buildConsolidationSupplementFetchInfos( + supplements: Map[TopicIdPartition, Long], + fetchInfos: Seq[(TopicIdPartition, PartitionData)], + logReadResultMap: util.Map[TopicIdPartition, LogReadResult] + ): Seq[(TopicIdPartition, PartitionData)] = { + val fetchInfoByTp = fetchInfos.toMap + supplements.flatMap { case (tp, logEndOffset) => + fetchInfoByTp.get(tp).flatMap { pd => + val alreadyRead = Option(logReadResultMap.get(tp)).map(_.info.records.sizeInBytes).getOrElse(0) + val remainingBytes = Math.max(pd.maxBytes - alreadyRead, 0) + if (remainingBytes > 0) + Some(tp -> new PartitionData(tp.topicId(), logEndOffset, pd.logStartOffset, remainingBytes, pd.currentLeaderEpoch, pd.lastFetchedEpoch)) + else + None + } + }.toSeq + } + + /** + * Merges a diskless supplement into the local-log fetch result for a consolidating partition. + * The supplement provides records beyond the local logEndOffset, and its HW/LSO supersede + * the local values. Local records are materialized from FileRecords to MemoryRecords if needed + * before being passed to ConcatenatedRecords. + */ + private[server] def mergeConsolidationSupplement( + tp: TopicIdPartition, + localData: FetchPartitionData, + supplementData: FetchPartitionData + ): FetchPartitionData = { + // Local-log reads return FileRecords (a memory-mapped segment slice), not MemoryRecords. + // ConcatenatedRecords backs onto MemoryRecords, so materialize the local slice into a + // heap buffer first. This is the standard idiom used by AbstractFetcherThread and the + // coordinator loaders for the same FileRecords->MemoryRecords conversion. + val localRecords = localData.records match { + case mr: MemoryRecords => mr + case fr: FileRecords => + val buffer = ByteBuffer.allocate(fr.sizeInBytes) + fr.readInto(buffer, 0) + MemoryRecords.readableRecords(buffer) + case other => + error(s"Unexpected Records type from local log read for $tp: ${other.getClass.getName}. Returning local data only.") + return localData + } + val mergedRecords = try { + ConcatenatedRecords.concat(localRecords, supplementData.records) + } catch { + case e: IllegalArgumentException => + error(s"${e.getMessage} for $tp. Returning local data only.") + return localData + } + // Pass through local abortedTransactions so READ_COMMITTED consumers keep abort markers for + // the local portion. Warn if present: transactions spanning the consolidation boundary into + // the diskless portion are not supported and those offsets will have no abort markers. + if (localData.abortedTransactions.isPresent && !localData.abortedTransactions.get.isEmpty) + warn(s"Consolidating diskless partition $tp has aborted transactions in the local log but diskless " + + s"storage does not support transactions — abort markers beyond logEndOffset will be missing") + // isReassignmentFetch is false: the supplement only fires for consumer fetches, never for follower/reassignment paths. + new FetchPartitionData( + localData.error, + supplementData.highWatermark, + Math.min(localData.logStartOffset, supplementData.logStartOffset), + mergedRecords, + supplementData.divergingEpoch, + supplementData.lastStableOffset, + localData.abortedTransactions, + localData.preferredReadReplica, + false + ) + } + /** * Fetch messages from a replica, and wait until enough data can be fetched and return; * the callback function will be triggered either when timeout or required fetch info is satisfied. @@ -2018,6 +2095,9 @@ class ReplicaManager(val config: KafkaConfig, val disklessFetchInfos = new mutable.ArrayBuffer[(TopicIdPartition, PartitionData)]() val classicFetchInfos = new mutable.ArrayBuffer[(TopicIdPartition, PartitionData)]() val immediateFetchResponses = new mutable.ArrayBuffer[(TopicIdPartition, FetchPartitionData)]() + // Consolidating partitions served from local log that may need a diskless supplement. + // Maps tp -> logEndOffset (the offset where the diskless supplement should start). + val consolidatingLocalFetchSupplements = new mutable.HashMap[TopicIdPartition, Long]() fetchInfos.foreach { fetchInfo => val (tp, fetchPartitionData) = fetchInfo @@ -2035,8 +2115,14 @@ class ReplicaManager(val config: KafkaConfig, if (isConsolidatingPartition) { getPartitionOrError(tp.topicPartition) match { case Right(partition) => - shouldReadFromUnifiedLog = shouldReadFromUnifiedLog || - partition.log.exists(fetchPartitionData.fetchOffset < _.logEndOffset) + val logEndOffset = partition.log.map(_.logEndOffset).getOrElse(0L) + if (fetchPartitionData.fetchOffset < logEndOffset) { + // Local log has data for this offset range — serve from local, track for diskless supplement + shouldReadFromUnifiedLog = true + if (inklessSharedState.isDefined) + consolidatingLocalFetchSupplements += (tp -> logEndOffset) + } + // else: consumer is at or beyond consolidation frontier, diskless-only case Left(error) => warn(s"Error while fetching partition ${tp.topicPartition()} for consolidating diskless topic: $error. " + s"Returning error for the fetch request since we cannot determine if the partition has switched to diskless or not.") @@ -2138,8 +2224,11 @@ class ReplicaManager(val config: KafkaConfig, inklessSharedState match { case None => - if (disklessFetchInfos.nonEmpty) { - error(s"Received diskless fetch request for topics ${disklessFetchInfos.map(_._1.topic()).distinct.mkString(", ")} but diskless storage system is not enabled. " + + if (disklessFetchInfos.nonEmpty || consolidatingLocalFetchSupplements.nonEmpty) { + val disklessTopics = disklessFetchInfos.map(_._1.topic()).distinct + val consolidatingTopics = consolidatingLocalFetchSupplements.keys.map(_.topic()).toSeq.distinct + val allTopics = (disklessTopics ++ consolidatingTopics).distinct + error(s"Received diskless fetch request for topics ${allTopics.mkString(", ")} but diskless storage system is not enabled. " + s"Replying an empty response.") respond(Seq.empty) return @@ -2239,9 +2328,55 @@ class ReplicaManager(val config: KafkaConfig, logReadResultMap.put(topicIdPartition, logReadResult) } + // For consolidating partitions where local log was read, supplement with diskless data if minBytes not satisfied. + // Only runs when there are no pure-diskless partitions in the request: if disklessFetchInfos is non-empty the + // request will park in DelayedFetch regardless, where the supplement runs concurrently with the diskless fetch. + // Running it here as well would block for disklessFetchMaxWaitMs and then discard the result. + var consolidationSupplementData = Map.empty[TopicIdPartition, FetchPartitionData] + if (consolidatingLocalFetchSupplements.nonEmpty && + disklessFetchInfos.isEmpty && + !params.isFromFollower && // safeguard: followers must not receive diskless records merged into local-log data + params.maxWaitMs > 0 && // safeguard: non-blocking polls must not be held by a diskless round-trip + !hasPreferredReadReplica && + bytesReadable < params.minBytes && + !errorReadingData) { + val supplementFetchInfos = buildConsolidationSupplementFetchInfos(consolidatingLocalFetchSupplements, fetchInfos, logReadResultMap) + + if (supplementFetchInfos.nonEmpty) { + try { + val supplementParams = fetchParamsWithNewMaxBytes(params, supplementFetchInfos.size.toFloat / fetchInfos.size.toFloat) + // Future not cancelled on failure — diskless reads are idempotent and hold no resources. + consolidationSupplementData = fetchDisklessMessages(supplementParams, supplementFetchInfos) + .get(Math.max(config.disklessFetchMaxWaitMs.toLong, params.maxWaitMs), TimeUnit.MILLISECONDS) + .toMap + bytesReadable += consolidationSupplementData.values + .filter(_.error == Errors.NONE).map(_.records.sizeInBytes).sum + } catch { + case e: InterruptedException => + Thread.currentThread().interrupt() + logger.warn("Interrupted while fetching diskless supplement for consolidating partitions, returning local data only", e) + case e: java.util.concurrent.TimeoutException => + logger.warn("Timed out fetching diskless supplement for consolidating partitions, returning local data only", e) + case e: Throwable => + logger.warn("Failed to fetch diskless supplement for consolidating partitions, returning local data only", e) + } + } + } + val fetchPartitionData = logReadResults.map { case (tp, result) => val isReassignmentFetch = params.isFromFollower && isAddingReplica(tp.topicPartition, params.replicaId) - tp -> result.toFetchPartitionData(isReassignmentFetch) + val localData = result.toFetchPartitionData(isReassignmentFetch) + val mergedData = consolidationSupplementData.get(tp) match { + case Some(supplementData) if supplementData.error == Errors.NONE && supplementData.records.sizeInBytes > 0 => + try mergeConsolidationSupplement(tp, localData, supplementData) + catch { + case e: Exception => + logger.warn(s"Failed to merge diskless supplement for consolidating partition $tp, returning local data only", e) + localData + } + case _ => localData + } + tp -> mergedData } // Respond immediately if no remote fetches are required and any of the below conditions is true diff --git a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala index fa6c71dff2..bb743fe7f5 100644 --- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala +++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala @@ -17,9 +17,11 @@ package kafka.server import io.aiven.inkless.control_plane.{BatchInfo, BatchMetadata, FindBatchRequest, FindBatchResponse} +import kafka.server.metadata.InklessMetadataView +import kafka.utils.TestUtils -import java.util.{Collections, Optional, OptionalLong} -import scala.collection.Seq +import java.util.{Collections, Optional, OptionalInt, OptionalLong} +import scala.collection.{Seq, mutable} import kafka.cluster.Partition import org.apache.kafka.common.{TopicIdPartition, Uuid} import org.apache.kafka.common.errors.{FencedLeaderEpochException, NotLeaderOrFollowerException} @@ -28,13 +30,13 @@ import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.{MemoryRecords, TimestampType} import org.apache.kafka.common.requests.FetchRequest import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, FetchPartitionData} -import org.apache.kafka.storage.internals.log.{FetchDataInfo, FetchPartitionStatus, LogOffsetMetadata, LogOffsetSnapshot, LogReadResult} -import org.junit.jupiter.api.{Nested, Test} +import org.apache.kafka.storage.internals.log.{FetchDataInfo, FetchPartitionStatus, LogOffsetMetadata, LogOffsetSnapshot, LogReadResult, UnifiedLog} +import org.junit.jupiter.api.{BeforeEach, Nested, Test} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource -import org.mockito.ArgumentMatchers.{any, anyFloat, anyInt, anyLong} -import org.mockito.Mockito.{mock, never, verify, when} +import org.mockito.ArgumentMatchers.{any, anyFloat, anyInt, anyLong, argThat} +import org.mockito.Mockito.{mock, never, times, verify, when} import java.util.concurrent.CompletableFuture @@ -44,6 +46,14 @@ class DelayedFetchTest { private val maxBytes = 1024 private val replicaManager: ReplicaManager = mock(classOf[ReplicaManager]) private val replicaQuota: ReplicaQuota = mock(classOf[ReplicaQuota]) + private val inklessMetadataView: InklessMetadataView = mock(classOf[InklessMetadataView]) + + @BeforeEach + def setUp(): Unit = { + when(replicaManager.inklessMetadataView()).thenReturn(inklessMetadataView) + when(inklessMetadataView.isConsolidatingDisklessTopic(any())).thenReturn(false) + when(replicaManager.buildConsolidationSupplementFetchInfos(any(), any(), any())).thenReturn(Seq.empty) + } @Test def testFetchWithFencedEpoch(): Unit = { @@ -642,6 +652,244 @@ class DelayedFetchTest { verify(mockResponse).estimatedByteSize(fetchOffset) } + @Test + def testCompletionWithConsolidationSupplement(): Unit = { + // Verifies that a consolidating partition's local data is supplemented with diskless data + // and that a pure-diskless partition's data also arrives in the response. + val consolidatingTp = new TopicIdPartition(Uuid.randomUuid(), 0, "consolidating-topic") + val disklessTp = new TopicIdPartition(Uuid.randomUuid(), 0, "diskless-topic") + val fetchOffset = 100L + val logEndOffset = 150L + val logStartOffset = 0L + val currentLeaderEpoch = Optional.of[Integer](1) + val minBytes = 1024 // high enough that local data alone won't satisfy it + + // Classic (consolidating) partition status + val classicFetchStatus = new FetchPartitionStatus( + new LogOffsetMetadata(fetchOffset), + new FetchRequest.PartitionData(consolidatingTp.topicId(), fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch) + ) + val classicStatusMap = new util.LinkedHashMap[TopicIdPartition, FetchPartitionStatus]() + classicStatusMap.put(consolidatingTp, classicFetchStatus) + + // Diskless partition status + val disklessFetchStatus = new FetchPartitionStatus( + new LogOffsetMetadata(fetchOffset), + new FetchRequest.PartitionData(disklessTp.topicId(), fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch) + ) + val disklessStatusMap = new util.LinkedHashMap[TopicIdPartition, FetchPartitionStatus]() + disklessStatusMap.put(disklessTp, disklessFetchStatus) + + val fetchParams = new FetchParams( + -1, // consumer + 1, + 500L, + minBytes, + maxBytes, + FetchIsolation.HIGH_WATERMARK, + Optional.empty() + ) + + @volatile var callbackResult: Option[Seq[(TopicIdPartition, FetchPartitionData)]] = None + def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { + callbackResult = Some(responses) + } + + val delayedFetch = new DelayedFetch( + params = fetchParams, + classicFetchPartitionStatus = classicStatusMap, + disklessFetchPartitionStatus = disklessStatusMap, + replicaManager = replicaManager, + quota = replicaQuota, + responseCallback = callback + ) + + // Stub inklessMetadataView for the consolidating partition + when(inklessMetadataView.isConsolidatingDisklessTopic(consolidatingTp.topic)).thenReturn(true) + + // Stub getPartitionOrError to return a partition with a log + val partition = mock(classOf[Partition]) + val unifiedLog = mock(classOf[UnifiedLog]) + when(unifiedLog.logEndOffset).thenReturn(logEndOffset) + when(partition.log).thenReturn(Some(unifiedLog)) + when(replicaManager.getPartitionOrError(consolidatingTp.topicPartition)) + .thenReturn(Right(partition)) + + // Stub readFromLog to return a small local read (below minBytes) + val localReadResult = new LogReadResult( + new FetchDataInfo(new LogOffsetMetadata(fetchOffset), MemoryRecords.EMPTY), + Optional.empty(), -1L, -1L, -1L, -1L, -1L, OptionalLong.empty(), Errors.NONE) + when(replicaManager.readFromLog(any[FetchParams], any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]], any[ReplicaQuota], any[Boolean])) + .thenReturn(Seq((consolidatingTp, localReadResult))) + + // Stub buildConsolidationSupplementFetchInfos to return non-empty supplement info + val supplementFetchInfo = (consolidatingTp, new FetchRequest.PartitionData( + consolidatingTp.topicId(), logEndOffset, logStartOffset, maxBytes, currentLeaderEpoch)) + when(replicaManager.buildConsolidationSupplementFetchInfos(any(), any(), any())) + .thenReturn(Seq(supplementFetchInfo)) + + // Prepare supplement and diskless FetchPartitionData + val supplementRecords = mock(classOf[MemoryRecords]) + when(supplementRecords.sizeInBytes).thenReturn(512) + val supplementData = new FetchPartitionData(Errors.NONE, logEndOffset + 100, logEndOffset, + supplementRecords, Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false) + + val disklessRecords = mock(classOf[MemoryRecords]) + when(disklessRecords.sizeInBytes).thenReturn(256) + val disklessData = new FetchPartitionData(Errors.NONE, 200L, 0L, + disklessRecords, Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false) + + // Stub fetchDisklessMessages: first call returns supplement, second call returns diskless + when(replicaManager.fetchDisklessMessages(any[FetchParams], any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]])) + .thenReturn(CompletableFuture.completedFuture(Seq((consolidatingTp, supplementData)))) + .thenReturn(CompletableFuture.completedFuture(Seq((disklessTp, disklessData)))) + + // Stub fetchParamsWithNewMaxBytes + when(replicaManager.fetchParamsWithNewMaxBytes(any[FetchParams], anyFloat())).thenAnswer(_.getArgument(0)) + + // Stub mergeConsolidationSupplement to return a merged result + val mergedRecords = mock(classOf[MemoryRecords]) + when(mergedRecords.sizeInBytes).thenReturn(768) + val mergedData = new FetchPartitionData(Errors.NONE, logEndOffset + 100, logEndOffset, + mergedRecords, Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false) + when(replicaManager.mergeConsolidationSupplement(any[TopicIdPartition], any[FetchPartitionData], any[FetchPartitionData])) + .thenReturn(mergedData) + + when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false) + + // Trigger onComplete + delayedFetch.forceComplete() + + TestUtils.waitUntilTrue(() => callbackResult.isDefined, "responseCallback should have been called") + val results = callbackResult.get + assertEquals(2, results.size) + + val resultMap = results.toMap + assertEquals(mergedData, resultMap(consolidatingTp)) + assertEquals(disklessData, resultMap(disklessTp)) + + // Verify mergeConsolidationSupplement was called exactly once + verify(replicaManager, times(1)).mergeConsolidationSupplement( + any[TopicIdPartition], any[FetchPartitionData], any[FetchPartitionData]) + } + + // When the local read for a consolidating partition returns an error, the supplement fetch + // must not be issued — buildConsolidationSupplementFetchInfos must not receive that partition. + @Test + def testConsolidationSupplementNotIssuedWhenLocalReadHasError(): Unit = { + val consolidatingTp = new TopicIdPartition(Uuid.randomUuid(), 0, "consolidating") + val fetchOffset = 50L + val logStartOffset = 0L + val logEndOffset = 100L + val currentLeaderEpoch = Optional.of[Integer](1) + + val classicFetchStatus = new FetchPartitionStatus( + new LogOffsetMetadata(fetchOffset), + new FetchRequest.PartitionData(consolidatingTp.topicId(), fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) + val classicStatusMap = new util.LinkedHashMap[TopicIdPartition, FetchPartitionStatus]() + classicStatusMap.put(consolidatingTp, classicFetchStatus) + + val fetchParams = new FetchParams(-1, 1, 500L, 1, maxBytes, FetchIsolation.HIGH_WATERMARK, Optional.empty()) + + @volatile var callbackResult: Option[Seq[(TopicIdPartition, FetchPartitionData)]] = None + val delayedFetch = new DelayedFetch( + params = fetchParams, + classicFetchPartitionStatus = classicStatusMap, + replicaManager = replicaManager, + quota = replicaQuota, + responseCallback = responses => callbackResult = Some(responses) + ) + + when(inklessMetadataView.isConsolidatingDisklessTopic(consolidatingTp.topic)).thenReturn(true) + val partition = mock(classOf[Partition]) + val unifiedLog = mock(classOf[UnifiedLog]) + when(unifiedLog.logEndOffset).thenReturn(logEndOffset) + when(partition.log).thenReturn(Some(unifiedLog)) + when(replicaManager.getPartitionOrError(consolidatingTp.topicPartition)).thenReturn(Right(partition)) + + // Local read returns an error — supplement must not be issued for this partition + val errorReadResult = new LogReadResult( + new FetchDataInfo(new LogOffsetMetadata(fetchOffset), MemoryRecords.EMPTY), + Optional.empty(), -1L, -1L, -1L, -1L, -1L, OptionalLong.empty(), Errors.NOT_LEADER_OR_FOLLOWER) + when(replicaManager.readFromLog(any[FetchParams], any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]], any[ReplicaQuota], any[Boolean])) + .thenReturn(Seq((consolidatingTp, errorReadResult))) + when(replicaManager.fetchParamsWithNewMaxBytes(any[FetchParams], anyFloat())).thenAnswer(_.getArgument(0)) + when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false) + + delayedFetch.forceComplete() + + TestUtils.waitUntilTrue(() => callbackResult.isDefined, "responseCallback should have been called") + // buildConsolidationSupplementFetchInfos must be called with an empty supplements map — + // the erroring partition must have been filtered out before building the supplement request + verify(replicaManager, times(1)).buildConsolidationSupplementFetchInfos( + argThat((m: mutable.HashMap[TopicIdPartition, Long]) => m.isEmpty), any(), any()) + } + + // When the local read for a consolidating partition returns an error, the supplement must not + // be applied — mergeConsolidationSupplement must never be called for an erroring local result. + @Test + def testConsolidationSupplementSkippedWhenLocalReadHasError(): Unit = { + val consolidatingTp = new TopicIdPartition(Uuid.randomUuid(), 0, "consolidating") + val fetchOffset = 50L + val logStartOffset = 0L + val logEndOffset = 100L + val currentLeaderEpoch = Optional.of[Integer](1) + + val classicFetchStatus = new FetchPartitionStatus( + new LogOffsetMetadata(fetchOffset), + new FetchRequest.PartitionData(consolidatingTp.topicId(), fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) + val classicStatusMap = new util.LinkedHashMap[TopicIdPartition, FetchPartitionStatus]() + classicStatusMap.put(consolidatingTp, classicFetchStatus) + + val fetchParams = new FetchParams(-1, 1, 500L, 1, maxBytes, FetchIsolation.HIGH_WATERMARK, Optional.empty()) + + @volatile var callbackResult: Option[Seq[(TopicIdPartition, FetchPartitionData)]] = None + val delayedFetch = new DelayedFetch( + params = fetchParams, + classicFetchPartitionStatus = classicStatusMap, + replicaManager = replicaManager, + quota = replicaQuota, + responseCallback = responses => callbackResult = Some(responses) + ) + + when(inklessMetadataView.isConsolidatingDisklessTopic(consolidatingTp.topic)).thenReturn(true) + val partition = mock(classOf[Partition]) + val unifiedLog = mock(classOf[UnifiedLog]) + when(unifiedLog.logEndOffset).thenReturn(logEndOffset) + when(partition.log).thenReturn(Some(unifiedLog)) + when(replicaManager.getPartitionOrError(consolidatingTp.topicPartition)).thenReturn(Right(partition)) + + // Local read returns an error + val errorReadResult = new LogReadResult( + new FetchDataInfo(new LogOffsetMetadata(fetchOffset), MemoryRecords.EMPTY), + Optional.empty(), -1L, -1L, -1L, -1L, -1L, OptionalLong.empty(), Errors.NOT_LEADER_OR_FOLLOWER) + when(replicaManager.readFromLog(any[FetchParams], any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]], any[ReplicaQuota], any[Boolean])) + .thenReturn(Seq((consolidatingTp, errorReadResult))) + + val supplementFetchInfo = (consolidatingTp, new FetchRequest.PartitionData( + consolidatingTp.topicId(), logEndOffset, logStartOffset, maxBytes, currentLeaderEpoch)) + when(replicaManager.buildConsolidationSupplementFetchInfos(any(), any(), any())) + .thenReturn(Seq(supplementFetchInfo)) + + val supplementRecords = mock(classOf[MemoryRecords]) + when(supplementRecords.sizeInBytes).thenReturn(512) + val supplementData = new FetchPartitionData(Errors.NONE, logEndOffset + 100, logEndOffset, + supplementRecords, Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false) + when(replicaManager.fetchDisklessMessages(any[FetchParams], any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]])) + .thenReturn(CompletableFuture.completedFuture(Seq((consolidatingTp, supplementData)))) + when(replicaManager.fetchParamsWithNewMaxBytes(any[FetchParams], anyFloat())).thenAnswer(_.getArgument(0)) + when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false) + + delayedFetch.forceComplete() + + TestUtils.waitUntilTrue(() => callbackResult.isDefined, "responseCallback should have been called") + val results = callbackResult.get.toMap + assertEquals(1, results.size) + // Error from the local read must be preserved — supplement must not overwrite it + assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, results(consolidatingTp).error) + verify(replicaManager, never()).mergeConsolidationSupplement(any(), any(), any()) + } + @Test def testCompletionWhenErrorOccursDuringDisklessBatchFinding(): Unit = { // Case C: When an error occurs while trying to find diskless batches, fetch should complete immediately @@ -729,5 +977,54 @@ class DelayedFetchTest { verify(mockResponse, never()).estimatedByteSize(anyLong()) verify(mockResponse, never()).highWatermark() } + + // When the diskless fetch future fails, the response must include per-partition error entries + // rather than silently dropping those partitions from the response. + @Test + def testDisklessFetchFailureReturnsPerPartitionErrors(): Unit = { + val disklessTp = new TopicIdPartition(Uuid.randomUuid(), 0, "diskless-topic") + val fetchOffset = 200L + val logStartOffset = 0L + val currentLeaderEpoch = Optional.of[Integer](1) + val minBytes = 100 + + val disklessFetchStatus = new FetchPartitionStatus( + new LogOffsetMetadata(fetchOffset), + new FetchRequest.PartitionData(disklessTp.topicId(), fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) + val disklessStatusMap = new util.LinkedHashMap[TopicIdPartition, FetchPartitionStatus]() + disklessStatusMap.put(disklessTp, disklessFetchStatus) + + val fetchParams = new FetchParams(-1, 1, 500L, minBytes, maxBytes, FetchIsolation.HIGH_WATERMARK, Optional.empty()) + + @volatile var callbackResult: Option[Seq[(TopicIdPartition, FetchPartitionData)]] = None + val delayedFetch = new DelayedFetch( + params = fetchParams, + classicFetchPartitionStatus = new util.LinkedHashMap[TopicIdPartition, FetchPartitionStatus](), + disklessFetchPartitionStatus = disklessStatusMap, + replicaManager = replicaManager, + quota = replicaQuota, + responseCallback = responses => callbackResult = Some(responses) + ) + + when(replicaManager.readFromLog(any[FetchParams], any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]], any[ReplicaQuota], any[Boolean])) + .thenReturn(Seq.empty) + when(replicaManager.fetchParamsWithNewMaxBytes(any[FetchParams], anyFloat())).thenAnswer(_.getArgument(0)) + when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false) + + // Diskless fetch fails with an exception + val failedFuture = new CompletableFuture[Seq[(TopicIdPartition, FetchPartitionData)]]() + failedFuture.completeExceptionally(new RuntimeException("Object storage unavailable")) + when(replicaManager.fetchDisklessMessages(any[FetchParams], any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]])) + .thenReturn(failedFuture) + + delayedFetch.forceComplete() + + TestUtils.waitUntilTrue(() => callbackResult.isDefined, "responseCallback should have been called") + val results = callbackResult.get.toMap + assertEquals(1, results.size, "Response must include the diskless partition even on failure") + val partitionData = results(disklessTp) + assertNotEquals(Errors.NONE, partitionData.error, "Error must be set for the failed partition") + assertEquals(MemoryRecords.EMPTY, partitionData.records) + } } } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerInklessTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerInklessTest.scala index 23e026fea2..15b41fd5ff 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerInklessTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerInklessTest.scala @@ -20,7 +20,7 @@ package kafka.server import io.aiven.inkless.common.SharedState import io.aiven.inkless.config.InklessConfig import io.aiven.inkless.consolidation.{ConsolidatedDisklessLogPruner, ConsolidationFetcherManager} -import io.aiven.inkless.consume.{FetchHandler, FetchOffsetHandler} +import io.aiven.inkless.consume.{ConcatenatedRecords, FetchHandler, FetchOffsetHandler} import io.aiven.inkless.control_plane.{BatchInfo, BatchMetadata, ControlPlane, ControlPlaneException, FindBatchResponse, DeleteRecordsResponse => CpDeleteRecordsResponse} import io.aiven.inkless.produce.AppendHandler import kafka.cluster.Partition @@ -73,7 +73,7 @@ import java.util import java.util.{Collections, Optional, OptionalInt, OptionalLong, Properties} import java.util.concurrent.{CompletableFuture, CountDownLatch, TimeUnit} import java.util.function.Consumer -import scala.collection.{Map, Seq} +import scala.collection.{Map, Seq, mutable} import scala.jdk.CollectionConverters._ class ReplicaManagerInklessTest { @@ -100,6 +100,21 @@ class ReplicaManagerInklessTest { val RECORDS: MemoryRecords = MemoryRecords.withRecords( 2.toByte, 0L, Compression.NONE, TimestampType.CREATE_TIME, 123L, 0.toShort, 0, 0, false, new SimpleRecord(0, "hello".getBytes()) ) + + // Real local-log reads return FileRecords, not MemoryRecords. Use this to match production types. + private def memoryRecordsToFileRecords(records: MemoryRecords): FileRecords = { + val file = TestUtils.tempFile() + val fileRecords = FileRecords.open(file) + try { + fileRecords.append(records) + fileRecords.flush() + } catch { + case e: Throwable => + fileRecords.close() + throw e + } + fileRecords + } val disklessTopicPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "diskless") val classicTopicPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "classic") @@ -1167,6 +1182,8 @@ class ReplicaManagerInklessTest { } } + + // When local log has data but below minBytes and diskless data is available, the fetch must // respond immediately (merged local+diskless) without parking in the delayed-fetch purgatory. @Test @@ -1193,6 +1210,7 @@ class ReplicaManagerInklessTest { consolidatingDisklessTopics = Set(disklessTopicPartition.topic()), delayedFetchPurgatory = Some(fetchPurgatory), )) + var localFileRecords: FileRecords = null try { when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(disklessTopicPartition.topicPartition())) .thenReturn(100L) @@ -1210,11 +1228,11 @@ class ReplicaManagerInklessTest { doReturn(mockPartition).when(replicaManager) .getPartitionOrException(ArgumentMatchers.eq(disklessTopicPartition.topicPartition())) - val localFileRecords = memoryRecordsToFileRecords(RECORDS) + localFileRecords = memoryRecordsToFileRecords(RECORDS) doReturn(Seq(disklessTopicPartition -> new LogReadResult( new FetchDataInfo(new LogOffsetMetadata(50L, 0L, 0), localFileRecords), - Optional.empty(), 100L, 0L, 100L, 0L, 0L, OptionalLong.empty() + Optional.empty(), 100L, 0L, 100L, 0L, 0L, OptionalLong.empty(), Errors.NONE )) ).when(replicaManager).readFromLog(any(), any(), any(), any()) @@ -1253,6 +1271,7 @@ class ReplicaManagerInklessTest { } finally { replicaManager.shutdown(checkpointHW = false) fetchHandlerCtor.close() + if (localFileRecords != null) localFileRecords.close() } } @@ -1280,6 +1299,7 @@ class ReplicaManagerInklessTest { consolidatingDisklessTopics = Set(disklessTopicPartition.topic()), delayedFetchPurgatory = Some(fetchPurgatory), )) + var localFileRecords: FileRecords = null try { when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(disklessTopicPartition.topicPartition())) .thenReturn(100L) @@ -1297,11 +1317,11 @@ class ReplicaManagerInklessTest { doReturn(mockPartition).when(replicaManager) .getPartitionOrException(ArgumentMatchers.eq(disklessTopicPartition.topicPartition())) - val localFileRecords = memoryRecordsToFileRecords(RECORDS) + localFileRecords = memoryRecordsToFileRecords(RECORDS) doReturn(Seq(disklessTopicPartition -> new LogReadResult( new FetchDataInfo(new LogOffsetMetadata(50L, 0L, 0), localFileRecords), - Optional.empty(), 100L, 0L, 100L, 0L, 0L, OptionalLong.empty() + Optional.empty(), 100L, 0L, 100L, 0L, 0L, OptionalLong.empty(), Errors.NONE )) ).when(replicaManager).readFromLog(any(), any(), any(), any()) @@ -1336,6 +1356,549 @@ class ReplicaManagerInklessTest { } finally { replicaManager.shutdown(checkpointHW = false) fetchHandlerCtor.close() + if (localFileRecords != null) localFileRecords.close() + } + } + + @Test + def testFetchConsolidatingDisklessSupplementsFromDisklessWhenLocalLogBelowMinBytes(): Unit = { + val supplementRecords = MemoryRecords.withRecords( + 2.toByte, 100L, Compression.NONE, TimestampType.CREATE_TIME, 456L, 0.toShort, 0, 0, false, new SimpleRecord(0, "supplement".getBytes()) + ) + var localFileRecords: FileRecords = null + val disklessResponse = Map(disklessTopicPartition -> + new FetchPartitionData( + Errors.NONE, + 500L, 0L, + supplementRecords, + Optional.empty(), OptionalLong.of(500L), Optional.empty(), OptionalInt.empty(), false) + ) + val fetchHandlerCtor = mockFetchHandler(disklessResponse) + val cp = mock(classOf[ControlPlane]) + val replicaManager = spy(createReplicaManager( + List(disklessTopicPartition.topic()), + controlPlane = Some(cp), + disklessManagedReplicasEnabled = true, + disklessRemoteStorageConsolidationEnabled = true, + consolidatingDisklessTopics = Set(disklessTopicPartition.topic()), + )) + try { + when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(disklessTopicPartition.topicPartition())) + .thenReturn(100L) + stubConsolidatingPartitionWithLocalLeo(replicaManager, localLeo = 100L) + + // Return small local records (below minBytes threshold) AS FileRecords, matching what a + // real local-log read produces. Using FileRecords here (not MemoryRecords) exercises the + // production FileRecords->MemoryRecords conversion in the supplement merge path; a + // MemoryRecords stub would vacuously pass while the real path throws ClassCastException. + localFileRecords = memoryRecordsToFileRecords(RECORDS) + doReturn(Seq(disklessTopicPartition -> + new LogReadResult( + new FetchDataInfo(new LogOffsetMetadata(50L, 0L, 0), localFileRecords), + Optional.empty(), 100L, 0L, 100L, 0L, 0L, OptionalLong.empty(), Errors.NONE + )) + ).when(replicaManager).readFromLog(any(), any(), any(), any()) + + val fetchParams = new FetchParams( + -1, -1L, + 5000L, + RECORDS.sizeInBytes + 1, // minBytes > local records size to trigger supplement + 1024 * 1024, + FetchIsolation.HIGH_WATERMARK, Optional.empty() + ) + val fetchInfos = Seq( + disklessTopicPartition -> new PartitionData(disklessTopicPartition.topicId(), 50L, 0L, 1024, Optional.empty()) + ) + + @volatile var responseData: Map[TopicIdPartition, FetchPartitionData] = null + val responseCallback = (response: Seq[(TopicIdPartition, FetchPartitionData)]) => { + responseData = response.toMap + } + replicaManager.fetchMessages(fetchParams, fetchInfos, QuotaFactory.UNBOUNDED_QUOTA, responseCallback) + + waitForFetchResponse(responseData) + assertEquals(1, responseData.size) + val result = responseData(disklessTopicPartition) + assertEquals(Errors.NONE, result.error) + // HW/LSO should come from the diskless supplement response + assertEquals(500L, result.highWatermark) + // Records should be merged: local + diskless + assertTrue(result.records.sizeInBytes > RECORDS.sizeInBytes, + s"Expected merged records to be larger than local-only, got ${result.records.sizeInBytes}") + // Both local read and diskless fetch handler should have been called + verify(replicaManager, times(1)).readFromLog(any(), any(), any(), any()) + verify(fetchHandlerCtor.constructed().get(0), times(1)).handle(any(), any()) + } finally { + replicaManager.shutdown(checkpointHW = false) + fetchHandlerCtor.close() + if (localFileRecords != null) localFileRecords.close() + } + } + + // When the supplement returns data with an error, those bytes must NOT count toward minBytes + // satisfaction — the response must park in the purgatory, not respond prematurely. + @Test + def testFetchConsolidatingSupplementWithErrorDoesNotInflateBytesReadable(): Unit = { + val supplementRecords = MemoryRecords.withRecords( + 2.toByte, 100L, Compression.NONE, TimestampType.CREATE_TIME, 456L, 0.toShort, 0, 0, false, new SimpleRecord(0, "error-data".getBytes()) + ) + // Supplement returns records but with an error — bytes must not be counted + val disklessResponse = Map(disklessTopicPartition -> + new FetchPartitionData( + Errors.KAFKA_STORAGE_ERROR, + 500L, 0L, + supplementRecords, + Optional.empty(), OptionalLong.of(500L), Optional.empty(), OptionalInt.empty(), false) + ) + val fetchHandlerCtor = mockFetchHandler(disklessResponse) + val cp = mock(classOf[ControlPlane]) + val timer = new MockTimer(time) + val fetchPurgatory = new DelayedOperationPurgatory[DelayedFetch]("Fetch", timer, 0, false) + val replicaManager = spy(createReplicaManager( + List(disklessTopicPartition.topic()), + controlPlane = Some(cp), + disklessManagedReplicasEnabled = true, + disklessRemoteStorageConsolidationEnabled = true, + consolidatingDisklessTopics = Set(disklessTopicPartition.topic()), + delayedFetchPurgatory = Some(fetchPurgatory), + )) + try { + when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(disklessTopicPartition.topicPartition())) + .thenReturn(100L) + val mockPartition = mock(classOf[Partition]) + val mockLog = mock(classOf[UnifiedLog]) + when(mockLog.logEndOffset).thenReturn(100L) + when(mockPartition.log).thenReturn(Some(mockLog)) + val endOffsetMetadata = new LogOffsetMetadata(100L, 0L, 0) + when(mockPartition.fetchOffsetSnapshot(any(), any())) + .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, endOffsetMetadata, endOffsetMetadata)) + doReturn(Right(mockPartition)).when(replicaManager) + .getPartitionOrError(ArgumentMatchers.eq(disklessTopicPartition.topicPartition())) + doReturn(mockPartition).when(replicaManager) + .getPartitionOrException(ArgumentMatchers.eq(disklessTopicPartition.topicPartition())) + + doReturn(Seq(disklessTopicPartition -> + new LogReadResult( + new FetchDataInfo(new LogOffsetMetadata(50L, 0L, 0), MemoryRecords.EMPTY), + Optional.empty(), 100L, 0L, 100L, 0L, 0L, OptionalLong.empty(), Errors.NONE + )) + ).when(replicaManager).readFromLog(any(), any(), any(), any()) + + val fetchParams = new FetchParams( + FetchRequest.ORDINARY_CONSUMER_ID, -1L, + 30000L, + supplementRecords.sizeInBytes + 1, // minBytes would be satisfied if error bytes were counted + 1024 * 1024, + FetchIsolation.HIGH_WATERMARK, Optional.empty() + ) + val fetchInfos = Seq( + disklessTopicPartition -> new PartitionData(disklessTopicPartition.topicId(), 50L, 0L, 1024, Optional.empty()) + ) + + @volatile var responseData: Map[TopicIdPartition, FetchPartitionData] = null + replicaManager.fetchMessages(fetchParams, fetchInfos, QuotaFactory.UNBOUNDED_QUOTA, + (response: Seq[(TopicIdPartition, FetchPartitionData)]) => responseData = response.toMap) + + // Error-bearing supplement bytes must NOT satisfy minBytes — request should park + assertEquals(1, fetchPurgatory.watched(), + "Fetch must park in purgatory when supplement has error — error bytes must not inflate bytesReadable") + assertNull(responseData) + } finally { + replicaManager.shutdown(checkpointHW = false) + fetchHandlerCtor.close() + } + } + + // When the supplement fetch is interrupted, the thread's interrupted status must be preserved + // so broker shutdown/request cancellation can proceed correctly. + @Test + def testFetchConsolidatingSupplementPreservesInterruptedStatus(): Unit = { + val fetchHandlerCtorMockInitializer: MockedConstruction.MockInitializer[FetchHandler] = { + case (mock, _) => + // Return a future that never completes — the test thread will be interrupted while waiting on .get() + val neverCompleteFuture = new java.util.concurrent.CompletableFuture[java.util.Map[TopicIdPartition, FetchPartitionData]]() + when(mock.handle(any(), any())).thenAnswer { _ => + // Interrupt the current thread so .get() throws InterruptedException + Thread.currentThread().interrupt() + neverCompleteFuture + } + } + val fetchHandlerCtor = mockConstruction(classOf[FetchHandler], fetchHandlerCtorMockInitializer) + val cp = mock(classOf[ControlPlane]) + val replicaManager = spy(createReplicaManager( + List(disklessTopicPartition.topic()), + controlPlane = Some(cp), + disklessManagedReplicasEnabled = true, + disklessRemoteStorageConsolidationEnabled = true, + consolidatingDisklessTopics = Set(disklessTopicPartition.topic()), + )) + try { + when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(disklessTopicPartition.topicPartition())) + .thenReturn(100L) + val mockPartition = mock(classOf[Partition]) + val mockLog = mock(classOf[UnifiedLog]) + when(mockLog.logEndOffset).thenReturn(100L) + when(mockPartition.log).thenReturn(Some(mockLog)) + val endOffsetMetadata = new LogOffsetMetadata(100L, 0L, 0) + when(mockPartition.fetchOffsetSnapshot(any(), any())) + .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, endOffsetMetadata, endOffsetMetadata)) + doReturn(Right(mockPartition)).when(replicaManager) + .getPartitionOrError(ArgumentMatchers.eq(disklessTopicPartition.topicPartition())) + doReturn(mockPartition).when(replicaManager) + .getPartitionOrException(ArgumentMatchers.eq(disklessTopicPartition.topicPartition())) + + doReturn(Seq(disklessTopicPartition -> + new LogReadResult( + new FetchDataInfo(new LogOffsetMetadata(50L, 0L, 0), RECORDS), + Optional.empty(), 100L, 0L, 100L, 0L, 0L, OptionalLong.empty(), Errors.NONE + )) + ).when(replicaManager).readFromLog(any(), any(), any(), any()) + + val fetchParams = new FetchParams( + FetchRequest.ORDINARY_CONSUMER_ID, -1L, + 5000L, + RECORDS.sizeInBytes + 1, // minBytes > local to trigger supplement + 1024 * 1024, + FetchIsolation.HIGH_WATERMARK, Optional.empty() + ) + val fetchInfos = Seq( + disklessTopicPartition -> new PartitionData(disklessTopicPartition.topicId(), 50L, 0L, 1024, Optional.empty()) + ) + + replicaManager.fetchMessages(fetchParams, fetchInfos, QuotaFactory.UNBOUNDED_QUOTA, + (_: Seq[(TopicIdPartition, FetchPartitionData)]) => {}) + + // The interrupted status must be restored on the current thread. + // fetchMessages returns synchronously (fires callback or parks) — either way the + // interrupt flag must be set after the supplement catch re-interrupts the thread. + assertTrue(Thread.interrupted(), "Thread interrupted status must be preserved after InterruptedException") + } finally { + // Clear any lingering interrupt so shutdown doesn't fail + Thread.interrupted() + replicaManager.shutdown(checkpointHW = false) + fetchHandlerCtor.close() + } + } + + // Mixed fetch: consolidating (local below minBytes) + pure-diskless + classic in one request. + // The consolidating partition gets a synchronous supplement; the pure-diskless partition goes + // through the normal delayed path; the classic partition is served from the local log. + // fetchDisklessMessages is called twice — once for the supplement, once for the delayed diskless fetch. + @Test + def testFetchMixedConsolidatingPureDisklessAndClassicPartitions(): Unit = { + val pureDisklessTp = new TopicIdPartition(Uuid.randomUuid(), 0, "pure-diskless") + val supplementRecords = MemoryRecords.withRecords( + 2.toByte, 100L, Compression.NONE, TimestampType.CREATE_TIME, 456L, 0.toShort, 0, 0, false, new SimpleRecord(0, "supplement".getBytes()) + ) + val disklessRecords = MemoryRecords.withRecords( + 2.toByte, 200L, Compression.NONE, TimestampType.CREATE_TIME, 789L, 0.toShort, 0, 0, false, new SimpleRecord(0, "diskless".getBytes()) + ) + // First call: supplement for consolidating partition. Second call: delayed fetch for pure-diskless. + val fetchHandlerCtorMockInitializer: MockedConstruction.MockInitializer[FetchHandler] = { + case (mock, _) => + when(mock.handle(any(), any())) + .thenReturn(CompletableFuture.completedFuture( + Map(disklessTopicPartition -> new FetchPartitionData( + Errors.NONE, 500L, 0L, supplementRecords, + Optional.empty(), OptionalLong.of(500L), Optional.empty(), OptionalInt.empty(), false) + ).asJava)) + .thenReturn(CompletableFuture.completedFuture( + Map(pureDisklessTp -> new FetchPartitionData( + Errors.NONE, 300L, 0L, disklessRecords, + Optional.empty(), OptionalLong.of(300L), Optional.empty(), OptionalInt.empty(), false) + ).asJava)) + } + val fetchHandlerCtor = mockConstruction(classOf[FetchHandler], fetchHandlerCtorMockInitializer) + + // findBatches is needed for DelayedFetch.tryComplete to fire for the pure-diskless partition. + val batchMetadata = mock(classOf[BatchMetadata]) + when(batchMetadata.topicIdPartition()).thenReturn(pureDisklessTp) + val batch = mock(classOf[BatchInfo]) + when(batch.metadata()).thenReturn(batchMetadata) + val findBatchResponse = mock(classOf[FindBatchResponse]) + when(findBatchResponse.batches()).thenReturn(util.List.of(batch)) + when(findBatchResponse.highWatermark()).thenReturn(300L) + when(findBatchResponse.estimatedByteSize(200L)).thenReturn(disklessRecords.sizeInBytes()) + when(findBatchResponse.errors()).thenReturn(Errors.NONE) + val cp = mock(classOf[ControlPlane]) + when(cp.findBatches(any(), any(), any())).thenReturn(util.List.of(findBatchResponse)) + + val replicaManager = spy(createReplicaManager( + List(disklessTopicPartition.topic(), pureDisklessTp.topic()), + controlPlane = Some(cp), + topicIdMapping = Map(pureDisklessTp.topic() -> pureDisklessTp.topicId()), + disklessManagedReplicasEnabled = true, + disklessRemoteStorageConsolidationEnabled = true, + consolidatingDisklessTopics = Set(disklessTopicPartition.topic()), + )) + var localFileRecords: FileRecords = null + try { + when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(disklessTopicPartition.topicPartition())) + .thenReturn(100L) + val consolidatingPartition = mock(classOf[Partition]) + val consolidatingLog = mock(classOf[UnifiedLog]) + when(consolidatingLog.logEndOffset).thenReturn(100L) + when(consolidatingPartition.log).thenReturn(Some(consolidatingLog)) + val consolidatingEndOffset = new LogOffsetMetadata(100L, 0L, 0) + when(consolidatingPartition.fetchOffsetSnapshot(any(), any())) + .thenReturn(new LogOffsetSnapshot(0L, consolidatingEndOffset, consolidatingEndOffset, consolidatingEndOffset)) + doReturn(Right(consolidatingPartition)).when(replicaManager) + .getPartitionOrError(ArgumentMatchers.eq(disklessTopicPartition.topicPartition())) + doReturn(consolidatingPartition).when(replicaManager) + .getPartitionOrException(ArgumentMatchers.eq(disklessTopicPartition.topicPartition())) + + localFileRecords = memoryRecordsToFileRecords(RECORDS) + doReturn(Seq( + disklessTopicPartition -> new LogReadResult( + new FetchDataInfo(new LogOffsetMetadata(50L, 0L, 0), localFileRecords), + Optional.empty(), 100L, 0L, 100L, 0L, 0L, OptionalLong.empty(), Errors.NONE + ), + classicTopicPartition -> new LogReadResult( + new FetchDataInfo(new LogOffsetMetadata(1L, 0L, 0), RECORDS), + Optional.empty(), 10L, 0L, 10L, 0L, 0L, OptionalLong.empty(), Errors.NONE + ) + )).when(replicaManager).readFromLog(any(), any(), any(), any()) + + val classicPartition = mock(classOf[Partition]) + val classicEndOffset = new LogOffsetMetadata(10L, 0L, RECORDS.sizeInBytes()) + when(classicPartition.fetchOffsetSnapshot(any(), any())) + .thenReturn(new LogOffsetSnapshot(0L, classicEndOffset, classicEndOffset, classicEndOffset)) + doReturn(classicPartition).when(replicaManager) + .getPartitionOrException(ArgumentMatchers.eq(classicTopicPartition.topicPartition())) + + val fetchParams = new FetchParams( + FetchRequest.ORDINARY_CONSUMER_ID, -1L, + 5000L, + RECORDS.sizeInBytes + 1, + 1024 * 1024, + FetchIsolation.HIGH_WATERMARK, Optional.empty() + ) + val fetchInfos = Seq( + disklessTopicPartition -> new PartitionData(disklessTopicPartition.topicId(), 50L, 0L, 1024, Optional.empty()), + pureDisklessTp -> new PartitionData(pureDisklessTp.topicId(), 200L, 0L, 1024, Optional.empty()), + classicTopicPartition -> new PartitionData(classicTopicPartition.topicId(), 1L, 0L, 1024, Optional.empty()), + ) + + @volatile var responseData: Map[TopicIdPartition, FetchPartitionData] = null + val responseCallback = (response: Seq[(TopicIdPartition, FetchPartitionData)]) => { + responseData = response.toMap + } + replicaManager.fetchMessages(fetchParams, fetchInfos, QuotaFactory.UNBOUNDED_QUOTA, responseCallback) + + waitForFetchResponse(responseData) + assertEquals(3, responseData.size) + + // Consolidating partition: merged local + supplement, HW from diskless + val consolidatingResult = responseData(disklessTopicPartition) + assertEquals(Errors.NONE, consolidatingResult.error) + assertEquals(500L, consolidatingResult.highWatermark) + assertTrue(consolidatingResult.records.sizeInBytes > RECORDS.sizeInBytes, + "Consolidating partition must have merged local+diskless records") + + // Pure-diskless partition: served from diskless handler via delayed path + val disklessResult = responseData(pureDisklessTp) + assertEquals(Errors.NONE, disklessResult.error) + assertEquals(300L, disklessResult.highWatermark) + + // Classic partition: served from local log + val classicResult = responseData(classicTopicPartition) + assertEquals(Errors.NONE, classicResult.error) + assertEquals(10L, classicResult.highWatermark) + assertEquals(RECORDS, classicResult.records) + + // Two diskless handler calls: one for the supplement, one for the delayed diskless fetch + verify(fetchHandlerCtor.constructed().get(0), times(2)).handle(any(), any()) + } finally { + replicaManager.shutdown(checkpointHW = false) + fetchHandlerCtor.close() + if (localFileRecords != null) localFileRecords.close() + } + } + + // --- buildConsolidationSupplementFetchInfos unit tests --- + + @Test + def testBuildConsolidationSupplementFetchInfosReturnsRequestStartingAtLogEndOffset(): Unit = { + val tp = disklessTopicPartition + val logEndOffset = 100L + val supplements = mutable.HashMap(tp -> logEndOffset) + val fetchInfos = Seq(tp -> new PartitionData(tp.topicId(), 50L, 0L, 1024, Optional.empty())) + val logReadResultMap = new util.HashMap[TopicIdPartition, LogReadResult]() + logReadResultMap.put(tp, new LogReadResult( + new FetchDataInfo(new LogOffsetMetadata(50L, 0L, 0), MemoryRecords.EMPTY), + Optional.empty(), 0L, 0L, 0L, 0L, 0L, OptionalLong.empty(), Errors.NONE + )) + + val replicaManager = createReplicaManager(List(tp.topic())) + try { + val result = replicaManager.buildConsolidationSupplementFetchInfos(supplements, fetchInfos, logReadResultMap) + assertEquals(1, result.size) + val (resultTp, partitionData) = result.head + assertEquals(tp, resultTp) + assertEquals(logEndOffset, partitionData.fetchOffset) + assertEquals(1024, partitionData.maxBytes) // no local bytes read, so full budget remains + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + + @Test + def testBuildConsolidationSupplementFetchInfosDeductsLocalBytesFromBudget(): Unit = { + val tp = disklessTopicPartition + val supplements = mutable.HashMap(tp -> 100L) + val fetchInfos = Seq(tp -> new PartitionData(tp.topicId(), 50L, 0L, 1024, Optional.empty())) + val localRecords = MemoryRecords.withRecords( + 2.toByte, 50L, Compression.NONE, TimestampType.CREATE_TIME, 123L, 0.toShort, 0, 0, false, new SimpleRecord(0, "local".getBytes()) + ) + val logReadResultMap = new util.HashMap[TopicIdPartition, LogReadResult]() + logReadResultMap.put(tp, new LogReadResult( + new FetchDataInfo(new LogOffsetMetadata(50L, 0L, 0), localRecords), + Optional.empty(), 0L, 0L, 0L, 0L, 0L, OptionalLong.empty(), Errors.NONE + )) + + val replicaManager = createReplicaManager(List(tp.topic())) + try { + val result = replicaManager.buildConsolidationSupplementFetchInfos(supplements, fetchInfos, logReadResultMap) + assertEquals(1, result.size) + val (_, partitionData) = result.head + assertEquals(1024 - localRecords.sizeInBytes, partitionData.maxBytes) + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + + @Test + def testBuildConsolidationSupplementFetchInfosDropsPartitionWhenBudgetExhausted(): Unit = { + val tp = disklessTopicPartition + val supplements = mutable.HashMap(tp -> 100L) + // Local read already consumed the full budget + val localRecords = MemoryRecords.withRecords( + 2.toByte, 50L, Compression.NONE, TimestampType.CREATE_TIME, 123L, 0.toShort, 0, 0, false, new SimpleRecord(0, "x".getBytes()) + ) + val fetchInfos = Seq(tp -> new PartitionData(tp.topicId(), 50L, 0L, localRecords.sizeInBytes, Optional.empty())) + val logReadResultMap = new util.HashMap[TopicIdPartition, LogReadResult]() + logReadResultMap.put(tp, new LogReadResult( + new FetchDataInfo(new LogOffsetMetadata(50L, 0L, 0), localRecords), + Optional.empty(), 0L, 0L, 0L, 0L, 0L, OptionalLong.empty(), Errors.NONE + )) + + val replicaManager = createReplicaManager(List(tp.topic())) + try { + val result = replicaManager.buildConsolidationSupplementFetchInfos(supplements, fetchInfos, logReadResultMap) + assertTrue(result.isEmpty, s"Expected no supplement when local bytes >= maxBytes, got $result") + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + + @Test + def testBuildConsolidationSupplementFetchInfosIgnoresPartitionsNotInFetchInfos(): Unit = { + val tp = disklessTopicPartition + val otherTp = new TopicIdPartition(Uuid.randomUuid(), 0, "other") + val supplements = mutable.HashMap(tp -> 100L, otherTp -> 200L) + val fetchInfos = Seq(tp -> new PartitionData(tp.topicId(), 50L, 0L, 1024, Optional.empty())) + val logReadResultMap = new util.HashMap[TopicIdPartition, LogReadResult]() + + val replicaManager = createReplicaManager(List(tp.topic())) + try { + val result = replicaManager.buildConsolidationSupplementFetchInfos(supplements, fetchInfos, logReadResultMap) + assertEquals(1, result.size) + assertEquals(tp, result.head._1) + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + + // --- mergeConsolidationSupplement unit tests --- + + @Test + def testMergeConsolidationSupplementCombinesMemoryRecords(): Unit = { + val localRecords = MemoryRecords.withRecords( + 2.toByte, 0L, Compression.NONE, TimestampType.CREATE_TIME, 123L, 0.toShort, 0, 0, false, new SimpleRecord(0, "local".getBytes()) + ) + val supplementRecords = MemoryRecords.withRecords( + 2.toByte, 100L, Compression.NONE, TimestampType.CREATE_TIME, 456L, 0.toShort, 0, 0, false, new SimpleRecord(0, "supplement".getBytes()) + ) + val localData = new FetchPartitionData(Errors.NONE, 100L, 0L, localRecords, + Optional.empty(), OptionalLong.of(80L), Optional.empty(), OptionalInt.empty(), false) + val supplementData = new FetchPartitionData(Errors.NONE, 500L, 0L, supplementRecords, + Optional.empty(), OptionalLong.of(500L), Optional.empty(), OptionalInt.empty(), false) + + val replicaManager = createReplicaManager(List(disklessTopicPartition.topic())) + try { + val result = replicaManager.mergeConsolidationSupplement(disklessTopicPartition, localData, supplementData) + assertEquals(Errors.NONE, result.error) + assertEquals(500L, result.highWatermark) + assertEquals(OptionalLong.of(500L), result.lastStableOffset) + assertTrue(result.records.sizeInBytes > localRecords.sizeInBytes, + "Merged records must be larger than local-only") + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + + @Test + def testMergeConsolidationSupplementMaterializesFileRecords(): Unit = { + val localRecords = MemoryRecords.withRecords( + 2.toByte, 0L, Compression.NONE, TimestampType.CREATE_TIME, 123L, 0.toShort, 0, 0, false, new SimpleRecord(0, "local".getBytes()) + ) + val localFileRecords = memoryRecordsToFileRecords(localRecords) + val supplementRecords = MemoryRecords.withRecords( + 2.toByte, 100L, Compression.NONE, TimestampType.CREATE_TIME, 456L, 0.toShort, 0, 0, false, new SimpleRecord(0, "supplement".getBytes()) + ) + val localData = new FetchPartitionData(Errors.NONE, 100L, 0L, localFileRecords, + Optional.empty(), OptionalLong.of(80L), Optional.empty(), OptionalInt.empty(), false) + val supplementData = new FetchPartitionData(Errors.NONE, 500L, 0L, supplementRecords, + Optional.empty(), OptionalLong.of(500L), Optional.empty(), OptionalInt.empty(), false) + + val replicaManager = createReplicaManager(List(disklessTopicPartition.topic())) + try { + val result = replicaManager.mergeConsolidationSupplement(disklessTopicPartition, localData, supplementData) + assertEquals(500L, result.highWatermark) + assertTrue(result.records.sizeInBytes > 0) + // Must not throw ClassCastException: FileRecords must have been converted before ConcatenatedRecords + assertInstanceOf(classOf[ConcatenatedRecords], result.records) + } finally { + replicaManager.shutdown(checkpointHW = false) + if (localFileRecords != null) localFileRecords.close() + } + } + + @Test + def testMergeConsolidationSupplementReturnsLocalDataOnUnknownLocalRecordsType(): Unit = { + val localData = new FetchPartitionData(Errors.NONE, 100L, 0L, mock(classOf[org.apache.kafka.common.record.Records]), + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false) + val supplementRecords = MemoryRecords.withRecords( + 2.toByte, 100L, Compression.NONE, TimestampType.CREATE_TIME, 456L, 0.toShort, 0, 0, false, new SimpleRecord(0, "s".getBytes()) + ) + val supplementData = new FetchPartitionData(Errors.NONE, 500L, 0L, supplementRecords, + Optional.empty(), OptionalLong.of(500L), Optional.empty(), OptionalInt.empty(), false) + + val replicaManager = createReplicaManager(List(disklessTopicPartition.topic())) + try { + val result = replicaManager.mergeConsolidationSupplement(disklessTopicPartition, localData, supplementData) + assertSame(localData, result) + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + + @Test + def testMergeConsolidationSupplementReturnsLocalDataOnUnknownSupplementRecordsType(): Unit = { + val localRecords = MemoryRecords.withRecords( + 2.toByte, 0L, Compression.NONE, TimestampType.CREATE_TIME, 123L, 0.toShort, 0, 0, false, new SimpleRecord(0, "local".getBytes()) + ) + val localData = new FetchPartitionData(Errors.NONE, 100L, 0L, localRecords, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false) + val supplementData = new FetchPartitionData(Errors.NONE, 500L, 0L, mock(classOf[org.apache.kafka.common.record.Records]), + Optional.empty(), OptionalLong.of(500L), Optional.empty(), OptionalInt.empty(), false) + + val replicaManager = createReplicaManager(List(disklessTopicPartition.topic())) + try { + val result = replicaManager.mergeConsolidationSupplement(disklessTopicPartition, localData, supplementData) + assertSame(localData, result) + } finally { + replicaManager.shutdown(checkpointHW = false) } } @@ -5146,7 +5709,8 @@ class ReplicaManagerInklessTest { consolidatingDisklessTopics: Set[String] = Set.empty, mockReplicaFetcherManager: Option[ReplicaFetcherManager] = None, inklessSharedStateEnabled: Boolean = true, - initDisklessLogManager: Option[InitDisklessLogManager] = None + initDisklessLogManager: Option[InitDisklessLogManager] = None, + delayedFetchPurgatory: Option[DelayedOperationPurgatory[DelayedFetch]] = None ): ReplicaManager = { val props = TestUtils.createBrokerConfig(1, logDirCount = 2) if (disklessManagedReplicasEnabled || disklessRemoteStorageConsolidationEnabled) { @@ -5197,6 +5761,7 @@ class ReplicaManagerInklessTest { inklessSharedState = if (inklessSharedStateEnabled) Some(sharedState) else None, inklessMetadataView = Some(inklessMetadata), initDisklessLogManager = initDisklessLogManager, + delayedFetchPurgatoryParam = delayedFetchPurgatory, ) { override protected def createReplicaFetcherManager( metrics: Metrics, diff --git a/storage/inkless/src/main/java/io/aiven/inkless/consume/ConcatenatedRecords.java b/storage/inkless/src/main/java/io/aiven/inkless/consume/ConcatenatedRecords.java index 2b4e48b613..218f6622fe 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/consume/ConcatenatedRecords.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/consume/ConcatenatedRecords.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -44,6 +45,23 @@ public ConcatenatedRecords(List backingRecords) { this.sizeInBytes = totalSize; } + public static ConcatenatedRecords concat(MemoryRecords prefix, Records tail) { + Objects.requireNonNull(prefix, "prefix must not be null"); + Objects.requireNonNull(tail, "tail must not be null"); + final List components; + if (tail instanceof ConcatenatedRecords cr) { + components = new ArrayList<>(1 + cr.backingRecords.size()); + components.add(prefix); + components.addAll(cr.backingRecords); + } else if (tail instanceof MemoryRecords mr) { + components = List.of(prefix, mr); + } else { + throw new IllegalArgumentException( + "Unsupported Records type for concatenation: " + tail.getClass().getName()); + } + return new ConcatenatedRecords(components); + } + @Override public Iterable batches() { return this::batchIterator; From aa6be120edf0d5dd729dc9b8688a70c0d25c80aa Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Thu, 11 Jun 2026 18:31:42 +0300 Subject: [PATCH 3/6] fixup! feat(inkless:consolidation): supplement local log with diskless data on fetch --- .../scala/kafka/server/ReplicaManager.scala | 17 +++- .../server/ReplicaManagerInklessTest.scala | 77 +++++++++++++++++++ 2 files changed, 90 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 5d249b924a..7250f8ded0 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -2016,11 +2016,20 @@ class ReplicaManager(val config: KafkaConfig, val fetchInfoByTp = fetchInfos.toMap supplements.flatMap { case (tp, logEndOffset) => fetchInfoByTp.get(tp).flatMap { pd => - val alreadyRead = Option(logReadResultMap.get(tp)).map(_.info.records.sizeInBytes).getOrElse(0) + val readResult = Option(logReadResultMap.get(tp)) + val alreadyRead = readResult.map(_.info.records.sizeInBytes).getOrElse(0) val remainingBytes = Math.max(pd.maxBytes - alreadyRead, 0) - if (remainingBytes > 0) - Some(tp -> new PartitionData(tp.topicId(), logEndOffset, pd.logStartOffset, remainingBytes, pd.currentLeaderEpoch, pd.lastFetchedEpoch)) - else + if (remainingBytes > 0) { + // Start the supplement where the local read left off, not at logEndOffset. + // Diskless has the full range so it can serve from any offset. This avoids a gap + // when the local read stopped at a segment boundary before reaching logEndOffset. + val supplementStartOffset = readResult + .map(_.info.records.lastBatch()) + .filter(_.isPresent) + .map(_.get().nextOffset()) + .getOrElse(logEndOffset) + Some(tp -> new PartitionData(tp.topicId(), supplementStartOffset, pd.logStartOffset, remainingBytes, pd.currentLeaderEpoch, pd.lastFetchedEpoch)) + } else None } }.toSeq diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerInklessTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerInklessTest.scala index 15b41fd5ff..4b45984aaf 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerInklessTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerInklessTest.scala @@ -1809,6 +1809,83 @@ class ReplicaManagerInklessTest { } } + // When the local read stops at a segment boundary (returning records that end before logEndOffset), + // the supplement must start at the next offset after the last local batch, not at logEndOffset. + // This ensures contiguous records without a gap. + @Test + def testBuildConsolidationSupplementFetchInfosStartsAtLastBatchNextOffset(): Unit = { + val tp = disklessTopicPartition + val logEndOffset = 5000L + val supplements = mutable.HashMap(tp -> logEndOffset) + // Local records end at offset 1999 (batch with baseOffset=1995, 5 records → lastOffset=1999, nextOffset=2000) + val localRecords = MemoryRecords.withRecords( + 2.toByte, 1995L, Compression.NONE, TimestampType.CREATE_TIME, 123L, 0.toShort, 0, 0, false, + new SimpleRecord(0, "a".getBytes()), new SimpleRecord(0, "b".getBytes()), + new SimpleRecord(0, "c".getBytes()), new SimpleRecord(0, "d".getBytes()), + new SimpleRecord(0, "e".getBytes()) + ) + val fetchInfos = Seq(tp -> new PartitionData(tp.topicId(), 1995L, 0L, 1024, Optional.empty())) + val logReadResultMap = new util.HashMap[TopicIdPartition, LogReadResult]() + logReadResultMap.put(tp, new LogReadResult( + new FetchDataInfo(new LogOffsetMetadata(1995L, 0L, 0), localRecords), + Optional.empty(), 0L, 0L, logEndOffset, 0L, 0L, OptionalLong.empty(), Errors.NONE + )) + + val replicaManager = createReplicaManager(List(tp.topic())) + try { + val result = replicaManager.buildConsolidationSupplementFetchInfos(supplements, fetchInfos, logReadResultMap) + assertEquals(1, result.size) + val (_, partitionData) = result.head + // Supplement must start at 2000 (nextOffset after last batch), NOT at logEndOffset (5000) + assertEquals(2000L, partitionData.fetchOffset, + "Supplement must start where local read left off, not at logEndOffset") + assertEquals(1024 - localRecords.sizeInBytes, partitionData.maxBytes) + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + + // Simulates a multi-segment local log: the read returns FileRecords from an older segment + // (offsets 1995-1999), while logEndOffset is at 5000 (active segment is far ahead). + // The supplement must start at 2000 (contiguous with local read), not at 5000 (which would skip segments). + @Test + def testBuildConsolidationSupplementFetchInfosWithFileRecordsFromOlderSegment(): Unit = { + val tp = disklessTopicPartition + val logEndOffset = 5000L + val supplements = mutable.HashMap(tp -> logEndOffset) + // Simulate a read from segment 0 that ends at offset 1999 — as FileRecords, matching production + val localMemRecords = MemoryRecords.withRecords( + 2.toByte, 1995L, Compression.NONE, TimestampType.CREATE_TIME, 123L, 0.toShort, 0, 0, false, + new SimpleRecord(0, "seg0-a".getBytes()), new SimpleRecord(0, "seg0-b".getBytes()), + new SimpleRecord(0, "seg0-c".getBytes()), new SimpleRecord(0, "seg0-d".getBytes()), + new SimpleRecord(0, "seg0-e".getBytes()) + ) + var localFileRecords: FileRecords = null + val fetchInfos = Seq(tp -> new PartitionData(tp.topicId(), 1995L, 0L, 1024 * 1024, Optional.empty())) + val logReadResultMap = new util.HashMap[TopicIdPartition, LogReadResult]() + + val replicaManager = createReplicaManager(List(tp.topic())) + try { + localFileRecords = memoryRecordsToFileRecords(localMemRecords) + logReadResultMap.put(tp, new LogReadResult( + new FetchDataInfo(new LogOffsetMetadata(1995L, 0L, 0), localFileRecords), + Optional.empty(), 0L, 0L, logEndOffset, 0L, 0L, OptionalLong.empty(), Errors.NONE + )) + + val result = replicaManager.buildConsolidationSupplementFetchInfos(supplements, fetchInfos, logReadResultMap) + assertEquals(1, result.size) + val (_, partitionData) = result.head + // Must start at 2000 (next offset after the last batch in the FileRecords), not 5000 + assertEquals(2000L, partitionData.fetchOffset, + "Supplement must start where the FileRecords segment read left off") + // Budget: maxBytes (1MB) minus the FileRecords size + assertEquals(1024 * 1024 - localFileRecords.sizeInBytes, partitionData.maxBytes) + } finally { + replicaManager.shutdown(checkpointHW = false) + if (localFileRecords != null) localFileRecords.close() + } + } + // --- mergeConsolidationSupplement unit tests --- @Test From 7cf20440f4b66a5b2a2784b66bc4e5d82c1f0470 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Fri, 12 Jun 2026 09:34:53 +0300 Subject: [PATCH 4/6] fixup! feat(inkless:consolidation): supplement local log with diskless data on fetch --- .../scala/kafka/server/DelayedFetch.scala | 35 ++++----------- .../scala/kafka/server/ReplicaManager.scala | 4 +- .../kafka/server/DelayedFetchTest.scala | 44 ++++--------------- 3 files changed, 19 insertions(+), 64 deletions(-) diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index 3b31d8a9ea..7e02ddb917 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -47,6 +47,7 @@ class DelayedFetch( params: FetchParams, classicFetchPartitionStatus: util.LinkedHashMap[TopicIdPartition, FetchPartitionStatus], disklessFetchPartitionStatus: util.LinkedHashMap[TopicIdPartition, FetchPartitionStatus] = new util.LinkedHashMap[TopicIdPartition, FetchPartitionStatus](), + consolidatingSupplements: Map[TopicIdPartition, Long] = Map.empty, replicaManager: ReplicaManager, quota: ReplicaQuota, maxWaitMs: Option[Long] = None, @@ -250,33 +251,13 @@ class DelayedFetch( (results, resultMap) } else (Seq.empty, new util.HashMap[TopicIdPartition, LogReadResult]()) - // Identify consolidating partitions that still have budget left after the local read and - // fire their supplement fetch concurrently with the pure-diskless fetch. Both futures are - // independent so running them in parallel keeps latency equal to the slower of the two. - // Guards like maxWaitMs>0 and !hasPreferredReadReplica are unnecessary here: by the time - // onComplete fires, those conditions were already true (maxWaitMs was positive to park, - // preferredReadReplica triggers immediate response and never parks). - val consolidatingSupplements = { - val supplements = new mutable.HashMap[TopicIdPartition, Long]() - if (!params.isFromFollower) { - classicFetchPartitionStatus.asScala.foreach { case (tp, status) => - if (replicaManager.inklessMetadataView().isConsolidatingDisklessTopic(tp.topic)) { - val readResult = logReadResultMap.get(tp) - if (readResult != null && readResult.error == Errors.NONE) { - replicaManager.getPartitionOrError(tp.topicPartition).foreach { partition => - partition.log.foreach { log => - if (status.startOffsetMetadata.messageOffset < log.logEndOffset) - supplements.put(tp, log.logEndOffset) - } - } - } - } - } - } - supplements - } - val supplementFetchInfos = replicaManager.buildConsolidationSupplementFetchInfos( - consolidatingSupplements, classicFetchInfos.toSeq, logReadResultMap) + // Supplement consolidating partitions whose local read has remaining byte budget. + // consolidatingSupplements is passed from fetchMessages (identified at routing time), + // so no per-partition metadata lookups are needed here. + val supplementFetchInfos = + if (consolidatingSupplements.nonEmpty) + replicaManager.buildConsolidationSupplementFetchInfos(consolidatingSupplements, classicFetchInfos.toSeq, logReadResultMap) + else Seq.empty val emptySupplementMap: Map[TopicIdPartition, FetchPartitionData] = Map.empty val supplementFuture: CompletableFuture[Map[TopicIdPartition, FetchPartitionData]] = diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 7250f8ded0..ac24e2a9a2 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -2017,9 +2017,10 @@ class ReplicaManager(val config: KafkaConfig, supplements.flatMap { case (tp, logEndOffset) => fetchInfoByTp.get(tp).flatMap { pd => val readResult = Option(logReadResultMap.get(tp)) + val hasError = readResult.exists(_.error != Errors.NONE) val alreadyRead = readResult.map(_.info.records.sizeInBytes).getOrElse(0) val remainingBytes = Math.max(pd.maxBytes - alreadyRead, 0) - if (remainingBytes > 0) { + if (!hasError && remainingBytes > 0) { // Start the supplement where the local read left off, not at logEndOffset. // Diskless has the full range so it can serve from any offset. This avoids a gap // when the local read stopped at a segment boundary before reaching logEndOffset. @@ -2286,6 +2287,7 @@ class ReplicaManager(val config: KafkaConfig, quota = quota, maxWaitMs = Some(maxWaitMs), minBytes = Some(minBytes), + consolidatingSupplements = if (params.isFromFollower) Map.empty else consolidatingLocalFetchSupplements.toMap, responseCallback = respond, ) diff --git a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala index bb743fe7f5..6e5247738b 100644 --- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala +++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala @@ -17,11 +17,10 @@ package kafka.server import io.aiven.inkless.control_plane.{BatchInfo, BatchMetadata, FindBatchRequest, FindBatchResponse} -import kafka.server.metadata.InklessMetadataView import kafka.utils.TestUtils import java.util.{Collections, Optional, OptionalInt, OptionalLong} -import scala.collection.{Seq, mutable} +import scala.collection.Seq import kafka.cluster.Partition import org.apache.kafka.common.{TopicIdPartition, Uuid} import org.apache.kafka.common.errors.{FencedLeaderEpochException, NotLeaderOrFollowerException} @@ -30,12 +29,12 @@ import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.{MemoryRecords, TimestampType} import org.apache.kafka.common.requests.FetchRequest import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, FetchPartitionData} -import org.apache.kafka.storage.internals.log.{FetchDataInfo, FetchPartitionStatus, LogOffsetMetadata, LogOffsetSnapshot, LogReadResult, UnifiedLog} +import org.apache.kafka.storage.internals.log.{FetchDataInfo, FetchPartitionStatus, LogOffsetMetadata, LogOffsetSnapshot, LogReadResult} import org.junit.jupiter.api.{BeforeEach, Nested, Test} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource -import org.mockito.ArgumentMatchers.{any, anyFloat, anyInt, anyLong, argThat} +import org.mockito.ArgumentMatchers.{any, anyFloat, anyInt, anyLong} import org.mockito.Mockito.{mock, never, times, verify, when} import java.util.concurrent.CompletableFuture @@ -46,12 +45,9 @@ class DelayedFetchTest { private val maxBytes = 1024 private val replicaManager: ReplicaManager = mock(classOf[ReplicaManager]) private val replicaQuota: ReplicaQuota = mock(classOf[ReplicaQuota]) - private val inklessMetadataView: InklessMetadataView = mock(classOf[InklessMetadataView]) @BeforeEach def setUp(): Unit = { - when(replicaManager.inklessMetadataView()).thenReturn(inklessMetadataView) - when(inklessMetadataView.isConsolidatingDisklessTopic(any())).thenReturn(false) when(replicaManager.buildConsolidationSupplementFetchInfos(any(), any(), any())).thenReturn(Seq.empty) } @@ -701,20 +697,10 @@ class DelayedFetchTest { disklessFetchPartitionStatus = disklessStatusMap, replicaManager = replicaManager, quota = replicaQuota, + consolidatingSupplements = Map(consolidatingTp -> logEndOffset), responseCallback = callback ) - // Stub inklessMetadataView for the consolidating partition - when(inklessMetadataView.isConsolidatingDisklessTopic(consolidatingTp.topic)).thenReturn(true) - - // Stub getPartitionOrError to return a partition with a log - val partition = mock(classOf[Partition]) - val unifiedLog = mock(classOf[UnifiedLog]) - when(unifiedLog.logEndOffset).thenReturn(logEndOffset) - when(partition.log).thenReturn(Some(unifiedLog)) - when(replicaManager.getPartitionOrError(consolidatingTp.topicPartition)) - .thenReturn(Right(partition)) - // Stub readFromLog to return a small local read (below minBytes) val localReadResult = new LogReadResult( new FetchDataInfo(new LogOffsetMetadata(fetchOffset), MemoryRecords.EMPTY), @@ -797,16 +783,10 @@ class DelayedFetchTest { classicFetchPartitionStatus = classicStatusMap, replicaManager = replicaManager, quota = replicaQuota, + consolidatingSupplements = Map(consolidatingTp -> logEndOffset), responseCallback = responses => callbackResult = Some(responses) ) - when(inklessMetadataView.isConsolidatingDisklessTopic(consolidatingTp.topic)).thenReturn(true) - val partition = mock(classOf[Partition]) - val unifiedLog = mock(classOf[UnifiedLog]) - when(unifiedLog.logEndOffset).thenReturn(logEndOffset) - when(partition.log).thenReturn(Some(unifiedLog)) - when(replicaManager.getPartitionOrError(consolidatingTp.topicPartition)).thenReturn(Right(partition)) - // Local read returns an error — supplement must not be issued for this partition val errorReadResult = new LogReadResult( new FetchDataInfo(new LogOffsetMetadata(fetchOffset), MemoryRecords.EMPTY), @@ -819,10 +799,8 @@ class DelayedFetchTest { delayedFetch.forceComplete() TestUtils.waitUntilTrue(() => callbackResult.isDefined, "responseCallback should have been called") - // buildConsolidationSupplementFetchInfos must be called with an empty supplements map — - // the erroring partition must have been filtered out before building the supplement request - verify(replicaManager, times(1)).buildConsolidationSupplementFetchInfos( - argThat((m: mutable.HashMap[TopicIdPartition, Long]) => m.isEmpty), any(), any()) + // The erroring partition must be filtered out — supplement fetch and merge must not fire. + verify(replicaManager, never()).mergeConsolidationSupplement(any(), any(), any()) } // When the local read for a consolidating partition returns an error, the supplement must not @@ -849,16 +827,10 @@ class DelayedFetchTest { classicFetchPartitionStatus = classicStatusMap, replicaManager = replicaManager, quota = replicaQuota, + consolidatingSupplements = Map(consolidatingTp -> logEndOffset), responseCallback = responses => callbackResult = Some(responses) ) - when(inklessMetadataView.isConsolidatingDisklessTopic(consolidatingTp.topic)).thenReturn(true) - val partition = mock(classOf[Partition]) - val unifiedLog = mock(classOf[UnifiedLog]) - when(unifiedLog.logEndOffset).thenReturn(logEndOffset) - when(partition.log).thenReturn(Some(unifiedLog)) - when(replicaManager.getPartitionOrError(consolidatingTp.topicPartition)).thenReturn(Right(partition)) - // Local read returns an error val errorReadResult = new LogReadResult( new FetchDataInfo(new LogOffsetMetadata(fetchOffset), MemoryRecords.EMPTY), From 3be218ecbb8d46538aad5af2e66714cb5efbabb2 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Fri, 12 Jun 2026 23:58:29 +0300 Subject: [PATCH 5/6] fixup! feat(inkless:consolidation): supplement local log with diskless data on fetch --- .../scala/kafka/server/ReplicaManager.scala | 6 +- .../server/ReplicaManagerInklessTest.scala | 70 +++++++++++++++++++ 2 files changed, 75 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index ac24e2a9a2..46896edba7 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -2129,7 +2129,11 @@ class ReplicaManager(val config: KafkaConfig, if (fetchPartitionData.fetchOffset < logEndOffset) { // Local log has data for this offset range — serve from local, track for diskless supplement shouldReadFromUnifiedLog = true - if (inklessSharedState.isDefined) + // Skip supplement tracking when the fetch offset falls in the tiered-storage range + // (below localLogStartOffset). The read will route to RemoteLogManager and the + // supplement data would be discarded by processRemoteFetches anyway. + val localLogStartOffset = partition.log.map(_.localLogStartOffset).getOrElse(0L) + if (inklessSharedState.isDefined && fetchPartitionData.fetchOffset >= localLogStartOffset) consolidatingLocalFetchSupplements += (tp -> logEndOffset) } // else: consumer is at or beyond consolidation frontier, diskless-only diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerInklessTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerInklessTest.scala index 4b45984aaf..ca81d28bb1 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerInklessTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerInklessTest.scala @@ -1580,6 +1580,76 @@ class ReplicaManagerInklessTest { } } + // When the consumer's fetch offset is below localLogStartOffset (data moved to tiered storage), + // the supplement must NOT be triggered — the read will route to RemoteLogManager and the + // supplement data would be discarded by processRemoteFetches anyway. + @Test + def testFetchConsolidatingSkipsSupplementWhenOffsetInTieredStorageRange(): Unit = { + val fetchHandlerCtor = mockFetchHandler(Map.empty) + val cp = mock(classOf[ControlPlane]) + val replicaManager = spy(createReplicaManager( + List(disklessTopicPartition.topic()), + controlPlane = Some(cp), + disklessManagedReplicasEnabled = true, + disklessRemoteStorageConsolidationEnabled = true, + consolidatingDisklessTopics = Set(disklessTopicPartition.topic()), + )) + try { + when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(disklessTopicPartition.topicPartition())) + .thenReturn(100L) + + // Local log: logEndOffset=500, localLogStartOffset=200 (offsets 0-199 are in tiered storage) + val mockPartition = mock(classOf[Partition]) + val mockLog = mock(classOf[UnifiedLog]) + when(mockLog.logEndOffset).thenReturn(500L) + when(mockLog.localLogStartOffset).thenReturn(200L) + when(mockPartition.log).thenReturn(Some(mockLog)) + val endOffsetMetadata = new LogOffsetMetadata(500L, 0L, 0) + when(mockPartition.fetchOffsetSnapshot(any(), any())) + .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, endOffsetMetadata, endOffsetMetadata)) + doReturn(Right(mockPartition)).when(replicaManager) + .getPartitionOrError(ArgumentMatchers.eq(disklessTopicPartition.topicPartition())) + doReturn(mockPartition).when(replicaManager) + .getPartitionOrException(ArgumentMatchers.eq(disklessTopicPartition.topicPartition())) + + // readFromLog returns empty records (simulating the OffsetOutOfRange -> tiered path) + doReturn(Seq(disklessTopicPartition -> + new LogReadResult( + new FetchDataInfo(new LogOffsetMetadata(50L, 0L, 0), MemoryRecords.EMPTY), + Optional.empty(), 500L, 0L, 500L, 0L, 0L, OptionalLong.empty(), Errors.NONE + )) + ).when(replicaManager).readFromLog(any(), any(), any(), any()) + + // Fetch at offset 50, which is below localLogStartOffset (200) + val fetchParams = new FetchParams( + FetchRequest.ORDINARY_CONSUMER_ID, -1L, + 100L, // short maxWaitMs so the purgatory expires quickly + 1024, // minBytes > 0 to trigger supplement logic + 1024 * 1024, + FetchIsolation.HIGH_WATERMARK, Optional.empty() + ) + val fetchInfos = Seq( + disklessTopicPartition -> new PartitionData(disklessTopicPartition.topicId(), 50L, 0L, 1024, Optional.empty()) + ) + + @volatile var responseData: Map[TopicIdPartition, FetchPartitionData] = null + val responseCallback = (response: Seq[(TopicIdPartition, FetchPartitionData)]) => { + responseData = response.toMap + } + replicaManager.fetchMessages(fetchParams, fetchInfos, QuotaFactory.UNBOUNDED_QUOTA, responseCallback) + + waitForFetchResponse(responseData) + assertEquals(1, responseData.size) + // The diskless fetch handler must NOT have been called — supplement was skipped + verify(fetchHandlerCtor.constructed().get(0), never()).handle(any(), any()) + // readFromLog is called at least once (inline), possibly again from DelayedFetch.onComplete + verify(replicaManager, atLeastOnce()).readFromLog(any(), any(), any(), any()) + } finally { + replicaManager.shutdown(checkpointHW = false) + fetchHandlerCtor.close() + } + } + // Mixed fetch: consolidating (local below minBytes) + pure-diskless + classic in one request. // The consolidating partition gets a synchronous supplement; the pure-diskless partition goes // through the normal delayed path; the classic partition is served from the local log. From 51b136e514fc4e4fbfe5284f4cd34a0a69cfbab3 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Wed, 17 Jun 2026 16:27:16 +0300 Subject: [PATCH 6/6] fixup! feat(inkless:consolidation): supplement local log with diskless data on fetch --- .../scala/kafka/server/ReplicaManager.scala | 31 ++- .../server/ReplicaManagerInklessTest.scala | 239 +++++++++++++++--- 2 files changed, 228 insertions(+), 42 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 46896edba7..dd05f6e23f 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -2006,7 +2006,9 @@ class ReplicaManager(val config: KafkaConfig, * Build the diskless supplement fetch requests for consolidating partitions whose local log * read did not satisfy minBytes. For each tracked partition, computes the remaining byte budget * (original maxBytes minus what the local read already returned) and emits a PartitionData - * starting at logEndOffset. Partitions whose remaining budget is zero are dropped. + * starting where the local read left off. Partitions whose remaining budget is zero, or whose + * local read has not yet reached the local log end offset (the classic->diskless seal), are + * dropped — see the exhaustion guard below. */ private[server] def buildConsolidationSupplementFetchInfos( supplements: Map[TopicIdPartition, Long], @@ -2020,17 +2022,24 @@ class ReplicaManager(val config: KafkaConfig, val hasError = readResult.exists(_.error != Errors.NONE) val alreadyRead = readResult.map(_.info.records.sizeInBytes).getOrElse(0) val remainingBytes = Math.max(pd.maxBytes - alreadyRead, 0) - if (!hasError && remainingBytes > 0) { - // Start the supplement where the local read left off, not at logEndOffset. - // Diskless has the full range so it can serve from any offset. This avoids a gap - // when the local read stopped at a segment boundary before reaching logEndOffset. - val supplementStartOffset = readResult - .map(_.info.records.lastBatch()) - .filter(_.isPresent) - .map(_.get().nextOffset()) - .getOrElse(logEndOffset) + // Start the supplement where the local read left off, not at logEndOffset. + // Diskless has the full range so it can serve from any offset. This avoids a gap + // when the local read stopped at a segment boundary before reaching logEndOffset. + val supplementStartOffset = readResult + .map(_.info.records.lastBatch()) + .filter(_.isPresent) + .map(_.get().nextOffset()) + .getOrElse(logEndOffset) + // Only supplement once the local log is exhausted: the local read must have reached the + // local log end offset, which for a frozen consolidating log equals the classic->diskless + // seal. If it stopped at an earlier segment boundary (supplementStartOffset < logEndOffset), + // skip the supplement — the consumer re-fetches and walks the remaining local segments, as + // it would for any classic multi-segment lag. Supplementing from below the seal would stitch + // the local prefix directly onto the diskless range (which starts at the seal) and silently + // drop the committed range [supplementStartOffset, seal). + if (!hasError && remainingBytes > 0 && supplementStartOffset >= logEndOffset) Some(tp -> new PartitionData(tp.topicId(), supplementStartOffset, pd.logStartOffset, remainingBytes, pd.currentLeaderEpoch, pd.lastFetchedEpoch)) - } else + else None } }.toSeq diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerInklessTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerInklessTest.scala index ca81d28bb1..f5a10ff822 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerInklessTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerInklessTest.scala @@ -101,6 +101,15 @@ class ReplicaManagerInklessTest { 2.toByte, 0L, Compression.NONE, TimestampType.CREATE_TIME, 123L, 0.toShort, 0, 0, false, new SimpleRecord(0, "hello".getBytes()) ) + // A single-record local batch whose nextOffset == 100, i.e. it ends exactly at the seal + // (classicToDisklessStartOffset = 100) used by the consolidating-fetch tests. The supplement is + // only emitted once the local read reaches the seal, so end-to-end tests that exercise the + // supplement merge must model a local read that ends there (not below it). Same byte size as + // RECORDS, so RECORDS.sizeInBytes can still be used for minBytes thresholds. + val RECORDS_AT_SEAL: MemoryRecords = MemoryRecords.withRecords( + 2.toByte, 99L, Compression.NONE, TimestampType.CREATE_TIME, 123L, 0.toShort, 0, 0, false, new SimpleRecord(0, "hello".getBytes()) + ) + // Real local-log reads return FileRecords, not MemoryRecords. Use this to match production types. private def memoryRecordsToFileRecords(records: MemoryRecords): FileRecords = { val file = TestUtils.tempFile() @@ -1228,10 +1237,11 @@ class ReplicaManagerInklessTest { doReturn(mockPartition).when(replicaManager) .getPartitionOrException(ArgumentMatchers.eq(disklessTopicPartition.topicPartition())) - localFileRecords = memoryRecordsToFileRecords(RECORDS) + // Local read reaches the seal (batch ends at offset 100 == seal), so the supplement fires. + localFileRecords = memoryRecordsToFileRecords(RECORDS_AT_SEAL) doReturn(Seq(disklessTopicPartition -> new LogReadResult( - new FetchDataInfo(new LogOffsetMetadata(50L, 0L, 0), localFileRecords), + new FetchDataInfo(new LogOffsetMetadata(99L, 0L, 0), localFileRecords), Optional.empty(), 100L, 0L, 100L, 0L, 0L, OptionalLong.empty(), Errors.NONE )) ).when(replicaManager).readFromLog(any(), any(), any(), any()) @@ -1317,10 +1327,12 @@ class ReplicaManagerInklessTest { doReturn(mockPartition).when(replicaManager) .getPartitionOrException(ArgumentMatchers.eq(disklessTopicPartition.topicPartition())) - localFileRecords = memoryRecordsToFileRecords(RECORDS) + // Local read reaches the seal (batch ends at offset 100 == seal), so the supplement fires — + // but the supplement returns no data, so minBytes stays unmet and the fetch parks. + localFileRecords = memoryRecordsToFileRecords(RECORDS_AT_SEAL) doReturn(Seq(disklessTopicPartition -> new LogReadResult( - new FetchDataInfo(new LogOffsetMetadata(50L, 0L, 0), localFileRecords), + new FetchDataInfo(new LogOffsetMetadata(99L, 0L, 0), localFileRecords), Optional.empty(), 100L, 0L, 100L, 0L, 0L, OptionalLong.empty(), Errors.NONE )) ).when(replicaManager).readFromLog(any(), any(), any(), any()) @@ -1391,10 +1403,12 @@ class ReplicaManagerInklessTest { // real local-log read produces. Using FileRecords here (not MemoryRecords) exercises the // production FileRecords->MemoryRecords conversion in the supplement merge path; a // MemoryRecords stub would vacuously pass while the real path throws ClassCastException. - localFileRecords = memoryRecordsToFileRecords(RECORDS) + // The local batch ends at offset 100 == seal, so the local read reaches the LEO and the + // supplement fires (see the exhaustion guard in buildConsolidationSupplementFetchInfos). + localFileRecords = memoryRecordsToFileRecords(RECORDS_AT_SEAL) doReturn(Seq(disklessTopicPartition -> new LogReadResult( - new FetchDataInfo(new LogOffsetMetadata(50L, 0L, 0), localFileRecords), + new FetchDataInfo(new LogOffsetMetadata(99L, 0L, 0), localFileRecords), Optional.empty(), 100L, 0L, 100L, 0L, 0L, OptionalLong.empty(), Errors.NONE )) ).when(replicaManager).readFromLog(any(), any(), any(), any()) @@ -1547,9 +1561,11 @@ class ReplicaManagerInklessTest { doReturn(mockPartition).when(replicaManager) .getPartitionOrException(ArgumentMatchers.eq(disklessTopicPartition.topicPartition())) + // Local read reaches the seal (batch ends at offset 100 == seal), so the supplement fires + // and the interrupt is raised inside the supplement fetch. doReturn(Seq(disklessTopicPartition -> new LogReadResult( - new FetchDataInfo(new LogOffsetMetadata(50L, 0L, 0), RECORDS), + new FetchDataInfo(new LogOffsetMetadata(99L, 0L, 0), RECORDS_AT_SEAL), Optional.empty(), 100L, 0L, 100L, 0L, 0L, OptionalLong.empty(), Errors.NONE )) ).when(replicaManager).readFromLog(any(), any(), any(), any()) @@ -1650,6 +1666,135 @@ class ReplicaManagerInklessTest { } } + // Guards against the "sub-seal gap" silent-data-loss bug. + // + // Setup of a consolidating partition: + // - classic-to-diskless seal = 1000 (offsets [1000, ...) live in object storage / diskless) + // - local classic prefix = [0, 1000) and spans multiple local log segments + // - local logEndOffset (LEO) = 1000 + // - localLogStartOffset = 0 (nothing tiered, so the tiered-range guard is a no-op) + // + // A consumer fetches at offset 480, which lands in an *earlier* local segment. LocalLog.read + // serves from a single segment, so the local read returns only [480, 500) (a segment boundary + // well below the seal). Because that is smaller than minBytes, the consolidating path would + // previously trigger a diskless supplement starting at lastBatch().nextOffset() == 500. + // + // But 500 is BELOW the seal (1000): the diskless control plane holds no batches for offsets + // below the seal, so a FindBatch at 500 returns the first available batch — at the seal, 1000. + // (The FetchHandler stub returns records starting at 1000 regardless of the requested offset, + // faithfully modelling that "first batch at-or-after" control-plane behavior.) + // + // Without the exhaustion guard the merge would stitch local [480, 500) directly onto diskless + // [1000, ...), silently dropping the committed range [500, 1000). With the guard the supplement + // is skipped (local read is below the seal), so the consumer gets the contiguous local prefix + // and re-fetches to walk the remaining local segments. + @Test + def testFetchConsolidatingSupplementBelowSealSilentlySkipsClassicPrefix(): Unit = { + val seal = 1000L + val localLeo = 1000L + + // Local single-segment read: offsets [480, 500). 20 records in one batch -> nextOffset == 500. + val localRecordList = (0 until 20).map(i => new SimpleRecord(i.toLong, s"local-$i".getBytes())).toArray + val localMemRecords = MemoryRecords.withRecords( + 2.toByte, 480L, Compression.NONE, TimestampType.CREATE_TIME, 123L, 0.toShort, 0, 0, false, localRecordList: _* + ) + var localFileRecords: FileRecords = null + + // Diskless serves from the seal: offsets [1000, 1005). Models the control plane returning the + // first batch at-or-after the requested (sub-seal) offset of 500. + val supplementRecordList = (0 until 5).map(i => new SimpleRecord(i.toLong, s"diskless-$i".getBytes())).toArray + val supplementRecords = MemoryRecords.withRecords( + 2.toByte, seal, Compression.NONE, TimestampType.CREATE_TIME, 456L, 0.toShort, 0, 0, false, supplementRecordList: _* + ) + val disklessResponse = Map(disklessTopicPartition -> + new FetchPartitionData( + Errors.NONE, + localLeo, 0L, + supplementRecords, + Optional.empty(), OptionalLong.of(localLeo), Optional.empty(), OptionalInt.empty(), false) + ) + + val fetchHandlerCtor = mockFetchHandler(disklessResponse) + val cp = mock(classOf[ControlPlane]) + val timer = new MockTimer(time) + val fetchPurgatory = new DelayedOperationPurgatory[DelayedFetch]("Fetch", timer, 0, false) + val replicaManager = spy(createReplicaManager( + List(disklessTopicPartition.topic()), + controlPlane = Some(cp), + disklessManagedReplicasEnabled = true, + disklessRemoteStorageConsolidationEnabled = true, + consolidatingDisklessTopics = Set(disklessTopicPartition.topic()), + delayedFetchPurgatory = Some(fetchPurgatory), + )) + try { + when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(disklessTopicPartition.topicPartition())) + .thenReturn(seal) + // The fetch offset (480) is on an older segment than the local LEO, so the local read returns + // a single sub-seal segment and the supplement is skipped. minBytes stays unmet, so the fetch + // parks; on tryComplete, DelayedFetch Case F (older segment) force-completes with the local + // data. Stub the partition + offset snapshot so the parked tryComplete can evaluate it. + val mockPartition = mock(classOf[Partition]) + val mockLog = mock(classOf[UnifiedLog]) + when(mockLog.logEndOffset).thenReturn(localLeo) + when(mockPartition.log).thenReturn(Some(mockLog)) + // endOffset on a NEWER segment than the fetch offset (480, baseOffset 0) so Case F fires. + val endOffsetMetadata = new LogOffsetMetadata(localLeo, 500L, 0) + when(mockPartition.fetchOffsetSnapshot(any(), any())) + .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, endOffsetMetadata, endOffsetMetadata)) + doReturn(Right(mockPartition)).when(replicaManager) + .getPartitionOrError(ArgumentMatchers.eq(disklessTopicPartition.topicPartition())) + doReturn(mockPartition).when(replicaManager) + .getPartitionOrException(ArgumentMatchers.eq(disklessTopicPartition.topicPartition())) + + localFileRecords = memoryRecordsToFileRecords(localMemRecords) + doReturn(Seq(disklessTopicPartition -> + new LogReadResult( + new FetchDataInfo(new LogOffsetMetadata(480L, 0L, 0), localFileRecords), + Optional.empty(), localLeo, 0L, localLeo, 0L, 0L, OptionalLong.empty(), Errors.NONE + )) + ).when(replicaManager).readFromLog(any(), any(), any(), any()) + + val fetchParams = new FetchParams( + FetchRequest.ORDINARY_CONSUMER_ID, -1L, + 1000L, + localMemRecords.sizeInBytes + 1, // minBytes > local-only size to trigger the supplement + 1024 * 1024, + FetchIsolation.HIGH_WATERMARK, Optional.empty() + ) + val fetchInfos = Seq( + disklessTopicPartition -> new PartitionData(disklessTopicPartition.topicId(), 480L, 0L, 1024 * 1024, Optional.empty()) + ) + + @volatile var responseData: Map[TopicIdPartition, FetchPartitionData] = null + val responseCallback = (response: Seq[(TopicIdPartition, FetchPartitionData)]) => { + responseData = response.toMap + } + replicaManager.fetchMessages(fetchParams, fetchInfos, QuotaFactory.UNBOUNDED_QUOTA, responseCallback) + + waitForFetchResponse(responseData) + val result = responseData(disklessTopicPartition) + assertEquals(Errors.NONE, result.error) + + // The supplement must NOT have been issued: a sub-seal supplement is the bug. + verify(fetchHandlerCtor.constructed().get(0), never()).handle(any(), any()) + + // The consumer sees only the contiguous local prefix [480, 500); no diskless records spliced + // in from the seal, so no gap. The consumer re-fetches from 500 to walk the rest of the log. + val offsets = result.records.records().asScala.map(_.offset()).toList + val gaps = offsets.sliding(2).collect { case Seq(a, b) if b != a + 1 => (a, b) }.toList + assertTrue(gaps.isEmpty, + s"Consumer-visible records must be contiguous, but found offset gap(s): $gaps. " + + s"A supplement started below the seal $seal would make diskless serve from the seal, " + + s"silently skipping the committed range. Offsets=$offsets") + assertTrue(offsets.nonEmpty && offsets.last < seal, + s"Expected only sub-seal local records, got offsets=$offsets") + } finally { + replicaManager.shutdown(checkpointHW = false) + fetchHandlerCtor.close() + if (localFileRecords != null) localFileRecords.close() + } + } + // Mixed fetch: consolidating (local below minBytes) + pure-diskless + classic in one request. // The consolidating partition gets a synchronous supplement; the pure-diskless partition goes // through the normal delayed path; the classic partition is served from the local log. @@ -1717,10 +1862,12 @@ class ReplicaManagerInklessTest { doReturn(consolidatingPartition).when(replicaManager) .getPartitionOrException(ArgumentMatchers.eq(disklessTopicPartition.topicPartition())) - localFileRecords = memoryRecordsToFileRecords(RECORDS) + // Consolidating local read reaches the seal (batch ends at offset 100 == seal), so the + // supplement fires. + localFileRecords = memoryRecordsToFileRecords(RECORDS_AT_SEAL) doReturn(Seq( disklessTopicPartition -> new LogReadResult( - new FetchDataInfo(new LogOffsetMetadata(50L, 0L, 0), localFileRecords), + new FetchDataInfo(new LogOffsetMetadata(99L, 0L, 0), localFileRecords), Optional.empty(), 100L, 0L, 100L, 0L, 0L, OptionalLong.empty(), Errors.NONE ), classicTopicPartition -> new LogReadResult( @@ -1815,7 +1962,9 @@ class ReplicaManagerInklessTest { @Test def testBuildConsolidationSupplementFetchInfosDeductsLocalBytesFromBudget(): Unit = { val tp = disklessTopicPartition - val supplements = mutable.HashMap(tp -> 100L) + // Single record at offset 50 → nextOffset 51. Seal at 51 so the local read reaches the LEO + // and the supplement is emitted (see the exhaustion guard). + val supplements = mutable.HashMap(tp -> 51L) val fetchInfos = Seq(tp -> new PartitionData(tp.topicId(), 50L, 0L, 1024, Optional.empty())) val localRecords = MemoryRecords.withRecords( 2.toByte, 50L, Compression.NONE, TimestampType.CREATE_TIME, 123L, 0.toShort, 0, 0, false, new SimpleRecord(0, "local".getBytes()) @@ -1879,15 +2028,18 @@ class ReplicaManagerInklessTest { } } - // When the local read stops at a segment boundary (returning records that end before logEndOffset), - // the supplement must start at the next offset after the last local batch, not at logEndOffset. - // This ensures contiguous records without a gap. + // When the local read stops at a segment boundary BELOW the local log end offset (the seal), + // no supplement must be emitted. The diskless range starts at the seal, so supplementing from + // an offset below it would stitch the local prefix directly onto the diskless range and silently + // drop the committed range [supplementStartOffset, seal). The consumer instead re-fetches and + // walks the remaining local segments, as it would for any classic multi-segment lag. @Test - def testBuildConsolidationSupplementFetchInfosStartsAtLastBatchNextOffset(): Unit = { + def testBuildConsolidationSupplementFetchInfosSkipsWhenLocalReadBelowSeal(): Unit = { val tp = disklessTopicPartition val logEndOffset = 5000L val supplements = mutable.HashMap(tp -> logEndOffset) - // Local records end at offset 1999 (batch with baseOffset=1995, 5 records → lastOffset=1999, nextOffset=2000) + // Local records end at offset 1999 (batch with baseOffset=1995, 5 records → lastOffset=1999, + // nextOffset=2000), well below the seal at 5000. val localRecords = MemoryRecords.withRecords( 2.toByte, 1995L, Compression.NONE, TimestampType.CREATE_TIME, 123L, 0.toShort, 0, 0, false, new SimpleRecord(0, "a".getBytes()), new SimpleRecord(0, "b".getBytes()), @@ -1904,22 +2056,17 @@ class ReplicaManagerInklessTest { val replicaManager = createReplicaManager(List(tp.topic())) try { val result = replicaManager.buildConsolidationSupplementFetchInfos(supplements, fetchInfos, logReadResultMap) - assertEquals(1, result.size) - val (_, partitionData) = result.head - // Supplement must start at 2000 (nextOffset after last batch), NOT at logEndOffset (5000) - assertEquals(2000L, partitionData.fetchOffset, - "Supplement must start where local read left off, not at logEndOffset") - assertEquals(1024 - localRecords.sizeInBytes, partitionData.maxBytes) + assertTrue(result.isEmpty, + s"Expected no supplement when the local read (nextOffset 2000) is below the seal ($logEndOffset), got $result") } finally { replicaManager.shutdown(checkpointHW = false) } } - // Simulates a multi-segment local log: the read returns FileRecords from an older segment - // (offsets 1995-1999), while logEndOffset is at 5000 (active segment is far ahead). - // The supplement must start at 2000 (contiguous with local read), not at 5000 (which would skip segments). + // Same sub-seal scenario but with FileRecords from an older segment (matching production reads). + // No supplement must be emitted while the local read is below the seal. @Test - def testBuildConsolidationSupplementFetchInfosWithFileRecordsFromOlderSegment(): Unit = { + def testBuildConsolidationSupplementFetchInfosSkipsWhenFileRecordsBelowSeal(): Unit = { val tp = disklessTopicPartition val logEndOffset = 5000L val supplements = mutable.HashMap(tp -> logEndOffset) @@ -1942,17 +2089,47 @@ class ReplicaManagerInklessTest { Optional.empty(), 0L, 0L, logEndOffset, 0L, 0L, OptionalLong.empty(), Errors.NONE )) + val result = replicaManager.buildConsolidationSupplementFetchInfos(supplements, fetchInfos, logReadResultMap) + assertTrue(result.isEmpty, + s"Expected no supplement when the FileRecords read (nextOffset 2000) is below the seal ($logEndOffset), got $result") + } finally { + replicaManager.shutdown(checkpointHW = false) + if (localFileRecords != null) localFileRecords.close() + } + } + + // When the local read reaches the local log end offset (the seal), the supplement is emitted + // starting at the seal — the boundary-spanning case the supplement exists for. + @Test + def testBuildConsolidationSupplementFetchInfosEmittedWhenLocalReadReachesSeal(): Unit = { + val tp = disklessTopicPartition + val seal = 2000L + val supplements = mutable.HashMap(tp -> seal) + // Local records end exactly at the seal (baseOffset=1995, 5 records → nextOffset=2000). + val localRecords = MemoryRecords.withRecords( + 2.toByte, 1995L, Compression.NONE, TimestampType.CREATE_TIME, 123L, 0.toShort, 0, 0, false, + new SimpleRecord(0, "a".getBytes()), new SimpleRecord(0, "b".getBytes()), + new SimpleRecord(0, "c".getBytes()), new SimpleRecord(0, "d".getBytes()), + new SimpleRecord(0, "e".getBytes()) + ) + val fetchInfos = Seq(tp -> new PartitionData(tp.topicId(), 1995L, 0L, 1024, Optional.empty())) + val logReadResultMap = new util.HashMap[TopicIdPartition, LogReadResult]() + logReadResultMap.put(tp, new LogReadResult( + new FetchDataInfo(new LogOffsetMetadata(1995L, 0L, 0), localRecords), + Optional.empty(), 0L, 0L, seal, 0L, 0L, OptionalLong.empty(), Errors.NONE + )) + + val replicaManager = createReplicaManager(List(tp.topic())) + try { val result = replicaManager.buildConsolidationSupplementFetchInfos(supplements, fetchInfos, logReadResultMap) assertEquals(1, result.size) val (_, partitionData) = result.head - // Must start at 2000 (next offset after the last batch in the FileRecords), not 5000 - assertEquals(2000L, partitionData.fetchOffset, - "Supplement must start where the FileRecords segment read left off") - // Budget: maxBytes (1MB) minus the FileRecords size - assertEquals(1024 * 1024 - localFileRecords.sizeInBytes, partitionData.maxBytes) + // Supplement starts at the seal (== local nextOffset), contiguous with the local read. + assertEquals(seal, partitionData.fetchOffset, + "Supplement must start at the seal once the local read reaches the local log end offset") + assertEquals(1024 - localRecords.sizeInBytes, partitionData.maxBytes) } finally { replicaManager.shutdown(checkpointHW = false) - if (localFileRecords != null) localFileRecords.close() } }