KAFKA-20338: Add JMH performance benchmarks for KafkaRaftClient#22669
KAFKA-20338: Add JMH performance benchmarks for KafkaRaftClient#22669anjy7 wants to merge 6 commits into
Conversation
kevin-wu24
left a comment
There was a problem hiding this comment.
Thanks for the PR @anjy7.
Can we attach a screenshot of the jmh output for these benchmarks to the PR description?
There was a problem hiding this comment.
Thanks for the updates @anjy7. Left a few more comments and some questions for you:
EDIT: Could we also attach a screenshot for the fetch request benchmark we wrote? That one may be easier to read through since it is not parametrized by voter set size.
| * context is built per invocation because driving the election to completion consumes it. | ||
| */ | ||
| @State(Scope.Thread) | ||
| public static class UnattachedQuorum { |
There was a problem hiding this comment.
A better name for this would be UnattachedWithMultipleVoters. The word Unattached means "I don't know the leader" for an epoch, whereas the word Quorum implies voters agreed on who a leader is for an epoch.
| // Standardized JMH iteration counts, shared by all raft benchmarks so every benchmark of a given | ||
| // mode is configured identically. SingleShotTime measures a single operation per iteration, so it | ||
| // needs many iterations to build a stable distribution; AverageTime averages many operations | ||
| // within each timed iteration, so it needs fewer. | ||
| public static final int SINGLE_SHOT_WARMUP_ITERATIONS = 50; | ||
| public static final int SINGLE_SHOT_MEASUREMENT_ITERATIONS = 30; | ||
| public static final int SINGLE_SHOT_FORKS = 5; | ||
| public static final int AVERAGE_TIME_WARMUP_ITERATIONS = 5; | ||
| public static final int AVERAGE_TIME_MEASUREMENT_ITERATIONS = 10; | ||
| public static final int AVERAGE_TIME_FORKS = 3; |
There was a problem hiding this comment.
You may want to break these comments up, so that the SingleShotTime documentation goes with the SINGLE_SHOT_... static variables, etc..
| public static final KRaftVersion DEFAULT_KRAFT_VERSION = KRaftVersion.KRAFT_VERSION_1; | ||
| public static final RaftProtocol DEFAULT_RAFT_PROTOCOL = RaftProtocol.KIP_1186_PROTOCOL; |
There was a problem hiding this comment.
We should grab the most highest ordinal value of these enums so we do not need to keep updating this.
LATEST_KRAFT_VERSION = Arrays.stream(KRaftVersion.values())
.max(Comparator.naturalOrder())
.orElseThrow()
| private int lastReadCount; | ||
| private int lastTruncationCount; | ||
| private int lastRequestsSent; | ||
| private Map<Short, Integer> lastRequestsSentByApiKey = new HashMap<>(); |
There was a problem hiding this comment.
We should be able to remove this.
| /** | ||
| * Builds an unattached node in a KIP-1186 quorum of {@code voterKeys}, whose first entry is the | ||
| * local node. | ||
| * Builds an unattached node in a quorum of {@code voterKeys} (first entry is the local node) |
There was a problem hiding this comment.
Technically there is no quorum at this point because there is no leader. It is more accurate to say:
Initializes a local, unattached node in a cluster of ...
| short id = apiKey.get().id; | ||
| int current = channel.requestsSent(apiKey.get()); | ||
| int delta = current - lastRequestsSentByApiKey.getOrDefault(id, 0); | ||
| lastRequestsSentByApiKey.put(id, current); |
There was a problem hiding this comment.
Seems like we are missing a call to MockNetworkChannel#drainSentRequests(Optional<ApiKeys> apiKeyFilter) before entering this method, since that needs to happen before we collect the request deltas for the current invocation. We should call that method in drainFrom. Also, we should be able to assert the sendQueue is empty at the end of this method. Otherwise, that is a performance regression, because we're sending more requests than we think.
This is so we can drain any remaining sent requests after the benchmarking code calls poll() for the last time in the current invocation. Only after this will our delta collection be accurate. This is similar to the current implementation of drainRpcResponsesSent. What do you think?
Additionally, this lastRequestsSentByApiKey logic from L208-211 is going to be incorrect. Basically, we're not going to count any other requests sent over the benchmarking invocation if we pass an apiKey to this method. Maybe my previous comments about this weren't very clear, and I think the naming convention of these methods contributed to it. I think this was the comment: #22669 (comment)? Since we are only reporting the requestsSent/invocation at the end, this lastRequestsSentByApiKey state is not necessary.
| public int drainRpcResponsesSent(Optional<ApiKeys> apiKey) { | ||
| if (apiKey.isEmpty()) { | ||
| return context.drainAllSentResponses(); | ||
| } |
There was a problem hiding this comment.
If a benchmarked code has outstanding sent responses before calling this method, the benchmark writer should know what those are. What do you think about renaming this method to maybeDrainSentRpcResponses(Optional<ApiKeys> apiKey) and the below implementation:
if apiKey is present,
drain for the specific api key supplied and store that number
assert the sentResponses queue is empty
return the stored number or 0
The motivation for this implementation is similar to what is proposed for properly draining + collecting deltas on the send queue here: https://github.com/apache/kafka/pull/22669/changes/f9b46f94589e96a65e076c56979bdc8788982ec7..e34b656497d2b9c17a027272ab6b028fc436ca70#r3484997654
| /** | ||
| * Accumulates this invocation's work deltas drained from {@code context} into these counters. | ||
| * {@code expectedRequest}/{@code expectedResponse}, if present, restrict the RPC request/response | ||
| * counts to that API key (e.g. {@code FETCH}); empty counts all. |
There was a problem hiding this comment.
We should document that when expectedRequest/expectedResponse is empty, it means no outstanding requests/responses are expected at the end of a benchmarking invocation.
| logFlushesTotal += context.drainLogFlushes(); | ||
| logReadsTotal += context.drainLogReads(); | ||
| logTruncationsTotal += context.drainLogTruncations(); | ||
| rpcRequestsSentTotal += context.drainRpcRequestsSent(expectedRequest); |
There was a problem hiding this comment.
From my previous comments, before this line, we should actually drain the MockNetworkChannel#sendQueue of outstanding requests, and then assert the queue is empty.
| // The number of measurement data points JMH will SUM the per-op methods over, i.e. | ||
| // (forks x measurement iterations) for this run. Captured from BenchmarkParams so it tracks the | ||
| // actual run shape (including -f/-i overrides) rather than being hardcoded. | ||
| private double measurementDataPoints = 1.0; | ||
|
|
||
| @Setup(Level.Trial) | ||
| public void captureRunShape(BenchmarkParams params) { | ||
| // forks() is 0 when forking is disabled (in-process), which is still one set of iterations. | ||
| int forks = Math.max(1, params.getForks()); | ||
| measurementDataPoints = (double) forks * params.getMeasurement().getCount(); | ||
| } |
There was a problem hiding this comment.
Can you explain what this code is doing for my understanding? I thought this state is per thread. How does jmh reconcile multiple of these when forks > 1?
What does params.getMeasurement().getCount() mean?
Background
KafkaRaftClient implements KRaft, Kafka's Raft consensus protocol. Today a single KafkaRaftClientDriver thread repeatedly calls KafkaRaftClient#poll() and each poll() iteration is an "all-in-one" event handler: it updates state, sends/handles RPCs, appends records (on the leader), and updates listeners to the log. The Kafka metadata team plans to refactor this programming model so that each of those responsibilities becomes a distinct event scheduled on a single-threaded event queue, the model already used elsewhere in Kafka (e.g. the controller's event queue).
The raft module has strong correctness protection (unit tests and simulation tests), but it has no performance benchmarks.A refactor of this size to a layer this critical can silently regress performance
This PR adds a protocol-level JMH benchmark suite.
What changed
Each benchmark follows the shape state X -> event -> state Y and reuses the existing RaftClientTestContext fixture.
This first PR covers:
ElectionBenchmark drives a full Unattached -> Leader election. It runs in SingleShotTime mode with a fresh context per invocation, since electing a leader consumes the context's state.
LeaderBenchmark measures steady-state leader FETCH handling from a fully caught-up follower (a FETCH that does not advance the high watermark). It runs in AverageTime mode on one prepared leader reused across invocations.
Because the mocks are in-memory, elapsed time alone is blind to regressions like an extra flush per operation, so each benchmark also reports machine-independent work counters via @AuxCounters: log flushes, reads, and truncations; RPC requests and responses sent; and quorum-state-file writes. These are reported as per-iteration totals plus an operations count, so the per-operation value is total / operations; the counts are integer-exact and should be stable across a correct refactor (a flush count going from 1 to 2 per op is a behavioral diff, not noise). Pairing with -prof gc gives allocations per operation (gc.alloc.rate.norm).
A thin public adapter RaftClientBenchmarkContext is added to the raft test fixtures. It exposes factories, scenario primitives, and the drainable work counters. The counters themselves are small additive fields on the existing mocks (MockLog, MockNetworkChannel, MockQuorumStateStore, MockMessageQueue).
RaftClientTestContext.pollUntil(...) is given a configurable poll interval. It defaults to 100ms (unchanged for existing tests); the benchmark contexts set it to 0 to busy-poll, mirroring KafkaRaftClientDriver, so the harness's sleep-between-polls does not dominate benchmark timing.
Run with:
Run individually:
./jmh-benchmarks/jmh.sh BenchmarkClassName
Run all:
./jmh-benchmarks/jmh.sh ".jmh.raft."
Testing
Ran the suite locally and the work counters match the expected per-operation profiles.
Sample Output
3 and 5 voter output for electLeader ElectionBenchmark in this PR:

Reviewers:
Kevin Wu kwu@confluent.io,
José Armando García Sancio jsancio@apache.org
Reviewers: Kevin Wu kwu@confluent.io