feat(inkless): track and serve the cross-tier earliest offset for consolidated topics#670
Draft
viktorsomogyi wants to merge 6 commits into
Draft
Conversation
Contributor
There was a problem hiding this comment.
Pull request overview
This PR makes the Inkless control plane the source of truth for the “cross-tier log start offset” (earliest physically readable offset across tiers for consolidating diskless topics) and serves it on the ListOffsets(EARLIEST) read path via a TTL/monotonic cache, with a leader-side reporter that persists remote-retention advances to the control plane.
Changes:
- Add control-plane persistence for cross-tier earliest offset (
remote_log_start_offset) with a monotonic SQL update function and updatedlist_offsets_v1behavior. - Add a Caffeine-based monotonic/TTL cache and a leader-side reporter to push/serve the cross-tier earliest offset efficiently.
- Update broker routing/read path and add unit + system coverage for cross-tier retention reclaim behavior.
Reviewed changes
Copilot reviewed 35 out of 142 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/kafkatest/tests/inkless/consolidation_retention_across_tiers_test.py | New system test validating retention reclaim advances EARLIEST across tiers on followers and remote objects are deleted. |
| tests/kafkatest/services/inkless/consolidation_verifier.py | Adds producer throttling support and a helper to wait for EARLIEST to settle. |
| storage/inkless/src/test/java/io/aiven/inkless/delete/CrossTierLogStartReporterTest.java | Unit tests for reporter buffering/flush/write-through/cache behavior. |
| storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/TopicsAndPartitionsCreateJobTest.java | Updates expected LogsRecord constructor for new DB column. |
| storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/DeleteTopicJobTest.java | Updates expected LogsRecord constructor for new DB column. |
| storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/DeleteRecordsJobTest.java | Updates expected LogsRecord constructor for new DB column. |
| storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/CommitFileJobTest.java | Updates expected LogsRecord constructor for new DB column. |
| storage/inkless/src/test/java/io/aiven/inkless/control_plane/AbstractControlPlaneTest.java | Adds control-plane tests for advancing and serving cross-tier earliest offsets. |
| storage/inkless/src/test/java/io/aiven/inkless/consume/FetchOffsetHandlerTest.java | Adds cache hit/miss behavior tests for consolidating EARLIEST. |
| storage/inkless/src/test/java/io/aiven/inkless/cache/CaffeineCrossTierLogStartCacheTest.java | New tests for TTL, monotonic put semantics, and invalid TTL handling. |
| storage/inkless/src/main/resources/db/migration/V15__Cross_tier_log_start.sql | Adds remote_log_start_offset column and monotonic advance function. |
| storage/inkless/src/main/resources/db/migration/V16__List_offsets_cross_tier_earliest.sql | Updates list_offsets_v1 so EARLIEST prefers remote_log_start_offset when present. |
| storage/inkless/src/main/jooq/org/jooq/generated/UDTs.java | jOOQ regen: adds new UDTs and bumps schema version to 16. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/PruneBatchesBelowHighestTieredOffsetResponseV1Record.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/PruneBatchesBelowHighestTieredOffsetRequestV1Record.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/ListOffsetsResponseV1Record.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/ListOffsetsRequestV1Record.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/InitDisklessLogResponseV1Record.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/InitDisklessLogRequestV1Record.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/InitDisklessLogProducerStateV1Record.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/FindBatchesResponseV1Record.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/FindBatchesRequestV1Record.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/EnforceRetentionResponseV1Record.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/EnforceRetentionRequestV1Record.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/DeleteRecordsResponseV1Record.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/DeleteRecordsRequestV1Record.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/CommitBatchResponseV1Record.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/CommitBatchRequestV1Record.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/BatchMetadataV1Record.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/BatchInfoV1Record.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/AdvanceCrossTierLogStartResponseV1Record.java | New jOOQ record for advance-cross-tier response UDT. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/AdvanceCrossTierLogStartRequestV1Record.java | New jOOQ record for advance-cross-tier request UDT. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/PruneBatchesBelowHighestTieredOffsetResponseV1.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/PruneBatchesBelowHighestTieredOffsetRequestV1.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/PruneBatchesBelowHighestTieredOffsetResponseV1Path.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/PruneBatchesBelowHighestTieredOffsetRequestV1Path.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/ListOffsetsResponseV1Path.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/ListOffsetsRequestV1Path.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/InitDisklessLogResponseV1Path.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/InitDisklessLogRequestV1Path.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/InitDisklessLogProducerStateV1Path.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/FindBatchesResponseV1Path.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/FindBatchesRequestV1Path.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/EnforceRetentionResponseV1Path.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/EnforceRetentionRequestV1Path.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/DeleteRecordsResponseV1Path.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/DeleteRecordsRequestV1Path.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/CommitBatchResponseV1Path.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/CommitBatchRequestV1Path.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/BatchMetadataV1Path.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/BatchInfoV1Path.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/AdvanceCrossTierLogStartResponseV1Path.java | New jOOQ path type for advance-cross-tier response UDT. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/AdvanceCrossTierLogStartRequestV1Path.java | New jOOQ path type for advance-cross-tier request UDT. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/ListOffsetsResponseV1.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/ListOffsetsRequestV1.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/InitDisklessLogResponseV1.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/InitDisklessLogRequestV1.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/InitDisklessLogProducerStateV1.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/FindBatchesResponseV1.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/FindBatchesRequestV1.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/EnforceRetentionResponseV1.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/EnforceRetentionRequestV1.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/DeleteRecordsResponseV1.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/DeleteRecordsRequestV1.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/CommitBatchResponseV1.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/CommitBatchRequestV1.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/BatchMetadataV1.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/BatchInfoV1.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/AdvanceCrossTierLogStartResponseV1.java | New jOOQ UDT for advance-cross-tier response. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/AdvanceCrossTierLogStartRequestV1.java | New jOOQ UDT for advance-cross-tier request. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/PruneBatchesBelowHighestTieredOffsetV1Record.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/ProducerStateRecord.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/LogsRecord.java | Adds remote_log_start_offset field + updates constructor signature. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/ListOffsetsV1Record.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/InitDisklessLogV1Record.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/FindBatchesV2Record.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/FindBatchesV1Record.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/FilesRecord.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/EnforceRetentionV2Record.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/EnforceRetentionV1Record.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/DeleteRecordsV1Record.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/CommitFileV1Record.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/BatchesRecord.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/AdvanceCrossTierLogStartV1Record.java | New jOOQ record for advance-cross-tier function table. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/PruneBatchesBelowHighestTieredOffsetV1.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/ProducerState.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/Logs.java | Adds REMOTE_LOG_START_OFFSET field and schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/ListOffsetsV1.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/InitDisklessLogV1.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/FindBatchesV2.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/FindBatchesV1.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/Files.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/EnforceRetentionV2.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/EnforceRetentionV1.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/DeleteRecordsV1.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/CommitFileV1.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/Batches.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/AdvanceCrossTierLogStartV1.java | New jOOQ table for advance-cross-tier function call. |
| storage/inkless/src/main/jooq/org/jooq/generated/Tables.java | Adds advance-cross-tier function/table helpers and schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/routines/MarkFileToDeleteV1.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/routines/DeleteTopicV1.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/routines/DeleteFilesV1.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/routines/DeleteBatchV1.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/routines/BatchTimestamp.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/Routines.java | Adds advance-cross-tier function helpers and schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/Keys.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/Indexes.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/enums/PruneBatchesBelowHighestTieredOffsetErrorV1.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/enums/ListOffsetsResponseErrorV1.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/enums/InitDisklessLogErrorV1.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/enums/FindBatchesResponseErrorV1.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/enums/FileStateT.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/enums/FileReasonT.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/enums/EnforceRetentionResponseErrorV1.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/enums/DeleteRecordsResponseErrorV1.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/enums/CommitBatchResponseErrorV1.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/enums/AdvanceCrossTierLogStartResponseErrorV1.java | New jOOQ enum for advance-cross-tier error. |
| storage/inkless/src/main/jooq/org/jooq/generated/Domains.java | jOOQ regen: schema version bump. |
| storage/inkless/src/main/jooq/org/jooq/generated/DefaultSchema.java | Registers new table/UDTs and schema version bump. |
| storage/inkless/src/main/java/io/aiven/inkless/doc/MetricsDocs.java | Adds reporter/cache metric tables to metrics doc generator output. |
| storage/inkless/src/main/java/io/aiven/inkless/delete/CrossTierLogStartReporterMetrics.java | New JMX metrics for the reporter. |
| storage/inkless/src/main/java/io/aiven/inkless/delete/CrossTierLogStartReporter.java | New leader-side reporter buffering/flush to control plane + cache write-through. |
| storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/PostgresControlPlaneMetrics.java | Adds query metrics for advance-cross-tier updates. |
| storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/PostgresControlPlane.java | Implements new advanceCrossTierLogStartOffset API via a job. |
| storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/AdvanceCrossTierLogStartOffsetJob.java | New Postgres job calling the SQL function and mapping results. |
| storage/inkless/src/main/java/io/aiven/inkless/control_plane/InMemoryControlPlane.java | Implements cross-tier earliest tracking and serves it from listOffsets(EARLIEST). |
| storage/inkless/src/main/java/io/aiven/inkless/control_plane/ControlPlane.java | Adds advanceCrossTierLogStartOffset to the control plane API. |
| storage/inkless/src/main/java/io/aiven/inkless/control_plane/AdvanceCrossTierLogStartOffsetRequest.java | New request type for cross-tier earliest updates. |
| storage/inkless/src/main/java/io/aiven/inkless/control_plane/AdvanceCrossTierLogStartOffsetResponse.java | New response type for cross-tier earliest updates. |
| storage/inkless/src/main/java/io/aiven/inkless/consume/FetchOffsetHandler.java | Adds read-through cache for consolidating EARLIEST offsets. |
| storage/inkless/src/main/java/io/aiven/inkless/config/InklessConfig.java | Adds configs for enabling and TTL of cross-tier earliest cache. |
| storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java | Wires cache and reporter into broker shared state and lifecycle. |
| storage/inkless/src/main/java/io/aiven/inkless/cache/CrossTierLogStartCache.java | New cache abstraction for cross-tier earliest offsets. |
| storage/inkless/src/main/java/io/aiven/inkless/cache/NullCrossTierLogStartCache.java | No-op cache implementation when disabled. |
| storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCrossTierLogStartCache.java | Caffeine TTL + monotonic put implementation with JMX metrics. |
| storage/inkless/src/main/java/io/aiven/inkless/cache/CrossTierLogStartCacheMetrics.java | JMX metrics for cache hits/misses/size. |
| docs/inkless/metrics.rst | Documents new reporter/cache/control-plane metrics. |
| docs/inkless/configs.rst | Documents new cache config keys. |
| core/src/test/scala/unit/kafka/server/DisklessFetchOffsetRouterTest.scala | Updates router tests for new “max of both legs” consolidating earliest behavior. |
| core/src/main/scala/kafka/server/ReplicaManager.scala | Schedules periodic flush of the cross-tier log start reporter. |
| core/src/main/scala/kafka/server/DisklessFetchOffsetRouter.scala | Changes consolidating EARLIEST routing to combine classic + diskless legs via max offset with failure normalization. |
| core/src/main/scala/kafka/server/BrokerServer.scala | Enqueues leader-observed remote log start offset updates into the reporter via RLM callback. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+431
to
+434
| // Forward-only update. | ||
| if (logInfo.remoteLogStartOffset < 0 || request.remoteLogStartOffset() > logInfo.remoteLogStartOffset) { | ||
| logInfo.remoteLogStartOffset = request.remoteLogStartOffset(); | ||
| } |
Comment on lines
+65
to
+69
| // Pending updates not yet flushed to the control plane (per partition, highest reported offset). | ||
| private final ConcurrentHashMap<TopicIdPartition, Long> pending = new ConcurrentHashMap<>(); | ||
| // Highest offset already accepted by the control plane, used to drop non-advancing reports. | ||
| private final ConcurrentHashMap<TopicIdPartition, Long> lastReported = new ConcurrentHashMap<>(); | ||
|
|
…nsolidation pipeline Implements Test 5 of the consolidation system test plan: inject object- store and control-plane outages (iptables DROP, since the deps are standalone ducknet containers) and assert no premature WAL prune, clean catch-up after recovery, and no data loss. Adds iptables block/heal and best-effort attempt_produce helpers to ConsolidationVerifier.
1c328e2 to
370cea8
Compare
… control plane
Introduce the control plane as the source of truth for the earliest
physically readable offset across the remote and local tiers of a
born-consolidated topic ("cross-tier log start offset").
- Add the remote_log_start_offset column to the logs table and an
advance_cross_tier_log_start_offset_v1 function (migration V15).
- Make list_offsets_v1 return the cross-tier value for EARLIEST_TIMESTAMP
queries (migration V16) and regenerate the jOOQ classes.
- Add ControlPlane.advanceCrossTierLogStartOffset with request/response
types, implemented in both InMemoryControlPlane and PostgresControlPlane
(AdvanceCrossTierLogStartOffsetJob) with a dedicated query metric.
Co-authored-by: Cursor <cursoragent@cursor.com>
Add a read-/write-through cache for the cross-tier log start offset to keep the ListOffsets read path off the control plane on the hot path. - Add the CrossTierLogStartCache abstraction with a Caffeine-backed (TTL + monotonic put) implementation, a Null no-op implementation, and hit/miss/size metrics. - Add the inkless.cross.tier.log.start.cache.* configuration keys and regenerate the configuration docs. Co-authored-by: Cursor <cursoragent@cursor.com>
…t to the control plane Push the leader's RLM-advanced earliest offset to the control plane so it becomes visible to followers and to ListOffsets queries. - Add CrossTierLogStartReporter, which buffers monotonic per-partition updates (diskless-only), flushes them to the control plane, writes through to the cache, and exposes reported/error/pending metrics. - Wire the reporter and cache into SharedState and trigger it from the RLM callback in BrokerServer/ReplicaManager on the leader. - Register the new metrics in MetricsDocs and regenerate the metrics docs. Co-authored-by: Cursor <cursoragent@cursor.com>
…Offsets Make ListOffsets(EARLIEST_TIMESTAMP) return the cross-tier earliest offset for consolidating partitions instead of a stale zero on followers. - Read the cross-tier value through the cache in FetchOffsetHandler, populating it from the control plane on a miss. - Return max(classic earliest, control-plane cross-tier) for consolidating partitions in DisklessFetchOffsetRouter. Co-authored-by: Cursor <cursoragent@cursor.com>
…test Add a system test that verifies the cross-tier earliest offset is reported and served correctly for a born-consolidated topic after retention reclaims data across tiers. Uses a deterministic two-cohort design with a retention freeze: a first cohort is reclaimed while a second survives, and the test asserts that ListOffsets(EARLIEST) advances past zero (and no further than the surviving cohort) on a follower. Extends consolidation_verifier with the supporting earliest-offset helpers. Co-authored-by: Cursor <cursoragent@cursor.com>
556b234 to
c80e5d4
Compare
b96ef10 to
8aea707
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
For born-consolidated (diskless + tiered) topics, the leader's RLM correctly
advances the earliest readable offset as retention reclaims data across the WAL,
local, and remote tiers. That advance was never propagated, so the control plane
and followers kept a stale view:
ListOffsets(EARLIEST_TIMESTAMP)returned0on followers even after the underlying data had been deleted.
This PR makes the Inkless control plane the source of truth for the
"cross-tier log start offset" — the earliest physically readable offset across
the remote + local tiers — and serves it on the read path, with a cache to keep
the hot path off the control plane.
How it works
remote_log_start_offsetcolumn on thelogstable and anadvance_cross_tier_log_start_offset_v1function (monotonic).list_offsets_v1returns this value for
EARLIEST_TIMESTAMPqueries.the control plane via
CrossTierLogStartReporter(diskless-only, monotonic,buffered/flushed), writing through to the cache.
ListOffsets(EARLIEST)returnsmax(classic earliest, control-plane cross-tier)for consolidating partitions, reading through thecache and populating it from the control plane on a miss.
CrossTierLogStartCache(Caffeine), with ano-op implementation and hit/miss/size metrics.
Reviewing
The change is organized into 5 layered commits, each self-contained:
track cross-tier log start offset in the control plane— schema (migrationsV15/V16), jOOQ,
ControlPlaneAPI + InMemory/Postgres impls.cache the cross-tier log start offset— cache abstraction + config.report leader cross-tier log start offset to the control plane— write pathserve cross-tier earliest offset on ListOffsets— read path.add cross-tier retention reclaim system test— system test.Configuration & metrics
inkless.cross.tier.log.start.cache.*(seedocs/inkless/configs.rst).(hits/misses/size), plus a control-plane query metric (see
docs/inkless/metrics.rst).Testing
the reporter, and the read-through handler.
consolidation_retention_across_tiers_test.py) using adeterministic two-cohort + retention-freeze design: one cohort is reclaimed
while another survives, and the test asserts
ListOffsets(EARLIEST)advancespast zero (and no further than the surviving cohort) on a follower.