Skip to content
Open
9 changes: 7 additions & 2 deletions raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -2708,10 +2708,12 @@ private void maybeTransition(
}
} else if (
leaderId.isPresent() &&
(!quorum.hasLeader() || leaderEndpoints.size() > quorum.leaderEndpoints().size())
(!quorum.hasLeader() || leaderEndpoints.size() > quorum.leaderEndpoints().size() ||
(quorum.isUnattached() && !leaderEndpoints.isEmpty()))
) {
// The request or response indicates the leader of the current epoch
// which are currently unknown or the replica has discovered more endpoints
// which are currently unknown, the replica has discovered more endpoints,
// or the replica is unattached but the has discovered endpoints for the leader.
transitionToFollower(epoch, leaderId.getAsInt(), leaderEndpoints, currentTimeMs);
}
}
Expand Down Expand Up @@ -3394,6 +3396,9 @@ private long pollFollowerAsObserver(FollowerState state, long currentTimeMs) {
state.resetUpdateVoterSetPeriod(currentTimeMs);
}
return sendResult.timeToWaitMs();
} else if (state.hasFetchTimeoutExpired(currentTimeMs)) {
transitionToUnattached(state.epoch(), OptionalInt.of(state.leaderId()));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a test which explicitly checks that the observer can transition to unattached if there is a timeout.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this to the existing unit test.

return 0L;
} else {
return maybeSendFetchToBestNode(state, currentTimeMs);
}
Expand Down
8 changes: 5 additions & 3 deletions raft/src/main/java/org/apache/kafka/raft/QuorumState.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,12 @@
*
* Unattached transitions to:
* Unattached: After learning of a new election with a higher epoch
* Follower: After discovering a leader with an equal or larger epoch
* Follower: After discovering a leader or new leader endpoints
* with an equal or larger epoch
*
* Follower transitions to:
* Unattached: After learning of a new election with a higher epoch
* Unattached: After learning of a new election with a higher epoch, or after
* expiration of the fetch timeout
* Follower: After discovering a leader with a larger epoch
*
*/
Expand Down Expand Up @@ -379,7 +381,7 @@ public void transitionToResigned(List<ReplicaKey> preferredSuccessors) {
*/
public void transitionToUnattached(int epoch, OptionalInt leaderId) {
int currentEpoch = state.epoch();
if (epoch < currentEpoch || (epoch == currentEpoch && !isProspective())) {
if (epoch < currentEpoch || (epoch == currentEpoch && !isProspective() && !isObserver())) {
throw new IllegalStateException(
String.format(
"Cannot transition to Unattached with epoch %d from current state %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,16 @@ public void testAutoRemoveOldVoter() throws Exception {
.withAutoJoin(true)
.withCanBecomeVoter(true)
.build();
var initialSleepMs = context.fetchTimeoutMs - 1;

context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
context.advanceTimeAndCompleteFetch(epoch, leader.id(), initialSleepMs, true);
initialSleepMs -= 1;

// the next request should be a remove voter request
pollAndDeliverRemoveVoter(context, oldFollower);

// after sending a remove voter the next request should be a fetch
context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
context.advanceTimeAndCompleteFetch(epoch, leader.id(), initialSleepMs, true);

// the replica should send remove voter again because the fetch did not update the voter set
pollAndDeliverRemoveVoter(context, oldFollower);
Expand All @@ -80,14 +82,16 @@ public void testAutoAddNewVoter() throws Exception {
.withAutoJoin(true)
.withCanBecomeVoter(true)
.build();
var initialSleepMs = context.fetchTimeoutMs - 1;

context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
context.advanceTimeAndCompleteFetch(epoch, leader.id(), initialSleepMs, true);
initialSleepMs -= 1;

// the next request should be an add voter request
pollAndSendAddVoter(context, newVoter);

// expire the add voter request, the next request should be a fetch
context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
context.advanceTimeAndCompleteFetch(epoch, leader.id(), initialSleepMs, true);

// the replica should send add voter again because the completed fetch
// did not update the voter set, and its timer has expired
Expand Down Expand Up @@ -128,7 +132,7 @@ public void testObserverRemovesOldVoterAndAutoJoins() throws Exception {
.build();

// advance time and complete a fetch to trigger the remove voter request
context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
context.advanceTimeAndCompleteFetch(epoch, leader.id(), context.fetchTimeoutMs - 1, true);

// the next request should be a remove voter request
pollAndDeliverRemoveVoter(context, oldFollower);
Expand All @@ -142,7 +146,7 @@ public void testObserverRemovesOldVoterAndAutoJoins() throws Exception {
);

// advance time and complete a fetch to trigger the add voter request
context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
context.advanceTimeAndCompleteFetch(epoch, leader.id(), context.fetchTimeoutMs - 1, true);

// the next request should be an add voter request
final var addVoterRequest = pollAndSendAddVoter(context, newFollowerKey);
Expand All @@ -163,7 +167,7 @@ public void testObserverRemovesOldVoterAndAutoJoins() throws Exception {

// advance time and complete a fetch and expire the update voter set timer
// the next request should be a fetch because the log voter configuration is up-to-date
context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
context.advanceTimeAndCompleteFetch(epoch, leader.id(), context.fetchTimeoutMs - 1, true);
context.pollUntilRequest();
context.assertSentFetchRequest();
}
Expand All @@ -188,7 +192,7 @@ public void testObserversDoNotAutoJoin() throws Exception {
.withCanBecomeVoter(false)
.build();

context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
context.advanceTimeAndCompleteFetch(epoch, leader.id(), context.fetchTimeoutMs - 1, true);

context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest();
Expand Down Expand Up @@ -217,7 +221,7 @@ public void testObserverDoesNotAddItselfWhenAutoJoinDisabled() throws Exception
.withCanBecomeVoter(true)
.build();

context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
context.advanceTimeAndCompleteFetch(epoch, leader.id(), context.fetchTimeoutMs - 1, true);

context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest();
Expand Down Expand Up @@ -246,7 +250,7 @@ public void testObserverDoesNotAutoJoinWithKRaftVersion0() throws Exception {
.withCanBecomeVoter(true)
.build();

context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
context.advanceTimeAndCompleteFetch(epoch, leader.id(), context.fetchTimeoutMs - 1, true);

context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest();
Expand Down
119 changes: 119 additions & 0 deletions raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
Expand Down Expand Up @@ -765,4 +766,122 @@ void testUpdatedHighWatermarkCompleted() throws Exception {
assertEquals(localLogEndOffset, partitionResponse.highWatermark());
}
}

@Test
void testObserverFetchesBetweenLeaderAndBootstrapServers() throws Exception {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this fail without your change? If so, can you tell me exactly what fails?

@kevin-wu24 kevin-wu24 May 28, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this fail without your change? If so, can you tell me exactly what fails?

Yes, trunk fails on L800 by sending a fetch request to the leader endpoint instead of the bootstrap endpoint on the second iteration of the for loop. At this point, the fetch timeout is expired (simulating being unable to reach the leader), and the local node will continue to fetch from that endpoint for the remainder of the epoch, instead of trying to fetch from bootstrap servers.

Looking at this again after a while, the for loop actually made this harder for me to read. I'm going to remove it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran this test against this PR and I got this trace:

[2026-06-25 15:02:25,329] INFO Starting request manager with bootstrap servers: [localhost:10634 (id: -2 rack: null isFenced: false)] (org.apache.kafka.raft.KafkaRaftClient:331)
[2026-06-25 15:02:25,561] INFO Reading KRaft snapshot and log as part of the initialization (org.apache.kafka.raft.KafkaRaftClient:509)
[2026-06-25 15:02:25,563] INFO Starting voters are VoterSet(voters={643=VoterNode(voterKey=ReplicaKey(id=643, directoryId=<undefined>), listeners=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10633}), supportedKRaftVersion=SupportedVersionRange[min_version:0, max_version:0]), 644=VoterNode(voterKey=ReplicaKey(id=644, directoryId=<undefined>), listeners=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10634}), supportedKRaftVersion=SupportedVersionRange[min_version:0, max_version:0])}) (org.apache.kafka.raft.KafkaRaftClient:511)
[2026-06-25 15:02:25,565] INFO Attempting durable transition to UnattachedState(epoch=0, leaderId=OptionalInt.empty, votedKey=Optional.empty, voters=[643, 644], electionTimeoutMs=18985, highWatermark=Optional.empty) from null (org.apache.kafka.raft.QuorumState:732)
[2026-06-25 15:02:25,568] INFO Completed transition to UnattachedState(epoch=0, leaderId=OptionalInt.empty, votedKey=Optional.empty, voters=[643, 644], electionTimeoutMs=18985, highWatermark=Optional.empty) from null (org.apache.kafka.raft.QuorumState:744)
[2026-06-25 15:02:25,586] TRACE Sent outbound request: OutboundRequest(correlationId=0, data=FetchRequestData(clusterId='Xs7d_i8LRIuAcKg9hc0dhw', replicaId=-1, replicaState=ReplicaState(replicaId=642, replicaEpoch=-1), maxWaitMs=0, minBytes=0, maxBytes=1048576, isolationLevel=0, sessionId=0, sessionEpoch=-1, topics=[FetchTopic(topic='metadata', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[FetchPartition(partition=0, currentLeaderEpoch=0, fetchOffset=0, lastFetchedEpoch=0, logStartOffset=-1, partitionMaxBytes=0, replicaDirectoryId=ezHminGtTAmIQQ3i5JFLUQ, highWatermark=-1)])], forgottenTopicsData=[], rackId=''), createdTimeMs=1782414145309, destination=localhost:10634 (id: -2 rack: null isFenced: false)) (org.apache.kafka.raft.KafkaRaftClient:2908)
[2026-06-25 15:02:25,587] INFO Registered the listener org.apache.kafka.raft.RaftClientTestContext$MockListener@107632469 (org.apache.kafka.raft.KafkaRaftClient:3590)
[2026-06-25 15:02:25,725] TRACE Received inbound message InboundResponse(correlationId=0, data=FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=0, responses=[FetchableTopicResponse(topic='', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[PartitionData(partitionIndex=0, errorCode=6, highWatermark=0, lastStableOffset=-1, logStartOffset=-1, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=643, leaderEpoch=2), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=[], preferredReadReplica=-1, records=MemoryRecords(size=0, buffer=java.nio.HeapByteBuffer[pos=0 lim=0 cap=37]))])], nodeEndpoints=[NodeEndpoint(nodeId=643, host='localhost', port=10633, rack=null)]), source=localhost:10634 (id: -2 rack: null isFenced: false)) (org.apache.kafka.raft.KafkaRaftClient:2848)
[2026-06-25 15:02:25,726] INFO Attempting durable transition to FollowerState(fetchTimeoutMs=50000, epoch=2, leader=643, leaderEndpoints=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10633}), votedKey=Optional.empty, voters=[643, 644], highWatermark=Optional.empty, fetchingSnapshot=Optional.empty) from UnattachedState(epoch=0, leaderId=OptionalInt.empty, votedKey=Optional.empty, voters=[643, 644], electionTimeoutMs=18985, highWatermark=Optional.empty) (org.apache.kafka.raft.QuorumState:732)
[2026-06-25 15:02:25,727] INFO Completed transition to FollowerState(fetchTimeoutMs=50000, epoch=2, leader=643, leaderEndpoints=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10633}), votedKey=Optional.empty, voters=[643, 644], highWatermark=Optional.empty, fetchingSnapshot=Optional.empty) from UnattachedState(epoch=0, leaderId=OptionalInt.empty, votedKey=Optional.empty, voters=[643, 644], electionTimeoutMs=18985, highWatermark=Optional.empty) (org.apache.kafka.raft.QuorumState:744)
[2026-06-25 15:02:25,728] DEBUG Notifying listener org.apache.kafka.raft.RaftClientTestContext$MockListener@107632469 of leader change LeaderAndEpoch[leaderId=OptionalInt[643], epoch=2] (org.apache.kafka.raft.KafkaRaftClient:4121)
[2026-06-25 15:02:25,836] TRACE Sent outbound request: OutboundRequest(correlationId=1, data=FetchRequestData(clusterId='Xs7d_i8LRIuAcKg9hc0dhw', replicaId=-1, replicaState=ReplicaState(replicaId=642, replicaEpoch=-1), maxWaitMs=0, minBytes=0, maxBytes=1048576, isolationLevel=0, sessionId=0, sessionEpoch=-1, topics=[FetchTopic(topic='metadata', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[FetchPartition(partition=0, currentLeaderEpoch=2, fetchOffset=0, lastFetchedEpoch=0, logStartOffset=-1, partitionMaxBytes=0, replicaDirectoryId=ezHminGtTAmIQQ3i5JFLUQ, highWatermark=-1)])], forgottenTopicsData=[], rackId=''), createdTimeMs=1782414145309, destination=localhost:10633 (id: 643 rack: null isFenced: false)) (org.apache.kafka.raft.KafkaRaftClient:2908)
[2026-06-25 15:02:25,837] INFO Attempting durable transition to UnattachedState(epoch=2, leaderId=OptionalInt[643], votedKey=Optional.empty, voters=[643, 644], electionTimeoutMs=9223372036854775807, highWatermark=Optional.empty) from FollowerState(fetchTimeoutMs=50000, epoch=2, leader=643, leaderEndpoints=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10633}), votedKey=Optional.empty, voters=[643, 644], highWatermark=Optional.empty, fetchingSnapshot=Optional.empty) (org.apache.kafka.raft.QuorumState:732)
[2026-06-25 15:02:25,837] INFO Completed transition to UnattachedState(epoch=2, leaderId=OptionalInt[643], votedKey=Optional.empty, voters=[643, 644], electionTimeoutMs=9223372036854775807, highWatermark=Optional.empty) from FollowerState(fetchTimeoutMs=50000, epoch=2, leader=643, leaderEndpoints=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10633}), votedKey=Optional.empty, voters=[643, 644], highWatermark=Optional.empty, fetchingSnapshot=Optional.empty) (org.apache.kafka.raft.QuorumState:744)
[2026-06-25 15:02:25,837] TRACE Received inbound message InboundResponse(correlationId=1, data=FetchResponseData(throttleTimeMs=0, errorCode=8, sessionId=0, responses=[], nodeEndpoints=[]), source=localhost:10633 (id: 643 rack: null isFenced: false)) (org.apache.kafka.raft.KafkaRaftClient:2848)
[2026-06-25 15:02:25,838] DEBUG Ignoring response InboundResponse(correlationId=1, data=FetchResponseData(throttleTimeMs=0, errorCode=8, sessionId=0, responses=[], nodeEndpoints=[]), source=localhost:10633 (id: 643 rack: null isFenced: false)) since it is no longer needed (org.apache.kafka.raft.KafkaRaftClient:2856)
[2026-06-25 15:02:25,942] TRACE Sent outbound request: OutboundRequest(correlationId=2, data=FetchRequestData(clusterId='Xs7d_i8LRIuAcKg9hc0dhw', replicaId=-1, replicaState=ReplicaState(replicaId=642, replicaEpoch=-1), maxWaitMs=0, minBytes=0, maxBytes=1048576, isolationLevel=0, sessionId=0, sessionEpoch=-1, topics=[FetchTopic(topic='metadata', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[FetchPartition(partition=0, currentLeaderEpoch=2, fetchOffset=0, lastFetchedEpoch=0, logStartOffset=-1, partitionMaxBytes=0, replicaDirectoryId=ezHminGtTAmIQQ3i5JFLUQ, highWatermark=-1)])], forgottenTopicsData=[], rackId=''), createdTimeMs=1782414195310, destination=localhost:10634 (id: -2 rack: null isFenced: false)) (org.apache.kafka.raft.KafkaRaftClient:2908)
[2026-06-25 15:02:25,943] TRACE Received inbound message InboundResponse(correlationId=2, data=FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=0, responses=[FetchableTopicResponse(topic='', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[PartitionData(partitionIndex=0, errorCode=6, highWatermark=0, lastStableOffset=-1, logStartOffset=-1, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=643, leaderEpoch=2), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=[], preferredReadReplica=-1, records=MemoryRecords(size=0, buffer=java.nio.HeapByteBuffer[pos=0 lim=0 cap=37]))])], nodeEndpoints=[NodeEndpoint(nodeId=643, host='localhost', port=10633, rack=null)]), source=localhost:10634 (id: -2 rack: null isFenced: false)) (org.apache.kafka.raft.KafkaRaftClient:2848)
[2026-06-25 15:02:25,943] INFO Attempting durable transition to FollowerState(fetchTimeoutMs=50000, epoch=2, leader=643, leaderEndpoints=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10633}), votedKey=Optional.empty, voters=[643, 644], highWatermark=Optional.empty, fetchingSnapshot=Optional.empty) from UnattachedState(epoch=2, leaderId=OptionalInt[643], votedKey=Optional.empty, voters=[643, 644], electionTimeoutMs=9223372036854775807, highWatermark=Optional.empty) (org.apache.kafka.raft.QuorumState:732)
[2026-06-25 15:02:25,944] INFO Completed transition to FollowerState(fetchTimeoutMs=50000, epoch=2, leader=643, leaderEndpoints=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10633}), votedKey=Optional.empty, voters=[643, 644], highWatermark=Optional.empty, fetchingSnapshot=Optional.empty) from UnattachedState(epoch=2, leaderId=OptionalInt[643], votedKey=Optional.empty, voters=[643, 644], electionTimeoutMs=9223372036854775807, highWatermark=Optional.empty) (org.apache.kafka.raft.QuorumState:744)
[2026-06-25 15:02:26,047] TRACE Sent outbound request: OutboundRequest(correlationId=3, data=FetchRequestData(clusterId='Xs7d_i8LRIuAcKg9hc0dhw', replicaId=-1, replicaState=ReplicaState(replicaId=642, replicaEpoch=-1), maxWaitMs=0, minBytes=0, maxBytes=1048576, isolationLevel=0, sessionId=0, sessionEpoch=-1, topics=[FetchTopic(topic='metadata', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[FetchPartition(partition=0, currentLeaderEpoch=2, fetchOffset=0, lastFetchedEpoch=0, logStartOffset=-1, partitionMaxBytes=0, replicaDirectoryId=ezHminGtTAmIQQ3i5JFLUQ, highWatermark=-1)])], forgottenTopicsData=[], rackId=''), createdTimeMs=1782414195310, destination=localhost:10633 (id: 643 rack: null isFenced: false)) (org.apache.kafka.raft.KafkaRaftClient:2908)

