Skip to content

fix(inkless): find_batches rejects offset below log start offset#666

Open
jeqo wants to merge 1 commit into
mainfrom
jeqo/find-batches-reject-below-log-start
Open

fix(inkless): find_batches rejects offset below log start offset#666
jeqo wants to merge 1 commit into
mainfrom
jeqo/find-batches-reject-below-log-start

Conversation

@jeqo

@jeqo jeqo commented Jun 25, 2026

Copy link
Copy Markdown
Contributor

Fix both control plane implementations to ensure offsets below log start offset are rejected:

For PG procedure changes, this is the diff:

  -                    WHEN r.starting_offset < 0 OR r.starting_offset > l.high_watermark THEN 'offset_out_of_range'
  +                    WHEN r.starting_offset < l.log_start_offset OR r.starting_offset > l.high_watermark THEN 'offset_out_of_range'

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR aligns findBatches behavior with Kafka semantics by rejecting fetch requests whose starting offset is below the current log start offset (i.e., offsets that have been deleted/aged out), across both the in-memory and Postgres-backed control planes.

Changes:

  • Update in-memory control plane fetch validation to return OFFSET_OUT_OF_RANGE when offset < logStartOffset.
  • Update Postgres find_batches_v2 function (via new Flyway migration) to treat starting_offset < log_start_offset as offset_out_of_range.
  • Extend control plane tests to cover “below log start” fetches and adjust existing delete-related fetch expectations; regenerate jOOQ artifacts to schema version 14.

Reviewed changes

Copilot reviewed 3 out of 101 changed files in this pull request and generated no comments.

