diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index 3926f6ec76a..7e02ddb9173 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -21,15 +21,17 @@ import com.yammer.metrics.core.Meter import io.aiven.inkless.control_plane.FindBatchRequest import kafka.utils.Logging -import java.util.concurrent.TimeUnit +import java.util.concurrent.{CompletableFuture, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean import org.apache.kafka.common.TopicIdPartition import org.apache.kafka.common.errors._ import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.MemoryRecords import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET} import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.purgatory.DelayedOperation import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, FetchPartitionData} -import org.apache.kafka.storage.internals.log.{FetchPartitionStatus, LogOffsetMetadata} +import org.apache.kafka.storage.internals.log.{FetchPartitionStatus, LogOffsetMetadata, LogReadResult} import java.util import scala.collection._ @@ -45,6 +47,7 @@ class DelayedFetch( params: FetchParams, classicFetchPartitionStatus: util.LinkedHashMap[TopicIdPartition, FetchPartitionStatus], disklessFetchPartitionStatus: util.LinkedHashMap[TopicIdPartition, FetchPartitionStatus] = new util.LinkedHashMap[TopicIdPartition, FetchPartitionStatus](), + consolidatingSupplements: Map[TopicIdPartition, Long] = Map.empty, replicaManager: ReplicaManager, quota: ReplicaQuota, maxWaitMs: Option[Long] = None, @@ -225,7 +228,6 @@ class DelayedFetch( * Upon completion, read whatever data is available and pass to the complete callback */ override def onComplete(): Unit = { - // Complete the classic fetches first val classicFetchInfos = classicFetchPartitionStatus.asScala.iterator.map { case (tp, status) => tp -> status.fetchInfo }.toBuffer @@ -235,50 +237,107 @@ class DelayedFetch( val totalRequestsSize = classicRequestsSize + disklessRequestsSize if (totalRequestsSize == 0) { - // No partitions to fetch, just return an empty response responseCallback(Seq.empty) return } - val fetchPartitionData = if (classicRequestsSize > 0) { - // adjust the max bytes for classic fetches based on the percentage of classic partitions + // Read classic partitions from local log. + val (logReadResults, logReadResultMap) = if (classicRequestsSize > 0) { val classicPercentage = classicRequestsSize / totalRequestsSize val classicParams = replicaManager.fetchParamsWithNewMaxBytes(params, classicPercentage) + val results = replicaManager.readFromLog(classicParams, classicFetchInfos, quota, readFromPurgatory = true) + val resultMap = new util.HashMap[TopicIdPartition, LogReadResult]() + results.foreach { case (tp, r) => resultMap.put(tp, r) } + (results, resultMap) + } else (Seq.empty, new util.HashMap[TopicIdPartition, LogReadResult]()) - val logReadResults = replicaManager.readFromLog( - classicParams, - classicFetchInfos, - quota, - readFromPurgatory = true - ) + // Supplement consolidating partitions whose local read has remaining byte budget. + // consolidatingSupplements is passed from fetchMessages (identified at routing time), + // so no per-partition metadata lookups are needed here. + val supplementFetchInfos = + if (consolidatingSupplements.nonEmpty) + replicaManager.buildConsolidationSupplementFetchInfos(consolidatingSupplements, classicFetchInfos.toSeq, logReadResultMap) + else Seq.empty - logReadResults.map { case (tp, result) => - val isReassignmentFetch = params.isFromFollower && - replicaManager.isAddingReplica(tp.topicPartition, params.replicaId) - - tp -> result.toFetchPartitionData(isReassignmentFetch) + val emptySupplementMap: Map[TopicIdPartition, FetchPartitionData] = Map.empty + val supplementFuture: CompletableFuture[Map[TopicIdPartition, FetchPartitionData]] = + if (supplementFetchInfos.nonEmpty) { + val supplementParams = replicaManager.fetchParamsWithNewMaxBytes( + params, supplementFetchInfos.size.toFloat / totalRequestsSize) + replicaManager.fetchDisklessMessages(supplementParams, supplementFetchInfos) + .thenApply[Map[TopicIdPartition, FetchPartitionData]](_.toMap) + .exceptionally((e: Throwable) => { + logger.warn("Failed to fetch diskless supplement for consolidating partitions in delayed fetch, returning local data only", e) + emptySupplementMap + }) + } else { + CompletableFuture.completedFuture(emptySupplementMap) } - } else Seq.empty - if (disklessRequestsSize > 0) { - // Classic fetches are complete, now handle diskless fetches - // adjust the max bytes for diskless fetches based on the percentage of diskless partitions - val disklessPercentage = disklessRequestsSize / totalRequestsSize - val disklessParams = replicaManager.fetchParamsWithNewMaxBytes(params, disklessPercentage) - val disklessFetchInfos = disklessFetchPartitionStatus.asScala.map { case (tp, status) => - tp -> status.fetchInfo + val emptyDisklessSeq: Seq[(TopicIdPartition, FetchPartitionData)] = Seq.empty + val disklessFuture: CompletableFuture[Seq[(TopicIdPartition, FetchPartitionData)]] = + if (disklessRequestsSize > 0) { + val disklessPercentage = disklessRequestsSize / totalRequestsSize + val disklessParams = replicaManager.fetchParamsWithNewMaxBytes(params, disklessPercentage) + val disklessFetchInfos = disklessFetchPartitionStatus.asScala.map { case (tp, status) => + tp -> status.fetchInfo + } + replicaManager.fetchDisklessMessages(disklessParams, disklessFetchInfos.toSeq) + .exceptionally((e: Throwable) => { + logger.warn("Failed to fetch diskless data in delayed fetch, returning per-partition errors", e) + val error = Errors.forException(e) + disklessFetchInfos.map { case (tp, _) => + tp -> new FetchPartitionData( + error, + -1L, + -1L, + MemoryRecords.EMPTY, + util.Optional.empty(), + util.OptionalLong.empty(), + util.Optional.empty(), + util.OptionalInt.empty(), + false + ) + }.toSeq + }) + } else { + CompletableFuture.completedFuture(emptyDisklessSeq) } - val disklessFetchResponseFuture = replicaManager.fetchDisklessMessages(disklessParams, disklessFetchInfos.toSeq) - // Combine the classic fetch results with the diskless fetch results - disklessFetchResponseFuture.whenComplete { case (disklessFetchPartitionData, _) => - // Do a single response callback with both classic and diskless fetch results - responseCallback(fetchPartitionData ++ disklessFetchPartitionData) - } - } else { - // No diskless fetches, just return the classic fetch results - responseCallback(fetchPartitionData) - } + // Join both futures and fire the single response callback once both are ready. + // The supplement and pure-diskless fetches run concurrently so latency equals the slower of the two. + // The AtomicBoolean ensures responseCallback is called exactly once even if an unexpected exception + // escapes the thenCombine lambda after the happy-path callback has already fired. + val callbackFired = new AtomicBoolean(false) + supplementFuture.thenCombine[Seq[(TopicIdPartition, FetchPartitionData)], Void](disklessFuture, + (supplementData: Map[TopicIdPartition, FetchPartitionData], + disklessData: Seq[(TopicIdPartition, FetchPartitionData)]) => { + val classicData = logReadResults.map { case (tp, result) => + val isReassignmentFetch = params.isFromFollower && + replicaManager.isAddingReplica(tp.topicPartition, params.replicaId) + val localData = result.toFetchPartitionData(isReassignmentFetch) + val mergedData = supplementData.get(tp) match { + // don't merge if local result has errors + case Some(sd) if result.error == Errors.NONE && sd.error == Errors.NONE && sd.records.sizeInBytes > 0 => + try replicaManager.mergeConsolidationSupplement(tp, localData, sd) + catch { + case e: Exception => + logger.warn(s"Failed to merge diskless supplement for $tp in delayed fetch, returning local data only", e) + localData + } + case _ => localData + } + tp -> mergedData + } + if (callbackFired.compareAndSet(false, true)) + responseCallback(classicData ++ disklessData) + null + }).exceptionally((e: Throwable) => { + logger.error("Unexpected error in delayed fetch completion", e) + if (callbackFired.compareAndSet(false, true)) + responseCallback(Seq.empty) + null + }) } } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 15e04245035..dd05f6e23fb 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -18,7 +18,7 @@ package kafka.server import com.yammer.metrics.core.Meter import io.aiven.inkless.common.SharedState -import io.aiven.inkless.consume.{FetchHandler, FetchOffsetHandler, Reader} +import io.aiven.inkless.consume.{ConcatenatedRecords, FetchHandler, FetchOffsetHandler, Reader} import io.aiven.inkless.control_plane.{BatchInfo, FindBatchRequest, FindBatchResponse, InitDisklessLogProducerState} import io.aiven.inkless.delete.{DeleteRecordsInterceptor, FileCleaner, RetentionEnforcer} import io.aiven.inkless.produce.AppendHandler @@ -79,6 +79,7 @@ import org.apache.kafka.storage.log.metrics.BrokerTopicStats import java.io.File import java.lang.{Long => JLong} +import java.nio.ByteBuffer import java.nio.file.{Files, Paths} import java.util import java.util.concurrent.atomic.AtomicBoolean @@ -2001,6 +2002,101 @@ class ReplicaManager(val config: KafkaConfig, originalParams.isolation, originalParams.clientMetadata, originalParams.shareFetchRequest) } + /** + * Build the diskless supplement fetch requests for consolidating partitions whose local log + * read did not satisfy minBytes. For each tracked partition, computes the remaining byte budget + * (original maxBytes minus what the local read already returned) and emits a PartitionData + * starting where the local read left off. Partitions whose remaining budget is zero, or whose + * local read has not yet reached the local log end offset (the classic->diskless seal), are + * dropped — see the exhaustion guard below. + */ + private[server] def buildConsolidationSupplementFetchInfos( + supplements: Map[TopicIdPartition, Long], + fetchInfos: Seq[(TopicIdPartition, PartitionData)], + logReadResultMap: util.Map[TopicIdPartition, LogReadResult] + ): Seq[(TopicIdPartition, PartitionData)] = { + val fetchInfoByTp = fetchInfos.toMap + supplements.flatMap { case (tp, logEndOffset) => + fetchInfoByTp.get(tp).flatMap { pd => + val readResult = Option(logReadResultMap.get(tp)) + val hasError = readResult.exists(_.error != Errors.NONE) + val alreadyRead = readResult.map(_.info.records.sizeInBytes).getOrElse(0) + val remainingBytes = Math.max(pd.maxBytes - alreadyRead, 0) + // Start the supplement where the local read left off, not at logEndOffset. + // Diskless has the full range so it can serve from any offset. This avoids a gap + // when the local read stopped at a segment boundary before reaching logEndOffset. + val supplementStartOffset = readResult + .map(_.info.records.lastBatch()) + .filter(_.isPresent) + .map(_.get().nextOffset()) + .getOrElse(logEndOffset) + // Only supplement once the local log is exhausted: the local read must have reached the + // local log end offset, which for a frozen consolidating log equals the classic->diskless + // seal. If it stopped at an earlier segment boundary (supplementStartOffset < logEndOffset), + // skip the supplement — the consumer re-fetches and walks the remaining local segments, as + // it would for any classic multi-segment lag. Supplementing from below the seal would stitch + // the local prefix directly onto the diskless range (which starts at the seal) and silently + // drop the committed range [supplementStartOffset, seal). + if (!hasError && remainingBytes > 0 && supplementStartOffset >= logEndOffset) + Some(tp -> new PartitionData(tp.topicId(), supplementStartOffset, pd.logStartOffset, remainingBytes, pd.currentLeaderEpoch, pd.lastFetchedEpoch)) + else + None + } + }.toSeq + } + + /** + * Merges a diskless supplement into the local-log fetch result for a consolidating partition. + * The supplement provides records beyond the local logEndOffset, and its HW/LSO supersede + * the local values. Local records are materialized from FileRecords to MemoryRecords if needed + * before being passed to ConcatenatedRecords. + */ + private[server] def mergeConsolidationSupplement( + tp: TopicIdPartition, + localData: FetchPartitionData, + supplementData: FetchPartitionData + ): FetchPartitionData = { + // Local-log reads return FileRecords (a memory-mapped segment slice), not MemoryRecords. + // ConcatenatedRecords backs onto MemoryRecords, so materialize the local slice into a + // heap buffer first. This is the standard idiom used by AbstractFetcherThread and the + // coordinator loaders for the same FileRecords->MemoryRecords conversion. + val localRecords = localData.records match { + case mr: MemoryRecords => mr + case fr: FileRecords => + val buffer = ByteBuffer.allocate(fr.sizeInBytes) + fr.readInto(buffer, 0) + MemoryRecords.readableRecords(buffer) + case other => + error(s"Unexpected Records type from local log read for $tp: ${other.getClass.getName}. Returning local data only.") + return localData + } + val mergedRecords = try { + ConcatenatedRecords.concat(localRecords, supplementData.records) + } catch { + case e: IllegalArgumentException => + error(s"${e.getMessage} for $tp. Returning local data only.") + return localData + } + // Pass through local abortedTransactions so READ_COMMITTED consumers keep abort markers for + // the local portion. Warn if present: transactions spanning the consolidation boundary into + // the diskless portion are not supported and those offsets will have no abort markers. + if (localData.abortedTransactions.isPresent && !localData.abortedTransactions.get.isEmpty) + warn(s"Consolidating diskless partition $tp has aborted transactions in the local log but diskless " + + s"storage does not support transactions — abort markers beyond logEndOffset will be missing") + // isReassignmentFetch is false: the supplement only fires for consumer fetches, never for follower/reassignment paths. + new FetchPartitionData( + localData.error, + supplementData.highWatermark, + Math.min(localData.logStartOffset, supplementData.logStartOffset), + mergedRecords, + supplementData.divergingEpoch, + supplementData.lastStableOffset, + localData.abortedTransactions, + localData.preferredReadReplica, + false + ) + } + /** * Fetch messages from a replica, and wait until enough data can be fetched and return; * the callback function will be triggered either when timeout or required fetch info is satisfied. @@ -2018,6 +2114,9 @@ class ReplicaManager(val config: KafkaConfig, val disklessFetchInfos = new mutable.ArrayBuffer[(TopicIdPartition, PartitionData)]() val classicFetchInfos = new mutable.ArrayBuffer[(TopicIdPartition, PartitionData)]() val immediateFetchResponses = new mutable.ArrayBuffer[(TopicIdPartition, FetchPartitionData)]() + // Consolidating partitions served from local log that may need a diskless supplement. + // Maps tp -> logEndOffset (the offset where the diskless supplement should start). + val consolidatingLocalFetchSupplements = new mutable.HashMap[TopicIdPartition, Long]() fetchInfos.foreach { fetchInfo => val (tp, fetchPartitionData) = fetchInfo @@ -2035,8 +2134,18 @@ class ReplicaManager(val config: KafkaConfig, if (isConsolidatingPartition) { getPartitionOrError(tp.topicPartition) match { case Right(partition) => - shouldReadFromUnifiedLog = shouldReadFromUnifiedLog || - partition.log.exists(fetchPartitionData.fetchOffset < _.logEndOffset) + val logEndOffset = partition.log.map(_.logEndOffset).getOrElse(0L) + if (fetchPartitionData.fetchOffset < logEndOffset) { + // Local log has data for this offset range — serve from local, track for diskless supplement + shouldReadFromUnifiedLog = true + // Skip supplement tracking when the fetch offset falls in the tiered-storage range + // (below localLogStartOffset). The read will route to RemoteLogManager and the + // supplement data would be discarded by processRemoteFetches anyway. + val localLogStartOffset = partition.log.map(_.localLogStartOffset).getOrElse(0L) + if (inklessSharedState.isDefined && fetchPartitionData.fetchOffset >= localLogStartOffset) + consolidatingLocalFetchSupplements += (tp -> logEndOffset) + } + // else: consumer is at or beyond consolidation frontier, diskless-only case Left(error) => warn(s"Error while fetching partition ${tp.topicPartition()} for consolidating diskless topic: $error. " + s"Returning error for the fetch request since we cannot determine if the partition has switched to diskless or not.") @@ -2138,8 +2247,11 @@ class ReplicaManager(val config: KafkaConfig, inklessSharedState match { case None => - if (disklessFetchInfos.nonEmpty) { - error(s"Received diskless fetch request for topics ${disklessFetchInfos.map(_._1.topic()).distinct.mkString(", ")} but diskless storage system is not enabled. " + + if (disklessFetchInfos.nonEmpty || consolidatingLocalFetchSupplements.nonEmpty) { + val disklessTopics = disklessFetchInfos.map(_._1.topic()).distinct + val consolidatingTopics = consolidatingLocalFetchSupplements.keys.map(_.topic()).toSeq.distinct + val allTopics = (disklessTopics ++ consolidatingTopics).distinct + error(s"Received diskless fetch request for topics ${allTopics.mkString(", ")} but diskless storage system is not enabled. " + s"Replying an empty response.") respond(Seq.empty) return @@ -2188,6 +2300,7 @@ class ReplicaManager(val config: KafkaConfig, quota = quota, maxWaitMs = Some(maxWaitMs), minBytes = Some(minBytes), + consolidatingSupplements = if (params.isFromFollower) Map.empty else consolidatingLocalFetchSupplements.toMap, responseCallback = respond, ) @@ -2239,9 +2352,55 @@ class ReplicaManager(val config: KafkaConfig, logReadResultMap.put(topicIdPartition, logReadResult) } + // For consolidating partitions where local log was read, supplement with diskless data if minBytes not satisfied. + // Only runs when there are no pure-diskless partitions in the request: if disklessFetchInfos is non-empty the + // request will park in DelayedFetch regardless, where the supplement runs concurrently with the diskless fetch. + // Running it here as well would block for disklessFetchMaxWaitMs and then discard the result. + var consolidationSupplementData = Map.empty[TopicIdPartition, FetchPartitionData] + if (consolidatingLocalFetchSupplements.nonEmpty && + disklessFetchInfos.isEmpty && + !params.isFromFollower && // safeguard: followers must not receive diskless records merged into local-log data + params.maxWaitMs > 0 && // safeguard: non-blocking polls must not be held by a diskless round-trip + !hasPreferredReadReplica && + bytesReadable < params.minBytes && + !errorReadingData) { + val supplementFetchInfos = buildConsolidationSupplementFetchInfos(consolidatingLocalFetchSupplements, fetchInfos, logReadResultMap) + + if (supplementFetchInfos.nonEmpty) { + try { + val supplementParams = fetchParamsWithNewMaxBytes(params, supplementFetchInfos.size.toFloat / fetchInfos.size.toFloat) + // Future not cancelled on failure — diskless reads are idempotent and hold no resources. + consolidationSupplementData = fetchDisklessMessages(supplementParams, supplementFetchInfos) + .get(Math.max(config.disklessFetchMaxWaitMs.toLong, params.maxWaitMs), TimeUnit.MILLISECONDS) + .toMap + bytesReadable += consolidationSupplementData.values + .filter(_.error == Errors.NONE).map(_.records.sizeInBytes).sum + } catch { + case e: InterruptedException => + Thread.currentThread().interrupt() + logger.warn("Interrupted while fetching diskless supplement for consolidating partitions, returning local data only", e) + case e: java.util.concurrent.TimeoutException => + logger.warn("Timed out fetching diskless supplement for consolidating partitions, returning local data only", e) + case e: Throwable => + logger.warn("Failed to fetch diskless supplement for consolidating partitions, returning local data only", e) + } + } + } + val fetchPartitionData = logReadResults.map { case (tp, result) => val isReassignmentFetch = params.isFromFollower && isAddingReplica(tp.topicPartition, params.replicaId) - tp -> result.toFetchPartitionData(isReassignmentFetch) + val localData = result.toFetchPartitionData(isReassignmentFetch) + val mergedData = consolidationSupplementData.get(tp) match { + case Some(supplementData) if supplementData.error == Errors.NONE && supplementData.records.sizeInBytes > 0 => + try mergeConsolidationSupplement(tp, localData, supplementData) + catch { + case e: Exception => + logger.warn(s"Failed to merge diskless supplement for consolidating partition $tp, returning local data only", e) + localData + } + case _ => localData + } + tp -> mergedData } // Respond immediately if no remote fetches are required and any of the below conditions is true diff --git a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala index fa6c71dff29..6e5247738bb 100644 --- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala +++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala @@ -17,8 +17,9 @@ package kafka.server import io.aiven.inkless.control_plane.{BatchInfo, BatchMetadata, FindBatchRequest, FindBatchResponse} +import kafka.utils.TestUtils -import java.util.{Collections, Optional, OptionalLong} +import java.util.{Collections, Optional, OptionalInt, OptionalLong} import scala.collection.Seq import kafka.cluster.Partition import org.apache.kafka.common.{TopicIdPartition, Uuid} @@ -29,12 +30,12 @@ import org.apache.kafka.common.record.{MemoryRecords, TimestampType} import org.apache.kafka.common.requests.FetchRequest import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, FetchPartitionData} import org.apache.kafka.storage.internals.log.{FetchDataInfo, FetchPartitionStatus, LogOffsetMetadata, LogOffsetSnapshot, LogReadResult} -import org.junit.jupiter.api.{Nested, Test} +import org.junit.jupiter.api.{BeforeEach, Nested, Test} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource import org.mockito.ArgumentMatchers.{any, anyFloat, anyInt, anyLong} -import org.mockito.Mockito.{mock, never, verify, when} +import org.mockito.Mockito.{mock, never, times, verify, when} import java.util.concurrent.CompletableFuture @@ -45,6 +46,11 @@ class DelayedFetchTest { private val replicaManager: ReplicaManager = mock(classOf[ReplicaManager]) private val replicaQuota: ReplicaQuota = mock(classOf[ReplicaQuota]) + @BeforeEach + def setUp(): Unit = { + when(replicaManager.buildConsolidationSupplementFetchInfos(any(), any(), any())).thenReturn(Seq.empty) + } + @Test def testFetchWithFencedEpoch(): Unit = { val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") @@ -642,6 +648,220 @@ class DelayedFetchTest { verify(mockResponse).estimatedByteSize(fetchOffset) } + @Test + def testCompletionWithConsolidationSupplement(): Unit = { + // Verifies that a consolidating partition's local data is supplemented with diskless data + // and that a pure-diskless partition's data also arrives in the response. + val consolidatingTp = new TopicIdPartition(Uuid.randomUuid(), 0, "consolidating-topic") + val disklessTp = new TopicIdPartition(Uuid.randomUuid(), 0, "diskless-topic") + val fetchOffset = 100L + val logEndOffset = 150L + val logStartOffset = 0L + val currentLeaderEpoch = Optional.of[Integer](1) + val minBytes = 1024 // high enough that local data alone won't satisfy it + + // Classic (consolidating) partition status + val classicFetchStatus = new FetchPartitionStatus( + new LogOffsetMetadata(fetchOffset), + new FetchRequest.PartitionData(consolidatingTp.topicId(), fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch) + ) + val classicStatusMap = new util.LinkedHashMap[TopicIdPartition, FetchPartitionStatus]() + classicStatusMap.put(consolidatingTp, classicFetchStatus) + + // Diskless partition status + val disklessFetchStatus = new FetchPartitionStatus( + new LogOffsetMetadata(fetchOffset), + new FetchRequest.PartitionData(disklessTp.topicId(), fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch) + ) + val disklessStatusMap = new util.LinkedHashMap[TopicIdPartition, FetchPartitionStatus]() + disklessStatusMap.put(disklessTp, disklessFetchStatus) + + val fetchParams = new FetchParams( + -1, // consumer + 1, + 500L, + minBytes, + maxBytes, + FetchIsolation.HIGH_WATERMARK, + Optional.empty() + ) + + @volatile var callbackResult: Option[Seq[(TopicIdPartition, FetchPartitionData)]] = None + def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { + callbackResult = Some(responses) + } + + val delayedFetch = new DelayedFetch( + params = fetchParams, + classicFetchPartitionStatus = classicStatusMap, + disklessFetchPartitionStatus = disklessStatusMap, + replicaManager = replicaManager, + quota = replicaQuota, + consolidatingSupplements = Map(consolidatingTp -> logEndOffset), + responseCallback = callback + ) + + // Stub readFromLog to return a small local read (below minBytes) + val localReadResult = new LogReadResult( + new FetchDataInfo(new LogOffsetMetadata(fetchOffset), MemoryRecords.EMPTY), + Optional.empty(), -1L, -1L, -1L, -1L, -1L, OptionalLong.empty(), Errors.NONE) + when(replicaManager.readFromLog(any[FetchParams], any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]], any[ReplicaQuota], any[Boolean])) + .thenReturn(Seq((consolidatingTp, localReadResult))) + + // Stub buildConsolidationSupplementFetchInfos to return non-empty supplement info + val supplementFetchInfo = (consolidatingTp, new FetchRequest.PartitionData( + consolidatingTp.topicId(), logEndOffset, logStartOffset, maxBytes, currentLeaderEpoch)) + when(replicaManager.buildConsolidationSupplementFetchInfos(any(), any(), any())) + .thenReturn(Seq(supplementFetchInfo)) + + // Prepare supplement and diskless FetchPartitionData + val supplementRecords = mock(classOf[MemoryRecords]) + when(supplementRecords.sizeInBytes).thenReturn(512) + val supplementData = new FetchPartitionData(Errors.NONE, logEndOffset + 100, logEndOffset, + supplementRecords, Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false) + + val disklessRecords = mock(classOf[MemoryRecords]) + when(disklessRecords.sizeInBytes).thenReturn(256) + val disklessData = new FetchPartitionData(Errors.NONE, 200L, 0L, + disklessRecords, Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false) + + // Stub fetchDisklessMessages: first call returns supplement, second call returns diskless + when(replicaManager.fetchDisklessMessages(any[FetchParams], any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]])) + .thenReturn(CompletableFuture.completedFuture(Seq((consolidatingTp, supplementData)))) + .thenReturn(CompletableFuture.completedFuture(Seq((disklessTp, disklessData)))) + + // Stub fetchParamsWithNewMaxBytes + when(replicaManager.fetchParamsWithNewMaxBytes(any[FetchParams], anyFloat())).thenAnswer(_.getArgument(0)) + + // Stub mergeConsolidationSupplement to return a merged result + val mergedRecords = mock(classOf[MemoryRecords]) + when(mergedRecords.sizeInBytes).thenReturn(768) + val mergedData = new FetchPartitionData(Errors.NONE, logEndOffset + 100, logEndOffset, + mergedRecords, Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false) + when(replicaManager.mergeConsolidationSupplement(any[TopicIdPartition], any[FetchPartitionData], any[FetchPartitionData])) + .thenReturn(mergedData) + + when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false) + + // Trigger onComplete + delayedFetch.forceComplete() + + TestUtils.waitUntilTrue(() => callbackResult.isDefined, "responseCallback should have been called") + val results = callbackResult.get + assertEquals(2, results.size) + + val resultMap = results.toMap + assertEquals(mergedData, resultMap(consolidatingTp)) + assertEquals(disklessData, resultMap(disklessTp)) + + // Verify mergeConsolidationSupplement was called exactly once + verify(replicaManager, times(1)).mergeConsolidationSupplement( + any[TopicIdPartition], any[FetchPartitionData], any[FetchPartitionData]) + } + + // When the local read for a consolidating partition returns an error, the supplement fetch + // must not be issued — buildConsolidationSupplementFetchInfos must not receive that partition. + @Test + def testConsolidationSupplementNotIssuedWhenLocalReadHasError(): Unit = { + val consolidatingTp = new TopicIdPartition(Uuid.randomUuid(), 0, "consolidating") + val fetchOffset = 50L + val logStartOffset = 0L + val logEndOffset = 100L + val currentLeaderEpoch = Optional.of[Integer](1) + + val classicFetchStatus = new FetchPartitionStatus( + new LogOffsetMetadata(fetchOffset), + new FetchRequest.PartitionData(consolidatingTp.topicId(), fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) + val classicStatusMap = new util.LinkedHashMap[TopicIdPartition, FetchPartitionStatus]() + classicStatusMap.put(consolidatingTp, classicFetchStatus) + + val fetchParams = new FetchParams(-1, 1, 500L, 1, maxBytes, FetchIsolation.HIGH_WATERMARK, Optional.empty()) + + @volatile var callbackResult: Option[Seq[(TopicIdPartition, FetchPartitionData)]] = None + val delayedFetch = new DelayedFetch( + params = fetchParams, + classicFetchPartitionStatus = classicStatusMap, + replicaManager = replicaManager, + quota = replicaQuota, + consolidatingSupplements = Map(consolidatingTp -> logEndOffset), + responseCallback = responses => callbackResult = Some(responses) + ) + + // Local read returns an error — supplement must not be issued for this partition + val errorReadResult = new LogReadResult( + new FetchDataInfo(new LogOffsetMetadata(fetchOffset), MemoryRecords.EMPTY), + Optional.empty(), -1L, -1L, -1L, -1L, -1L, OptionalLong.empty(), Errors.NOT_LEADER_OR_FOLLOWER) + when(replicaManager.readFromLog(any[FetchParams], any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]], any[ReplicaQuota], any[Boolean])) + .thenReturn(Seq((consolidatingTp, errorReadResult))) + when(replicaManager.fetchParamsWithNewMaxBytes(any[FetchParams], anyFloat())).thenAnswer(_.getArgument(0)) + when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false) + + delayedFetch.forceComplete() + + TestUtils.waitUntilTrue(() => callbackResult.isDefined, "responseCallback should have been called") + // The erroring partition must be filtered out — supplement fetch and merge must not fire. + verify(replicaManager, never()).mergeConsolidationSupplement(any(), any(), any()) + } + + // When the local read for a consolidating partition returns an error, the supplement must not + // be applied — mergeConsolidationSupplement must never be called for an erroring local result. + @Test + def testConsolidationSupplementSkippedWhenLocalReadHasError(): Unit = { + val consolidatingTp = new TopicIdPartition(Uuid.randomUuid(), 0, "consolidating") + val fetchOffset = 50L + val logStartOffset = 0L + val logEndOffset = 100L + val currentLeaderEpoch = Optional.of[Integer](1) + + val classicFetchStatus = new FetchPartitionStatus( + new LogOffsetMetadata(fetchOffset), + new FetchRequest.PartitionData(consolidatingTp.topicId(), fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) + val classicStatusMap = new util.LinkedHashMap[TopicIdPartition, FetchPartitionStatus]() + classicStatusMap.put(consolidatingTp, classicFetchStatus) + + val fetchParams = new FetchParams(-1, 1, 500L, 1, maxBytes, FetchIsolation.HIGH_WATERMARK, Optional.empty()) + + @volatile var callbackResult: Option[Seq[(TopicIdPartition, FetchPartitionData)]] = None + val delayedFetch = new DelayedFetch( + params = fetchParams, + classicFetchPartitionStatus = classicStatusMap, + replicaManager = replicaManager, + quota = replicaQuota, + consolidatingSupplements = Map(consolidatingTp -> logEndOffset), + responseCallback = responses => callbackResult = Some(responses) + ) + + // Local read returns an error + val errorReadResult = new LogReadResult( + new FetchDataInfo(new LogOffsetMetadata(fetchOffset), MemoryRecords.EMPTY), + Optional.empty(), -1L, -1L, -1L, -1L, -1L, OptionalLong.empty(), Errors.NOT_LEADER_OR_FOLLOWER) + when(replicaManager.readFromLog(any[FetchParams], any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]], any[ReplicaQuota], any[Boolean])) + .thenReturn(Seq((consolidatingTp, errorReadResult))) + + val supplementFetchInfo = (consolidatingTp, new FetchRequest.PartitionData( + consolidatingTp.topicId(), logEndOffset, logStartOffset, maxBytes, currentLeaderEpoch)) + when(replicaManager.buildConsolidationSupplementFetchInfos(any(), any(), any())) + .thenReturn(Seq(supplementFetchInfo)) + + val supplementRecords = mock(classOf[MemoryRecords]) + when(supplementRecords.sizeInBytes).thenReturn(512) + val supplementData = new FetchPartitionData(Errors.NONE, logEndOffset + 100, logEndOffset, + supplementRecords, Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false) + when(replicaManager.fetchDisklessMessages(any[FetchParams], any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]])) + .thenReturn(CompletableFuture.completedFuture(Seq((consolidatingTp, supplementData)))) + when(replicaManager.fetchParamsWithNewMaxBytes(any[FetchParams], anyFloat())).thenAnswer(_.getArgument(0)) + when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false) + + delayedFetch.forceComplete() + + TestUtils.waitUntilTrue(() => callbackResult.isDefined, "responseCallback should have been called") + val results = callbackResult.get.toMap + assertEquals(1, results.size) + // Error from the local read must be preserved — supplement must not overwrite it + assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, results(consolidatingTp).error) + verify(replicaManager, never()).mergeConsolidationSupplement(any(), any(), any()) + } + @Test def testCompletionWhenErrorOccursDuringDisklessBatchFinding(): Unit = { // Case C: When an error occurs while trying to find diskless batches, fetch should complete immediately @@ -729,5 +949,54 @@ class DelayedFetchTest { verify(mockResponse, never()).estimatedByteSize(anyLong()) verify(mockResponse, never()).highWatermark() } + + // When the diskless fetch future fails, the response must include per-partition error entries + // rather than silently dropping those partitions from the response. + @Test + def testDisklessFetchFailureReturnsPerPartitionErrors(): Unit = { + val disklessTp = new TopicIdPartition(Uuid.randomUuid(), 0, "diskless-topic") + val fetchOffset = 200L + val logStartOffset = 0L + val currentLeaderEpoch = Optional.of[Integer](1) + val minBytes = 100 + + val disklessFetchStatus = new FetchPartitionStatus( + new LogOffsetMetadata(fetchOffset), + new FetchRequest.PartitionData(disklessTp.topicId(), fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) + val disklessStatusMap = new util.LinkedHashMap[TopicIdPartition, FetchPartitionStatus]() + disklessStatusMap.put(disklessTp, disklessFetchStatus) + + val fetchParams = new FetchParams(-1, 1, 500L, minBytes, maxBytes, FetchIsolation.HIGH_WATERMARK, Optional.empty()) + + @volatile var callbackResult: Option[Seq[(TopicIdPartition, FetchPartitionData)]] = None + val delayedFetch = new DelayedFetch( + params = fetchParams, + classicFetchPartitionStatus = new util.LinkedHashMap[TopicIdPartition, FetchPartitionStatus](), + disklessFetchPartitionStatus = disklessStatusMap, + replicaManager = replicaManager, + quota = replicaQuota, + responseCallback = responses => callbackResult = Some(responses) + ) + + when(replicaManager.readFromLog(any[FetchParams], any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]], any[ReplicaQuota], any[Boolean])) + .thenReturn(Seq.empty) + when(replicaManager.fetchParamsWithNewMaxBytes(any[FetchParams], anyFloat())).thenAnswer(_.getArgument(0)) + when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false) + + // Diskless fetch fails with an exception + val failedFuture = new CompletableFuture[Seq[(TopicIdPartition, FetchPartitionData)]]() + failedFuture.completeExceptionally(new RuntimeException("Object storage unavailable")) + when(replicaManager.fetchDisklessMessages(any[FetchParams], any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]])) + .thenReturn(failedFuture) + + delayedFetch.forceComplete() + + TestUtils.waitUntilTrue(() => callbackResult.isDefined, "responseCallback should have been called") + val results = callbackResult.get.toMap + assertEquals(1, results.size, "Response must include the diskless partition even on failure") + val partitionData = results(disklessTp) + assertNotEquals(Errors.NONE, partitionData.error, "Error must be set for the failed partition") + assertEquals(MemoryRecords.EMPTY, partitionData.records) + } } } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerInklessTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerInklessTest.scala index f40ffd3e152..f5a10ff8221 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerInklessTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerInklessTest.scala @@ -20,7 +20,7 @@ package kafka.server import io.aiven.inkless.common.SharedState import io.aiven.inkless.config.InklessConfig import io.aiven.inkless.consolidation.{ConsolidatedDisklessLogPruner, ConsolidationFetcherManager} -import io.aiven.inkless.consume.{FetchHandler, FetchOffsetHandler} +import io.aiven.inkless.consume.{ConcatenatedRecords, FetchHandler, FetchOffsetHandler} import io.aiven.inkless.control_plane.{BatchInfo, BatchMetadata, ControlPlane, ControlPlaneException, FindBatchResponse, DeleteRecordsResponse => CpDeleteRecordsResponse} import io.aiven.inkless.produce.AppendHandler import kafka.cluster.Partition @@ -73,7 +73,7 @@ import java.util import java.util.{Collections, Optional, OptionalInt, OptionalLong, Properties} import java.util.concurrent.{CompletableFuture, CountDownLatch, TimeUnit} import java.util.function.Consumer -import scala.collection.{Map, Seq} +import scala.collection.{Map, Seq, mutable} import scala.jdk.CollectionConverters._ class ReplicaManagerInklessTest { @@ -100,6 +100,30 @@ class ReplicaManagerInklessTest { val RECORDS: MemoryRecords = MemoryRecords.withRecords( 2.toByte, 0L, Compression.NONE, TimestampType.CREATE_TIME, 123L, 0.toShort, 0, 0, false, new SimpleRecord(0, "hello".getBytes()) ) + + // A single-record local batch whose nextOffset == 100, i.e. it ends exactly at the seal + // (classicToDisklessStartOffset = 100) used by the consolidating-fetch tests. The supplement is + // only emitted once the local read reaches the seal, so end-to-end tests that exercise the + // supplement merge must model a local read that ends there (not below it). Same byte size as + // RECORDS, so RECORDS.sizeInBytes can still be used for minBytes thresholds. + val RECORDS_AT_SEAL: MemoryRecords = MemoryRecords.withRecords( + 2.toByte, 99L, Compression.NONE, TimestampType.CREATE_TIME, 123L, 0.toShort, 0, 0, false, new SimpleRecord(0, "hello".getBytes()) + ) + + // Real local-log reads return FileRecords, not MemoryRecords. Use this to match production types. + private def memoryRecordsToFileRecords(records: MemoryRecords): FileRecords = { + val file = TestUtils.tempFile() + val fileRecords = FileRecords.open(file) + try { + fileRecords.append(records) + fileRecords.flush() + } catch { + case e: Throwable => + fileRecords.close() + throw e + } + fileRecords + } val disklessTopicPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "diskless") val classicTopicPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "classic") @@ -1167,6 +1191,1041 @@ class ReplicaManagerInklessTest { } } + + + // When local log has data but below minBytes and diskless data is available, the fetch must + // respond immediately (merged local+diskless) without parking in the delayed-fetch purgatory. + @Test + def testFetchConsolidatingSupplementRespondsWithoutDelayWhenDisklessDataAvailable(): Unit = { + val supplementRecords = MemoryRecords.withRecords( + 2.toByte, 100L, Compression.NONE, TimestampType.CREATE_TIME, 456L, 0.toShort, 0, 0, false, new SimpleRecord(0, "supplement".getBytes()) + ) + val disklessResponse = Map(disklessTopicPartition -> + new FetchPartitionData( + Errors.NONE, + 500L, 0L, + supplementRecords, + Optional.empty(), OptionalLong.of(500L), Optional.empty(), OptionalInt.empty(), false) + ) + val fetchHandlerCtor = mockFetchHandler(disklessResponse) + val cp = mock(classOf[ControlPlane]) + val timer = new MockTimer(time) + val fetchPurgatory = new DelayedOperationPurgatory[DelayedFetch]("Fetch", timer, 0, false) + val replicaManager = spy(createReplicaManager( + List(disklessTopicPartition.topic()), + controlPlane = Some(cp), + disklessManagedReplicasEnabled = true, + disklessRemoteStorageConsolidationEnabled = true, + consolidatingDisklessTopics = Set(disklessTopicPartition.topic()), + delayedFetchPurgatory = Some(fetchPurgatory), + )) + var localFileRecords: FileRecords = null + try { + when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(disklessTopicPartition.topicPartition())) + .thenReturn(100L) + // Stub fetchOffsetSnapshot so DelayedFetch.tryComplete doesn't NPE if the request parks + // on a build without the supplement. + val mockPartition = mock(classOf[Partition]) + val mockLog = mock(classOf[UnifiedLog]) + when(mockLog.logEndOffset).thenReturn(100L) + when(mockPartition.log).thenReturn(Some(mockLog)) + val endOffsetMetadata = new LogOffsetMetadata(100L, 0L, 0) + when(mockPartition.fetchOffsetSnapshot(any(), any())) + .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, endOffsetMetadata, endOffsetMetadata)) + doReturn(Right(mockPartition)).when(replicaManager) + .getPartitionOrError(ArgumentMatchers.eq(disklessTopicPartition.topicPartition())) + doReturn(mockPartition).when(replicaManager) + .getPartitionOrException(ArgumentMatchers.eq(disklessTopicPartition.topicPartition())) + + // Local read reaches the seal (batch ends at offset 100 == seal), so the supplement fires. + localFileRecords = memoryRecordsToFileRecords(RECORDS_AT_SEAL) + doReturn(Seq(disklessTopicPartition -> + new LogReadResult( + new FetchDataInfo(new LogOffsetMetadata(99L, 0L, 0), localFileRecords), + Optional.empty(), 100L, 0L, 100L, 0L, 0L, OptionalLong.empty(), Errors.NONE + )) + ).when(replicaManager).readFromLog(any(), any(), any(), any()) + + // Large minBytes + large maxWaitMs: WITHOUT the supplement this request would park in the + // purgatory and wait the full 30s. WITH the supplement it must respond immediately. + val maxWaitMs = 30000L + val fetchParams = new FetchParams( + FetchRequest.ORDINARY_CONSUMER_ID, -1L, + maxWaitMs, + RECORDS.sizeInBytes + 1, // minBytes > local-only size + 1024 * 1024, + FetchIsolation.HIGH_WATERMARK, Optional.empty() + ) + val fetchInfos = Seq( + disklessTopicPartition -> new PartitionData(disklessTopicPartition.topicId(), 50L, 0L, 1024, Optional.empty()) + ) + + @volatile var responseData: Map[TopicIdPartition, FetchPartitionData] = null + val responseCallback = (response: Seq[(TopicIdPartition, FetchPartitionData)]) => { + responseData = response.toMap + } + replicaManager.fetchMessages(fetchParams, fetchInfos, QuotaFactory.UNBOUNDED_QUOTA, responseCallback) + + // The response is produced synchronously: callback already fired, the clock was NOT advanced, + // and the delayed-fetch purgatory never held this operation. + waitForFetchResponse(responseData) + assertEquals(0, fetchPurgatory.watched(), + "Supplemented consolidating fetch must NOT be parked in the delayed-fetch purgatory") + assertEquals(1, responseData.size) + val result = responseData(disklessTopicPartition) + assertEquals(Errors.NONE, result.error) + assertEquals(500L, result.highWatermark) + assertTrue(result.records.sizeInBytes > RECORDS.sizeInBytes, + s"Expected merged local+diskless records, got ${result.records.sizeInBytes}") + verify(fetchHandlerCtor.constructed().get(0), times(1)).handle(any(), any()) + } finally { + replicaManager.shutdown(checkpointHW = false) + fetchHandlerCtor.close() + if (localFileRecords != null) localFileRecords.close() + } + } + + // When local log has data below minBytes but no diskless data is available past logEndOffset, + // the fetch must park in the purgatory and only complete after maxWaitMs elapses. + @Test + def testFetchConsolidatingParksUntilMaxWaitWhenNoDisklessSupplementData(): Unit = { + // Diskless supplement returns an empty response: no data available past the local boundary yet. + val disklessResponse = Map(disklessTopicPartition -> + new FetchPartitionData( + Errors.NONE, + 100L, 0L, + MemoryRecords.EMPTY, + Optional.empty(), OptionalLong.of(100L), Optional.empty(), OptionalInt.empty(), false) + ) + val fetchHandlerCtor = mockFetchHandler(disklessResponse) + val cp = mock(classOf[ControlPlane]) + val timer = new MockTimer(time) + val fetchPurgatory = new DelayedOperationPurgatory[DelayedFetch]("Fetch", timer, 0, false) + val replicaManager = spy(createReplicaManager( + List(disklessTopicPartition.topic()), + controlPlane = Some(cp), + disklessManagedReplicasEnabled = true, + disklessRemoteStorageConsolidationEnabled = true, + consolidatingDisklessTopics = Set(disklessTopicPartition.topic()), + delayedFetchPurgatory = Some(fetchPurgatory), + )) + var localFileRecords: FileRecords = null + try { + when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(disklessTopicPartition.topicPartition())) + .thenReturn(100L) + // Stub a partition whose local log ends at 100 and exposes a matching offset snapshot, so the + // parked DelayedFetch.tryComplete can evaluate the partition's high watermark without NPE. + val mockPartition = mock(classOf[Partition]) + val mockLog = mock(classOf[UnifiedLog]) + when(mockLog.logEndOffset).thenReturn(100L) + when(mockPartition.log).thenReturn(Some(mockLog)) + val endOffsetMetadata = new LogOffsetMetadata(100L, 0L, 0) + when(mockPartition.fetchOffsetSnapshot(any(), any())) + .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, endOffsetMetadata, endOffsetMetadata)) + doReturn(Right(mockPartition)).when(replicaManager) + .getPartitionOrError(ArgumentMatchers.eq(disklessTopicPartition.topicPartition())) + doReturn(mockPartition).when(replicaManager) + .getPartitionOrException(ArgumentMatchers.eq(disklessTopicPartition.topicPartition())) + + // Local read reaches the seal (batch ends at offset 100 == seal), so the supplement fires — + // but the supplement returns no data, so minBytes stays unmet and the fetch parks. + localFileRecords = memoryRecordsToFileRecords(RECORDS_AT_SEAL) + doReturn(Seq(disklessTopicPartition -> + new LogReadResult( + new FetchDataInfo(new LogOffsetMetadata(99L, 0L, 0), localFileRecords), + Optional.empty(), 100L, 0L, 100L, 0L, 0L, OptionalLong.empty(), Errors.NONE + )) + ).when(replicaManager).readFromLog(any(), any(), any(), any()) + + val maxWaitMs = 30000L + val fetchParams = new FetchParams( + FetchRequest.ORDINARY_CONSUMER_ID, -1L, + maxWaitMs, + RECORDS.sizeInBytes + 1, // minBytes > local-only size; supplement returns nothing + 1024 * 1024, + FetchIsolation.HIGH_WATERMARK, Optional.empty() + ) + val fetchInfos = Seq( + disklessTopicPartition -> new PartitionData(disklessTopicPartition.topicId(), 50L, 0L, 1024, Optional.empty()) + ) + + @volatile var responseData: Map[TopicIdPartition, FetchPartitionData] = null + val responseCallback = (response: Seq[(TopicIdPartition, FetchPartitionData)]) => { + responseData = response.toMap + } + replicaManager.fetchMessages(fetchParams, fetchInfos, QuotaFactory.UNBOUNDED_QUOTA, responseCallback) + + // No diskless data to supplement -> minBytes unmet -> parked in the purgatory, not yet responded. + assertEquals(1, fetchPurgatory.watched(), + "Fetch must be parked in the delayed-fetch purgatory when the supplement yields no data") + assertNull(responseData, "Response must not be produced until maxWaitMs elapses") + + // Purgatory purges lazily — don't assert watched()==0 after completion. + timer.advanceClock(maxWaitMs + 1) + waitForFetchResponse(responseData) + assertEquals(1, responseData.size) + assertEquals(Errors.NONE, responseData(disklessTopicPartition).error) + } finally { + replicaManager.shutdown(checkpointHW = false) + fetchHandlerCtor.close() + if (localFileRecords != null) localFileRecords.close() + } + } + + @Test + def testFetchConsolidatingDisklessSupplementsFromDisklessWhenLocalLogBelowMinBytes(): Unit = { + val supplementRecords = MemoryRecords.withRecords( + 2.toByte, 100L, Compression.NONE, TimestampType.CREATE_TIME, 456L, 0.toShort, 0, 0, false, new SimpleRecord(0, "supplement".getBytes()) + ) + var localFileRecords: FileRecords = null + val disklessResponse = Map(disklessTopicPartition -> + new FetchPartitionData( + Errors.NONE, + 500L, 0L, + supplementRecords, + Optional.empty(), OptionalLong.of(500L), Optional.empty(), OptionalInt.empty(), false) + ) + val fetchHandlerCtor = mockFetchHandler(disklessResponse) + val cp = mock(classOf[ControlPlane]) + val replicaManager = spy(createReplicaManager( + List(disklessTopicPartition.topic()), + controlPlane = Some(cp), + disklessManagedReplicasEnabled = true, + disklessRemoteStorageConsolidationEnabled = true, + consolidatingDisklessTopics = Set(disklessTopicPartition.topic()), + )) + try { + when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(disklessTopicPartition.topicPartition())) + .thenReturn(100L) + stubConsolidatingPartitionWithLocalLeo(replicaManager, localLeo = 100L) + + // Return small local records (below minBytes threshold) AS FileRecords, matching what a + // real local-log read produces. Using FileRecords here (not MemoryRecords) exercises the + // production FileRecords->MemoryRecords conversion in the supplement merge path; a + // MemoryRecords stub would vacuously pass while the real path throws ClassCastException. + // The local batch ends at offset 100 == seal, so the local read reaches the LEO and the + // supplement fires (see the exhaustion guard in buildConsolidationSupplementFetchInfos). + localFileRecords = memoryRecordsToFileRecords(RECORDS_AT_SEAL) + doReturn(Seq(disklessTopicPartition -> + new LogReadResult( + new FetchDataInfo(new LogOffsetMetadata(99L, 0L, 0), localFileRecords), + Optional.empty(), 100L, 0L, 100L, 0L, 0L, OptionalLong.empty(), Errors.NONE + )) + ).when(replicaManager).readFromLog(any(), any(), any(), any()) + + val fetchParams = new FetchParams( + -1, -1L, + 5000L, + RECORDS.sizeInBytes + 1, // minBytes > local records size to trigger supplement + 1024 * 1024, + FetchIsolation.HIGH_WATERMARK, Optional.empty() + ) + val fetchInfos = Seq( + disklessTopicPartition -> new PartitionData(disklessTopicPartition.topicId(), 50L, 0L, 1024, Optional.empty()) + ) + + @volatile var responseData: Map[TopicIdPartition, FetchPartitionData] = null + val responseCallback = (response: Seq[(TopicIdPartition, FetchPartitionData)]) => { + responseData = response.toMap + } + replicaManager.fetchMessages(fetchParams, fetchInfos, QuotaFactory.UNBOUNDED_QUOTA, responseCallback) + + waitForFetchResponse(responseData) + assertEquals(1, responseData.size) + val result = responseData(disklessTopicPartition) + assertEquals(Errors.NONE, result.error) + // HW/LSO should come from the diskless supplement response + assertEquals(500L, result.highWatermark) + // Records should be merged: local + diskless + assertTrue(result.records.sizeInBytes > RECORDS.sizeInBytes, + s"Expected merged records to be larger than local-only, got ${result.records.sizeInBytes}") + // Both local read and diskless fetch handler should have been called + verify(replicaManager, times(1)).readFromLog(any(), any(), any(), any()) + verify(fetchHandlerCtor.constructed().get(0), times(1)).handle(any(), any()) + } finally { + replicaManager.shutdown(checkpointHW = false) + fetchHandlerCtor.close() + if (localFileRecords != null) localFileRecords.close() + } + } + + // When the supplement returns data with an error, those bytes must NOT count toward minBytes + // satisfaction — the response must park in the purgatory, not respond prematurely. + @Test + def testFetchConsolidatingSupplementWithErrorDoesNotInflateBytesReadable(): Unit = { + val supplementRecords = MemoryRecords.withRecords( + 2.toByte, 100L, Compression.NONE, TimestampType.CREATE_TIME, 456L, 0.toShort, 0, 0, false, new SimpleRecord(0, "error-data".getBytes()) + ) + // Supplement returns records but with an error — bytes must not be counted + val disklessResponse = Map(disklessTopicPartition -> + new FetchPartitionData( + Errors.KAFKA_STORAGE_ERROR, + 500L, 0L, + supplementRecords, + Optional.empty(), OptionalLong.of(500L), Optional.empty(), OptionalInt.empty(), false) + ) + val fetchHandlerCtor = mockFetchHandler(disklessResponse) + val cp = mock(classOf[ControlPlane]) + val timer = new MockTimer(time) + val fetchPurgatory = new DelayedOperationPurgatory[DelayedFetch]("Fetch", timer, 0, false) + val replicaManager = spy(createReplicaManager( + List(disklessTopicPartition.topic()), + controlPlane = Some(cp), + disklessManagedReplicasEnabled = true, + disklessRemoteStorageConsolidationEnabled = true, + consolidatingDisklessTopics = Set(disklessTopicPartition.topic()), + delayedFetchPurgatory = Some(fetchPurgatory), + )) + try { + when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(disklessTopicPartition.topicPartition())) + .thenReturn(100L) + val mockPartition = mock(classOf[Partition]) + val mockLog = mock(classOf[UnifiedLog]) + when(mockLog.logEndOffset).thenReturn(100L) + when(mockPartition.log).thenReturn(Some(mockLog)) + val endOffsetMetadata = new LogOffsetMetadata(100L, 0L, 0) + when(mockPartition.fetchOffsetSnapshot(any(), any())) + .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, endOffsetMetadata, endOffsetMetadata)) + doReturn(Right(mockPartition)).when(replicaManager) + .getPartitionOrError(ArgumentMatchers.eq(disklessTopicPartition.topicPartition())) + doReturn(mockPartition).when(replicaManager) + .getPartitionOrException(ArgumentMatchers.eq(disklessTopicPartition.topicPartition())) + + doReturn(Seq(disklessTopicPartition -> + new LogReadResult( + new FetchDataInfo(new LogOffsetMetadata(50L, 0L, 0), MemoryRecords.EMPTY), + Optional.empty(), 100L, 0L, 100L, 0L, 0L, OptionalLong.empty(), Errors.NONE + )) + ).when(replicaManager).readFromLog(any(), any(), any(), any()) + + val fetchParams = new FetchParams( + FetchRequest.ORDINARY_CONSUMER_ID, -1L, + 30000L, + supplementRecords.sizeInBytes + 1, // minBytes would be satisfied if error bytes were counted + 1024 * 1024, + FetchIsolation.HIGH_WATERMARK, Optional.empty() + ) + val fetchInfos = Seq( + disklessTopicPartition -> new PartitionData(disklessTopicPartition.topicId(), 50L, 0L, 1024, Optional.empty()) + ) + + @volatile var responseData: Map[TopicIdPartition, FetchPartitionData] = null + replicaManager.fetchMessages(fetchParams, fetchInfos, QuotaFactory.UNBOUNDED_QUOTA, + (response: Seq[(TopicIdPartition, FetchPartitionData)]) => responseData = response.toMap) + + // Error-bearing supplement bytes must NOT satisfy minBytes — request should park + assertEquals(1, fetchPurgatory.watched(), + "Fetch must park in purgatory when supplement has error — error bytes must not inflate bytesReadable") + assertNull(responseData) + } finally { + replicaManager.shutdown(checkpointHW = false) + fetchHandlerCtor.close() + } + } + + // When the supplement fetch is interrupted, the thread's interrupted status must be preserved + // so broker shutdown/request cancellation can proceed correctly. + @Test + def testFetchConsolidatingSupplementPreservesInterruptedStatus(): Unit = { + val fetchHandlerCtorMockInitializer: MockedConstruction.MockInitializer[FetchHandler] = { + case (mock, _) => + // Return a future that never completes — the test thread will be interrupted while waiting on .get() + val neverCompleteFuture = new java.util.concurrent.CompletableFuture[java.util.Map[TopicIdPartition, FetchPartitionData]]() + when(mock.handle(any(), any())).thenAnswer { _ => + // Interrupt the current thread so .get() throws InterruptedException + Thread.currentThread().interrupt() + neverCompleteFuture + } + } + val fetchHandlerCtor = mockConstruction(classOf[FetchHandler], fetchHandlerCtorMockInitializer) + val cp = mock(classOf[ControlPlane]) + val replicaManager = spy(createReplicaManager( + List(disklessTopicPartition.topic()), + controlPlane = Some(cp), + disklessManagedReplicasEnabled = true, + disklessRemoteStorageConsolidationEnabled = true, + consolidatingDisklessTopics = Set(disklessTopicPartition.topic()), + )) + try { + when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(disklessTopicPartition.topicPartition())) + .thenReturn(100L) + val mockPartition = mock(classOf[Partition]) + val mockLog = mock(classOf[UnifiedLog]) + when(mockLog.logEndOffset).thenReturn(100L) + when(mockPartition.log).thenReturn(Some(mockLog)) + val endOffsetMetadata = new LogOffsetMetadata(100L, 0L, 0) + when(mockPartition.fetchOffsetSnapshot(any(), any())) + .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, endOffsetMetadata, endOffsetMetadata)) + doReturn(Right(mockPartition)).when(replicaManager) + .getPartitionOrError(ArgumentMatchers.eq(disklessTopicPartition.topicPartition())) + doReturn(mockPartition).when(replicaManager) + .getPartitionOrException(ArgumentMatchers.eq(disklessTopicPartition.topicPartition())) + + // Local read reaches the seal (batch ends at offset 100 == seal), so the supplement fires + // and the interrupt is raised inside the supplement fetch. + doReturn(Seq(disklessTopicPartition -> + new LogReadResult( + new FetchDataInfo(new LogOffsetMetadata(99L, 0L, 0), RECORDS_AT_SEAL), + Optional.empty(), 100L, 0L, 100L, 0L, 0L, OptionalLong.empty(), Errors.NONE + )) + ).when(replicaManager).readFromLog(any(), any(), any(), any()) + + val fetchParams = new FetchParams( + FetchRequest.ORDINARY_CONSUMER_ID, -1L, + 5000L, + RECORDS.sizeInBytes + 1, // minBytes > local to trigger supplement + 1024 * 1024, + FetchIsolation.HIGH_WATERMARK, Optional.empty() + ) + val fetchInfos = Seq( + disklessTopicPartition -> new PartitionData(disklessTopicPartition.topicId(), 50L, 0L, 1024, Optional.empty()) + ) + + replicaManager.fetchMessages(fetchParams, fetchInfos, QuotaFactory.UNBOUNDED_QUOTA, + (_: Seq[(TopicIdPartition, FetchPartitionData)]) => {}) + + // The interrupted status must be restored on the current thread. + // fetchMessages returns synchronously (fires callback or parks) — either way the + // interrupt flag must be set after the supplement catch re-interrupts the thread. + assertTrue(Thread.interrupted(), "Thread interrupted status must be preserved after InterruptedException") + } finally { + // Clear any lingering interrupt so shutdown doesn't fail + Thread.interrupted() + replicaManager.shutdown(checkpointHW = false) + fetchHandlerCtor.close() + } + } + + // When the consumer's fetch offset is below localLogStartOffset (data moved to tiered storage), + // the supplement must NOT be triggered — the read will route to RemoteLogManager and the + // supplement data would be discarded by processRemoteFetches anyway. + @Test + def testFetchConsolidatingSkipsSupplementWhenOffsetInTieredStorageRange(): Unit = { + val fetchHandlerCtor = mockFetchHandler(Map.empty) + val cp = mock(classOf[ControlPlane]) + val replicaManager = spy(createReplicaManager( + List(disklessTopicPartition.topic()), + controlPlane = Some(cp), + disklessManagedReplicasEnabled = true, + disklessRemoteStorageConsolidationEnabled = true, + consolidatingDisklessTopics = Set(disklessTopicPartition.topic()), + )) + try { + when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(disklessTopicPartition.topicPartition())) + .thenReturn(100L) + + // Local log: logEndOffset=500, localLogStartOffset=200 (offsets 0-199 are in tiered storage) + val mockPartition = mock(classOf[Partition]) + val mockLog = mock(classOf[UnifiedLog]) + when(mockLog.logEndOffset).thenReturn(500L) + when(mockLog.localLogStartOffset).thenReturn(200L) + when(mockPartition.log).thenReturn(Some(mockLog)) + val endOffsetMetadata = new LogOffsetMetadata(500L, 0L, 0) + when(mockPartition.fetchOffsetSnapshot(any(), any())) + .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, endOffsetMetadata, endOffsetMetadata)) + doReturn(Right(mockPartition)).when(replicaManager) + .getPartitionOrError(ArgumentMatchers.eq(disklessTopicPartition.topicPartition())) + doReturn(mockPartition).when(replicaManager) + .getPartitionOrException(ArgumentMatchers.eq(disklessTopicPartition.topicPartition())) + + // readFromLog returns empty records (simulating the OffsetOutOfRange -> tiered path) + doReturn(Seq(disklessTopicPartition -> + new LogReadResult( + new FetchDataInfo(new LogOffsetMetadata(50L, 0L, 0), MemoryRecords.EMPTY), + Optional.empty(), 500L, 0L, 500L, 0L, 0L, OptionalLong.empty(), Errors.NONE + )) + ).when(replicaManager).readFromLog(any(), any(), any(), any()) + + // Fetch at offset 50, which is below localLogStartOffset (200) + val fetchParams = new FetchParams( + FetchRequest.ORDINARY_CONSUMER_ID, -1L, + 100L, // short maxWaitMs so the purgatory expires quickly + 1024, // minBytes > 0 to trigger supplement logic + 1024 * 1024, + FetchIsolation.HIGH_WATERMARK, Optional.empty() + ) + val fetchInfos = Seq( + disklessTopicPartition -> new PartitionData(disklessTopicPartition.topicId(), 50L, 0L, 1024, Optional.empty()) + ) + + @volatile var responseData: Map[TopicIdPartition, FetchPartitionData] = null + val responseCallback = (response: Seq[(TopicIdPartition, FetchPartitionData)]) => { + responseData = response.toMap + } + replicaManager.fetchMessages(fetchParams, fetchInfos, QuotaFactory.UNBOUNDED_QUOTA, responseCallback) + + waitForFetchResponse(responseData) + assertEquals(1, responseData.size) + // The diskless fetch handler must NOT have been called — supplement was skipped + verify(fetchHandlerCtor.constructed().get(0), never()).handle(any(), any()) + // readFromLog is called at least once (inline), possibly again from DelayedFetch.onComplete + verify(replicaManager, atLeastOnce()).readFromLog(any(), any(), any(), any()) + } finally { + replicaManager.shutdown(checkpointHW = false) + fetchHandlerCtor.close() + } + } + + // Guards against the "sub-seal gap" silent-data-loss bug. + // + // Setup of a consolidating partition: + // - classic-to-diskless seal = 1000 (offsets [1000, ...) live in object storage / diskless) + // - local classic prefix = [0, 1000) and spans multiple local log segments + // - local logEndOffset (LEO) = 1000 + // - localLogStartOffset = 0 (nothing tiered, so the tiered-range guard is a no-op) + // + // A consumer fetches at offset 480, which lands in an *earlier* local segment. LocalLog.read + // serves from a single segment, so the local read returns only [480, 500) (a segment boundary + // well below the seal). Because that is smaller than minBytes, the consolidating path would + // previously trigger a diskless supplement starting at lastBatch().nextOffset() == 500. + // + // But 500 is BELOW the seal (1000): the diskless control plane holds no batches for offsets + // below the seal, so a FindBatch at 500 returns the first available batch — at the seal, 1000. + // (The FetchHandler stub returns records starting at 1000 regardless of the requested offset, + // faithfully modelling that "first batch at-or-after" control-plane behavior.) + // + // Without the exhaustion guard the merge would stitch local [480, 500) directly onto diskless + // [1000, ...), silently dropping the committed range [500, 1000). With the guard the supplement + // is skipped (local read is below the seal), so the consumer gets the contiguous local prefix + // and re-fetches to walk the remaining local segments. + @Test + def testFetchConsolidatingSupplementBelowSealSilentlySkipsClassicPrefix(): Unit = { + val seal = 1000L + val localLeo = 1000L + + // Local single-segment read: offsets [480, 500). 20 records in one batch -> nextOffset == 500. + val localRecordList = (0 until 20).map(i => new SimpleRecord(i.toLong, s"local-$i".getBytes())).toArray + val localMemRecords = MemoryRecords.withRecords( + 2.toByte, 480L, Compression.NONE, TimestampType.CREATE_TIME, 123L, 0.toShort, 0, 0, false, localRecordList: _* + ) + var localFileRecords: FileRecords = null + + // Diskless serves from the seal: offsets [1000, 1005). Models the control plane returning the + // first batch at-or-after the requested (sub-seal) offset of 500. + val supplementRecordList = (0 until 5).map(i => new SimpleRecord(i.toLong, s"diskless-$i".getBytes())).toArray + val supplementRecords = MemoryRecords.withRecords( + 2.toByte, seal, Compression.NONE, TimestampType.CREATE_TIME, 456L, 0.toShort, 0, 0, false, supplementRecordList: _* + ) + val disklessResponse = Map(disklessTopicPartition -> + new FetchPartitionData( + Errors.NONE, + localLeo, 0L, + supplementRecords, + Optional.empty(), OptionalLong.of(localLeo), Optional.empty(), OptionalInt.empty(), false) + ) + + val fetchHandlerCtor = mockFetchHandler(disklessResponse) + val cp = mock(classOf[ControlPlane]) + val timer = new MockTimer(time) + val fetchPurgatory = new DelayedOperationPurgatory[DelayedFetch]("Fetch", timer, 0, false) + val replicaManager = spy(createReplicaManager( + List(disklessTopicPartition.topic()), + controlPlane = Some(cp), + disklessManagedReplicasEnabled = true, + disklessRemoteStorageConsolidationEnabled = true, + consolidatingDisklessTopics = Set(disklessTopicPartition.topic()), + delayedFetchPurgatory = Some(fetchPurgatory), + )) + try { + when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(disklessTopicPartition.topicPartition())) + .thenReturn(seal) + // The fetch offset (480) is on an older segment than the local LEO, so the local read returns + // a single sub-seal segment and the supplement is skipped. minBytes stays unmet, so the fetch + // parks; on tryComplete, DelayedFetch Case F (older segment) force-completes with the local + // data. Stub the partition + offset snapshot so the parked tryComplete can evaluate it. + val mockPartition = mock(classOf[Partition]) + val mockLog = mock(classOf[UnifiedLog]) + when(mockLog.logEndOffset).thenReturn(localLeo) + when(mockPartition.log).thenReturn(Some(mockLog)) + // endOffset on a NEWER segment than the fetch offset (480, baseOffset 0) so Case F fires. + val endOffsetMetadata = new LogOffsetMetadata(localLeo, 500L, 0) + when(mockPartition.fetchOffsetSnapshot(any(), any())) + .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, endOffsetMetadata, endOffsetMetadata)) + doReturn(Right(mockPartition)).when(replicaManager) + .getPartitionOrError(ArgumentMatchers.eq(disklessTopicPartition.topicPartition())) + doReturn(mockPartition).when(replicaManager) + .getPartitionOrException(ArgumentMatchers.eq(disklessTopicPartition.topicPartition())) + + localFileRecords = memoryRecordsToFileRecords(localMemRecords) + doReturn(Seq(disklessTopicPartition -> + new LogReadResult( + new FetchDataInfo(new LogOffsetMetadata(480L, 0L, 0), localFileRecords), + Optional.empty(), localLeo, 0L, localLeo, 0L, 0L, OptionalLong.empty(), Errors.NONE + )) + ).when(replicaManager).readFromLog(any(), any(), any(), any()) + + val fetchParams = new FetchParams( + FetchRequest.ORDINARY_CONSUMER_ID, -1L, + 1000L, + localMemRecords.sizeInBytes + 1, // minBytes > local-only size to trigger the supplement + 1024 * 1024, + FetchIsolation.HIGH_WATERMARK, Optional.empty() + ) + val fetchInfos = Seq( + disklessTopicPartition -> new PartitionData(disklessTopicPartition.topicId(), 480L, 0L, 1024 * 1024, Optional.empty()) + ) + + @volatile var responseData: Map[TopicIdPartition, FetchPartitionData] = null + val responseCallback = (response: Seq[(TopicIdPartition, FetchPartitionData)]) => { + responseData = response.toMap + } + replicaManager.fetchMessages(fetchParams, fetchInfos, QuotaFactory.UNBOUNDED_QUOTA, responseCallback) + + waitForFetchResponse(responseData) + val result = responseData(disklessTopicPartition) + assertEquals(Errors.NONE, result.error) + + // The supplement must NOT have been issued: a sub-seal supplement is the bug. + verify(fetchHandlerCtor.constructed().get(0), never()).handle(any(), any()) + + // The consumer sees only the contiguous local prefix [480, 500); no diskless records spliced + // in from the seal, so no gap. The consumer re-fetches from 500 to walk the rest of the log. + val offsets = result.records.records().asScala.map(_.offset()).toList + val gaps = offsets.sliding(2).collect { case Seq(a, b) if b != a + 1 => (a, b) }.toList + assertTrue(gaps.isEmpty, + s"Consumer-visible records must be contiguous, but found offset gap(s): $gaps. " + + s"A supplement started below the seal $seal would make diskless serve from the seal, " + + s"silently skipping the committed range. Offsets=$offsets") + assertTrue(offsets.nonEmpty && offsets.last < seal, + s"Expected only sub-seal local records, got offsets=$offsets") + } finally { + replicaManager.shutdown(checkpointHW = false) + fetchHandlerCtor.close() + if (localFileRecords != null) localFileRecords.close() + } + } + + // Mixed fetch: consolidating (local below minBytes) + pure-diskless + classic in one request. + // The consolidating partition gets a synchronous supplement; the pure-diskless partition goes + // through the normal delayed path; the classic partition is served from the local log. + // fetchDisklessMessages is called twice — once for the supplement, once for the delayed diskless fetch. + @Test + def testFetchMixedConsolidatingPureDisklessAndClassicPartitions(): Unit = { + val pureDisklessTp = new TopicIdPartition(Uuid.randomUuid(), 0, "pure-diskless") + val supplementRecords = MemoryRecords.withRecords( + 2.toByte, 100L, Compression.NONE, TimestampType.CREATE_TIME, 456L, 0.toShort, 0, 0, false, new SimpleRecord(0, "supplement".getBytes()) + ) + val disklessRecords = MemoryRecords.withRecords( + 2.toByte, 200L, Compression.NONE, TimestampType.CREATE_TIME, 789L, 0.toShort, 0, 0, false, new SimpleRecord(0, "diskless".getBytes()) + ) + // First call: supplement for consolidating partition. Second call: delayed fetch for pure-diskless. + val fetchHandlerCtorMockInitializer: MockedConstruction.MockInitializer[FetchHandler] = { + case (mock, _) => + when(mock.handle(any(), any())) + .thenReturn(CompletableFuture.completedFuture( + Map(disklessTopicPartition -> new FetchPartitionData( + Errors.NONE, 500L, 0L, supplementRecords, + Optional.empty(), OptionalLong.of(500L), Optional.empty(), OptionalInt.empty(), false) + ).asJava)) + .thenReturn(CompletableFuture.completedFuture( + Map(pureDisklessTp -> new FetchPartitionData( + Errors.NONE, 300L, 0L, disklessRecords, + Optional.empty(), OptionalLong.of(300L), Optional.empty(), OptionalInt.empty(), false) + ).asJava)) + } + val fetchHandlerCtor = mockConstruction(classOf[FetchHandler], fetchHandlerCtorMockInitializer) + + // findBatches is needed for DelayedFetch.tryComplete to fire for the pure-diskless partition. + val batchMetadata = mock(classOf[BatchMetadata]) + when(batchMetadata.topicIdPartition()).thenReturn(pureDisklessTp) + val batch = mock(classOf[BatchInfo]) + when(batch.metadata()).thenReturn(batchMetadata) + val findBatchResponse = mock(classOf[FindBatchResponse]) + when(findBatchResponse.batches()).thenReturn(util.List.of(batch)) + when(findBatchResponse.highWatermark()).thenReturn(300L) + when(findBatchResponse.estimatedByteSize(200L)).thenReturn(disklessRecords.sizeInBytes()) + when(findBatchResponse.errors()).thenReturn(Errors.NONE) + val cp = mock(classOf[ControlPlane]) + when(cp.findBatches(any(), any(), any())).thenReturn(util.List.of(findBatchResponse)) + + val replicaManager = spy(createReplicaManager( + List(disklessTopicPartition.topic(), pureDisklessTp.topic()), + controlPlane = Some(cp), + topicIdMapping = Map(pureDisklessTp.topic() -> pureDisklessTp.topicId()), + disklessManagedReplicasEnabled = true, + disklessRemoteStorageConsolidationEnabled = true, + consolidatingDisklessTopics = Set(disklessTopicPartition.topic()), + )) + var localFileRecords: FileRecords = null + try { + when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(disklessTopicPartition.topicPartition())) + .thenReturn(100L) + val consolidatingPartition = mock(classOf[Partition]) + val consolidatingLog = mock(classOf[UnifiedLog]) + when(consolidatingLog.logEndOffset).thenReturn(100L) + when(consolidatingPartition.log).thenReturn(Some(consolidatingLog)) + val consolidatingEndOffset = new LogOffsetMetadata(100L, 0L, 0) + when(consolidatingPartition.fetchOffsetSnapshot(any(), any())) + .thenReturn(new LogOffsetSnapshot(0L, consolidatingEndOffset, consolidatingEndOffset, consolidatingEndOffset)) + doReturn(Right(consolidatingPartition)).when(replicaManager) + .getPartitionOrError(ArgumentMatchers.eq(disklessTopicPartition.topicPartition())) + doReturn(consolidatingPartition).when(replicaManager) + .getPartitionOrException(ArgumentMatchers.eq(disklessTopicPartition.topicPartition())) + + // Consolidating local read reaches the seal (batch ends at offset 100 == seal), so the + // supplement fires. + localFileRecords = memoryRecordsToFileRecords(RECORDS_AT_SEAL) + doReturn(Seq( + disklessTopicPartition -> new LogReadResult( + new FetchDataInfo(new LogOffsetMetadata(99L, 0L, 0), localFileRecords), + Optional.empty(), 100L, 0L, 100L, 0L, 0L, OptionalLong.empty(), Errors.NONE + ), + classicTopicPartition -> new LogReadResult( + new FetchDataInfo(new LogOffsetMetadata(1L, 0L, 0), RECORDS), + Optional.empty(), 10L, 0L, 10L, 0L, 0L, OptionalLong.empty(), Errors.NONE + ) + )).when(replicaManager).readFromLog(any(), any(), any(), any()) + + val classicPartition = mock(classOf[Partition]) + val classicEndOffset = new LogOffsetMetadata(10L, 0L, RECORDS.sizeInBytes()) + when(classicPartition.fetchOffsetSnapshot(any(), any())) + .thenReturn(new LogOffsetSnapshot(0L, classicEndOffset, classicEndOffset, classicEndOffset)) + doReturn(classicPartition).when(replicaManager) + .getPartitionOrException(ArgumentMatchers.eq(classicTopicPartition.topicPartition())) + + val fetchParams = new FetchParams( + FetchRequest.ORDINARY_CONSUMER_ID, -1L, + 5000L, + RECORDS.sizeInBytes + 1, + 1024 * 1024, + FetchIsolation.HIGH_WATERMARK, Optional.empty() + ) + val fetchInfos = Seq( + disklessTopicPartition -> new PartitionData(disklessTopicPartition.topicId(), 50L, 0L, 1024, Optional.empty()), + pureDisklessTp -> new PartitionData(pureDisklessTp.topicId(), 200L, 0L, 1024, Optional.empty()), + classicTopicPartition -> new PartitionData(classicTopicPartition.topicId(), 1L, 0L, 1024, Optional.empty()), + ) + + @volatile var responseData: Map[TopicIdPartition, FetchPartitionData] = null + val responseCallback = (response: Seq[(TopicIdPartition, FetchPartitionData)]) => { + responseData = response.toMap + } + replicaManager.fetchMessages(fetchParams, fetchInfos, QuotaFactory.UNBOUNDED_QUOTA, responseCallback) + + waitForFetchResponse(responseData) + assertEquals(3, responseData.size) + + // Consolidating partition: merged local + supplement, HW from diskless + val consolidatingResult = responseData(disklessTopicPartition) + assertEquals(Errors.NONE, consolidatingResult.error) + assertEquals(500L, consolidatingResult.highWatermark) + assertTrue(consolidatingResult.records.sizeInBytes > RECORDS.sizeInBytes, + "Consolidating partition must have merged local+diskless records") + + // Pure-diskless partition: served from diskless handler via delayed path + val disklessResult = responseData(pureDisklessTp) + assertEquals(Errors.NONE, disklessResult.error) + assertEquals(300L, disklessResult.highWatermark) + + // Classic partition: served from local log + val classicResult = responseData(classicTopicPartition) + assertEquals(Errors.NONE, classicResult.error) + assertEquals(10L, classicResult.highWatermark) + assertEquals(RECORDS, classicResult.records) + + // Two diskless handler calls: one for the supplement, one for the delayed diskless fetch + verify(fetchHandlerCtor.constructed().get(0), times(2)).handle(any(), any()) + } finally { + replicaManager.shutdown(checkpointHW = false) + fetchHandlerCtor.close() + if (localFileRecords != null) localFileRecords.close() + } + } + + // --- buildConsolidationSupplementFetchInfos unit tests --- + + @Test + def testBuildConsolidationSupplementFetchInfosReturnsRequestStartingAtLogEndOffset(): Unit = { + val tp = disklessTopicPartition + val logEndOffset = 100L + val supplements = mutable.HashMap(tp -> logEndOffset) + val fetchInfos = Seq(tp -> new PartitionData(tp.topicId(), 50L, 0L, 1024, Optional.empty())) + val logReadResultMap = new util.HashMap[TopicIdPartition, LogReadResult]() + logReadResultMap.put(tp, new LogReadResult( + new FetchDataInfo(new LogOffsetMetadata(50L, 0L, 0), MemoryRecords.EMPTY), + Optional.empty(), 0L, 0L, 0L, 0L, 0L, OptionalLong.empty(), Errors.NONE + )) + + val replicaManager = createReplicaManager(List(tp.topic())) + try { + val result = replicaManager.buildConsolidationSupplementFetchInfos(supplements, fetchInfos, logReadResultMap) + assertEquals(1, result.size) + val (resultTp, partitionData) = result.head + assertEquals(tp, resultTp) + assertEquals(logEndOffset, partitionData.fetchOffset) + assertEquals(1024, partitionData.maxBytes) // no local bytes read, so full budget remains + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + + @Test + def testBuildConsolidationSupplementFetchInfosDeductsLocalBytesFromBudget(): Unit = { + val tp = disklessTopicPartition + // Single record at offset 50 → nextOffset 51. Seal at 51 so the local read reaches the LEO + // and the supplement is emitted (see the exhaustion guard). + val supplements = mutable.HashMap(tp -> 51L) + val fetchInfos = Seq(tp -> new PartitionData(tp.topicId(), 50L, 0L, 1024, Optional.empty())) + val localRecords = MemoryRecords.withRecords( + 2.toByte, 50L, Compression.NONE, TimestampType.CREATE_TIME, 123L, 0.toShort, 0, 0, false, new SimpleRecord(0, "local".getBytes()) + ) + val logReadResultMap = new util.HashMap[TopicIdPartition, LogReadResult]() + logReadResultMap.put(tp, new LogReadResult( + new FetchDataInfo(new LogOffsetMetadata(50L, 0L, 0), localRecords), + Optional.empty(), 0L, 0L, 0L, 0L, 0L, OptionalLong.empty(), Errors.NONE + )) + + val replicaManager = createReplicaManager(List(tp.topic())) + try { + val result = replicaManager.buildConsolidationSupplementFetchInfos(supplements, fetchInfos, logReadResultMap) + assertEquals(1, result.size) + val (_, partitionData) = result.head + assertEquals(1024 - localRecords.sizeInBytes, partitionData.maxBytes) + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + + @Test + def testBuildConsolidationSupplementFetchInfosDropsPartitionWhenBudgetExhausted(): Unit = { + val tp = disklessTopicPartition + val supplements = mutable.HashMap(tp -> 100L) + // Local read already consumed the full budget + val localRecords = MemoryRecords.withRecords( + 2.toByte, 50L, Compression.NONE, TimestampType.CREATE_TIME, 123L, 0.toShort, 0, 0, false, new SimpleRecord(0, "x".getBytes()) + ) + val fetchInfos = Seq(tp -> new PartitionData(tp.topicId(), 50L, 0L, localRecords.sizeInBytes, Optional.empty())) + val logReadResultMap = new util.HashMap[TopicIdPartition, LogReadResult]() + logReadResultMap.put(tp, new LogReadResult( + new FetchDataInfo(new LogOffsetMetadata(50L, 0L, 0), localRecords), + Optional.empty(), 0L, 0L, 0L, 0L, 0L, OptionalLong.empty(), Errors.NONE + )) + + val replicaManager = createReplicaManager(List(tp.topic())) + try { + val result = replicaManager.buildConsolidationSupplementFetchInfos(supplements, fetchInfos, logReadResultMap) + assertTrue(result.isEmpty, s"Expected no supplement when local bytes >= maxBytes, got $result") + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + + @Test + def testBuildConsolidationSupplementFetchInfosIgnoresPartitionsNotInFetchInfos(): Unit = { + val tp = disklessTopicPartition + val otherTp = new TopicIdPartition(Uuid.randomUuid(), 0, "other") + val supplements = mutable.HashMap(tp -> 100L, otherTp -> 200L) + val fetchInfos = Seq(tp -> new PartitionData(tp.topicId(), 50L, 0L, 1024, Optional.empty())) + val logReadResultMap = new util.HashMap[TopicIdPartition, LogReadResult]() + + val replicaManager = createReplicaManager(List(tp.topic())) + try { + val result = replicaManager.buildConsolidationSupplementFetchInfos(supplements, fetchInfos, logReadResultMap) + assertEquals(1, result.size) + assertEquals(tp, result.head._1) + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + + // When the local read stops at a segment boundary BELOW the local log end offset (the seal), + // no supplement must be emitted. The diskless range starts at the seal, so supplementing from + // an offset below it would stitch the local prefix directly onto the diskless range and silently + // drop the committed range [supplementStartOffset, seal). The consumer instead re-fetches and + // walks the remaining local segments, as it would for any classic multi-segment lag. + @Test + def testBuildConsolidationSupplementFetchInfosSkipsWhenLocalReadBelowSeal(): Unit = { + val tp = disklessTopicPartition + val logEndOffset = 5000L + val supplements = mutable.HashMap(tp -> logEndOffset) + // Local records end at offset 1999 (batch with baseOffset=1995, 5 records → lastOffset=1999, + // nextOffset=2000), well below the seal at 5000. + val localRecords = MemoryRecords.withRecords( + 2.toByte, 1995L, Compression.NONE, TimestampType.CREATE_TIME, 123L, 0.toShort, 0, 0, false, + new SimpleRecord(0, "a".getBytes()), new SimpleRecord(0, "b".getBytes()), + new SimpleRecord(0, "c".getBytes()), new SimpleRecord(0, "d".getBytes()), + new SimpleRecord(0, "e".getBytes()) + ) + val fetchInfos = Seq(tp -> new PartitionData(tp.topicId(), 1995L, 0L, 1024, Optional.empty())) + val logReadResultMap = new util.HashMap[TopicIdPartition, LogReadResult]() + logReadResultMap.put(tp, new LogReadResult( + new FetchDataInfo(new LogOffsetMetadata(1995L, 0L, 0), localRecords), + Optional.empty(), 0L, 0L, logEndOffset, 0L, 0L, OptionalLong.empty(), Errors.NONE + )) + + val replicaManager = createReplicaManager(List(tp.topic())) + try { + val result = replicaManager.buildConsolidationSupplementFetchInfos(supplements, fetchInfos, logReadResultMap) + assertTrue(result.isEmpty, + s"Expected no supplement when the local read (nextOffset 2000) is below the seal ($logEndOffset), got $result") + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + + // Same sub-seal scenario but with FileRecords from an older segment (matching production reads). + // No supplement must be emitted while the local read is below the seal. + @Test + def testBuildConsolidationSupplementFetchInfosSkipsWhenFileRecordsBelowSeal(): Unit = { + val tp = disklessTopicPartition + val logEndOffset = 5000L + val supplements = mutable.HashMap(tp -> logEndOffset) + // Simulate a read from segment 0 that ends at offset 1999 — as FileRecords, matching production + val localMemRecords = MemoryRecords.withRecords( + 2.toByte, 1995L, Compression.NONE, TimestampType.CREATE_TIME, 123L, 0.toShort, 0, 0, false, + new SimpleRecord(0, "seg0-a".getBytes()), new SimpleRecord(0, "seg0-b".getBytes()), + new SimpleRecord(0, "seg0-c".getBytes()), new SimpleRecord(0, "seg0-d".getBytes()), + new SimpleRecord(0, "seg0-e".getBytes()) + ) + var localFileRecords: FileRecords = null + val fetchInfos = Seq(tp -> new PartitionData(tp.topicId(), 1995L, 0L, 1024 * 1024, Optional.empty())) + val logReadResultMap = new util.HashMap[TopicIdPartition, LogReadResult]() + + val replicaManager = createReplicaManager(List(tp.topic())) + try { + localFileRecords = memoryRecordsToFileRecords(localMemRecords) + logReadResultMap.put(tp, new LogReadResult( + new FetchDataInfo(new LogOffsetMetadata(1995L, 0L, 0), localFileRecords), + Optional.empty(), 0L, 0L, logEndOffset, 0L, 0L, OptionalLong.empty(), Errors.NONE + )) + + val result = replicaManager.buildConsolidationSupplementFetchInfos(supplements, fetchInfos, logReadResultMap) + assertTrue(result.isEmpty, + s"Expected no supplement when the FileRecords read (nextOffset 2000) is below the seal ($logEndOffset), got $result") + } finally { + replicaManager.shutdown(checkpointHW = false) + if (localFileRecords != null) localFileRecords.close() + } + } + + // When the local read reaches the local log end offset (the seal), the supplement is emitted + // starting at the seal — the boundary-spanning case the supplement exists for. + @Test + def testBuildConsolidationSupplementFetchInfosEmittedWhenLocalReadReachesSeal(): Unit = { + val tp = disklessTopicPartition + val seal = 2000L + val supplements = mutable.HashMap(tp -> seal) + // Local records end exactly at the seal (baseOffset=1995, 5 records → nextOffset=2000). + val localRecords = MemoryRecords.withRecords( + 2.toByte, 1995L, Compression.NONE, TimestampType.CREATE_TIME, 123L, 0.toShort, 0, 0, false, + new SimpleRecord(0, "a".getBytes()), new SimpleRecord(0, "b".getBytes()), + new SimpleRecord(0, "c".getBytes()), new SimpleRecord(0, "d".getBytes()), + new SimpleRecord(0, "e".getBytes()) + ) + val fetchInfos = Seq(tp -> new PartitionData(tp.topicId(), 1995L, 0L, 1024, Optional.empty())) + val logReadResultMap = new util.HashMap[TopicIdPartition, LogReadResult]() + logReadResultMap.put(tp, new LogReadResult( + new FetchDataInfo(new LogOffsetMetadata(1995L, 0L, 0), localRecords), + Optional.empty(), 0L, 0L, seal, 0L, 0L, OptionalLong.empty(), Errors.NONE + )) + + val replicaManager = createReplicaManager(List(tp.topic())) + try { + val result = replicaManager.buildConsolidationSupplementFetchInfos(supplements, fetchInfos, logReadResultMap) + assertEquals(1, result.size) + val (_, partitionData) = result.head + // Supplement starts at the seal (== local nextOffset), contiguous with the local read. + assertEquals(seal, partitionData.fetchOffset, + "Supplement must start at the seal once the local read reaches the local log end offset") + assertEquals(1024 - localRecords.sizeInBytes, partitionData.maxBytes) + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + + // --- mergeConsolidationSupplement unit tests --- + + @Test + def testMergeConsolidationSupplementCombinesMemoryRecords(): Unit = { + val localRecords = MemoryRecords.withRecords( + 2.toByte, 0L, Compression.NONE, TimestampType.CREATE_TIME, 123L, 0.toShort, 0, 0, false, new SimpleRecord(0, "local".getBytes()) + ) + val supplementRecords = MemoryRecords.withRecords( + 2.toByte, 100L, Compression.NONE, TimestampType.CREATE_TIME, 456L, 0.toShort, 0, 0, false, new SimpleRecord(0, "supplement".getBytes()) + ) + val localData = new FetchPartitionData(Errors.NONE, 100L, 0L, localRecords, + Optional.empty(), OptionalLong.of(80L), Optional.empty(), OptionalInt.empty(), false) + val supplementData = new FetchPartitionData(Errors.NONE, 500L, 0L, supplementRecords, + Optional.empty(), OptionalLong.of(500L), Optional.empty(), OptionalInt.empty(), false) + + val replicaManager = createReplicaManager(List(disklessTopicPartition.topic())) + try { + val result = replicaManager.mergeConsolidationSupplement(disklessTopicPartition, localData, supplementData) + assertEquals(Errors.NONE, result.error) + assertEquals(500L, result.highWatermark) + assertEquals(OptionalLong.of(500L), result.lastStableOffset) + assertTrue(result.records.sizeInBytes > localRecords.sizeInBytes, + "Merged records must be larger than local-only") + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + + @Test + def testMergeConsolidationSupplementMaterializesFileRecords(): Unit = { + val localRecords = MemoryRecords.withRecords( + 2.toByte, 0L, Compression.NONE, TimestampType.CREATE_TIME, 123L, 0.toShort, 0, 0, false, new SimpleRecord(0, "local".getBytes()) + ) + val localFileRecords = memoryRecordsToFileRecords(localRecords) + val supplementRecords = MemoryRecords.withRecords( + 2.toByte, 100L, Compression.NONE, TimestampType.CREATE_TIME, 456L, 0.toShort, 0, 0, false, new SimpleRecord(0, "supplement".getBytes()) + ) + val localData = new FetchPartitionData(Errors.NONE, 100L, 0L, localFileRecords, + Optional.empty(), OptionalLong.of(80L), Optional.empty(), OptionalInt.empty(), false) + val supplementData = new FetchPartitionData(Errors.NONE, 500L, 0L, supplementRecords, + Optional.empty(), OptionalLong.of(500L), Optional.empty(), OptionalInt.empty(), false) + + val replicaManager = createReplicaManager(List(disklessTopicPartition.topic())) + try { + val result = replicaManager.mergeConsolidationSupplement(disklessTopicPartition, localData, supplementData) + assertEquals(500L, result.highWatermark) + assertTrue(result.records.sizeInBytes > 0) + // Must not throw ClassCastException: FileRecords must have been converted before ConcatenatedRecords + assertInstanceOf(classOf[ConcatenatedRecords], result.records) + } finally { + replicaManager.shutdown(checkpointHW = false) + if (localFileRecords != null) localFileRecords.close() + } + } + + @Test + def testMergeConsolidationSupplementReturnsLocalDataOnUnknownLocalRecordsType(): Unit = { + val localData = new FetchPartitionData(Errors.NONE, 100L, 0L, mock(classOf[org.apache.kafka.common.record.Records]), + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false) + val supplementRecords = MemoryRecords.withRecords( + 2.toByte, 100L, Compression.NONE, TimestampType.CREATE_TIME, 456L, 0.toShort, 0, 0, false, new SimpleRecord(0, "s".getBytes()) + ) + val supplementData = new FetchPartitionData(Errors.NONE, 500L, 0L, supplementRecords, + Optional.empty(), OptionalLong.of(500L), Optional.empty(), OptionalInt.empty(), false) + + val replicaManager = createReplicaManager(List(disklessTopicPartition.topic())) + try { + val result = replicaManager.mergeConsolidationSupplement(disklessTopicPartition, localData, supplementData) + assertSame(localData, result) + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + + @Test + def testMergeConsolidationSupplementReturnsLocalDataOnUnknownSupplementRecordsType(): Unit = { + val localRecords = MemoryRecords.withRecords( + 2.toByte, 0L, Compression.NONE, TimestampType.CREATE_TIME, 123L, 0.toShort, 0, 0, false, new SimpleRecord(0, "local".getBytes()) + ) + val localData = new FetchPartitionData(Errors.NONE, 100L, 0L, localRecords, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false) + val supplementData = new FetchPartitionData(Errors.NONE, 500L, 0L, mock(classOf[org.apache.kafka.common.record.Records]), + Optional.empty(), OptionalLong.of(500L), Optional.empty(), OptionalInt.empty(), false) + + val replicaManager = createReplicaManager(List(disklessTopicPartition.topic())) + try { + val result = replicaManager.mergeConsolidationSupplement(disklessTopicPartition, localData, supplementData) + assertSame(localData, result) + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + @Test def testFetchConsolidatingDisklessPartitionOfflineReturnsKafkaStorageError(): Unit = { val disklessResponse = Map(disklessTopicPartition -> @@ -4974,7 +6033,8 @@ class ReplicaManagerInklessTest { consolidatingDisklessTopics: Set[String] = Set.empty, mockReplicaFetcherManager: Option[ReplicaFetcherManager] = None, inklessSharedStateEnabled: Boolean = true, - initDisklessLogManager: Option[InitDisklessLogManager] = None + initDisklessLogManager: Option[InitDisklessLogManager] = None, + delayedFetchPurgatory: Option[DelayedOperationPurgatory[DelayedFetch]] = None ): ReplicaManager = { val props = TestUtils.createBrokerConfig(1, logDirCount = 2) if (disklessManagedReplicasEnabled || disklessRemoteStorageConsolidationEnabled) { @@ -5025,6 +6085,7 @@ class ReplicaManagerInklessTest { inklessSharedState = if (inklessSharedStateEnabled) Some(sharedState) else None, inklessMetadataView = Some(inklessMetadata), initDisklessLogManager = initDisklessLogManager, + delayedFetchPurgatoryParam = delayedFetchPurgatory, ) { override protected def createReplicaFetcherManager( metrics: Metrics, diff --git a/storage/inkless/src/main/java/io/aiven/inkless/consume/ConcatenatedRecords.java b/storage/inkless/src/main/java/io/aiven/inkless/consume/ConcatenatedRecords.java index 2b4e48b613e..218f6622fe5 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/consume/ConcatenatedRecords.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/consume/ConcatenatedRecords.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -44,6 +45,23 @@ public ConcatenatedRecords(List backingRecords) { this.sizeInBytes = totalSize; } + public static ConcatenatedRecords concat(MemoryRecords prefix, Records tail) { + Objects.requireNonNull(prefix, "prefix must not be null"); + Objects.requireNonNull(tail, "tail must not be null"); + final List components; + if (tail instanceof ConcatenatedRecords cr) { + components = new ArrayList<>(1 + cr.backingRecords.size()); + components.add(prefix); + components.addAll(cr.backingRecords); + } else if (tail instanceof MemoryRecords mr) { + components = List.of(prefix, mr); + } else { + throw new IllegalArgumentException( + "Unsupported Records type for concatenation: " + tail.getClass().getName()); + } + return new ConcatenatedRecords(components); + } + @Override public Iterable batches() { return this::batchIterator;