Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,9 @@ class BrokerServer(
logManager.getLog(tp).foreach { log =>
log.updateLogStartOffsetFromRemoteTier(remoteLogStartOffset)
}
// For consolidating diskless topics, persist the leader's cross-tier earliest offset in the
// control plane so any broker can serve it for ListOffsets(EARLIEST). No-op for classic topics.
maybeInklessSharedState.foreach(_.crossTierLogStartReporter().enqueue(tp, remoteLogStartOffset))
},
brokerTopicStats, metrics, endpoint.toJava)
Some(rlm)
Expand Down
76 changes: 73 additions & 3 deletions core/src/main/scala/kafka/server/DisklessFetchOffsetRouter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.kafka.storage.internals.log.OffsetResultHolder.FileRecordsOrEr

import java.util.Optional
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{CompletableFuture, CopyOnWriteArrayList, Future}
import java.util.concurrent.{CompletableFuture, CompletionException, CopyOnWriteArrayList, Future}
import scala.jdk.OptionConverters.RichOptional

/**
Expand Down Expand Up @@ -136,9 +136,13 @@ class DisklessFetchOffsetRouter(
classicLookup()

case ListOffsetsRequest.EARLIEST_TIMESTAMP =>
// Always do classic first and then fall back to diskless with consolidating partitions
if (isConsolidatingPartition) {
classicWithDisklessFallbackLookup(topicPartition, version, classicLookup _, disklessLookup _, disklessLookupOnNewJob _)
// The cross-tier earliest offset lives on the classic leader's UnifiedLog; followers only
// learn it via the control plane (populated by the leader's CrossTierLogStartReporter).
// Take the max of both legs so the freshest authoritative value wins regardless of which
// broker serves: on the leader the classic leg is authoritative, on followers the classic
// leg is stale-low (safe) and the control-plane leg supplies the correct value.
maxOfClassicAndDiskless(topicPartition, classicLookupOf(classicLookup(), version), disklessLookup)
// Try classic first when classic data is still on disk, otherwise go straight to diskless.
} else if (classicLogStartOffsetProvider(topicPartition).exists(_ < classicToDisklessStartOffset)) {
classicLookup()
Expand Down Expand Up @@ -182,6 +186,31 @@ class DisklessFetchOffsetRouter(
} else classic
}

/**
* Run both legs and return the result carrying the larger offset. Used for EARLIEST on
* consolidating partitions, where the authoritative cross-tier earliest offset may live on
* either the classic leg (the leader's local UnifiedLog) or the diskless leg (the control
* plane, populated by the leader for followers). Both legs are normalized so an error on one
* leg does not discard a valid offset from the other; if neither leg yields an offset, an
* error (if any) is surfaced. The single cancel hook fans out to both underlying jobs.
*/
private def maxOfClassicAndDiskless(
tp: TopicPartition,
classic: Lookup,
diskless: Lookup
): ListOffsetsPartitionStatus = {
val cancelSignal = new FanOutCancelFuture
cancelSignal.addTarget(classic.jobFuture)
cancelSignal.addTarget(diskless.jobFuture)

val combined = normalizeToResult(classic.taskFuture).thenCombine[FileRecordsOrError, FileRecordsOrError](
normalizeToResult(diskless.taskFuture),
(c: FileRecordsOrError, d: FileRecordsOrError) => maxOffsetResult(c, d)
)

asStatus(tp, new AsyncOffsetReadFutureHolder(cancelSignal, combined))
}

