From 39c1fd0d1dd9e718609f3e237db04d73fbcdea72 Mon Sep 17 00:00:00 2001 From: tony tang Date: Wed, 24 Jun 2026 12:42:02 -0500 Subject: [PATCH] KAFKA-20726: Don't reuse the BufferSupplier in kraft state machine KRaftControlRecordStateMachine reused a single BufferSupplier across all reads. The supplier returned by BufferSupplier.create() caches ByteBuffers by capacity without eviction, so the cached buffers lived as long as the state machine and consumed memory unboundedly. Create a fresh BufferSupplier per read in maybeLoadLog() and maybeLoadSnapshot() instead, allowing it to be released once the read completes. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../apache/kafka/raft/KafkaRaftClient.java | 1 - .../KRaftControlRecordStateMachine.java | 46 +++++++++---------- .../KRaftControlRecordStateMachineTest.java | 1 - 3 files changed, 22 insertions(+), 26 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 824a4c4a8ba08..44f6013e10812 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -499,7 +499,6 @@ public void initialize( staticVoters, log, serde, - BufferSupplier.create(), MAX_BATCH_SIZE_BYTES, logContext, kafkaRaftMetrics, diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java index 94bd8f526961c..f1896d0d69802 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java @@ -55,7 +55,6 @@ public final class KRaftControlRecordStateMachine { private final LogContext logContext; private final RaftLog log; private final RecordSerde serde; - private final BufferSupplier bufferSupplier; private final Logger logger; private final int maxBatchSizeBytes; @@ -82,7 +81,6 @@ public final class KRaftControlRecordStateMachine { * @param staticVoterSet the set of voter statically configured * @param log the on disk topic partition * @param serde the record decoder for data records - * @param bufferSupplier the supplier of byte buffers * @param maxBatchSizeBytes the maximum size of record batch * @param logContext the log context */ @@ -90,7 +88,6 @@ public KRaftControlRecordStateMachine( VoterSet staticVoterSet, RaftLog log, RecordSerde serde, - BufferSupplier bufferSupplier, int maxBatchSizeBytes, LogContext logContext, KafkaRaftMetrics kafkaRaftMetrics, @@ -100,7 +97,6 @@ public KRaftControlRecordStateMachine( this.log = log; this.voterSetHistory = new VoterSetHistory(staticVoterSet, logContext); this.serde = serde; - this.bufferSupplier = bufferSupplier; this.maxBatchSizeBytes = maxBatchSizeBytes; this.logger = logContext.logger(getClass()); this.kafkaRaftMetrics = kafkaRaftMetrics; @@ -232,25 +228,27 @@ private void checkOffsetIsValid(long offset) { } private void maybeLoadLog() { - while (log.endOffset().offset() > nextOffset) { - LogFetchInfo info = log.read( - nextOffset, - Isolation.UNCOMMITTED, - Integer.MAX_VALUE - ); - try (RecordsIterator iterator = new RecordsIterator<>( - info.records, - serde, - bufferSupplier, - maxBatchSizeBytes, - true, // Validate batch CRC - logContext - ) - ) { - while (iterator.hasNext()) { - Batch batch = iterator.next(); - handleBatch(batch, OptionalLong.empty()); - nextOffset = batch.lastOffset() + 1; + try (BufferSupplier bufferSupplier = BufferSupplier.create()) { + while (log.endOffset().offset() > nextOffset) { + LogFetchInfo info = log.read( + nextOffset, + Isolation.UNCOMMITTED, + Integer.MAX_VALUE + ); + try (RecordsIterator iterator = new RecordsIterator<>( + info.records, + serde, + bufferSupplier, + maxBatchSizeBytes, + true, // Validate batch CRC + logContext + ) + ) { + while (iterator.hasNext()) { + Batch batch = iterator.next(); + handleBatch(batch, OptionalLong.empty()); + nextOffset = batch.lastOffset() + 1; + } } } } @@ -271,7 +269,7 @@ private void maybeLoadSnapshot() { try (SnapshotReader reader = RecordsSnapshotReader.of( rawSnapshot, serde, - bufferSupplier, + BufferSupplier.create(), maxBatchSizeBytes, true, // Validate batch CRC logContext diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachineTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachineTest.java index 84ca54f206a32..82def14f7f497 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachineTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachineTest.java @@ -58,7 +58,6 @@ private static KRaftControlRecordStateMachine buildPartitionListener( staticVoterSet, log, STRING_SERDE, - BufferSupplier.NO_CACHING, 1024, new LogContext(), raftMetrics,