Can we get a TRACE of the actual issue to make sure we are solving the correct problem? I am having a hard time understanding the actual problem so I am sure that this change solves that problem.

@kevin-wu24 kevin-wu24 Jun 26, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is an updated trace from my most recent local changes:

[2026-06-26 09:50:15,417] INFO Starting request manager with bootstrap servers: [localhost:10139 (id: -2 rack: null isFenced: false)] (org.apache.kafka.raft.KafkaRaftClient:331)
[2026-06-26 09:50:15,600] INFO Reading KRaft snapshot and log as part of the initialization (org.apache.kafka.raft.KafkaRaftClient:509)
[2026-06-26 09:50:15,601] INFO Starting voters are VoterSet(voters={148=VoterNode(voterKey=ReplicaKey(id=148, directoryId=<undefined>), listeners=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10138}), supportedKRaftVersion=SupportedVersionRange[min_version:0, max_version:0]), 149=VoterNode(voterKey=ReplicaKey(id=149, directoryId=<undefined>), listeners=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10139}), supportedKRaftVersion=SupportedVersionRange[min_version:0, max_version:0])}) (org.apache.kafka.raft.KafkaRaftClient:511)
[2026-06-26 09:50:15,603] INFO Attempting durable transition to UnattachedState(epoch=0, leaderId=OptionalInt.empty, votedKey=Optional.empty, voters=[148, 149], electionTimeoutMs=18985, highWatermark=Optional.empty) from null (org.apache.kafka.raft.QuorumState:732)
[2026-06-26 09:50:15,605] INFO Completed transition to UnattachedState(epoch=0, leaderId=OptionalInt.empty, votedKey=Optional.empty, voters=[148, 149], electionTimeoutMs=18985, highWatermark=Optional.empty) from null (org.apache.kafka.raft.QuorumState:744)
[2026-06-26 09:50:15,614] TRACE Sent outbound request: OutboundRequest(correlationId=0, data=FetchRequestData(clusterId='sSoE9smGSQqjfEuTnlMPsA', replicaId=-1, replicaState=ReplicaState(replicaId=147, replicaEpoch=-1), maxWaitMs=0, minBytes=0, maxBytes=1048576, isolationLevel=0, sessionId=0, sessionEpoch=-1, topics=[FetchTopic(topic='metadata', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[FetchPartition(partition=0, currentLeaderEpoch=0, fetchOffset=0, lastFetchedEpoch=0, logStartOffset=-1, partitionMaxBytes=0, replicaDirectoryId=XiEwxtuzSGuh5WQsWw8VnQ, highWatermark=-1)])], forgottenTopicsData=[], rackId=''), createdTimeMs=1782485415405, destination=localhost:10139 (id: -2 rack: null isFenced: false)) (org.apache.kafka.raft.KafkaRaftClient:2908)
[2026-06-26 09:50:15,615] INFO Registered the listener org.apache.kafka.raft.RaftClientTestContext$MockListener@220558713 (org.apache.kafka.raft.KafkaRaftClient:3590)
[2026-06-26 09:50:15,707] TRACE Received inbound message InboundResponse(correlationId=0, data=FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=0, responses=[FetchableTopicResponse(topic='', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[PartitionData(partitionIndex=0, errorCode=6, highWatermark=0, lastStableOffset=-1, logStartOffset=-1, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=148, leaderEpoch=2), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=[], preferredReadReplica=-1, records=MemoryRecords(size=0, buffer=java.nio.HeapByteBuffer[pos=0 lim=0 cap=37]))])], nodeEndpoints=[NodeEndpoint(nodeId=148, host='localhost', port=10138, rack=null)]), source=localhost:10139 (id: -2 rack: null isFenced: false)) (org.apache.kafka.raft.KafkaRaftClient:2848)
[2026-06-26 09:50:15,708] INFO Attempting durable transition to FollowerState(fetchTimeoutMs=50000, epoch=2, leader=148, leaderEndpoints=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10138}), votedKey=Optional.empty, voters=[148, 149], highWatermark=Optional.empty, fetchingSnapshot=Optional.empty) from UnattachedState(epoch=0, leaderId=OptionalInt.empty, votedKey=Optional.empty, voters=[148, 149], electionTimeoutMs=18985, highWatermark=Optional.empty) (org.apache.kafka.raft.QuorumState:732)
[2026-06-26 09:50:15,709] INFO Completed transition to FollowerState(fetchTimeoutMs=50000, epoch=2, leader=148, leaderEndpoints=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10138}), votedKey=Optional.empty, voters=[148, 149], highWatermark=Optional.empty, fetchingSnapshot=Optional.empty) from UnattachedState(epoch=0, leaderId=OptionalInt.empty, votedKey=Optional.empty, voters=[148, 149], electionTimeoutMs=18985, highWatermark=Optional.empty) (org.apache.kafka.raft.QuorumState:744)
[2026-06-26 09:50:15,709] DEBUG Notifying listener org.apache.kafka.raft.RaftClientTestContext$MockListener@220558713 of leader change LeaderAndEpoch[leaderId=OptionalInt[148], epoch=2] (org.apache.kafka.raft.KafkaRaftClient:4121)
[2026-06-26 09:50:15,812] TRACE Sent outbound request: OutboundRequest(correlationId=1, data=FetchRequestData(clusterId='sSoE9smGSQqjfEuTnlMPsA', replicaId=-1, replicaState=ReplicaState(replicaId=147, replicaEpoch=-1), maxWaitMs=0, minBytes=0, maxBytes=1048576, isolationLevel=0, sessionId=0, sessionEpoch=-1, topics=[FetchTopic(topic='metadata', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[FetchPartition(partition=0, currentLeaderEpoch=2, fetchOffset=0, lastFetchedEpoch=0, logStartOffset=-1, partitionMaxBytes=0, replicaDirectoryId=XiEwxtuzSGuh5WQsWw8VnQ, highWatermark=-1)])], forgottenTopicsData=[], rackId=''), createdTimeMs=1782485415405, destination=localhost:10138 (id: 148 rack: null isFenced: false)) (org.apache.kafka.raft.KafkaRaftClient:2908)
[2026-06-26 09:50:15,813] TRACE Received inbound message InboundResponse(correlationId=1, data=FetchResponseData(throttleTimeMs=0, errorCode=8, sessionId=0, responses=[], nodeEndpoints=[]), source=localhost:10138 (id: 148 rack: null isFenced: false)) (org.apache.kafka.raft.KafkaRaftClient:2848)
[2026-06-26 09:50:15,814] INFO Attempting durable transition to UnattachedState(epoch=2, leaderId=OptionalInt[148], votedKey=Optional.empty, voters=[148, 149], electionTimeoutMs=9223372036854775807, highWatermark=Optional.empty) from FollowerState(fetchTimeoutMs=50000, epoch=2, leader=148, leaderEndpoints=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10138}), votedKey=Optional.empty, voters=[148, 149], highWatermark=Optional.empty, fetchingSnapshot=Optional.empty) (org.apache.kafka.raft.QuorumState:732)
[2026-06-26 09:50:15,814] INFO Completed transition to UnattachedState(epoch=2, leaderId=OptionalInt[148], votedKey=Optional.empty, voters=[148, 149], electionTimeoutMs=9223372036854775807, highWatermark=Optional.empty) from FollowerState(fetchTimeoutMs=50000, epoch=2, leader=148, leaderEndpoints=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10138}), votedKey=Optional.empty, voters=[148, 149], highWatermark=Optional.empty, fetchingSnapshot=Optional.empty) (org.apache.kafka.raft.QuorumState:744)
[2026-06-26 09:50:15,918] TRACE Sent outbound request: OutboundRequest(correlationId=2, data=FetchRequestData(clusterId='sSoE9smGSQqjfEuTnlMPsA', replicaId=-1, replicaState=ReplicaState(replicaId=147, replicaEpoch=-1), maxWaitMs=0, minBytes=0, maxBytes=1048576, isolationLevel=0, sessionId=0, sessionEpoch=-1, topics=[FetchTopic(topic='metadata', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[FetchPartition(partition=0, currentLeaderEpoch=2, fetchOffset=0, lastFetchedEpoch=0, logStartOffset=-1, partitionMaxBytes=0, replicaDirectoryId=XiEwxtuzSGuh5WQsWw8VnQ, highWatermark=-1)])], forgottenTopicsData=[], rackId=''), createdTimeMs=1782485465406, destination=localhost:10139 (id: -2 rack: null isFenced: false)) (org.apache.kafka.raft.KafkaRaftClient:2908)
[2026-06-26 09:50:15,919] TRACE Received inbound message InboundResponse(correlationId=2, data=FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=0, responses=[FetchableTopicResponse(topic='', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[PartitionData(partitionIndex=0, errorCode=6, highWatermark=0, lastStableOffset=-1, logStartOffset=-1, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=148, leaderEpoch=2), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=[], preferredReadReplica=-1, records=MemoryRecords(size=0, buffer=java.nio.HeapByteBuffer[pos=0 lim=0 cap=37]))])], nodeEndpoints=[NodeEndpoint(nodeId=148, host='localhost', port=10138, rack=null)]), source=localhost:10139 (id: -2 rack: null isFenced: false)) (org.apache.kafka.raft.KafkaRaftClient:2848)
[2026-06-26 09:50:15,920] INFO Attempting durable transition to FollowerState(fetchTimeoutMs=50000, epoch=2, leader=148, leaderEndpoints=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10138}), votedKey=Optional.empty, voters=[148, 149], highWatermark=Optional.empty, fetchingSnapshot=Optional.empty) from UnattachedState(epoch=2, leaderId=OptionalInt[148], votedKey=Optional.empty, voters=[148, 149], electionTimeoutMs=9223372036854775807, highWatermark=Optional.empty) (org.apache.kafka.raft.QuorumState:732)
[2026-06-26 09:50:15,920] INFO Completed transition to FollowerState(fetchTimeoutMs=50000, epoch=2, leader=148, leaderEndpoints=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10138}), votedKey=Optional.empty, voters=[148, 149], highWatermark=Optional.empty, fetchingSnapshot=Optional.empty) from UnattachedState(epoch=2, leaderId=OptionalInt[148], votedKey=Optional.empty, voters=[148, 149], electionTimeoutMs=9223372036854775807, highWatermark=Optional.empty) (org.apache.kafka.raft.QuorumState:744)
[2026-06-26 09:50:16,024] TRACE Sent outbound request: OutboundRequest(correlationId=3, data=FetchRequestData(clusterId='sSoE9smGSQqjfEuTnlMPsA', replicaId=-1, replicaState=ReplicaState(replicaId=147, replicaEpoch=-1), maxWaitMs=0, minBytes=0, maxBytes=1048576, isolationLevel=0, sessionId=0, sessionEpoch=-1, topics=[FetchTopic(topic='metadata', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[FetchPartition(partition=0, currentLeaderEpoch=2, fetchOffset=0, lastFetchedEpoch=0, logStartOffset=-1, partitionMaxBytes=0, replicaDirectoryId=XiEwxtuzSGuh5WQsWw8VnQ, highWatermark=-1)])], forgottenTopicsData=[], rackId=''), createdTimeMs=1782485465406, destination=localhost:10138 (id: 148 rack: null isFenced: false)) (org.apache.kafka.raft.KafkaRaftClient:2908)