/**
* Wrap a single async lookup as a `ListOffsetsPartitionStatus`, ensuring the purgatory wakes
* up to deliver the response when the lookup completes.
Expand Down Expand Up @@ -322,6 +351,47 @@ private[server] object DisklessFetchOffsetRouter {
private def isSyncNoMatch(r: ListOffsetsPartitionResponse): Boolean =
r.errorCode == Errors.NONE.code && r.offset < 0

/**
* Ensure a lookup future always completes normally with a `FileRecordsOrError`, converting an
* exceptional completion into a `FileRecordsOrError` carrying the (unwrapped) exception. This
* lets [[maxOfClassicAndDiskless]] combine both legs without one leg's failure discarding the
* other leg's valid offset.
*/
private def normalizeToResult(f: CompletableFuture[FileRecordsOrError]): CompletableFuture[FileRecordsOrError] =
f.exceptionally { t =>
val cause = t match {
case ce: CompletionException if ce.getCause != null => ce.getCause
case other => other
}
val ex = cause match {
case e: Exception => e
case other => new RuntimeException(other)
}
new FileRecordsOrError(Optional.of(ex), Optional.empty())
}

/**
* Combine two lookup results into the one carrying the larger offset. If only one leg has an
* offset, that leg wins; if neither does, an exception (if present) is surfaced, else the empty
* result is returned. Returning too-low an EARLIEST is safe (the consumer re-resolves), while
* returning too-high would skip readable data, so this never drops a known-higher offset.
*/
private def maxOffsetResult(a: FileRecordsOrError, b: FileRecordsOrError): FileRecordsOrError = {
val aHas = a.hasTimestampAndOffset
val bHas = b.hasTimestampAndOffset
if (aHas && bHas) {
if (b.timestampAndOffset.get.offset > a.timestampAndOffset.get.offset) b else a
} else if (aHas) {
a
} else if (bHas) {
b
} else if (a.hasException) {
a
} else {
b
}
}

