diff --git a/build.gradle b/build.gradle
index 5b8c8e8c874b6..e3b6bf069f766 100644
--- a/build.gradle
+++ b/build.gradle
@@ -3613,6 +3613,7 @@ project(':jmh-benchmarks') {
implementation testFixtures(project(':clients'))
implementation testFixtures(project(':server-common'))
implementation testFixtures(project(':metadata'))
+ implementation testFixtures(project(':raft'))
implementation libs.jmhCore
annotationProcessor libs.jmhGeneratorAnnProcess
diff --git a/checkstyle/import-control-jmh-benchmarks.xml b/checkstyle/import-control-jmh-benchmarks.xml
index 8828e713e49f5..2cc40b78bde2a 100644
--- a/checkstyle/import-control-jmh-benchmarks.xml
+++ b/checkstyle/import-control-jmh-benchmarks.xml
@@ -65,4 +65,7 @@
+
+
+
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/raft/ElectionBenchmarks.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/raft/ElectionBenchmarks.java
new file mode 100644
index 0000000000000..5d9e30bb466a8
--- /dev/null
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/raft/ElectionBenchmarks.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.raft;
+
+import org.apache.kafka.raft.RaftClientBenchmarkContext;
+import org.apache.kafka.raft.RaftClientTestContext;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Benchmarks for the leader-election path. The outer class is intentionally not a JMH {@code @State}:
+ * each benchmark declares the starting state it needs as a nested {@code @State} parameter, so
+ * different election scenarios (e.g. a future Prospective or Candidate start) can have their own
+ * setup without forcing a single shared {@code @Setup} on the whole class.
+ */
+@BenchmarkMode(Mode.SingleShotTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+@Warmup(iterations = RaftClientBenchmarkContext.SINGLE_SHOT_WARMUP_ITERATIONS)
+@Measurement(iterations = RaftClientBenchmarkContext.SINGLE_SHOT_MEASUREMENT_ITERATIONS)
+@Fork(RaftClientBenchmarkContext.SINGLE_SHOT_FORKS)
+public class ElectionBenchmarks {
+
+ /**
+ * Starting state: the local node is Unattached in a {@code voterCount}-node quorum. A fresh
+ * context is built per invocation because driving the election to completion consumes it.
+ */
+ @State(Scope.Thread)
+ public static class UnattachedQuorum {
+ @Param({"3", "5"})
+ public int voterCount;
+
+ RaftClientBenchmarkContext benchmark;
+ RaftClientTestContext context;
+
+ @Setup(Level.Invocation)
+ public void setup() throws Exception {
+ benchmark = RaftClientBenchmarkContext.unattached(voterCount);
+ context = benchmark.testContext();
+ benchmark.zeroCountersOnSetup();
+ }
+ }
+
+ /** A multi-voter quorum elects the local node as leader. */
+ @Benchmark
+ public void electLeader(UnattachedQuorum state, KRaftBenchmarkingCounters counters) throws Exception {
+ state.context.unattachedToLeader();
+
+ counters.drainFrom(state.benchmark, Optional.empty(), Optional.empty());
+ }
+}
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/raft/KRaftBenchmarkingCounters.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/raft/KRaftBenchmarkingCounters.java
new file mode 100644
index 0000000000000..83c01e95af1be
--- /dev/null
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/raft/KRaftBenchmarkingCounters.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.raft;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.raft.RaftClientBenchmarkContext;
+
+import org.openjdk.jmh.annotations.AuxCounters;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.infra.BenchmarkParams;
+
+import java.util.Optional;
+
+/**
+ * Secondary, machine-independent work counters reported by the raft benchmarks alongside the timing
+ * score, as {@code benchmark:counter} rows.
+ *
+ *
Throughout this class, an operation is JMH's unit of work: a single invocation of a
+ * {@code @Benchmark}-annotated method. (One operation equals one invocation here because we don't use
+ * {@code @OperationsPerInvocation}.) JMH reports the timing score in {@code ns/op}, and these work
+ * counters are reported {@code PerOp} to match.
+ *
+ *
Each benchmark calls {@link #drainFrom} every invocation to accumulate the work deltas drained
+ * from {@link RaftClientBenchmarkContext}. The raw totals are private accumulators; what we report
+ * are the per-operation values from the {@code *PerOp()} methods (the quantity of interest), plus
+ * {@link #operations}.
+ *
+ *
JMH aggregates {@code Type.EVENTS} secondary results with {@code SUM} across all measurement
+ * data points i.e {@code forks x measurement iterations}. To make the summary row
+ * report the true per-operation value rather than that value multiplied by the data-point count, each
+ * method pre-divides by the data-point count obtained from {@link BenchmarkParams} in
+ * {@link #captureRunShape}. The SUM then reconstitutes the exact per-operation value (e.g.
+ * {@code logReadsPerOp = 1.0}) in the summary, for any {@code -f}/{@code -i} configuration. (The
+ * per-iteration console values are correspondingly a small fraction of the per-op value; read the
+ * summary row.)
+ *
+ *
The per-operation values are integer-exact and should be stable across a correct refactor of
+ * {@code KafkaRaftClient}: a flush count moving from 1 to 2 per operation is a behavioral diff, not
+ * measurement noise. The counters that are zero on a path (e.g. log flushes on a caught-up fetch)
+ * are the most useful tripwires, since zero is speed-independent.
+ */
+@State(Scope.Thread)
+@AuxCounters(AuxCounters.Type.EVENTS)
+public class KRaftBenchmarkingCounters {
+ private long logFlushesTotal;
+ private long logReadsTotal;
+ private long logTruncationsTotal;
+ private long rpcRequestsSentTotal;
+ private long rpcResponsesSentTotal;
+ private long quorumStateWritesTotal;
+ private long quorumStateReadsTotal;
+
+ // Reported: the number of operations (i.e. @Benchmark method invocations) measured in the
+ // iteration, and the divisor for the per-operation values below.
+ public long operations;
+
+ // The number of measurement data points JMH will SUM the per-op methods over, i.e.
+ // (forks x measurement iterations) for this run. Captured from BenchmarkParams so it tracks the
+ // actual run shape (including -f/-i overrides) rather than being hardcoded.
+ private double measurementDataPoints = 1.0;
+
+ @Setup(Level.Trial)
+ public void captureRunShape(BenchmarkParams params) {
+ // forks() is 0 when forking is disabled (in-process), which is still one set of iterations.
+ int forks = Math.max(1, params.getForks());
+ measurementDataPoints = (double) forks * params.getMeasurement().getCount();
+ }
+
+ @Setup(Level.Iteration)
+ public void reset() {
+ logFlushesTotal = 0;
+ logReadsTotal = 0;
+ logTruncationsTotal = 0;
+ rpcRequestsSentTotal = 0;
+ rpcResponsesSentTotal = 0;
+ quorumStateWritesTotal = 0;
+ quorumStateReadsTotal = 0;
+ }
+
+ /**
+ * Accumulates this invocation's work deltas drained from {@code context} into these counters.
+ * {@code expectedRequest}/{@code expectedResponse}, if present, restrict the RPC request/response
+ * counts to that API key (e.g. {@code FETCH}); empty counts all.
+ */
+ public void drainFrom(
+ RaftClientBenchmarkContext context,
+ Optional expectedRequest,
+ Optional expectedResponse
+ ) {
+ logFlushesTotal += context.drainLogFlushes();
+ logReadsTotal += context.drainLogReads();
+ logTruncationsTotal += context.drainLogTruncations();
+ rpcRequestsSentTotal += context.drainRpcRequestsSent(expectedRequest);
+ rpcResponsesSentTotal += context.drainRpcResponsesSent(expectedResponse);
+ quorumStateWritesTotal += context.drainQuorumStateWrites();
+ quorumStateReadsTotal += context.drainQuorumStateReads();
+ operations += 1;
+ }
+
+ public double logFlushesPerOp() {
+ return perOperation(logFlushesTotal);
+ }
+
+ public double logReadsPerOp() {
+ return perOperation(logReadsTotal);
+ }
+
+ public double logTruncationsPerOp() {
+ return perOperation(logTruncationsTotal);
+ }
+
+ public double rpcRequestsSentPerOp() {
+ return perOperation(rpcRequestsSentTotal);
+ }
+
+ public double rpcResponsesSentPerOp() {
+ return perOperation(rpcResponsesSentTotal);
+ }
+
+ public double quorumStateWritesPerOp() {
+ return perOperation(quorumStateWritesTotal);
+ }
+
+ public double quorumStateReadsPerOp() {
+ return perOperation(quorumStateReadsTotal);
+ }
+
+ private double perOperation(long counter) {
+ if (operations == 0) {
+ return 0.0;
+ }
+ return (double) counter / operations / measurementDataPoints;
+ }
+}
\ No newline at end of file
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/raft/LeaderBenchmarks.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/raft/LeaderBenchmarks.java
new file mode 100644
index 0000000000000..6132b774d886d
--- /dev/null
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/raft/LeaderBenchmarks.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.raft;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.raft.RaftClientBenchmarkContext;
+import org.apache.kafka.raft.RaftClientTestContext;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+@Warmup(iterations = RaftClientBenchmarkContext.AVERAGE_TIME_WARMUP_ITERATIONS)
+@Measurement(iterations = RaftClientBenchmarkContext.AVERAGE_TIME_MEASUREMENT_ITERATIONS)
+@Fork(RaftClientBenchmarkContext.AVERAGE_TIME_FORKS)
+public class LeaderBenchmarks {
+
+ /**
+ * Starting state: the local node is Leader with the high watermark at the log end and a caught-up
+ * follower ready to fetch. Built once per trial and reused across invocations, since handling a
+ * caught-up fetch does not mutate it.
+ */
+ @State(Scope.Thread)
+ public static class LeaderWithCaughtUpFollower {
+ static final int VOTER_COUNT = 3;
+
+ RaftClientBenchmarkContext benchmark;
+ RaftClientTestContext context;
+
+ int epoch;
+ long endOffset;
+
+ @Setup(Level.Trial)
+ public void setup() throws Exception {
+ benchmark = RaftClientBenchmarkContext.leader(VOTER_COUNT);
+ context = benchmark.testContext();
+ context.advanceLocalLeaderHighWatermarkToLogEndOffset();
+ epoch = context.currentEpoch();
+ endOffset = benchmark.logEndOffset();
+ benchmark.zeroCountersOnSetup();
+ }
+ }
+
+ /**
+ * Leader handles a valid FETCH from a fully caught-up follower (fetch offset == log end offset),
+ * which does not advance the high watermark — the steady-state heartbeat-style fetch.
+ */
+ @Benchmark
+ public void handleFetchFromCaughtUpFollower(
+ LeaderWithCaughtUpFollower state,
+ KRaftBenchmarkingCounters counters
+ ) throws Exception {
+ state.context.deliverRequest(
+ state.context.fetchRequest(
+ state.epoch, state.benchmark.remoteVoters().get(0), state.endOffset, state.epoch, 0));
+ state.context.pollUntilResponse();
+
+ counters.drainFrom(state.benchmark, Optional.empty(), Optional.of(ApiKeys.FETCH));
+ }
+}
diff --git a/raft/src/testFixtures/java/org/apache/kafka/raft/MockLog.java b/raft/src/testFixtures/java/org/apache/kafka/raft/MockLog.java
index 53d9380860e90..6c987eb7bd0f4 100644
--- a/raft/src/testFixtures/java/org/apache/kafka/raft/MockLog.java
+++ b/raft/src/testFixtures/java/org/apache/kafka/raft/MockLog.java
@@ -69,6 +69,12 @@ public class MockLog implements RaftLog {
private long firstUnflushedOffset = 0;
private boolean flushedSinceLastChecked = false;
+ // Cumulative work counters used by the JMH raft benchmarks. They grow for the lifetime of the
+ // log; benchmarks read them as drainable deltas (see RaftClientBenchmarkContext).
+ private int flushCount = 0;
+ private int readCount = 0;
+ private int truncationCount = 0;
+
public MockLog(
TopicPartition topicPartition,
Uuid topicId,
@@ -81,6 +87,7 @@ public MockLog(
@Override
public void truncateTo(long offset) {
+ truncationCount++;
if (offset < highWatermark.offset()) {
throw new IllegalArgumentException("Illegal attempt to truncate to offset " + offset +
" which is below the current high watermark " + highWatermark);
@@ -109,6 +116,7 @@ public boolean truncateToLatestSnapshot() {
flush(false);
truncated.set(true);
+ truncationCount++;
}
});
@@ -363,6 +371,7 @@ private LogAppendInfo append(Records records, int epoch, boolean isLeader) {
@Override
public void flush(boolean forceFlushActiveSegment) {
flushedSinceLastChecked = true;
+ flushCount++;
firstUnflushedOffset = endOffset().offset();
}
@@ -384,6 +393,22 @@ public boolean flushedSinceLastChecked() {
return oldValue;
}
+ /**
+ * Cumulative number of {@link #flush(boolean)} calls since this log was created. Used as a
+ * proxy for disk I/Os by the JMH raft benchmarks.
+ */
+ public int flushCount() {
+ return flushCount;
+ }
+
+ public int readCount() {
+ return readCount;
+ }
+
+ public int truncationCount() {
+ return truncationCount;
+ }
+
/**
* Reopening the log causes all unflushed data to be lost.
*/
@@ -420,6 +445,7 @@ private void verifyOffsetInRange(long offset) {
@Override
public LogFetchInfo read(long startOffset, Isolation isolation, int maxTotalBatchBytes) {
+ readCount++;
verifyOffsetInRange(startOffset);
long maxOffset = isolation == Isolation.COMMITTED ? highWatermark.offset() : endOffset().offset();
diff --git a/raft/src/testFixtures/java/org/apache/kafka/raft/MockNetworkChannel.java b/raft/src/testFixtures/java/org/apache/kafka/raft/MockNetworkChannel.java
index 565a0d911390b..31a2a96c4b232 100644
--- a/raft/src/testFixtures/java/org/apache/kafka/raft/MockNetworkChannel.java
+++ b/raft/src/testFixtures/java/org/apache/kafka/raft/MockNetworkChannel.java
@@ -35,6 +35,9 @@ public class MockNetworkChannel implements NetworkChannel {
private final List sendQueue = new ArrayList<>();
private final Map> awaitingResponse = new HashMap<>();
+ private int requestsSent = 0;
+ private final Map requestsSentByApiKey = new HashMap<>();
+
public MockNetworkChannel(AtomicInteger correlationIdCounter) {
this.correlationIdCounter = correlationIdCounter;
}
@@ -52,9 +55,30 @@ public int newCorrelationId() {
public CompletionStage send(RaftRequest.Outbound request) {
var future = new CompletableFuture();
sendQueue.add(new RequestEntry(request, future));
+ requestsSent++;
+ requestsSentByApiKey.merge(request.data().apiKey(), 1, Integer::sum);
return future;
}
+ /**
+ * Cumulative number of outbound requests the client has sent since this channel was created.
+ */
+ public int requestsSent() {
+ return requestsSent;
+ }
+
+ /**
+ * Cumulative number of outbound requests of the given API key the client has sent since this
+ * channel was created.
+ */
+ public int requestsSent(ApiKeys apiKey) {
+ return requestsSentByApiKey.getOrDefault(apiKey.id, 0);
+ }
+
+ public Map requestsSentByApiKey() {
+ return Map.copyOf(requestsSentByApiKey);
+ }
+
@Override
public ListenerName listenerName() {
return LISTENER_NAME;
diff --git a/raft/src/testFixtures/java/org/apache/kafka/raft/MockQuorumStateStore.java b/raft/src/testFixtures/java/org/apache/kafka/raft/MockQuorumStateStore.java
index fd6e4ebd49691..bc88c2336fa78 100644
--- a/raft/src/testFixtures/java/org/apache/kafka/raft/MockQuorumStateStore.java
+++ b/raft/src/testFixtures/java/org/apache/kafka/raft/MockQuorumStateStore.java
@@ -26,16 +26,30 @@
public class MockQuorumStateStore implements QuorumStateStore {
private Optional current = Optional.empty();
+ // Cumulative counts of quorum-state-file writes and reads, used by the JMH raft benchmarks.
+ private int writeCount = 0;
+ private int readCount = 0;
+
@Override
public Optional readElectionState() {
+ readCount++;
return current.map(ElectionState::fromQuorumStateData);
}
+ public int readCount() {
+ return readCount;
+ }
+
@Override
public void writeElectionState(ElectionState update, KRaftVersion kraftVersion) {
current = Optional.of(
update.toQuorumStateData(kraftVersion.quorumStateVersion())
);
+ writeCount++;
+ }
+
+ public int writeCount() {
+ return writeCount;
}
@Override
diff --git a/raft/src/testFixtures/java/org/apache/kafka/raft/RaftClientBenchmarkContext.java b/raft/src/testFixtures/java/org/apache/kafka/raft/RaftClientBenchmarkContext.java
new file mode 100644
index 0000000000000..8565a64cb9042
--- /dev/null
+++ b/raft/src/testFixtures/java/org/apache/kafka/raft/RaftClientBenchmarkContext.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.raft.RaftClientTestContext.RaftProtocol;
+import org.apache.kafka.server.common.KRaftVersion;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public final class RaftClientBenchmarkContext {
+ // Standardized JMH iteration counts, shared by all raft benchmarks so every benchmark of a given
+ // mode is configured identically. SingleShotTime measures a single operation per iteration, so it
+ // needs many iterations to build a stable distribution; AverageTime averages many operations
+ // within each timed iteration, so it needs fewer.
+ public static final int SINGLE_SHOT_WARMUP_ITERATIONS = 50;
+ public static final int SINGLE_SHOT_MEASUREMENT_ITERATIONS = 30;
+ public static final int SINGLE_SHOT_FORKS = 5;
+ public static final int AVERAGE_TIME_WARMUP_ITERATIONS = 5;
+ public static final int AVERAGE_TIME_MEASUREMENT_ITERATIONS = 10;
+ public static final int AVERAGE_TIME_FORKS = 3;
+
+ public static final KRaftVersion DEFAULT_KRAFT_VERSION = KRaftVersion.KRAFT_VERSION_1;
+ public static final RaftProtocol DEFAULT_RAFT_PROTOCOL = RaftProtocol.KIP_1186_PROTOCOL;
+
+ private final RaftClientTestContext context;
+ private final MockLog log;
+ private final MockNetworkChannel channel;
+ private final List voters;
+
+ private int lastFlushCount;
+ private int lastReadCount;
+ private int lastTruncationCount;
+ private int lastRequestsSent;
+ private Map lastRequestsSentByApiKey = new HashMap<>();
+ private int lastQuorumWrites;
+ private int lastQuorumReads;
+
+ private RaftClientBenchmarkContext(RaftClientTestContext context, List voters) {
+ this.context = context;
+ this.log = context.log;
+ this.channel = context.channel;
+ this.voters = List.copyOf(voters);
+ }
+
+ /**
+ * Builds an unattached node in a {@code voterCount}-node quorum (the local node is not yet the
+ * leader). Use {@link #unattachedToLeader()} as the measured operation to drive a full
+ * Unattached → Leader election. A single-voter quorum is rejected because such a node elects
+ * itself at initialization, before any measured poll.
+ */
+ public static RaftClientBenchmarkContext unattached(int voterCount) throws Exception {
+ return unattached(voterCount, DEFAULT_KRAFT_VERSION, DEFAULT_RAFT_PROTOCOL);
+ }
+
+ public static RaftClientBenchmarkContext unattached(
+ int voterCount,
+ KRaftVersion kraftVersion,
+ RaftProtocol raftProtocol
+ ) throws Exception {
+ if (voterCount < 2) {
+ throw new IllegalArgumentException("voterCount must be at least 2; a single voter self-elects at init");
+ }
+ List voterKeys = voterKeys(voterCount);
+ return new RaftClientBenchmarkContext(buildContext(voterKeys, kraftVersion, raftProtocol), voterKeys);
+ }
+
+ public static RaftClientBenchmarkContext leader(int voterCount) throws Exception {
+ return leader(voterCount, DEFAULT_KRAFT_VERSION, DEFAULT_RAFT_PROTOCOL);
+ }
+
+ public static RaftClientBenchmarkContext leader(
+ int voterCount,
+ KRaftVersion kraftVersion,
+ RaftProtocol raftProtocol
+ ) throws Exception {
+ List voterKeys = voterKeys(voterCount);
+ RaftClientTestContext context = buildContext(voterKeys, kraftVersion, raftProtocol);
+ context.unattachedToLeader();
+
+ return new RaftClientBenchmarkContext(context, voterKeys);
+ }
+
+ /**
+ * {@code voterCount} voter keys, each with a random directory id, with the local node first.
+ */
+ private static List voterKeys(int voterCount) {
+ int localId = randomReplicaId();
+ return IntStream.range(0, voterCount)
+ .mapToObj(i -> ReplicaKey.of(localId + i, Uuid.randomUuid()))
+ .collect(Collectors.toList());
+ }
+
+ private static int randomReplicaId() {
+ return ThreadLocalRandom.current().nextInt(1025);
+ }
+
+ /**
+ * Builds an unattached node in a quorum of {@code voterKeys} (first entry is the local node)
+ */
+ private static RaftClientTestContext buildContext(
+ List voterKeys,
+ KRaftVersion kraftVersion,
+ RaftProtocol raftProtocol
+ ) throws Exception {
+ ReplicaKey local = voterKeys.get(0);
+ VoterSet voters = VoterSetTestUtil.voterSet(voterKeys.stream());
+
+ return new RaftClientTestContext.Builder(local.id(), local.directoryId().get())
+ .withStartingVoters(voters, kraftVersion)
+ .withRaftProtocol(raftProtocol)
+ .withPollIntervalMs(0)
+ .withUnknownLeader(0)
+ .build();
+ }
+
+ public RaftClientTestContext testContext() {
+ return context;
+ }
+
+ /** The local node's log end offset. Kept here because the {@code log} field is package-private. */
+ public long logEndOffset() {
+ return log.endOffset().offset();
+ }
+
+ public ReplicaKey localVoter() {
+ return voters.get(0);
+ }
+
+ /**
+ * The voters other than the local node, in voter-set order. May be empty (single-voter quorum).
+ * Use these as the source of delivered requests, e.g. a FETCH from a follower on the leader.
+ */
+ public List remoteVoters() {
+ return voters.subList(1, voters.size());
+ }
+
+ /**
+ * Establishes the counter baseline so that work done before this point (building the context,
+ * driving an election in {@code leader()}, or anything else a benchmark does in its setup) is not
+ * attributed to the measured operation. Call this at the end of benchmark setup, just
+ * before the measured region begins.
+ */
+ public void zeroCountersOnSetup() {
+ lastFlushCount = log.flushCount();
+ lastReadCount = log.readCount();
+ lastTruncationCount = log.truncationCount();
+ lastRequestsSent = channel.requestsSent();
+ lastRequestsSentByApiKey = new HashMap<>(channel.requestsSentByApiKey());
+ lastQuorumWrites = context.quorumStateWriteCount();
+ lastQuorumReads = context.quorumStateReadCount();
+ context.drainAllSentResponses();
+ }
+
+ public int drainLogFlushes() {
+ int current = log.flushCount();
+ int delta = current - lastFlushCount;
+ lastFlushCount = current;
+ return delta;
+ }
+
+ public int drainLogReads() {
+ int current = log.readCount();
+ int delta = current - lastReadCount;
+ lastReadCount = current;
+ return delta;
+ }
+
+ public int drainLogTruncations() {
+ int current = log.truncationCount();
+ int delta = current - lastTruncationCount;
+ lastTruncationCount = current;
+ return delta;
+ }
+
+ /**
+ * Number of requests sent since the last drain. If {@code apiKey} is present, only requests of
+ * that API key are counted; otherwise all requests are counted.
+ */
+ public int drainRpcRequestsSent(Optional apiKey) {
+ if (apiKey.isEmpty()) {
+ int current = channel.requestsSent();
+ int delta = current - lastRequestsSent;
+ lastRequestsSent = current;
+ return delta;
+ }
+ short id = apiKey.get().id;
+ int current = channel.requestsSent(apiKey.get());
+ int delta = current - lastRequestsSentByApiKey.getOrDefault(id, 0);
+ lastRequestsSentByApiKey.put(id, current);
+ return delta;
+ }
+
+ public int drainQuorumStateWrites() {
+ int current = context.quorumStateWriteCount();
+ int delta = current - lastQuorumWrites;
+ lastQuorumWrites = current;
+ return delta;
+ }
+
+ public int drainQuorumStateReads() {
+ int current = context.quorumStateReadCount();
+ int delta = current - lastQuorumReads;
+ lastQuorumReads = current;
+ return delta;
+ }
+
+ /**
+ * Number of responses sent since the last drain. If {@code apiKey} is present, only responses of
+ * that API key are counted; otherwise all responses are counted.
+ */
+ public int drainRpcResponsesSent(Optional apiKey) {
+ if (apiKey.isEmpty()) {
+ return context.drainAllSentResponses();
+ }
+ return context.drainSentResponses(apiKey.get()).size();
+ }
+}
diff --git a/raft/src/testFixtures/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/testFixtures/java/org/apache/kafka/raft/RaftClientTestContext.java
index ad47c8d958102..5dcf74e1b9bf1 100644
--- a/raft/src/testFixtures/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/testFixtures/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -126,8 +126,9 @@ public final class RaftClientTestContext {
private int electionTimeoutMs;
private int requestTimeoutMs;
private int appendLingerMs;
+ private long pollIntervalMs;
- private final QuorumStateStore quorumStateStore;
+ private final MockQuorumStateStore quorumStateStore;
final String clusterId;
private final OptionalInt localId;
public final Uuid localDirectoryId;
@@ -167,7 +168,7 @@ public static final class Builder {
private final MockMessageQueue messageQueue = new MockMessageQueue();
private final MockTime time = new MockTime();
- private final QuorumStateStore quorumStateStore = new MockQuorumStateStore();
+ private final MockQuorumStateStore quorumStateStore = new MockQuorumStateStore();
private final MockableRandom random = new MockableRandom(1L);
private final LogContext logContext = new LogContext();
private final MockLog log = new MockLog(METADATA_PARTITION, Uuid.METADATA_TOPIC_ID, logContext);
@@ -179,6 +180,7 @@ public static final class Builder {
private int requestTimeoutMs = DEFAULT_REQUEST_TIMEOUT_MS;
private int electionTimeoutMs = DEFAULT_ELECTION_TIMEOUT_MS;
private int appendLingerMs = DEFAULT_APPEND_LINGER_MS;
+ private long pollIntervalMs = TestUtils.DEFAULT_POLL_INTERVAL_MS;
private MemoryPool memoryPool = MemoryPool.NONE;
private Optional> bootstrapServers = Optional.empty();
private RaftProtocol raftProtocol = RaftProtocol.KIP_595_PROTOCOL;
@@ -294,6 +296,11 @@ Builder withElectionTimeoutMs(int electionTimeoutMs) {
return this;
}
+ Builder withPollIntervalMs(long pollIntervalMs) {
+ this.pollIntervalMs = pollIntervalMs;
+ return this;
+ }
+
Builder withRequestTimeoutMs(int requestTimeoutMs) {
this.requestTimeoutMs = requestTimeoutMs;
return this;
@@ -517,6 +524,7 @@ public RaftClientTestContext build() throws IOException {
context.electionTimeoutMs = electionTimeoutMs;
context.requestTimeoutMs = requestTimeoutMs;
context.appendLingerMs = appendLingerMs;
+ context.pollIntervalMs = pollIntervalMs;
return context;
}
@@ -533,7 +541,7 @@ private RaftClientTestContext(
MockNetworkChannel channel,
MockMessageQueue messageQueue,
MockTime time,
- QuorumStateStore quorumStateStore,
+ MockQuorumStateStore quorumStateStore,
VoterSet startingVoters,
Set bootstrapIds,
RaftProtocol raftProtocol,
@@ -567,6 +575,20 @@ int electionTimeoutMs() {
return electionTimeoutMs;
}
+ /**
+ * Cumulative number of quorum-state-file writes. Used by the JMH raft benchmarks.
+ */
+ int quorumStateWriteCount() {
+ return quorumStateStore.writeCount();
+ }
+
+ /**
+ * Cumulative number of quorum-state-file reads. Used by the JMH raft benchmarks.
+ */
+ int quorumStateReadCount() {
+ return quorumStateStore.readCount();
+ }
+
int requestTimeoutMs() {
return requestTimeoutMs;
}
@@ -643,7 +665,7 @@ void expectAndGrantVotes(int epoch) throws Exception {
deliverResponse(request.correlationId(), request.destination(), voteResponse);
}
- client.poll();
+ pollUntil(() -> client.quorum().isLeader());
assertElectedLeader(epoch, localIdOrThrow());
}
@@ -669,8 +691,7 @@ void expectAndGrantPreVotes(int epoch) throws Exception {
}
}
- client.poll();
- assertTrue(client.quorum().isCandidate());
+ pollUntil(() -> client.quorum().isCandidate());
}
private int localIdOrThrow() {
@@ -720,10 +741,10 @@ public void pollUntil(TestCondition condition) throws InterruptedException {
TestUtils.waitForCondition(() -> {
poll();
return condition.conditionMet();
- }, 5000, "Condition failed to be satisfied before timeout");
+ }, 5000, pollIntervalMs, () -> "Condition failed to be satisfied before timeout");
}
- void pollUntilResponse() throws InterruptedException {
+ public void pollUntilResponse() throws InterruptedException {
pollUntil(() -> !sentResponses.isEmpty());
}
@@ -979,7 +1000,7 @@ private ApiMessage roundTripApiMessage(ApiMessage message, short version) {
return message;
}
- void deliverRequest(ApiMessage request) {
+ public void deliverRequest(ApiMessage request) {
short version = raftRequestVersion(request);
deliverRequest(request, version);
}
@@ -1061,6 +1082,18 @@ List assertSentBeginQuorumEpochRequest(int epoch, Set drainSentResponses(
ApiKeys apiKey
) {
@@ -1871,7 +1904,7 @@ void assertFetchRequestData(
}
}
- FetchRequestData fetchRequest(
+ public FetchRequestData fetchRequest(
int epoch,
ReplicaKey replicaKey,
long fetchOffset,
@@ -2357,7 +2390,7 @@ Optional> drainHandledBootstrapSnapshot() {
* Determines what versions of RPCs are in use. Note, these are ordered from oldest to newest, and are
* cumulative. E.g. KIP_1186_PROTOCOL includes KIP_996_PROTOCOL, KIP_853_PROTOCOL, and KIP_595_PROTOCOL changes
*/
- enum RaftProtocol {
+ public enum RaftProtocol {
// kraft support
KIP_595_PROTOCOL,
// dynamic quorum reconfiguration support