The scenario is: after the local node becomes Follower, it is unable to successfully fetch from the leader, instead receiving the BROKER_NOT_AVAILABLE message, for the duration of its fetch timeout. This is shown by the local node sending a fetch to the leader, getting a BROKER_NOT_AVAILABLE response, and only then transitioning to Unattached.

final var epoch = 2;
final var local = KafkaRaftClientTest.replicaKey(
KafkaRaftClientTest.randomReplicaId(),
true
);
final var leader = KafkaRaftClientTest.replicaKey(local.id() + 1, true);
final var bootstrapVoter = KafkaRaftClientTest.replicaKey(local.id() + 2, true);
final var voters = VoterSet.fromMap(
Map.of(
leader.id(), VoterSetTest.voterNode(leader),
bootstrapVoter.id(), VoterSetTest.voterNode(bootstrapVoter)
)
);

final var context = new RaftClientTestContext.Builder(
local.id(),
local.directoryId().get()
)
.withStartingVoters(voters, KRaftVersion.KRAFT_VERSION_1)
// configure the bootstrap servers to only include the bootstrap voter
// to reliably check the destination of the observer's fetch requests
// alternates between the leader and the bootstrap voter
.withBootstrapServers(
Optional.of(List.of(RaftClientTestContext.mockAddress(bootstrapVoter.id())))

@jsancio jsancio Jun 16, 2026

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's document that you are doing this to reliably check fetches to the leader (known node) vs fetches to the bootstrap server (unknown nodes). Another way to check this is that "bootstrap nodes" have an unknown id. We represent this in the network client by giving those nodes an id less than -1. RPCs to known kafka nodes have an id greater than or equal to 0.

)
.withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_1186_PROTOCOL)
.build();