private def syncResponseToResult(r: ListOffsetsPartitionResponse): FileRecordsOrError = {
if (r.errorCode != Errors.NONE.code) {
new FileRecordsOrError(Optional.of(Errors.forCode(r.errorCode).exception()), Optional.empty())
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import com.yammer.metrics.core.Meter
import io.aiven.inkless.common.SharedState
import io.aiven.inkless.consume.{ConcatenatedRecords, FetchHandler, FetchOffsetHandler, Reader}
import io.aiven.inkless.control_plane.{BatchInfo, FindBatchRequest, FindBatchResponse, InitDisklessLogProducerState}
import io.aiven.inkless.delete.{DeleteRecordsInterceptor, FileCleaner, RetentionEnforcer}
import io.aiven.inkless.delete.{CrossTierLogStartReporter, DeleteRecordsInterceptor, FileCleaner, RetentionEnforcer}
import io.aiven.inkless.produce.AppendHandler
import io.aiven.inkless.consolidation.{ConsolidatedDisklessLogPruner, ConsolidationFetcherManager, ConsolidationMetrics, ConsolidationReconciler}
import kafka.cluster.Partition
Expand Down Expand Up @@ -468,6 +468,8 @@ class ReplicaManager(val config: KafkaConfig,

scheduler.schedule("inkless-file-cleaner", () => inklessFileCleaner.foreach(_.run()), sharedState.config().fileCleanerInterval().toMillis, sharedState.config().fileCleanerInterval().toMillis)

scheduler.schedule("inkless-cross-tier-log-start-reporter", () => sharedState.crossTierLogStartReporter().run(), config.logInitialTaskDelayMs, CrossTierLogStartReporter.FLUSH_INTERVAL_MS)

inklessConsolidatedDisklessLogPruner.foreach { pruner =>
scheduler.schedule("inkless-consolidated-diskless-log-pruner", () => pruner.run(),
sharedState.config.consolidationCleanupInterval.toMillis, sharedState.config.consolidationCleanupInterval.toMillis)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ class DisklessFetchOffsetRouterTest {
private def resolvedStatus(response: ListOffsetsPartitionResponse): ListOffsetsPartitionStatus =
ListOffsetsPartitionStatus.builder().responseOpt(Optional.of(response)).build()

// A diskless leg result carrying a concrete offset.
private def offsetResult(offset: Long, timestamp: Long = 0L, leaderEpoch: Int = 1): FileRecordsOrError =
new FileRecordsOrError(
Optional.empty(),
Optional.of(new FileRecords.TimestampAndOffset(timestamp, offset, Optional.of[Integer](leaderEpoch))))

// Invoke `route()` with the standard test wiring.
private def route(router: DisklessFetchOffsetRouter,
timestamp: Long,
Expand Down Expand Up @@ -463,71 +469,106 @@ class DisklessFetchOffsetRouterTest {
}

@Test
def hybridConsolidatingEarliestAlwaysTriesClassicFirstWithDisklessFallback(): Unit = {
def consolidatingEarliestReturnsMaxOffsetFromDisklessWhenDisklessIsHigher(): Unit = {
when(inklessMetadataView.getClassicToDisklessStartOffset(tp)).thenReturn(PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET)
when(inklessMetadataView.isConsolidatingDisklessTopic(tp.topic)).thenReturn(true)
// classic leg resolves to offset 10, diskless leg to offset 50 => max picks 50.
disklessTaskFuture.complete(offsetResult(50L))
val classicHit = resolvedStatus(makeResponse(offset = 10L, timestamp = 111L))

val status = route(
newRouter(disklessManagedReplicasEnabled = true, disklessConsolidationEnabled = true),
newRouter(disklessConsolidationEnabled = true),
timestamp = ListOffsetsRequest.EARLIEST_TIMESTAMP,
classicLogStartOffset = Some(100L)
classicResult = classicHit
)

assertSame(defaultClassicResult, status)
// Both legs run: classic is invoked and the diskless lookup is added to the job.
assertClassicCalledWith(allowFromFollower = true)
verify(job, never()).add(any(), any())
verify(job).add(eqTo(tp), any())
assertTrue(status.futureHolderOpt.isPresent)
val result = status.futureHolderOpt.get.taskFuture.get(1, TimeUnit.SECONDS)
assertEquals(50L, result.timestampAndOffset.get.offset)
}

@Test
def hybridConsolidatingEarliestFallsBackToDisklessWhenClassicReturnsNoMatchEvenIfLogPastBoundary(): Unit = {
def consolidatingEarliestReturnsMaxOffsetFromClassicWhenClassicIsHigher(): Unit = {
when(inklessMetadataView.getClassicToDisklessStartOffset(tp)).thenReturn(100L)
when(inklessMetadataView.isConsolidatingDisklessTopic(tp.topic)).thenReturn(true)
val noMatch = resolvedStatus(makeResponse(offset = -1L, errorCode = Errors.NONE.code))
// classic leg resolves to offset 80, diskless leg to offset 50 => max picks 80.
disklessTaskFuture.complete(offsetResult(50L))
val classicHit = resolvedStatus(makeResponse(offset = 80L, timestamp = 222L))

val status = route(
newRouter(disklessConsolidationEnabled = true),
timestamp = ListOffsetsRequest.EARLIEST_TIMESTAMP,
classicLogStartOffset = Some(100L),
classicResult = noMatch
classicResult = classicHit
)

assertClassicCalledWith(allowFromFollower = true)
verify(job).add(eqTo(tp), any())
assertTrue(status.futureHolderOpt.isPresent, "fallback to diskless surfaces an async holder")
assertNotSame(noMatch, status)
assertTrue(status.futureHolderOpt.isPresent)
val result = status.futureHolderOpt.get.taskFuture.get(1, TimeUnit.SECONDS)
assertEquals(80L, result.timestampAndOffset.get.offset)
}

@Test
def hybridConsolidatingWithCommittedBoundaryRequiresCompleteClassicPrefixForFollowerAccess(): Unit = {
when(inklessMetadataView.getClassicToDisklessStartOffset(tp)).thenReturn(100L)
def consolidatingEarliestUsesDisklessOffsetWhenClassicLegFails(): Unit = {
when(inklessMetadataView.getClassicToDisklessStartOffset(tp)).thenReturn(PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET)
when(inklessMetadataView.isConsolidatingDisklessTopic(tp.topic)).thenReturn(true)
// classic leg errors; the diskless offset must still be returned (an error on one leg
// must not discard a valid offset from the other).
disklessTaskFuture.complete(offsetResult(50L))
val classicError = resolvedStatus(makeResponse(offset = -1L, errorCode = Errors.LEADER_NOT_AVAILABLE.code))

val status = route(
newRouter(disklessConsolidationEnabled = true),
timestamp = ListOffsetsRequest.EARLIEST_TIMESTAMP,
classicLogStartOffset = Some(0L),
hasCompleteClassicPrefix = false
classicResult = classicError
)

assertSame(defaultClassicResult, status)
assertClassicCalledWith(allowFromFollower = false)
verify(job, never()).add(any(), any())
assertTrue(status.futureHolderOpt.isPresent)
val result = status.futureHolderOpt.get.taskFuture.get(1, TimeUnit.SECONDS)
assertFalse(result.hasException)
assertEquals(50L, result.timestampAndOffset.get.offset)
}

@Test
def consolidatingEarliestSurfacesErrorWhenBothLegsFail(): Unit = {
when(inklessMetadataView.getClassicToDisklessStartOffset(tp)).thenReturn(PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET)
when(inklessMetadataView.isConsolidatingDisklessTopic(tp.topic)).thenReturn(true)
val disklessFailure = new RuntimeException("diskless boom")
disklessTaskFuture.completeExceptionally(disklessFailure)
val classicError = resolvedStatus(makeResponse(offset = -1L, errorCode = Errors.LEADER_NOT_AVAILABLE.code))

val status = route(
newRouter(disklessConsolidationEnabled = true),
timestamp = ListOffsetsRequest.EARLIEST_TIMESTAMP,
classicResult = classicError
)

assertTrue(status.futureHolderOpt.isPresent)
val result = status.futureHolderOpt.get.taskFuture.get(1, TimeUnit.SECONDS)
assertTrue(result.hasException)
}

@Test
def hybridConsolidatingEarliestReturnsClassicWhenClassicHitsEvenIfLogPastBoundary(): Unit = {
def consolidatingEarliestWithCommittedBoundaryDeniesFollowerAccessWhenPrefixIncomplete(): Unit = {
when(inklessMetadataView.getClassicToDisklessStartOffset(tp)).thenReturn(100L)
when(inklessMetadataView.isConsolidatingDisklessTopic(tp.topic)).thenReturn(true)
disklessTaskFuture.complete(offsetResult(50L))

val status = route(
newRouter(disklessConsolidationEnabled = true),
timestamp = ListOffsetsRequest.EARLIEST_TIMESTAMP,
classicLogStartOffset = Some(100L)
classicLogStartOffset = Some(0L),
hasCompleteClassicPrefix = false
)

assertSame(defaultClassicResult, status)
assertClassicCalledWith(allowFromFollower = true)
verify(job, never()).add(any(), any())
// Both legs run, but the classic leg must be denied follower access until the local prefix is complete.
assertClassicCalledWith(allowFromFollower = false)
verify(job).add(eqTo(tp), any())
assertTrue(status.futureHolderOpt.isPresent)
}

@Test
Expand Down
15 changes: 15 additions & 0 deletions docs/inkless/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,21 @@ Under ``inkless.``
* Valid Values: [1,...]
* Importance: low

``consume.cross.tier.log.start.cache.enabled``
If true, the cross-tier log start offset cache is enabled. It caches the EARLIEST offset of consolidating diskless topics to avoid querying the control plane on every request.

* Type: boolean
* Default: true
* Importance: low

``consume.cross.tier.log.start.cache.ttl.ms``
Time to live in milliseconds for an entry in the cross-tier log start offset cache. A stale entry can only ever be too low (the safe direction), so this only bounds how quickly a retention advance becomes visible from non-leader brokers.

* Type: int
* Default: 10000 (10 seconds)
* Valid Values: [1,...]
* Importance: low

``fetch.data.thread.pool.size``
Thread pool size to concurrently fetch data files from remote storage

Expand Down
Loading
Loading