Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 23 additions & 21 deletions docs/inkless/metrics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -177,27 +177,29 @@ FileCommitter metrics
io.aiven.inkless.produce:type=FileCommitter
-------------------------------------------

======================== ===========================================================================
Attribute name Description
======================== ===========================================================================
BatchesCommitRate Rate of batches committed per second
BatchesCount Number of batches per committed file
CacheStoreTime Time spent storing file data in the cache after commit in milliseconds
CommitQueueBytes Current total bytes of files waiting to be committed
CommitQueueFiles Current number of files waiting to be committed
FileCommitErrorRate Rate of failed file commits per second
FileCommitRate Rate of successful file commits per second
FileCommitTime Time spent committing a file to the control plane in milliseconds
FileCommitWaitTime Time a file waits before commit starts in milliseconds
FileSize Size of committed files in bytes
FileTotalLifeTime Total lifetime of a file from creation to commit completion in milliseconds
FileUploadAndCommitTime Time spent uploading and committing a file in milliseconds
FileUploadErrorRate Rate of failed file uploads per second
FileUploadRate Rate of successful file uploads per second
FileUploadTime Time spent uploading a file to object storage in milliseconds
WriteErrorRate Rate of failed write operations per second
WriteRate Rate of successful write operations per second
======================== ===========================================================================
============================= ==================================================================================================
Attribute name Description
============================= ==================================================================================================
BatchesCommitRate Rate of batches committed per second
BatchesCount Number of batches per committed file
BatchesPerPartitionPerCommit Number of batches a single topic-partition contributes to a committed file (per-partition fan-in).
CacheStoreTime Time spent storing file data in the cache after commit in milliseconds
CommitQueueBytes Current total bytes of files waiting to be committed
CommitQueueFiles Current number of files waiting to be committed
FileCommitErrorRate Rate of failed file commits per second
FileCommitRate Rate of successful file commits per second
FileCommitTime Time spent committing a file to the control plane in milliseconds
FileCommitWaitTime Time a file waits before commit starts in milliseconds
FileSize Size of committed files in bytes
FileTotalLifeTime Total lifetime of a file from creation to commit completion in milliseconds
FileUploadAndCommitTime Time spent uploading and committing a file in milliseconds
FileUploadErrorRate Rate of failed file uploads per second
FileUploadRate Rate of successful file uploads per second
FileUploadTime Time spent uploading a file to object storage in milliseconds
PartitionsPerCommit Number of distinct topic-partitions in a committed file
WriteErrorRate Rate of failed write operations per second
WriteRate Rate of successful write operations per second
============================= ==================================================================================================


