Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 94 additions & 35 deletions core/src/main/scala/kafka/server/DelayedFetch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,17 @@ import com.yammer.metrics.core.Meter
import io.aiven.inkless.control_plane.FindBatchRequest
import kafka.utils.Logging

import java.util.concurrent.TimeUnit
import java.util.concurrent.{CompletableFuture, TimeUnit}
import java.util.concurrent.atomic.AtomicBoolean
import org.apache.kafka.common.TopicIdPartition
import org.apache.kafka.common.errors._
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.purgatory.DelayedOperation
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, FetchPartitionData}
import org.apache.kafka.storage.internals.log.{FetchPartitionStatus, LogOffsetMetadata}
import org.apache.kafka.storage.internals.log.{FetchPartitionStatus, LogOffsetMetadata, LogReadResult}

import java.util
import scala.collection._
Expand All @@ -45,6 +47,7 @@ class DelayedFetch(
params: FetchParams,
classicFetchPartitionStatus: util.LinkedHashMap[TopicIdPartition, FetchPartitionStatus],
disklessFetchPartitionStatus: util.LinkedHashMap[TopicIdPartition, FetchPartitionStatus] = new util.LinkedHashMap[TopicIdPartition, FetchPartitionStatus](),
consolidatingSupplements: Map[TopicIdPartition, Long] = Map.empty,
replicaManager: ReplicaManager,
quota: ReplicaQuota,
maxWaitMs: Option[Long] = None,
Expand Down Expand Up @@ -225,7 +228,6 @@ class DelayedFetch(
* Upon completion, read whatever data is available and pass to the complete callback
*/
override def onComplete(): Unit = {
// Complete the classic fetches first
val classicFetchInfos = classicFetchPartitionStatus.asScala.iterator.map { case (tp, status) =>
tp -> status.fetchInfo
}.toBuffer
Expand All @@ -235,50 +237,107 @@ class DelayedFetch(
val totalRequestsSize = classicRequestsSize + disklessRequestsSize

if (totalRequestsSize == 0) {
// No partitions to fetch, just return an empty response
responseCallback(Seq.empty)
return
}

val fetchPartitionData = if (classicRequestsSize > 0) {
// adjust the max bytes for classic fetches based on the percentage of classic partitions
// Read classic partitions from local log.
val (logReadResults, logReadResultMap) = if (classicRequestsSize > 0) {
val classicPercentage = classicRequestsSize / totalRequestsSize
val classicParams = replicaManager.fetchParamsWithNewMaxBytes(params, classicPercentage)
val results = replicaManager.readFromLog(classicParams, classicFetchInfos, quota, readFromPurgatory = true)
val resultMap = new util.HashMap[TopicIdPartition, LogReadResult]()
results.foreach { case (tp, r) => resultMap.put(tp, r) }
(results, resultMap)
} else (Seq.empty, new util.HashMap[TopicIdPartition, LogReadResult]())

val logReadResults = replicaManager.readFromLog(
classicParams,
classicFetchInfos,
quota,
readFromPurgatory = true
)
// Supplement consolidating partitions whose local read has remaining byte budget.
// consolidatingSupplements is passed from fetchMessages (identified at routing time),
// so no per-partition metadata lookups are needed here.
val supplementFetchInfos =
if (consolidatingSupplements.nonEmpty)
replicaManager.buildConsolidationSupplementFetchInfos(consolidatingSupplements, classicFetchInfos.toSeq, logReadResultMap)
else Seq.empty

logReadResults.map { case (tp, result) =>
val isReassignmentFetch = params.isFromFollower &&
replicaManager.isAddingReplica(tp.topicPartition, params.replicaId)

tp -> result.toFetchPartitionData(isReassignmentFetch)
val emptySupplementMap: Map[TopicIdPartition, FetchPartitionData] = Map.empty
val supplementFuture: CompletableFuture[Map[TopicIdPartition, FetchPartitionData]] =
if (supplementFetchInfos.nonEmpty) {
val supplementParams = replicaManager.fetchParamsWithNewMaxBytes(
params, supplementFetchInfos.size.toFloat / totalRequestsSize)
replicaManager.fetchDisklessMessages(supplementParams, supplementFetchInfos)
.thenApply[Map[TopicIdPartition, FetchPartitionData]](_.toMap)
.exceptionally((e: Throwable) => {
logger.warn("Failed to fetch diskless supplement for consolidating partitions in delayed fetch, returning local data only", e)
emptySupplementMap
})
} else {
CompletableFuture.completedFuture(emptySupplementMap)
}
} else Seq.empty

if (disklessRequestsSize > 0) {
// Classic fetches are complete, now handle diskless fetches
// adjust the max bytes for diskless fetches based on the percentage of diskless partitions
val disklessPercentage = disklessRequestsSize / totalRequestsSize
val disklessParams = replicaManager.fetchParamsWithNewMaxBytes(params, disklessPercentage)
val disklessFetchInfos = disklessFetchPartitionStatus.asScala.map { case (tp, status) =>
tp -> status.fetchInfo
val emptyDisklessSeq: Seq[(TopicIdPartition, FetchPartitionData)] = Seq.empty
val disklessFuture: CompletableFuture[Seq[(TopicIdPartition, FetchPartitionData)]] =
if (disklessRequestsSize > 0) {
val disklessPercentage = disklessRequestsSize / totalRequestsSize
val disklessParams = replicaManager.fetchParamsWithNewMaxBytes(params, disklessPercentage)
val disklessFetchInfos = disklessFetchPartitionStatus.asScala.map { case (tp, status) =>
tp -> status.fetchInfo
}
replicaManager.fetchDisklessMessages(disklessParams, disklessFetchInfos.toSeq)
.exceptionally((e: Throwable) => {
logger.warn("Failed to fetch diskless data in delayed fetch, returning per-partition errors", e)
val error = Errors.forException(e)
disklessFetchInfos.map { case (tp, _) =>
tp -> new FetchPartitionData(
error,
-1L,
-1L,
MemoryRecords.EMPTY,
util.Optional.empty(),
util.OptionalLong.empty(),
util.Optional.empty(),
util.OptionalInt.empty(),
false
)
}.toSeq
})
} else {
CompletableFuture.completedFuture(emptyDisklessSeq)
}
Comment thread
jeqo marked this conversation as resolved.
val disklessFetchResponseFuture = replicaManager.fetchDisklessMessages(disklessParams, disklessFetchInfos.toSeq)

// Combine the classic fetch results with the diskless fetch results
disklessFetchResponseFuture.whenComplete { case (disklessFetchPartitionData, _) =>
// Do a single response callback with both classic and diskless fetch results
responseCallback(fetchPartitionData ++ disklessFetchPartitionData)
}
} else {
// No diskless fetches, just return the classic fetch results
responseCallback(fetchPartitionData)
}
// Join both futures and fire the single response callback once both are ready.
// The supplement and pure-diskless fetches run concurrently so latency equals the slower of the two.
// The AtomicBoolean ensures responseCallback is called exactly once even if an unexpected exception
// escapes the thenCombine lambda after the happy-path callback has already fired.
val callbackFired = new AtomicBoolean(false)
supplementFuture.thenCombine[Seq[(TopicIdPartition, FetchPartitionData)], Void](disklessFuture,
(supplementData: Map[TopicIdPartition, FetchPartitionData],
disklessData: Seq[(TopicIdPartition, FetchPartitionData)]) => {
val classicData = logReadResults.map { case (tp, result) =>
val isReassignmentFetch = params.isFromFollower &&
replicaManager.isAddingReplica(tp.topicPartition, params.replicaId)
val localData = result.toFetchPartitionData(isReassignmentFetch)
val mergedData = supplementData.get(tp) match {
// don't merge if local result has errors
case Some(sd) if result.error == Errors.NONE && sd.error == Errors.NONE && sd.records.sizeInBytes > 0 =>
try replicaManager.mergeConsolidationSupplement(tp, localData, sd)
catch {
case e: Exception =>
logger.warn(s"Failed to merge diskless supplement for $tp in delayed fetch, returning local data only", e)
localData
}
case _ => localData
}
tp -> mergedData
}
if (callbackFired.compareAndSet(false, true))
responseCallback(classicData ++ disklessData)
null
}).exceptionally((e: Throwable) => {
logger.error("Unexpected error in delayed fetch completion", e)
if (callbackFired.compareAndSet(false, true))
responseCallback(Seq.empty)
null
})
}
}

Expand Down
Loading
Loading