// The observer initially fetches from the bootstrap servers,
// where it will discover the leader's endpoints.
final var bootstrapFetch = pollAndCheckObserverFetchRequest(
context,
true,
bootstrapVoter.id()
);
context.deliverResponse(
bootstrapFetch.correlationId(),
bootstrapFetch.destination(),
context.fetchResponse(
epoch,
leader.id(),
MemoryRecords.EMPTY,
0L,
Errors.NOT_LEADER_OR_FOLLOWER
)
);

// Subsequent fetch from the observer is sent to the leader
// Return a BROKER_NOT_AVAILABLE error, handle that response, and then
// advance time past the fetch timeout.
// This is to simulate the leader endpoints being unreachable, which will
// cause the observer to fetch from the bootstrap servers after the fetch timeout expires.
final var leaderFetch = pollAndCheckObserverFetchRequest(
context,
false,
leader.id()
);
context.deliverResponse(
leaderFetch.correlationId(),
leaderFetch.destination(),
RaftUtil.errorResponse(
ApiKeys.FETCH,
Errors.BROKER_NOT_AVAILABLE
)
);
Comment on lines +829 to +836

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the trace I pasted, this response is not deliver before the fetch timeout. This is misleading when reading the test.

context.client.poll();

// The fetch timeout is much greater than the request manager's configured backoff, so the
// current unreachable connection will no longer be backing off when the next fetch is sent.
// Expire the fetch timeout and check that the next fetch is sent to the bootstrap server again.
context.time.sleep(context.fetchTimeoutMs + 1);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made this changes and the test pass. It looks like the issue is that the leader is in the backoff state because kraft got an error from the leader:

diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java
index 8d762d6c96..3d873add30 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java
@@ -836,7 +836,6 @@ public final class KafkaRaftClientFetchTest {
         // The fetch timeout is much greater than the request manager's configured backoff, so the
         // current unreachable connection will no longer be backing off when the next fetch is sent.
         // Expire the fetch timeout and check that the next fetch is sent to the bootstrap server again.
-        context.time.sleep(context.fetchTimeoutMs + 1);
         final var nextBootstrapFetch = pollAndCheckObserverFetchRequest(
             context,
             true,
@@ -854,6 +853,8 @@ public final class KafkaRaftClientFetchTest {
             )
         );

+        context.time.sleep(context.retryBackoffMs);
+
         // Discovering the leader from a bootstrap fetch means the observer resumes fetching from the leader
         pollAndCheckObserverFetchRequest(
             context,
@@ -871,10 +872,8 @@ public final class KafkaRaftClientFetchTest {
         context.pollUntilRequest();
         RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
         if (isBootstrapFetch) {
-            assertTrue(context.client.quorum().isUnattached());
             assertTrue(fetchRequest.destination().id() < -1);
         } else {
-            assertTrue(context.client.quorum().isFollower());
             assertEquals(expectedDestinationId, fetchRequest.destination().id());
         }
         // only need to check port since the host is always "localhost" for the mock addresses

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My test is written incorrectly. I agree with your above comments that the response from the leader fetch is not delivered until after the fetch timeout expires.

I need to handle the BROKER_NOT_AVAILABLE response first via poll() to accurately simulate this scenario.

final var nextBootstrapFetch = pollAndCheckObserverFetchRequest(
context,
true,
bootstrapVoter.id()
);
context.deliverResponse(
nextBootstrapFetch.correlationId(),
nextBootstrapFetch.destination(),
context.fetchResponse(
epoch,
leader.id(),
MemoryRecords.EMPTY,
0L,
Errors.NOT_LEADER_OR_FOLLOWER
)
);

// Discovering the leader from a bootstrap fetch means the observer resumes fetching from the leader
pollAndCheckObserverFetchRequest(
context,
false,
leader.id()
);
}

private RaftRequest.Outbound pollAndCheckObserverFetchRequest(
RaftClientTestContext context,
boolean isBootstrapFetch,
int expectedDestinationId
) throws Exception {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
if (isBootstrapFetch) {
assertTrue(fetchRequest.destination().id() < -1);
} else {
assertEquals(expectedDestinationId, fetchRequest.destination().id());
}
// only need to check port since the host is always "localhost" for the mock addresses
assertEquals(
RaftClientTestContext.mockAddress(expectedDestinationId).getPort(),
fetchRequest.destination().port()
);
return fetchRequest;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2231,7 +2231,7 @@ void testFollowerSendsUpdateVoter() throws Exception {
.build();

// waiting for FETCH requests until the UpdateRaftVoter request is sent
context.advanceTimeAndCompleteFetch(epoch, voter1.id(), true);
context.advanceTimeAndCompleteFetch(epoch, voter1.id(), context.fetchTimeoutMs - 1, true);

context.pollUntilRequest();
RaftRequest.Outbound updateRequest = context.assertSentUpdateVoterRequest(
Expand Down Expand Up @@ -2284,7 +2284,7 @@ void testFollowerSendsUpdateVoterWithKraftVersion0(Errors updateVoterError) thro
.build();

// waiting for FETCH request until the UpdateRaftVoter request is set
context.advanceTimeAndCompleteFetch(epoch, voter1.id(), true);
context.advanceTimeAndCompleteFetch(epoch, voter1.id(), context.fetchTimeoutMs - 1, true);

context.pollUntilRequest();
RaftRequest.Outbound updateRequest = context.assertSentUpdateVoterRequest(
Expand Down Expand Up @@ -2355,7 +2355,7 @@ void testFollowerSendsUpdateVoterAfterElectionWithKraftVersion0(Errors updateVot
.build();

// waiting for FETCH request until the UpdateRaftVoter request is set
context.advanceTimeAndCompleteFetch(epoch, voter1.id(), true);
context.advanceTimeAndCompleteFetch(epoch, voter1.id(), context.fetchTimeoutMs - 1, true);

context.pollUntilRequest();
RaftRequest.Outbound updateRequest = context.assertSentUpdateVoterRequest(
Expand Down Expand Up @@ -2383,7 +2383,7 @@ void testFollowerSendsUpdateVoterAfterElectionWithKraftVersion0(Errors updateVot
context.pollUntilResponse();

// waiting for FETCH request until the UpdateRaftVoter request is set
context.advanceTimeAndCompleteFetch(newEpoch, voter1.id(), true);
context.advanceTimeAndCompleteFetch(newEpoch, voter1.id(), context.fetchTimeoutMs - 1, true);

context.pollUntilRequest();
updateRequest = context.assertSentUpdateVoterRequest(
Expand Down Expand Up @@ -2657,7 +2657,7 @@ void testFollowerSendsUpdateVoterWhenDifferent() throws Exception {
.build();

// waiting for FETCH request until the UpdateRaftVoter request is set
context.advanceTimeAndCompleteFetch(epoch, voter1.id(), true);
context.advanceTimeAndCompleteFetch(epoch, voter1.id(), context.fetchTimeoutMs - 1, true);

// update voter should not be sent because the local listener is not different from the voter set
context.pollUntilRequest();
Expand Down Expand Up @@ -2698,7 +2698,7 @@ void testFollowerSendsUpdateVoterIfPendingFetchDuringTimeout() throws Exception
.build();

// waiting up to the last FETCH request before the UpdateRaftVoter request is set
context.advanceTimeAndCompleteFetch(epoch, voter1.id(), false);
context.advanceTimeAndCompleteFetch(epoch, voter1.id(), context.fetchTimeoutMs - 1, false);

// expect one last FETCH request
context.pollUntilRequest();
Expand Down Expand Up @@ -2759,7 +2759,7 @@ void testUpdateVoterResponseCausesEpochChange() throws Exception {
.build();

// waiting for FETCH request until the UpdateRaftVoter request is set
context.advanceTimeAndCompleteFetch(epoch, voter1.id(), true);
context.advanceTimeAndCompleteFetch(epoch, voter1.id(), context.fetchTimeoutMs - 1, true);

context.pollUntilRequest();
RaftRequest.Outbound updateRequest = context.assertSentUpdateVoterRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2010,7 +2010,8 @@ public void testFollowerAsObserverDoesNotBecomeProspectiveAfterFetchTimeout(bool

context.time.sleep(context.fetchTimeoutMs);
context.pollUntilRequest();
assertTrue(context.client.quorum().isFollower());
assertFalse(context.client.quorum().isProspective());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: do we with to add an assert for whether it becomes Unattached?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

assertTrue(context.client.quorum().isUnattached());

// transitions to unattached
context.deliverRequest(context.voteRequest(epoch + 1, replicaKey(otherNodeId, withKip853Rpc), epoch, 1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1013,17 +1013,25 @@ void deliverResponse(int correlationId, Node source, ApiMessage response) {
* This is used to expire the update voter set timer without also expiring the fetch timer,
* which is needed for add, remove, and update voter tests.
* For voters and observers, polling after exiting this method expires the update voter set timer.
*
* For subsequent calls of this method that intend to also expire the update voter set timer,
* the initial sleep time must be less than what was previously used.
* This avoids expiring the fetch timer, but will expire the update voter set timer.
*
* @param epoch - the current epoch
* @param leaderId - the leader id
* @param initialSleepMs - the initial sleep time before the first fetch, which should
* be less than the fetch timeout to avoid expiring the fetch timer
* @param expireUpdateVoterSetTimer - if true, advance time again to expire this timer
*/
void advanceTimeAndCompleteFetch(
int epoch,
int leaderId,
int initialSleepMs,
boolean expireUpdateVoterSetTimer
) throws Exception {
for (int i = 0; i < NUMBER_FETCH_TIMEOUTS_IN_UPDATE_VOTER_SET_PERIOD; i++) {
time.sleep(fetchTimeoutMs - 1);
time.sleep(initialSleepMs);
pollUntilRequest();
final var fetchRequest = assertSentFetchRequest();
assertFetchRequestData(
Expand All @@ -1049,7 +1057,7 @@ void advanceTimeAndCompleteFetch(
client.poll();
}
if (expireUpdateVoterSetTimer) {
time.sleep(fetchTimeoutMs - 1);
time.sleep(fetchTimeoutMs - initialSleepMs);
}
}

Expand Down
Loading