Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,6 @@ public void initialize(
staticVoters,
log,
serde,
BufferSupplier.create(),
MAX_BATCH_SIZE_BYTES,
logContext,
kafkaRaftMetrics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ public final class KRaftControlRecordStateMachine {
private final LogContext logContext;
private final RaftLog log;
private final RecordSerde<?> serde;
private final BufferSupplier bufferSupplier;
private final Logger logger;
private final int maxBatchSizeBytes;

Expand All @@ -82,15 +81,13 @@ public final class KRaftControlRecordStateMachine {
* @param staticVoterSet the set of voter statically configured
* @param log the on disk topic partition
* @param serde the record decoder for data records
* @param bufferSupplier the supplier of byte buffers
* @param maxBatchSizeBytes the maximum size of record batch
* @param logContext the log context
*/
public KRaftControlRecordStateMachine(
VoterSet staticVoterSet,
RaftLog log,
RecordSerde<?> serde,
BufferSupplier bufferSupplier,
int maxBatchSizeBytes,
LogContext logContext,
KafkaRaftMetrics kafkaRaftMetrics,
Expand All @@ -100,7 +97,6 @@ public KRaftControlRecordStateMachine(
this.log = log;
this.voterSetHistory = new VoterSetHistory(staticVoterSet, logContext);
this.serde = serde;
this.bufferSupplier = bufferSupplier;
this.maxBatchSizeBytes = maxBatchSizeBytes;
this.logger = logContext.logger(getClass());
this.kafkaRaftMetrics = kafkaRaftMetrics;
Expand Down Expand Up @@ -232,25 +228,27 @@ private void checkOffsetIsValid(long offset) {
}

private void maybeLoadLog() {
while (log.endOffset().offset() > nextOffset) {
LogFetchInfo info = log.read(
nextOffset,
Isolation.UNCOMMITTED,
Integer.MAX_VALUE
);
try (RecordsIterator<?> iterator = new RecordsIterator<>(
info.records,
serde,
bufferSupplier,
maxBatchSizeBytes,
true, // Validate batch CRC
logContext
)
) {
while (iterator.hasNext()) {
Batch<?> batch = iterator.next();
handleBatch(batch, OptionalLong.empty());
nextOffset = batch.lastOffset() + 1;
try (BufferSupplier bufferSupplier = BufferSupplier.create()) {
while (log.endOffset().offset() > nextOffset) {
LogFetchInfo info = log.read(
nextOffset,
Isolation.UNCOMMITTED,
Integer.MAX_VALUE
);
try (RecordsIterator<?> iterator = new RecordsIterator<>(
info.records,
serde,
bufferSupplier,
maxBatchSizeBytes,
true, // Validate batch CRC
logContext
)
) {
while (iterator.hasNext()) {
Batch<?> batch = iterator.next();
handleBatch(batch, OptionalLong.empty());
nextOffset = batch.lastOffset() + 1;
}
}
}
}
Expand All @@ -271,7 +269,7 @@ private void maybeLoadSnapshot() {
try (SnapshotReader<?> reader = RecordsSnapshotReader.of(
rawSnapshot,
serde,
bufferSupplier,
BufferSupplier.create(),
maxBatchSizeBytes,
true, // Validate batch CRC
logContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ private static KRaftControlRecordStateMachine buildPartitionListener(
staticVoterSet,
log,
STRING_SERDE,
BufferSupplier.NO_CACHING,
1024,
new LogContext(),
raftMetrics,
Expand Down