FileCleaner metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ void commit(final ClosedFile file) throws InterruptedException {
} else {
metrics.fileAdded(file.size());
metrics.batchesAdded(file.commitBatchRequests().size());
// Partition fan-in (distinct partitions and batches-per-partition) characterizes the
// batch-coalescing opportunity. The requests are already grouped by topic-partition, so
// the metric computes both in one linear pass — no extra grouping here.
metrics.partitionFanInAdded(file.commitBatchRequests());
totalFilesInProgress.addAndGet(1);
totalBytesInProgress.addAndGet(file.size());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package io.aiven.inkless.produce;

import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;

Expand All @@ -36,6 +37,7 @@
import java.util.function.Supplier;

import io.aiven.inkless.TimeUtils;
import io.aiven.inkless.control_plane.CommitBatchRequest;

@CoverageIgnore
public class FileCommitterMetrics implements Closeable {
Expand Down Expand Up @@ -71,6 +73,10 @@ public class FileCommitterMetrics implements Closeable {
private static final String BATCHES_COUNT_DOC = "Number of batches per committed file";
private static final String BATCHES_COMMIT_RATE = "BatchesCommitRate";
private static final String BATCHES_COMMIT_RATE_DOC = "Rate of batches committed per second";
private static final String PARTITIONS_PER_COMMIT = "PartitionsPerCommit";
private static final String PARTITIONS_PER_COMMIT_DOC = "Number of distinct topic-partitions in a committed file";
private static final String BATCHES_PER_PARTITION_PER_COMMIT = "BatchesPerPartitionPerCommit";
private static final String BATCHES_PER_PARTITION_PER_COMMIT_DOC = "Number of batches a single topic-partition contributes to a committed file (per-partition fan-in).";
private static final String WRITE_RATE = "WriteRate";
private static final String WRITE_RATE_DOC = "Rate of successful write operations per second";
private static final String WRITE_ERROR_RATE = "WriteErrorRate";
Expand All @@ -94,6 +100,8 @@ public static List<MetricNameTemplate> all() {
new MetricNameTemplate(BATCHES_COMMIT_RATE, GROUP, BATCHES_COMMIT_RATE_DOC),
new MetricNameTemplate(FILE_SIZE, GROUP, FILE_SIZE_DOC),
new MetricNameTemplate(BATCHES_COUNT, GROUP, BATCHES_COUNT_DOC),
new MetricNameTemplate(PARTITIONS_PER_COMMIT, GROUP, PARTITIONS_PER_COMMIT_DOC),
new MetricNameTemplate(BATCHES_PER_PARTITION_PER_COMMIT, GROUP, BATCHES_PER_PARTITION_PER_COMMIT_DOC),
new MetricNameTemplate(CACHE_STORE_TIME, GROUP, CACHE_STORE_TIME_DOC),
new MetricNameTemplate(WRITE_RATE, GROUP, WRITE_RATE_DOC),
new MetricNameTemplate(WRITE_ERROR_RATE, GROUP, WRITE_ERROR_RATE_DOC),
Expand All @@ -113,6 +121,9 @@ public static List<MetricNameTemplate> all() {
private final Histogram fileCommitWaitTimeHistogram;
private final Histogram fileSizeHistogram;
private final Histogram batchesCountHistogram;
// Visible for testing.
final Histogram partitionsPerCommitHistogram;
final Histogram batchesPerPartitionPerCommitHistogram;
private final Histogram cacheStoreTimeHistogram;
private final Meter fileUploadRate;
private final Meter fileUploadErrorRate;
Expand All @@ -136,6 +147,8 @@ public static List<MetricNameTemplate> all() {
batchesCommitRate = metricsGroup.newMeter(BATCHES_COMMIT_RATE, "batches", TimeUnit.SECONDS, Map.of());
fileSizeHistogram = metricsGroup.newHistogram(FILE_SIZE, true, Map.of());
batchesCountHistogram = metricsGroup.newHistogram(BATCHES_COUNT, true, Map.of());
partitionsPerCommitHistogram = metricsGroup.newHistogram(PARTITIONS_PER_COMMIT, true, Map.of());
batchesPerPartitionPerCommitHistogram = metricsGroup.newHistogram(BATCHES_PER_PARTITION_PER_COMMIT, true, Map.of());
cacheStoreTimeHistogram = metricsGroup.newHistogram(CACHE_STORE_TIME, true, Map.of());
writeRate = metricsGroup.newMeter(WRITE_RATE, "writes", TimeUnit.SECONDS, Map.of());
writeErrorRate = metricsGroup.newMeter(WRITE_ERROR_RATE, "errors", TimeUnit.SECONDS, Map.of());
Expand All @@ -158,6 +171,39 @@ void batchesAdded(final int size) {
batchesCommitRate.mark(size);
}

/**
* Records the partition fan-in of a committed file: the number of distinct topic-partitions, and the
* number of batches each one contributed.
*
* <p>Relies on {@code commitBatchRequests} being grouped by topic-partition (they are sorted that way
* in {@link BatchBuffer#close()}), so partitions and their batch counts are computed in a single
* linear pass over contiguous runs — no intermediate grouping map.
*
* @param sortedRequests the committed batch requests, grouped by topic-partition
*/
void partitionFanInAdded(final List<CommitBatchRequest> sortedRequests) {
if (sortedRequests.isEmpty()) {
return;
}
int partitions = 0;
int runLength = 0;
TopicIdPartition current = null;
for (final CommitBatchRequest request : sortedRequests) {
final TopicIdPartition partition = request.topicIdPartition();
if (!partition.equals(current)) {
if (current != null) {
Comment thread
jeqo marked this conversation as resolved.
batchesPerPartitionPerCommitHistogram.update(runLength);
}
partitions++;
runLength = 0;
current = partition;
}
runLength++;
}
batchesPerPartitionPerCommitHistogram.update(runLength); // flush the final run
partitionsPerCommitHistogram.update(partitions);
}

void fileUploadFinished(final long durationMs) {
fileUploadTimeHistogram.update(durationMs);
fileUploadRate.mark();
Expand Down Expand Up @@ -215,6 +261,8 @@ public void close() throws IOException {
metricsGroup.removeMetric(CACHE_STORE_TIME);
metricsGroup.removeMetric(BATCHES_COUNT);
metricsGroup.removeMetric(BATCHES_COMMIT_RATE);
metricsGroup.removeMetric(PARTITIONS_PER_COMMIT);
metricsGroup.removeMetric(BATCHES_PER_PARTITION_PER_COMMIT);
metricsGroup.removeMetric(WRITE_RATE);
metricsGroup.removeMetric(WRITE_ERROR_RATE);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Inkless
* Copyright (C) 2024 - 2026 Aiven OY
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package io.aiven.inkless.produce;

import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.MockTime;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import io.aiven.inkless.control_plane.CommitBatchRequest;

import static org.assertj.core.api.Assertions.assertThat;

class FileCommitterMetricsTest {
static final Uuid TOPIC_ID = new Uuid(1, 2);
static final TopicIdPartition T0 = new TopicIdPartition(TOPIC_ID, 0, "t");
static final TopicIdPartition T1 = new TopicIdPartition(TOPIC_ID, 1, "t");
static final TopicIdPartition T2 = new TopicIdPartition(TOPIC_ID, 2, "t");

FileCommitterMetrics metrics;

@BeforeEach
void setUp() {
metrics = new FileCommitterMetrics(new MockTime());
}

@AfterEach
void tearDown() throws IOException {
metrics.close();
}

private static CommitBatchRequest request(final TopicIdPartition tp, final int byteOffset) {
return CommitBatchRequest.of(0, tp, byteOffset, 10, 0, 0, 1000, TimestampType.CREATE_TIME);
}

/**
* Builds a request list grouped by partition (as BatchBuffer.close() produces), given a fan-in per partition.
**/
private static List<CommitBatchRequest> grouped(final TopicIdPartition... partitionPerBatch) {
final List<CommitBatchRequest> requests = new ArrayList<>();
int byteOffset = 0;
for (final TopicIdPartition tp : partitionPerBatch) {
requests.add(request(tp, byteOffset));
byteOffset += 10;
}
return requests;
}

@Test
void emptyIsNoOp() {
metrics.partitionFanInAdded(List.of());
assertThat(metrics.partitionsPerCommitHistogram.count()).isZero();
assertThat(metrics.batchesPerPartitionPerCommitHistogram.count()).isZero();
}

@Test
void singlePartitionSingleBatch() {
metrics.partitionFanInAdded(grouped(T0));

// 1 commit observed -> 1 partition; 1 partition observed -> fan-in 1.
assertThat(metrics.partitionsPerCommitHistogram.count()).isEqualTo(1);
assertThat(metrics.partitionsPerCommitHistogram.max()).isEqualTo(1.0);
assertThat(metrics.batchesPerPartitionPerCommitHistogram.count()).isEqualTo(1);
assertThat(metrics.batchesPerPartitionPerCommitHistogram.max()).isEqualTo(1.0);
}

@Test
void singlePartitionManyBatchesCountsAsOneRunWithFullFanIn() {
metrics.partitionFanInAdded(grouped(T0, T0, T0, T0, T0));

assertThat(metrics.partitionsPerCommitHistogram.count()).isEqualTo(1);
assertThat(metrics.partitionsPerCommitHistogram.max()).isEqualTo(1.0); // one distinct partition
assertThat(metrics.batchesPerPartitionPerCommitHistogram.count()).isEqualTo(1); // one partition observed
assertThat(metrics.batchesPerPartitionPerCommitHistogram.max()).isEqualTo(5.0); // fan-in 5
}

@Test
void multiplePartitionsWithDifferentFanIn() {
// Grouped by partition (the invariant from BatchBuffer.close()): T0 x2, T1 x1, T2 x3.
metrics.partitionFanInAdded(grouped(T0, T0, T1, T2, T2, T2));

// 3 distinct partitions in this one commit.
assertThat(metrics.partitionsPerCommitHistogram.count()).isEqualTo(1);
assertThat(metrics.partitionsPerCommitHistogram.max()).isEqualTo(3.0);

// 3 partitions observed; fan-ins were 2, 1, 3 -> min 1, max 3.
assertThat(metrics.batchesPerPartitionPerCommitHistogram.count()).isEqualTo(3);
assertThat(metrics.batchesPerPartitionPerCommitHistogram.max()).isEqualTo(3.0);
assertThat(metrics.batchesPerPartitionPerCommitHistogram.min()).isEqualTo(1.0);
}

@Test
void accumulatesAcrossCommits() {
metrics.partitionFanInAdded(grouped(T0, T0)); // 1 partition, fan-in 2
metrics.partitionFanInAdded(grouped(T0, T1, T2)); // 3 partitions, fan-in 1 each

// Two commits observed.
assertThat(metrics.partitionsPerCommitHistogram.count()).isEqualTo(2);
assertThat(metrics.partitionsPerCommitHistogram.max()).isEqualTo(3.0);
assertThat(metrics.partitionsPerCommitHistogram.min()).isEqualTo(1.0);

// Partitions observed across both commits: 1 + 3 = 4; max fan-in was 2.
assertThat(metrics.batchesPerPartitionPerCommitHistogram.count()).isEqualTo(4);
assertThat(metrics.batchesPerPartitionPerCommitHistogram.max()).isEqualTo(2.0);
assertThat(metrics.batchesPerPartitionPerCommitHistogram.min()).isEqualTo(1.0);
}
}
Loading