Show a summary per file
File Description
storage/inkless/src/test/java/io/aiven/inkless/control_plane/AbstractControlPlaneTest.java Adds/adjusts tests to assert OFFSET_OUT_OF_RANGE for offsets below logStartOffset.
storage/inkless/src/main/resources/db/migration/V14__Find_batches_rejects_below_log_start.sql Replaces find_batches_v2 to reject starting_offset < log_start_offset.
storage/inkless/src/main/java/io/aiven/inkless/control_plane/InMemoryControlPlane.java Changes fetch lower-bound check from < 0 to < logStartOffset.
storage/inkless/src/main/jooq/org/jooq/generated/UDTs.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/Tables.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/Routines.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/Keys.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/Indexes.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/Domains.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/DefaultSchema.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/records/PruneBatchesBelowHighestTieredOffsetResponseV1Record.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/records/PruneBatchesBelowHighestTieredOffsetRequestV1Record.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/records/ListOffsetsResponseV1Record.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/records/ListOffsetsRequestV1Record.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/records/InitDisklessLogResponseV1Record.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/records/InitDisklessLogRequestV1Record.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/records/InitDisklessLogProducerStateV1Record.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/records/FindBatchesResponseV1Record.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/records/FindBatchesRequestV1Record.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/records/EnforceRetentionResponseV1Record.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/records/EnforceRetentionRequestV1Record.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/records/DeleteRecordsResponseV1Record.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/records/DeleteRecordsRequestV1Record.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/records/CommitBatchResponseV1Record.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/records/CommitBatchRequestV1Record.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/records/BatchMetadataV1Record.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/records/BatchInfoV1Record.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/PruneBatchesBelowHighestTieredOffsetResponseV1.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/PruneBatchesBelowHighestTieredOffsetRequestV1.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/PruneBatchesBelowHighestTieredOffsetResponseV1Path.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/PruneBatchesBelowHighestTieredOffsetRequestV1Path.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/ListOffsetsResponseV1Path.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/ListOffsetsRequestV1Path.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/InitDisklessLogResponseV1Path.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/InitDisklessLogRequestV1Path.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/InitDisklessLogProducerStateV1Path.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/FindBatchesResponseV1Path.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/FindBatchesRequestV1Path.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/EnforceRetentionResponseV1Path.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/EnforceRetentionRequestV1Path.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/DeleteRecordsResponseV1Path.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/DeleteRecordsRequestV1Path.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/CommitBatchResponseV1Path.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/CommitBatchRequestV1Path.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/BatchMetadataV1Path.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/BatchInfoV1Path.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/ListOffsetsResponseV1.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/ListOffsetsRequestV1.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/InitDisklessLogResponseV1.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/InitDisklessLogRequestV1.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/InitDisklessLogProducerStateV1.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/FindBatchesResponseV1.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/FindBatchesRequestV1.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/EnforceRetentionResponseV1.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/EnforceRetentionRequestV1.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/DeleteRecordsResponseV1.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/DeleteRecordsRequestV1.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/CommitBatchResponseV1.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/CommitBatchRequestV1.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/BatchMetadataV1.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/udt/BatchInfoV1.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/tables/records/PruneBatchesBelowHighestTieredOffsetV1Record.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/tables/records/ProducerStateRecord.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/tables/records/LogsRecord.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/tables/records/ListOffsetsV1Record.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/tables/records/InitDisklessLogV1Record.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/tables/records/FindBatchesV2Record.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/tables/records/FindBatchesV1Record.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/tables/records/FilesRecord.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/tables/records/EnforceRetentionV2Record.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/tables/records/EnforceRetentionV1Record.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/tables/records/DeleteRecordsV1Record.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/tables/records/CommitFileV1Record.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/tables/records/BatchesRecord.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/tables/PruneBatchesBelowHighestTieredOffsetV1.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/tables/ProducerState.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/tables/Logs.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/tables/ListOffsetsV1.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/tables/InitDisklessLogV1.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/tables/FindBatchesV2.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/tables/FindBatchesV1.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/tables/Files.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/tables/EnforceRetentionV2.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/tables/EnforceRetentionV1.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/tables/DeleteRecordsV1.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/tables/CommitFileV1.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/tables/Batches.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/routines/MarkFileToDeleteV1.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/routines/DeleteTopicV1.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/routines/DeleteFilesV1.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/routines/DeleteBatchV1.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/routines/BatchTimestamp.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/enums/PruneBatchesBelowHighestTieredOffsetErrorV1.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/enums/ListOffsetsResponseErrorV1.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/enums/InitDisklessLogErrorV1.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/enums/FindBatchesResponseErrorV1.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/enums/FileStateT.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/enums/FileReasonT.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/enums/EnforceRetentionResponseErrorV1.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/enums/DeleteRecordsResponseErrorV1.java jOOQ regen: schema version 14 metadata bump.
storage/inkless/src/main/jooq/org/jooq/generated/enums/CommitBatchResponseErrorV1.java jOOQ regen: schema version 14 metadata bump.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@viktorsomogyi viktorsomogyi left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this now fixes the bug we talked about but may introduce a new one. We currently don't have tests for that but I'll try to introduce a few soon.
After this change, a fetch for an offset below the diskless WAL start returns OFFSET_OUT_OF_RANGE instead of NONE. The problem is that DisklessLeaderEndPoint.fetch only rewrites the response to OFFSET_MOVED_TO_TIERED_STORAGE when the diskless fetch came back with NONE:

