Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3613,6 +3613,7 @@ project(':jmh-benchmarks') {
implementation testFixtures(project(':clients'))
implementation testFixtures(project(':server-common'))
implementation testFixtures(project(':metadata'))
implementation testFixtures(project(':raft'))

implementation libs.jmhCore
annotationProcessor libs.jmhGeneratorAnnProcess
Expand Down
3 changes: 3 additions & 0 deletions checkstyle/import-control-jmh-benchmarks.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,7 @@
<subpackage name="metadata">
<allow class="org.apache.kafka.raft.KRaftConfigs"/>
</subpackage>
<subpackage name="raft">
<allow pkg="org.apache.kafka.raft"/>
</subpackage>
</import-control>
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.jmh.raft;

import org.apache.kafka.raft.RaftClientBenchmarkContext;
import org.apache.kafka.raft.RaftClientTestContext;

import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;

import java.util.Optional;
import java.util.concurrent.TimeUnit;

/**
* Benchmarks for the leader-election path. The outer class is intentionally not a JMH {@code @State}:
* each benchmark declares the starting state it needs as a nested {@code @State} parameter, so
* different election scenarios (e.g. a future Prospective or Candidate start) can have their own
* setup without forcing a single shared {@code @Setup} on the whole class.
*/
@BenchmarkMode(Mode.SingleShotTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Warmup(iterations = RaftClientBenchmarkContext.SINGLE_SHOT_WARMUP_ITERATIONS)
@Measurement(iterations = RaftClientBenchmarkContext.SINGLE_SHOT_MEASUREMENT_ITERATIONS)
@Fork(RaftClientBenchmarkContext.SINGLE_SHOT_FORKS)
public class ElectionBenchmarks {

/**
* Starting state: the local node is Unattached in a {@code voterCount}-node quorum. A fresh
* context is built per invocation because driving the election to completion consumes it.
*/
@State(Scope.Thread)
public static class UnattachedQuorum {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A better name for this would be UnattachedWithMultipleVoters. The word Unattached means "I don't know the leader" for an epoch, whereas the word Quorum implies voters agreed on who a leader is for an epoch.

@Param({"3", "5"})
public int voterCount;

RaftClientBenchmarkContext benchmark;
RaftClientTestContext context;

@Setup(Level.Invocation)
public void setup() throws Exception {
benchmark = RaftClientBenchmarkContext.unattached(voterCount);
context = benchmark.testContext();
benchmark.zeroCountersOnSetup();
}
}

/** A multi-voter quorum elects the local node as leader. */
@Benchmark
public void electLeader(UnattachedQuorum state, KRaftBenchmarkingCounters counters) throws Exception {
state.context.unattachedToLeader();

counters.drainFrom(state.benchmark, Optional.empty(), Optional.empty());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.jmh.raft;

import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.raft.RaftClientBenchmarkContext;

import org.openjdk.jmh.annotations.AuxCounters;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.infra.BenchmarkParams;

import java.util.Optional;

/**
* Secondary, machine-independent work counters reported by the raft benchmarks alongside the timing
* score, as {@code benchmark:counter} rows.
*
* <p>Throughout this class, an <em>operation</em> is JMH's unit of work: a single invocation of a
* {@code @Benchmark}-annotated method. (One operation equals one invocation here because we don't use
* {@code @OperationsPerInvocation}.) JMH reports the timing score in {@code ns/op}, and these work
* counters are reported {@code PerOp} to match.
*
* <p>Each benchmark calls {@link #drainFrom} every invocation to accumulate the work deltas drained
* from {@link RaftClientBenchmarkContext}. The raw totals are private accumulators; what we report
* are the per-operation values from the {@code *PerOp()} methods (the quantity of interest), plus
* {@link #operations}.
*
* <p>JMH aggregates {@code Type.EVENTS} secondary results with {@code SUM} across all measurement
* data points i.e {@code forks x measurement iterations}. To make the <em>summary</em> row
* report the true per-operation value rather than that value multiplied by the data-point count, each
* method pre-divides by the data-point count obtained from {@link BenchmarkParams} in
* {@link #captureRunShape}. The SUM then reconstitutes the exact per-operation value (e.g.
* {@code logReadsPerOp = 1.0}) in the summary, for any {@code -f}/{@code -i} configuration. (The
* per-iteration console values are correspondingly a small fraction of the per-op value; read the
* summary row.)
*
* <p>The per-operation values are integer-exact and should be stable across a correct refactor of
* {@code KafkaRaftClient}: a flush count moving from 1 to 2 per operation is a behavioral diff, not
* measurement noise. The counters that are zero on a path (e.g. log flushes on a caught-up fetch)
* are the most useful tripwires, since zero is speed-independent.
*/
@State(Scope.Thread)
@AuxCounters(AuxCounters.Type.EVENTS)
public class KRaftBenchmarkingCounters {
private long logFlushesTotal;
private long logReadsTotal;
private long logTruncationsTotal;
private long rpcRequestsSentTotal;
private long rpcResponsesSentTotal;
private long quorumStateWritesTotal;
private long quorumStateReadsTotal;

// Reported: the number of operations (i.e. @Benchmark method invocations) measured in the
// iteration, and the divisor for the per-operation values below.
public long operations;

// The number of measurement data points JMH will SUM the per-op methods over, i.e.
// (forks x measurement iterations) for this run. Captured from BenchmarkParams so it tracks the
// actual run shape (including -f/-i overrides) rather than being hardcoded.
private double measurementDataPoints = 1.0;

@Setup(Level.Trial)
public void captureRunShape(BenchmarkParams params) {
// forks() is 0 when forking is disabled (in-process), which is still one set of iterations.
int forks = Math.max(1, params.getForks());
measurementDataPoints = (double) forks * params.getMeasurement().getCount();
}
Comment on lines +74 to +84

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain what this code is doing for my understanding? I thought this state is per thread. How does jmh reconcile multiple of these when forks > 1?

What does params.getMeasurement().getCount() mean?


@Setup(Level.Iteration)
public void reset() {
logFlushesTotal = 0;
logReadsTotal = 0;
logTruncationsTotal = 0;
rpcRequestsSentTotal = 0;
rpcResponsesSentTotal = 0;
quorumStateWritesTotal = 0;
quorumStateReadsTotal = 0;
}

/**
* Accumulates this invocation's work deltas drained from {@code context} into these counters.
* {@code expectedRequest}/{@code expectedResponse}, if present, restrict the RPC request/response
* counts to that API key (e.g. {@code FETCH}); empty counts all.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should document that when expectedRequest/expectedResponse is empty, it means no outstanding requests/responses are expected at the end of a benchmarking invocation.

*/
public void drainFrom(
RaftClientBenchmarkContext context,
Optional<ApiKeys> expectedRequest,
Optional<ApiKeys> expectedResponse
) {
logFlushesTotal += context.drainLogFlushes();
logReadsTotal += context.drainLogReads();
logTruncationsTotal += context.drainLogTruncations();
rpcRequestsSentTotal += context.drainRpcRequestsSent(expectedRequest);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my previous comments, before this line, we should actually drain the MockNetworkChannel#sendQueue of outstanding requests, and then assert the queue is empty.

rpcResponsesSentTotal += context.drainRpcResponsesSent(expectedResponse);
quorumStateWritesTotal += context.drainQuorumStateWrites();
quorumStateReadsTotal += context.drainQuorumStateReads();
operations += 1;
}

public double logFlushesPerOp() {
return perOperation(logFlushesTotal);
}

public double logReadsPerOp() {
return perOperation(logReadsTotal);
}

public double logTruncationsPerOp() {
return perOperation(logTruncationsTotal);
}

public double rpcRequestsSentPerOp() {
return perOperation(rpcRequestsSentTotal);
}

public double rpcResponsesSentPerOp() {
return perOperation(rpcResponsesSentTotal);
}

public double quorumStateWritesPerOp() {
return perOperation(quorumStateWritesTotal);
}

public double quorumStateReadsPerOp() {
return perOperation(quorumStateReadsTotal);
}

private double perOperation(long counter) {
if (operations == 0) {
return 0.0;
}
return (double) counter / operations / measurementDataPoints;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.jmh.raft;

import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.raft.RaftClientBenchmarkContext;
import org.apache.kafka.raft.RaftClientTestContext;

import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;

import java.util.Optional;
import java.util.concurrent.TimeUnit;

@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Warmup(iterations = RaftClientBenchmarkContext.AVERAGE_TIME_WARMUP_ITERATIONS)
@Measurement(iterations = RaftClientBenchmarkContext.AVERAGE_TIME_MEASUREMENT_ITERATIONS)
@Fork(RaftClientBenchmarkContext.AVERAGE_TIME_FORKS)
public class LeaderBenchmarks {

/**
* Starting state: the local node is Leader with the high watermark at the log end and a caught-up
* follower ready to fetch. Built once per trial and reused across invocations, since handling a
* caught-up fetch does not mutate it.
*/
@State(Scope.Thread)
public static class LeaderWithCaughtUpFollower {
static final int VOTER_COUNT = 3;

RaftClientBenchmarkContext benchmark;
RaftClientTestContext context;

int epoch;
long endOffset;

@Setup(Level.Trial)
public void setup() throws Exception {
benchmark = RaftClientBenchmarkContext.leader(VOTER_COUNT);
context = benchmark.testContext();
context.advanceLocalLeaderHighWatermarkToLogEndOffset();
epoch = context.currentEpoch();
endOffset = benchmark.logEndOffset();
benchmark.zeroCountersOnSetup();
}
}

/**
* Leader handles a valid FETCH from a fully caught-up follower (fetch offset == log end offset),
* which does not advance the high watermark — the steady-state heartbeat-style fetch.
*/
@Benchmark
public void handleFetchFromCaughtUpFollower(
LeaderWithCaughtUpFollower state,
KRaftBenchmarkingCounters counters
) throws Exception {
state.context.deliverRequest(
state.context.fetchRequest(
state.epoch, state.benchmark.remoteVoters().get(0), state.endOffset, state.epoch, 0));
state.context.pollUntilResponse();

counters.drainFrom(state.benchmark, Optional.empty(), Optional.of(ApiKeys.FETCH));
}
}
Loading