From df85d102ac74c24e177393e102a6c7facd5467f4 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Mon, 29 Jun 2026 15:56:42 +0300 Subject: [PATCH] feat(inkless): add partition fan-in commit metrics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds two per-broker histograms on FileCommitter: PartitionsPerCommit (distinct topic-partitions in a committed file) and BatchesPerPartitionPerCommit (a partition's batch fan-in). Batches-per-partition is the workload dimension that drives batch coalescing, so it surfaces the per-partition coalescing opportunity. Computed in a single linear pass over the committed requests, which are already grouped by topic-partition (BatchBuffer.close sorts them) — no intermediate grouping map. Regenerates metrics.rst. Co-Authored-By: Claude Opus 4.8 --- docs/inkless/metrics.rst | 44 +++--- .../aiven/inkless/produce/FileCommitter.java | 4 + .../inkless/produce/FileCommitterMetrics.java | 48 +++++++ .../produce/FileCommitterMetricsTest.java | 130 ++++++++++++++++++ 4 files changed, 205 insertions(+), 21 deletions(-) create mode 100644 storage/inkless/src/test/java/io/aiven/inkless/produce/FileCommitterMetricsTest.java diff --git a/docs/inkless/metrics.rst b/docs/inkless/metrics.rst index 921b4a89590..095a3282782 100644 --- a/docs/inkless/metrics.rst +++ b/docs/inkless/metrics.rst @@ -177,27 +177,29 @@ FileCommitter metrics io.aiven.inkless.produce:type=FileCommitter ------------------------------------------- -======================== =========================================================================== -Attribute name Description -======================== =========================================================================== -BatchesCommitRate Rate of batches committed per second -BatchesCount Number of batches per committed file -CacheStoreTime Time spent storing file data in the cache after commit in milliseconds -CommitQueueBytes Current total bytes of files waiting to be committed -CommitQueueFiles Current number of files waiting to be committed -FileCommitErrorRate Rate of failed file commits per second -FileCommitRate Rate of successful file commits per second -FileCommitTime Time spent committing a file to the control plane in milliseconds -FileCommitWaitTime Time a file waits before commit starts in milliseconds -FileSize Size of committed files in bytes -FileTotalLifeTime Total lifetime of a file from creation to commit completion in milliseconds -FileUploadAndCommitTime Time spent uploading and committing a file in milliseconds -FileUploadErrorRate Rate of failed file uploads per second -FileUploadRate Rate of successful file uploads per second -FileUploadTime Time spent uploading a file to object storage in milliseconds -WriteErrorRate Rate of failed write operations per second -WriteRate Rate of successful write operations per second -======================== =========================================================================== +============================= ================================================================================================== +Attribute name Description +============================= ================================================================================================== +BatchesCommitRate Rate of batches committed per second +BatchesCount Number of batches per committed file +BatchesPerPartitionPerCommit Number of batches a single topic-partition contributes to a committed file (per-partition fan-in). +CacheStoreTime Time spent storing file data in the cache after commit in milliseconds +CommitQueueBytes Current total bytes of files waiting to be committed +CommitQueueFiles Current number of files waiting to be committed +FileCommitErrorRate Rate of failed file commits per second +FileCommitRate Rate of successful file commits per second +FileCommitTime Time spent committing a file to the control plane in milliseconds +FileCommitWaitTime Time a file waits before commit starts in milliseconds +FileSize Size of committed files in bytes +FileTotalLifeTime Total lifetime of a file from creation to commit completion in milliseconds +FileUploadAndCommitTime Time spent uploading and committing a file in milliseconds +FileUploadErrorRate Rate of failed file uploads per second +FileUploadRate Rate of successful file uploads per second +FileUploadTime Time spent uploading a file to object storage in milliseconds +PartitionsPerCommit Number of distinct topic-partitions in a committed file +WriteErrorRate Rate of failed write operations per second +WriteRate Rate of successful write operations per second +============================= ================================================================================================== FileCleaner metrics diff --git a/storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java b/storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java index dff48286e97..18f919e520c 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java @@ -182,6 +182,10 @@ void commit(final ClosedFile file) throws InterruptedException { } else { metrics.fileAdded(file.size()); metrics.batchesAdded(file.commitBatchRequests().size()); + // Partition fan-in (distinct partitions and batches-per-partition) characterizes the + // batch-coalescing opportunity. The requests are already grouped by topic-partition, so + // the metric computes both in one linear pass — no extra grouping here. + metrics.partitionFanInAdded(file.commitBatchRequests()); totalFilesInProgress.addAndGet(1); totalBytesInProgress.addAndGet(file.size()); diff --git a/storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitterMetrics.java b/storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitterMetrics.java index 3c88b58b0c2..3c62a348d8f 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitterMetrics.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitterMetrics.java @@ -18,6 +18,7 @@ package io.aiven.inkless.produce; import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.metrics.KafkaMetricsGroup; @@ -36,6 +37,7 @@ import java.util.function.Supplier; import io.aiven.inkless.TimeUtils; +import io.aiven.inkless.control_plane.CommitBatchRequest; @CoverageIgnore public class FileCommitterMetrics implements Closeable { @@ -71,6 +73,10 @@ public class FileCommitterMetrics implements Closeable { private static final String BATCHES_COUNT_DOC = "Number of batches per committed file"; private static final String BATCHES_COMMIT_RATE = "BatchesCommitRate"; private static final String BATCHES_COMMIT_RATE_DOC = "Rate of batches committed per second"; + private static final String PARTITIONS_PER_COMMIT = "PartitionsPerCommit"; + private static final String PARTITIONS_PER_COMMIT_DOC = "Number of distinct topic-partitions in a committed file"; + private static final String BATCHES_PER_PARTITION_PER_COMMIT = "BatchesPerPartitionPerCommit"; + private static final String BATCHES_PER_PARTITION_PER_COMMIT_DOC = "Number of batches a single topic-partition contributes to a committed file (per-partition fan-in)."; private static final String WRITE_RATE = "WriteRate"; private static final String WRITE_RATE_DOC = "Rate of successful write operations per second"; private static final String WRITE_ERROR_RATE = "WriteErrorRate"; @@ -94,6 +100,8 @@ public static List all() { new MetricNameTemplate(BATCHES_COMMIT_RATE, GROUP, BATCHES_COMMIT_RATE_DOC), new MetricNameTemplate(FILE_SIZE, GROUP, FILE_SIZE_DOC), new MetricNameTemplate(BATCHES_COUNT, GROUP, BATCHES_COUNT_DOC), + new MetricNameTemplate(PARTITIONS_PER_COMMIT, GROUP, PARTITIONS_PER_COMMIT_DOC), + new MetricNameTemplate(BATCHES_PER_PARTITION_PER_COMMIT, GROUP, BATCHES_PER_PARTITION_PER_COMMIT_DOC), new MetricNameTemplate(CACHE_STORE_TIME, GROUP, CACHE_STORE_TIME_DOC), new MetricNameTemplate(WRITE_RATE, GROUP, WRITE_RATE_DOC), new MetricNameTemplate(WRITE_ERROR_RATE, GROUP, WRITE_ERROR_RATE_DOC), @@ -113,6 +121,9 @@ public static List all() { private final Histogram fileCommitWaitTimeHistogram; private final Histogram fileSizeHistogram; private final Histogram batchesCountHistogram; + // Visible for testing. + final Histogram partitionsPerCommitHistogram; + final Histogram batchesPerPartitionPerCommitHistogram; private final Histogram cacheStoreTimeHistogram; private final Meter fileUploadRate; private final Meter fileUploadErrorRate; @@ -136,6 +147,8 @@ public static List all() { batchesCommitRate = metricsGroup.newMeter(BATCHES_COMMIT_RATE, "batches", TimeUnit.SECONDS, Map.of()); fileSizeHistogram = metricsGroup.newHistogram(FILE_SIZE, true, Map.of()); batchesCountHistogram = metricsGroup.newHistogram(BATCHES_COUNT, true, Map.of()); + partitionsPerCommitHistogram = metricsGroup.newHistogram(PARTITIONS_PER_COMMIT, true, Map.of()); + batchesPerPartitionPerCommitHistogram = metricsGroup.newHistogram(BATCHES_PER_PARTITION_PER_COMMIT, true, Map.of()); cacheStoreTimeHistogram = metricsGroup.newHistogram(CACHE_STORE_TIME, true, Map.of()); writeRate = metricsGroup.newMeter(WRITE_RATE, "writes", TimeUnit.SECONDS, Map.of()); writeErrorRate = metricsGroup.newMeter(WRITE_ERROR_RATE, "errors", TimeUnit.SECONDS, Map.of()); @@ -158,6 +171,39 @@ void batchesAdded(final int size) { batchesCommitRate.mark(size); } + /** + * Records the partition fan-in of a committed file: the number of distinct topic-partitions, and the + * number of batches each one contributed. + * + *

Relies on {@code commitBatchRequests} being grouped by topic-partition (they are sorted that way + * in {@link BatchBuffer#close()}), so partitions and their batch counts are computed in a single + * linear pass over contiguous runs — no intermediate grouping map. + * + * @param sortedRequests the committed batch requests, grouped by topic-partition + */ + void partitionFanInAdded(final List sortedRequests) { + if (sortedRequests.isEmpty()) { + return; + } + int partitions = 0; + int runLength = 0; + TopicIdPartition current = null; + for (final CommitBatchRequest request : sortedRequests) { + final TopicIdPartition partition = request.topicIdPartition(); + if (!partition.equals(current)) { + if (current != null) { + batchesPerPartitionPerCommitHistogram.update(runLength); + } + partitions++; + runLength = 0; + current = partition; + } + runLength++; + } + batchesPerPartitionPerCommitHistogram.update(runLength); // flush the final run + partitionsPerCommitHistogram.update(partitions); + } + void fileUploadFinished(final long durationMs) { fileUploadTimeHistogram.update(durationMs); fileUploadRate.mark(); @@ -215,6 +261,8 @@ public void close() throws IOException { metricsGroup.removeMetric(CACHE_STORE_TIME); metricsGroup.removeMetric(BATCHES_COUNT); metricsGroup.removeMetric(BATCHES_COMMIT_RATE); + metricsGroup.removeMetric(PARTITIONS_PER_COMMIT); + metricsGroup.removeMetric(BATCHES_PER_PARTITION_PER_COMMIT); metricsGroup.removeMetric(WRITE_RATE); metricsGroup.removeMetric(WRITE_ERROR_RATE); } diff --git a/storage/inkless/src/test/java/io/aiven/inkless/produce/FileCommitterMetricsTest.java b/storage/inkless/src/test/java/io/aiven/inkless/produce/FileCommitterMetricsTest.java new file mode 100644 index 00000000000..b29689f89e8 --- /dev/null +++ b/storage/inkless/src/test/java/io/aiven/inkless/produce/FileCommitterMetricsTest.java @@ -0,0 +1,130 @@ +/* + * Inkless + * Copyright (C) 2024 - 2026 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.produce; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.utils.MockTime; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import io.aiven.inkless.control_plane.CommitBatchRequest; + +import static org.assertj.core.api.Assertions.assertThat; + +class FileCommitterMetricsTest { + static final Uuid TOPIC_ID = new Uuid(1, 2); + static final TopicIdPartition T0 = new TopicIdPartition(TOPIC_ID, 0, "t"); + static final TopicIdPartition T1 = new TopicIdPartition(TOPIC_ID, 1, "t"); + static final TopicIdPartition T2 = new TopicIdPartition(TOPIC_ID, 2, "t"); + + FileCommitterMetrics metrics; + + @BeforeEach + void setUp() { + metrics = new FileCommitterMetrics(new MockTime()); + } + + @AfterEach + void tearDown() throws IOException { + metrics.close(); + } + + private static CommitBatchRequest request(final TopicIdPartition tp, final int byteOffset) { + return CommitBatchRequest.of(0, tp, byteOffset, 10, 0, 0, 1000, TimestampType.CREATE_TIME); + } + + /** + * Builds a request list grouped by partition (as BatchBuffer.close() produces), given a fan-in per partition. + **/ + private static List grouped(final TopicIdPartition... partitionPerBatch) { + final List requests = new ArrayList<>(); + int byteOffset = 0; + for (final TopicIdPartition tp : partitionPerBatch) { + requests.add(request(tp, byteOffset)); + byteOffset += 10; + } + return requests; + } + + @Test + void emptyIsNoOp() { + metrics.partitionFanInAdded(List.of()); + assertThat(metrics.partitionsPerCommitHistogram.count()).isZero(); + assertThat(metrics.batchesPerPartitionPerCommitHistogram.count()).isZero(); + } + + @Test + void singlePartitionSingleBatch() { + metrics.partitionFanInAdded(grouped(T0)); + + // 1 commit observed -> 1 partition; 1 partition observed -> fan-in 1. + assertThat(metrics.partitionsPerCommitHistogram.count()).isEqualTo(1); + assertThat(metrics.partitionsPerCommitHistogram.max()).isEqualTo(1.0); + assertThat(metrics.batchesPerPartitionPerCommitHistogram.count()).isEqualTo(1); + assertThat(metrics.batchesPerPartitionPerCommitHistogram.max()).isEqualTo(1.0); + } + + @Test + void singlePartitionManyBatchesCountsAsOneRunWithFullFanIn() { + metrics.partitionFanInAdded(grouped(T0, T0, T0, T0, T0)); + + assertThat(metrics.partitionsPerCommitHistogram.count()).isEqualTo(1); + assertThat(metrics.partitionsPerCommitHistogram.max()).isEqualTo(1.0); // one distinct partition + assertThat(metrics.batchesPerPartitionPerCommitHistogram.count()).isEqualTo(1); // one partition observed + assertThat(metrics.batchesPerPartitionPerCommitHistogram.max()).isEqualTo(5.0); // fan-in 5 + } + + @Test + void multiplePartitionsWithDifferentFanIn() { + // Grouped by partition (the invariant from BatchBuffer.close()): T0 x2, T1 x1, T2 x3. + metrics.partitionFanInAdded(grouped(T0, T0, T1, T2, T2, T2)); + + // 3 distinct partitions in this one commit. + assertThat(metrics.partitionsPerCommitHistogram.count()).isEqualTo(1); + assertThat(metrics.partitionsPerCommitHistogram.max()).isEqualTo(3.0); + + // 3 partitions observed; fan-ins were 2, 1, 3 -> min 1, max 3. + assertThat(metrics.batchesPerPartitionPerCommitHistogram.count()).isEqualTo(3); + assertThat(metrics.batchesPerPartitionPerCommitHistogram.max()).isEqualTo(3.0); + assertThat(metrics.batchesPerPartitionPerCommitHistogram.min()).isEqualTo(1.0); + } + + @Test + void accumulatesAcrossCommits() { + metrics.partitionFanInAdded(grouped(T0, T0)); // 1 partition, fan-in 2 + metrics.partitionFanInAdded(grouped(T0, T1, T2)); // 3 partitions, fan-in 1 each + + // Two commits observed. + assertThat(metrics.partitionsPerCommitHistogram.count()).isEqualTo(2); + assertThat(metrics.partitionsPerCommitHistogram.max()).isEqualTo(3.0); + assertThat(metrics.partitionsPerCommitHistogram.min()).isEqualTo(1.0); + + // Partitions observed across both commits: 1 + 3 = 4; max fan-in was 2. + assertThat(metrics.batchesPerPartitionPerCommitHistogram.count()).isEqualTo(4); + assertThat(metrics.batchesPerPartitionPerCommitHistogram.max()).isEqualTo(2.0); + assertThat(metrics.batchesPerPartitionPerCommitHistogram.min()).isEqualTo(1.0); + } +}