if (fetchResponseData.errorCode == Errors.NONE.code) {

This can be problematic after broker restarts where it lost the local data as it starts asking from offset 0 and expects an OFFSET_MOVED_TO_TIERED_STORAGE to rebuild its tier-state metadata and serve [0, disklessStart) from remote. Probably we should also expect OFFSET_OUT_OF_RANGE in DisklessLeaderEndPoint.

Edit: generated a test case that demonstrates this:

/*
 * Inkless
 * Copyright (C) 2024 - 2026 Aiven OY
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

package io.aiven.inkless.consolidation

import io.aiven.inkless.cache.{FixedBlockAlignment, NullCache}
import io.aiven.inkless.common.{ObjectFormat, ObjectKey}
import io.aiven.inkless.consume.{FetchHandler, FetchOffsetHandler, Reader}
import io.aiven.inkless.control_plane.{CommitBatchRequest, CreateTopicAndPartitionsRequest, DeleteRecordsRequest, InMemoryControlPlane}
import io.aiven.inkless.storage_backend.common.ObjectFetcher
import kafka.cluster.Partition
import kafka.server.{KafkaConfig, QuotaFactory, ReplicaManager}
import kafka.utils.TestUtils
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.utils.MockTime
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.server.{PartitionFetchState, ReplicaState}
import org.apache.kafka.storage.internals.log.UnifiedLog
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.mockito.Mockito.{mock, when}

import java.util
import java.util.Optional
import scala.util.Right

/**
 * Integration test for the consolidated read-from-remote handoff (#650), exercising
 * [[DisklessLeaderEndPoint.fetch]] over a REAL diskless fetch stack
 * ([[FetchHandler]] -> [[Reader]] -> [[InMemoryControlPlane]]) instead of a mocked
 * [[FetchHandler]].
 *
 * This wiring matters: every case in `DisklessLeaderEndPointTest` stubs
 * `fetchHandler.handle(...)` to return `Errors.NONE`, which is the precondition the
 * `OFFSET_MOVED_TO_TIERED_STORAGE` rewrite in `DisklessLeaderEndPoint` depends on. Those
 * mocks therefore cannot observe what the control plane actually returns for an offset in
 * the pruned prefix `[wholeLogStart, disklessStart)`.
 *
 * With the `find_batches` change ("reject offset below log start offset"), the control
 * plane now returns `OFFSET_OUT_OF_RANGE` for that range. `FetchCompleter` propagates it,
 * so `DisklessLeaderEndPoint` sees a non-`NONE` error and SKIPS the tiered-storage redirect
 * -- the consolidation fetcher gets `OFFSET_OUT_OF_RANGE` instead of being told to rebuild
 * the prefix from remote. This test pins the #650 contract and is expected to FAIL until
 * `DisklessLeaderEndPoint` is taught to redirect on `OFFSET_OUT_OF_RANGE` within the
 * consolidated prefix.
 */
class DisklessLeaderEndPointConsolidationReadIntegrationTest {

  private val brokerEndPoint = new BrokerEndPoint(1, "localhost", 9092)
  private val topicPartition = new TopicPartition("diskless-topic", 0)
  private val topicId = Uuid.randomUuid()
  private val topicIdPartition = new TopicIdPartition(topicId, topicPartition)

  private val brokerId = 11
  private val fileSize = 123456L

  private def kafkaConfig: KafkaConfig =
    KafkaConfig.fromProps(TestUtils.createBrokerConfig(brokerEndPoint.id, port = brokerEndPoint.port))

  /**
   * Builds a control plane holding a single batch over offsets `[0, 10)` whose log start
   * offset has then been advanced to `disklessStart` via `deleteRecords` -- i.e. the WAL
   * prefix `[0, disklessStart)` has been pruned after consolidation to remote.
   */
  private def consolidatedControlPlane(time: MockTime, disklessStart: Long): InMemoryControlPlane = {
    val controlPlane = new InMemoryControlPlane(time)
    controlPlane.configure(util.Map.of("producer.id.expiration.ms", Int.box(60000)))
    controlPlane.createTopicAndPartitions(
      util.Set.of(new CreateTopicAndPartitionsRequest(topicId, topicPartition.topic, 1))
    )
    // One batch spanning offsets [0, 10): base 0, last 9 -> high watermark becomes 10.
    controlPlane.commitFile(
      "obj-0", ObjectFormat.WRITE_AHEAD_MULTI_SEGMENT, brokerId, fileSize,
      util.List.of(CommitBatchRequest.of(0, topicIdPartition, 0, 100, 0, 9, 1000L, TimestampType.CREATE_TIME))
    )
    // Advance the diskless WAL start (== control-plane log_start_offset) to disklessStart.
    controlPlane.deleteRecords(util.List.of(new DeleteRecordsRequest(topicIdPartition, disklessStart)))
    controlPlane
  }

  private def newReader(controlPlane: InMemoryControlPlane, time: MockTime): Reader =
    new Reader(
      time,
      ObjectKey.creator("", false),
      new FixedBlockAlignment(Integer.MAX_VALUE),
      new NullCache(),
      controlPlane,
      mock(classOf[ObjectFetcher]), // never invoked: an out-of-range response carries no batches to fetch
      new BrokerTopicStats(),
      1, // fetchMetadataThreadPoolSize
      1, // fetchDataThreadPoolSize
      Optional.empty[ObjectFetcher](), // lagging consumer storage
      60000L, // laggingConsumerThresholdMs
      0, // laggingConsumerRequestRateLimit
      0, // laggingConsumerThreadPoolSize -- feature disabled
      0L, // hedgeTtfbThresholdMs -- disabled
      0L, // hedgeTotalTimeThresholdMs -- disabled
      10 // maxBatchesPerPartitionToFind
    )

  /**
   * A consolidating partition whose local [[UnifiedLog]] reports the whole-log start `0`
   * (e.g. a born-consolidated topic whose local log was rebuilt empty after losing all local
   * copies) and has remote storage enabled, so the prefix `[0, disklessStart)` lives only on
   * the remote tier. Both [[ReplicaManager.localLogOrException]] (used by `buildFetch`) and
   * [[ReplicaManager.getPartitionOrError]] (used by `fetch`) are stubbed.
   */
  private def replicaManagerForConsolidatedPartition(localLogStartOffset: Long): ReplicaManager = {
    val replicaManager = mock(classOf[ReplicaManager])
    val partition = mock(classOf[Partition])
    val localLog = mock(classOf[UnifiedLog])
    when(localLog.logStartOffset).thenReturn(localLogStartOffset)
    when(localLog.remoteLogEnabled()).thenReturn(true)
    when(partition.localLogOrException).thenReturn(localLog)
    when(replicaManager.localLogOrException(topicPartition)).thenReturn(localLog)
    when(replicaManager.getPartitionOrError(topicPartition)).thenReturn(Right(partition))
    replicaManager
  }

  /**
   * A fetcher freshly armed after a local-log loss: it has nothing locally yet, so its fetch
   * position is the whole-log start (0), which is below the pruned diskless WAL start.
   */
  private def rehydratingFetchState: PartitionFetchState =
    new PartitionFetchState(
      Optional.of(topicId),
      0L, // fetchOffset == whole-log start after local-log loss
      Optional.empty(),
      4, // currentLeaderEpoch
      ReplicaState.FETCHING,
      Optional.empty()
    )

  @Test
  def consolidationFetcherArmedAtWholeLogStartRedirectsToTieredStorageWithRealControlPlane(): Unit = {
    val disklessStart = 5L
    val time = new MockTime()
    val controlPlane = consolidatedControlPlane(time, disklessStart)
    val fetchHandler = new FetchHandler(newReader(controlPlane, time))
    try {
      val endPoint = new DisklessLeaderEndPoint(
        brokerEndPoint,
        fetchHandler,
        mock(classOf[FetchOffsetHandler]),
        replicaManagerForConsolidatedPartition(localLogStartOffset = 0L),
        kafkaConfig,
        QuotaFactory.UNBOUNDED_QUOTA,
        () => MetadataVersion.LATEST_PRODUCTION,
        () => 7L
      )

      // Drive the real fetcher loop: buildFetch arms the request from the post-loss fetch state
      // (offset 0, the whole-log start -- NOT the diskless WAL start 5), exactly as the
      // consolidation fetcher does during rehydration. This is the offset that must be redirected
      // to remote; the steady-state fetcher at/after offset 5 is served from the WAL and is
      // unaffected by the find_batches change.
      val replicaFetch = endPoint.buildFetch(util.Map.of(topicPartition, rehydratingFetchState))
      assertTrue(replicaFetch.result.isPresent, "buildFetch should produce a fetch request")
      assertEquals(
        0L,
        replicaFetch.result.get.partitionData.get(topicPartition).fetchOffset,
        "the rehydrating consolidation fetcher must arm at the whole-log start (0), in the pruned prefix"
      )

      val pd = endPoint.fetch(replicaFetch.result.get.fetchRequest).get(topicPartition)

      // The #650 contract: an offset in the consolidated remote prefix [0, disklessStart) must
      // redirect to tiered storage so the tier-state machine rebuilds it from remote.
      assertEquals(
        Errors.OFFSET_MOVED_TO_TIERED_STORAGE.code,
        pd.errorCode,
        s"offset 0 in the consolidated prefix [0, $disklessStart) must redirect to tiered storage, " +
          s"but the real control plane returned ${Errors.forCode(pd.errorCode)}"
      )
      // The whole-log start (0) is preserved so the tier-state rebuild keeps logStartOffset at 0.
      assertEquals(0L, pd.logStartOffset)
    } finally {
      fetchHandler.close()
    }
  }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants