Skip to content
Open
5 changes: 5 additions & 0 deletions raft/src/main/java/org/apache/kafka/raft/FollowerState.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ public long remainingFetchTimeMs(long currentTimeMs) {
return fetchTimer.remainingMs();
}

public long remainingUpdateVoterSetTimeMs(long currentTimeMs) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's this change for? it's only used in the testContext?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's to fix some logic in RaftClientTestContext#advanceTimeAndCompleteFetch, so that both timers won't be expired on a subsequent invocation of the helper method. I was originally using this method to write my test case, but ended up changing the test to not use this helper. I can remove it from the PR if it is confusing things.

updateVoterSetPeriodTimer.update(currentTimeMs);
return updateVoterSetPeriodTimer.remainingMs();
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my other comment but ideally we should not have this method if is not used by the kraft implementation.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, I can remove this.


public int leaderId() {
return leaderId;
}
Expand Down
36 changes: 36 additions & 0 deletions raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1725,6 +1725,13 @@ private boolean handleFetchResponse(
leaderEndpoints = Endpoints.empty();
}

maybeSwitchObserverFetchToLeader(

@ahuang98 ahuang98 Jun 15, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this transition not already handled by maybeHandleCommonResponse?

@kevin-wu24 kevin-wu24 Jun 15, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we store the leader information when becoming Unattached in the same epoch after the fetch timeout expires, receiving a response from a bootstrap fetch that contains the same leader from the same epoch will fall through to maybeTransition and not match any of the cases in that method.

responseEpoch,
responseLeaderId,
leaderEndpoints,
currentTimeMs
);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you document the exact RPC trace for this issue? Can you document trace in trunk vs the trace in this branch? I am interested in the RPCs and kraft state transitions.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you document the exact RPC trace for this issue? Can you document trace in trunk vs the trace in this branch? I am interested in the RPCs and kraft state transitions.

Sure, the trace is below. It is a very esoteric edge case, so let me know if you have any questions.

Assume that the observer is unable to establish a TCP connection with the leader endpoint X. This means that requestManager.hasRequestTimedOut in maybeSendFetchToBestNode will return false, since the raft client gets an error response from its network client when it cannot connect to the request destination node. Also, assume that on the subsequent poll(), the requestManager.isBackingOff is also false. This is what I am referring to when I say the fetching destination logic needs to be independent of the request manager state.

The observer starts up either in the Unattached state or the FollowerState with a known leader endpoint X at the current epoch.

On trunk:

  • If the observer is in the first case, we enter pollUnattachedCommon to fetch from bootstrap servers, and if there is a leader, we receive its endpoint X and transition to FollowerState, so we are in the second case going forward.
  • Observer tries to fetch from the leader going forward. Under our assumptions above, maybeSendFetchToBestNode will always execute code in the else if (!requestManager.hasAnyInflightRequest) branch, which sends a fetch to the unreachable endpoint X.
  • The observer is now unable to make progress so long as requestManager.isBackingOff keeps returning false and endpoint X is partitioned from the observer.

On this branch:

  • If the observer is in the first case, we pollUnattachedCommon to fetch from bootstrap servers, and if there is a leader, we receive its endpoint X and transition to FollowerState, so we are in the second case going forward.
  • If the observer is unable to fetch from the leader within the fetch timeout, it transitions from FollowerState(leader X) -> Unattached(leader X) in the same epoch, which allows it to fetch from the bootstrap servers.
  • If the bootstrap servers are still in the same epoch with the same leader X, their completed fetch response to the observer will transition the observer from Unattached(leader X) -> FollowerState(leader X). The observer will then fetch from endpoint X.
  • If there is a leader election and an epoch bump, the observer can discover that from the bootstrap servers. The observer cannot discover this new state if it is stuck fetching from X to which it cannot connect.

The motivation for the proposed behavior is that on trunk, the observer can be "stuck" (depending on the request manager state) fetching from a leader's endpoint which is unreachable. This "starves" the bootstrap endpoints and prevents the observer from discovering a new leader or fetching until X becomes reachable.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need this special handler for FETCH? How about the other RPCs that follower send like FETCH_SNAPSHOT and UPDATE_VOTER?

@kevin-wu24 kevin-wu24 Jun 16, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am checking:

else if (responseEpoch == quorum.epoch() && quorum.isUnattached() &&
            responseLeaderId.isPresent() && !leaderEndpoints.isEmpty()) {

in this method. This handler and logic only applies to nodes in the Unattached state, which can only fetch from the bootstrap servers. A follower whose fetch timeout has expired transitions to Unattached and then can only send Fetch.


Optional<Boolean> handled = maybeHandleCommonResponse(
error,
responseLeaderId,
Expand Down Expand Up @@ -2608,6 +2615,32 @@ private boolean hasConsistentLeader(int epoch, OptionalInt leaderId) {
}
}

/**
* If the local replica is an observer currently routing fetches to bootstrap servers,
* and a non-leader source's fetch response advertises the leader's endpoints, switch
* the observer back to fetching from the leader.
*/
private void maybeSwitchObserverFetchToLeader(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: slightly more descriptive name might be a tad better - for example maybeSwitchObserverToFetchFromLeader.

int responseEpoch,
OptionalInt responseLeaderId,
Endpoints leaderEndpoints,
long currentTimeMs
) {
if (!hasConsistentLeader(responseEpoch, responseLeaderId)) {
throw new IllegalStateException("Received request or response with leader " + responseLeaderId +

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: we might also wish to include quorum.localIdOrSentinel in the exception message for convenience.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I followed the other usage of hasConsistentLeader, which uses quorum.localId(). If the local leader is empty, hasConsistentLeader returns true.

" and epoch " + responseEpoch + " which is inconsistent with current leader " +
quorum.leaderId() + " and epoch " + quorum.epoch());
} else if (responseEpoch == quorum.epoch() && quorum.isUnattached() &&
responseLeaderId.isPresent() && !leaderEndpoints.isEmpty()) {
transitionToFollower(
responseEpoch,
responseLeaderId.getAsInt(),
leaderEndpoints,
currentTimeMs
);
}
}

/**
* Handle response errors that are common across request types.
*
Expand Down Expand Up @@ -3390,6 +3423,9 @@ private long pollFollowerAsObserver(FollowerState state, long currentTimeMs) {
state.resetUpdateVoterSetPeriod(currentTimeMs);
}
return sendResult.timeToWaitMs();
} else if (state.hasFetchTimeoutExpired(currentTimeMs)) {
transitionToUnattached(state.epoch(), OptionalInt.of(state.leaderId()));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a test which explicitly checks that the observer can transition to unattached if there is a timeout.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this to the existing unit test.

return 0L;
} else {
return maybeSendFetchToBestNode(state, currentTimeMs);
}
Expand Down
2 changes: 1 addition & 1 deletion raft/src/main/java/org/apache/kafka/raft/QuorumState.java
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ public void transitionToResigned(List<ReplicaKey> preferredSuccessors) {
*/
public void transitionToUnattached(int epoch, OptionalInt leaderId) {
int currentEpoch = state.epoch();
if (epoch < currentEpoch || (epoch == currentEpoch && !isProspective())) {
if (epoch < currentEpoch || (epoch == currentEpoch && !isProspective() && !isObserver())) {
throw new IllegalStateException(
String.format(
"Cannot transition to Unattached with epoch %d from current state %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
Expand Down Expand Up @@ -765,4 +766,75 @@ void testUpdatedHighWatermarkCompleted() throws Exception {
assertEquals(localLogEndOffset, partitionResponse.highWatermark());
}
}

@Test
void testObserverFetchesBetweenLeaderAndBootstrapServers() throws Exception {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this fail without your change? If so, can you tell me exactly what fails?

@kevin-wu24 kevin-wu24 May 28, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this fail without your change? If so, can you tell me exactly what fails?

Yes, trunk fails on L800 by sending a fetch request to the leader endpoint instead of the bootstrap endpoint on the second iteration of the for loop. At this point, the fetch timeout is expired (simulating being unable to reach the leader), and the local node will continue to fetch from that endpoint for the remainder of the epoch, instead of trying to fetch from bootstrap servers.

Looking at this again after a while, the for loop actually made this harder for me to read. I'm going to remove it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran this test against this PR and I got this trace:

[2026-06-25 15:02:25,329] INFO Starting request manager with bootstrap servers: [localhost:10634 (id: -2 rack: null isFenced: false)] (org.apache.kafka.raft.KafkaRaftClient:331)
[2026-06-25 15:02:25,561] INFO Reading KRaft snapshot and log as part of the initialization (org.apache.kafka.raft.KafkaRaftClient:509)
[2026-06-25 15:02:25,563] INFO Starting voters are VoterSet(voters={643=VoterNode(voterKey=ReplicaKey(id=643, directoryId=<undefined>), listeners=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10633}), supportedKRaftVersion=SupportedVersionRange[min_version:0, max_version:0]), 644=VoterNode(voterKey=ReplicaKey(id=644, directoryId=<undefined>), listeners=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10634}), supportedKRaftVersion=SupportedVersionRange[min_version:0, max_version:0])}) (org.apache.kafka.raft.KafkaRaftClient:511)
[2026-06-25 15:02:25,565] INFO Attempting durable transition to UnattachedState(epoch=0, leaderId=OptionalInt.empty, votedKey=Optional.empty, voters=[643, 644], electionTimeoutMs=18985, highWatermark=Optional.empty) from null (org.apache.kafka.raft.QuorumState:732)
[2026-06-25 15:02:25,568] INFO Completed transition to UnattachedState(epoch=0, leaderId=OptionalInt.empty, votedKey=Optional.empty, voters=[643, 644], electionTimeoutMs=18985, highWatermark=Optional.empty) from null (org.apache.kafka.raft.QuorumState:744)
[2026-06-25 15:02:25,586] TRACE Sent outbound request: OutboundRequest(correlationId=0, data=FetchRequestData(clusterId='Xs7d_i8LRIuAcKg9hc0dhw', replicaId=-1, replicaState=ReplicaState(replicaId=642, replicaEpoch=-1), maxWaitMs=0, minBytes=0, maxBytes=1048576, isolationLevel=0, sessionId=0, sessionEpoch=-1, topics=[FetchTopic(topic='metadata', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[FetchPartition(partition=0, currentLeaderEpoch=0, fetchOffset=0, lastFetchedEpoch=0, logStartOffset=-1, partitionMaxBytes=0, replicaDirectoryId=ezHminGtTAmIQQ3i5JFLUQ, highWatermark=-1)])], forgottenTopicsData=[], rackId=''), createdTimeMs=1782414145309, destination=localhost:10634 (id: -2 rack: null isFenced: false)) (org.apache.kafka.raft.KafkaRaftClient:2908)
[2026-06-25 15:02:25,587] INFO Registered the listener org.apache.kafka.raft.RaftClientTestContext$MockListener@107632469 (org.apache.kafka.raft.KafkaRaftClient:3590)
[2026-06-25 15:02:25,725] TRACE Received inbound message InboundResponse(correlationId=0, data=FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=0, responses=[FetchableTopicResponse(topic='', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[PartitionData(partitionIndex=0, errorCode=6, highWatermark=0, lastStableOffset=-1, logStartOffset=-1, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=643, leaderEpoch=2), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=[], preferredReadReplica=-1, records=MemoryRecords(size=0, buffer=java.nio.HeapByteBuffer[pos=0 lim=0 cap=37]))])], nodeEndpoints=[NodeEndpoint(nodeId=643, host='localhost', port=10633, rack=null)]), source=localhost:10634 (id: -2 rack: null isFenced: false)) (org.apache.kafka.raft.KafkaRaftClient:2848)
[2026-06-25 15:02:25,726] INFO Attempting durable transition to FollowerState(fetchTimeoutMs=50000, epoch=2, leader=643, leaderEndpoints=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10633}), votedKey=Optional.empty, voters=[643, 644], highWatermark=Optional.empty, fetchingSnapshot=Optional.empty) from UnattachedState(epoch=0, leaderId=OptionalInt.empty, votedKey=Optional.empty, voters=[643, 644], electionTimeoutMs=18985, highWatermark=Optional.empty) (org.apache.kafka.raft.QuorumState:732)
[2026-06-25 15:02:25,727] INFO Completed transition to FollowerState(fetchTimeoutMs=50000, epoch=2, leader=643, leaderEndpoints=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10633}), votedKey=Optional.empty, voters=[643, 644], highWatermark=Optional.empty, fetchingSnapshot=Optional.empty) from UnattachedState(epoch=0, leaderId=OptionalInt.empty, votedKey=Optional.empty, voters=[643, 644], electionTimeoutMs=18985, highWatermark=Optional.empty) (org.apache.kafka.raft.QuorumState:744)
[2026-06-25 15:02:25,728] DEBUG Notifying listener org.apache.kafka.raft.RaftClientTestContext$MockListener@107632469 of leader change LeaderAndEpoch[leaderId=OptionalInt[643], epoch=2] (org.apache.kafka.raft.KafkaRaftClient:4121)
[2026-06-25 15:02:25,836] TRACE Sent outbound request: OutboundRequest(correlationId=1, data=FetchRequestData(clusterId='Xs7d_i8LRIuAcKg9hc0dhw', replicaId=-1, replicaState=ReplicaState(replicaId=642, replicaEpoch=-1), maxWaitMs=0, minBytes=0, maxBytes=1048576, isolationLevel=0, sessionId=0, sessionEpoch=-1, topics=[FetchTopic(topic='metadata', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[FetchPartition(partition=0, currentLeaderEpoch=2, fetchOffset=0, lastFetchedEpoch=0, logStartOffset=-1, partitionMaxBytes=0, replicaDirectoryId=ezHminGtTAmIQQ3i5JFLUQ, highWatermark=-1)])], forgottenTopicsData=[], rackId=''), createdTimeMs=1782414145309, destination=localhost:10633 (id: 643 rack: null isFenced: false)) (org.apache.kafka.raft.KafkaRaftClient:2908)
[2026-06-25 15:02:25,837] INFO Attempting durable transition to UnattachedState(epoch=2, leaderId=OptionalInt[643], votedKey=Optional.empty, voters=[643, 644], electionTimeoutMs=9223372036854775807, highWatermark=Optional.empty) from FollowerState(fetchTimeoutMs=50000, epoch=2, leader=643, leaderEndpoints=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10633}), votedKey=Optional.empty, voters=[643, 644], highWatermark=Optional.empty, fetchingSnapshot=Optional.empty) (org.apache.kafka.raft.QuorumState:732)
[2026-06-25 15:02:25,837] INFO Completed transition to UnattachedState(epoch=2, leaderId=OptionalInt[643], votedKey=Optional.empty, voters=[643, 644], electionTimeoutMs=9223372036854775807, highWatermark=Optional.empty) from FollowerState(fetchTimeoutMs=50000, epoch=2, leader=643, leaderEndpoints=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10633}), votedKey=Optional.empty, voters=[643, 644], highWatermark=Optional.empty, fetchingSnapshot=Optional.empty) (org.apache.kafka.raft.QuorumState:744)
[2026-06-25 15:02:25,837] TRACE Received inbound message InboundResponse(correlationId=1, data=FetchResponseData(throttleTimeMs=0, errorCode=8, sessionId=0, responses=[], nodeEndpoints=[]), source=localhost:10633 (id: 643 rack: null isFenced: false)) (org.apache.kafka.raft.KafkaRaftClient:2848)
[2026-06-25 15:02:25,838] DEBUG Ignoring response InboundResponse(correlationId=1, data=FetchResponseData(throttleTimeMs=0, errorCode=8, sessionId=0, responses=[], nodeEndpoints=[]), source=localhost:10633 (id: 643 rack: null isFenced: false)) since it is no longer needed (org.apache.kafka.raft.KafkaRaftClient:2856)
[2026-06-25 15:02:25,942] TRACE Sent outbound request: OutboundRequest(correlationId=2, data=FetchRequestData(clusterId='Xs7d_i8LRIuAcKg9hc0dhw', replicaId=-1, replicaState=ReplicaState(replicaId=642, replicaEpoch=-1), maxWaitMs=0, minBytes=0, maxBytes=1048576, isolationLevel=0, sessionId=0, sessionEpoch=-1, topics=[FetchTopic(topic='metadata', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[FetchPartition(partition=0, currentLeaderEpoch=2, fetchOffset=0, lastFetchedEpoch=0, logStartOffset=-1, partitionMaxBytes=0, replicaDirectoryId=ezHminGtTAmIQQ3i5JFLUQ, highWatermark=-1)])], forgottenTopicsData=[], rackId=''), createdTimeMs=1782414195310, destination=localhost:10634 (id: -2 rack: null isFenced: false)) (org.apache.kafka.raft.KafkaRaftClient:2908)
[2026-06-25 15:02:25,943] TRACE Received inbound message InboundResponse(correlationId=2, data=FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=0, responses=[FetchableTopicResponse(topic='', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[PartitionData(partitionIndex=0, errorCode=6, highWatermark=0, lastStableOffset=-1, logStartOffset=-1, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=643, leaderEpoch=2), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=[], preferredReadReplica=-1, records=MemoryRecords(size=0, buffer=java.nio.HeapByteBuffer[pos=0 lim=0 cap=37]))])], nodeEndpoints=[NodeEndpoint(nodeId=643, host='localhost', port=10633, rack=null)]), source=localhost:10634 (id: -2 rack: null isFenced: false)) (org.apache.kafka.raft.KafkaRaftClient:2848)
[2026-06-25 15:02:25,943] INFO Attempting durable transition to FollowerState(fetchTimeoutMs=50000, epoch=2, leader=643, leaderEndpoints=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10633}), votedKey=Optional.empty, voters=[643, 644], highWatermark=Optional.empty, fetchingSnapshot=Optional.empty) from UnattachedState(epoch=2, leaderId=OptionalInt[643], votedKey=Optional.empty, voters=[643, 644], electionTimeoutMs=9223372036854775807, highWatermark=Optional.empty) (org.apache.kafka.raft.QuorumState:732)
[2026-06-25 15:02:25,944] INFO Completed transition to FollowerState(fetchTimeoutMs=50000, epoch=2, leader=643, leaderEndpoints=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10633}), votedKey=Optional.empty, voters=[643, 644], highWatermark=Optional.empty, fetchingSnapshot=Optional.empty) from UnattachedState(epoch=2, leaderId=OptionalInt[643], votedKey=Optional.empty, voters=[643, 644], electionTimeoutMs=9223372036854775807, highWatermark=Optional.empty) (org.apache.kafka.raft.QuorumState:744)
[2026-06-25 15:02:26,047] TRACE Sent outbound request: OutboundRequest(correlationId=3, data=FetchRequestData(clusterId='Xs7d_i8LRIuAcKg9hc0dhw', replicaId=-1, replicaState=ReplicaState(replicaId=642, replicaEpoch=-1), maxWaitMs=0, minBytes=0, maxBytes=1048576, isolationLevel=0, sessionId=0, sessionEpoch=-1, topics=[FetchTopic(topic='metadata', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[FetchPartition(partition=0, currentLeaderEpoch=2, fetchOffset=0, lastFetchedEpoch=0, logStartOffset=-1, partitionMaxBytes=0, replicaDirectoryId=ezHminGtTAmIQQ3i5JFLUQ, highWatermark=-1)])], forgottenTopicsData=[], rackId=''), createdTimeMs=1782414195310, destination=localhost:10633 (id: 643 rack: null isFenced: false)) (org.apache.kafka.raft.KafkaRaftClient:2908)

Can we get a TRACE of the actual issue to make sure we are solving the correct problem? I am having a hard time understanding the actual problem so I am sure that this change solves that problem.

@kevin-wu24 kevin-wu24 Jun 26, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is an updated trace from my most recent local changes:

[2026-06-26 09:50:15,417] INFO Starting request manager with bootstrap servers: [localhost:10139 (id: -2 rack: null isFenced: false)] (org.apache.kafka.raft.KafkaRaftClient:331)
[2026-06-26 09:50:15,600] INFO Reading KRaft snapshot and log as part of the initialization (org.apache.kafka.raft.KafkaRaftClient:509)
[2026-06-26 09:50:15,601] INFO Starting voters are VoterSet(voters={148=VoterNode(voterKey=ReplicaKey(id=148, directoryId=<undefined>), listeners=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10138}), supportedKRaftVersion=SupportedVersionRange[min_version:0, max_version:0]), 149=VoterNode(voterKey=ReplicaKey(id=149, directoryId=<undefined>), listeners=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10139}), supportedKRaftVersion=SupportedVersionRange[min_version:0, max_version:0])}) (org.apache.kafka.raft.KafkaRaftClient:511)
[2026-06-26 09:50:15,603] INFO Attempting durable transition to UnattachedState(epoch=0, leaderId=OptionalInt.empty, votedKey=Optional.empty, voters=[148, 149], electionTimeoutMs=18985, highWatermark=Optional.empty) from null (org.apache.kafka.raft.QuorumState:732)
[2026-06-26 09:50:15,605] INFO Completed transition to UnattachedState(epoch=0, leaderId=OptionalInt.empty, votedKey=Optional.empty, voters=[148, 149], electionTimeoutMs=18985, highWatermark=Optional.empty) from null (org.apache.kafka.raft.QuorumState:744)
[2026-06-26 09:50:15,614] TRACE Sent outbound request: OutboundRequest(correlationId=0, data=FetchRequestData(clusterId='sSoE9smGSQqjfEuTnlMPsA', replicaId=-1, replicaState=ReplicaState(replicaId=147, replicaEpoch=-1), maxWaitMs=0, minBytes=0, maxBytes=1048576, isolationLevel=0, sessionId=0, sessionEpoch=-1, topics=[FetchTopic(topic='metadata', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[FetchPartition(partition=0, currentLeaderEpoch=0, fetchOffset=0, lastFetchedEpoch=0, logStartOffset=-1, partitionMaxBytes=0, replicaDirectoryId=XiEwxtuzSGuh5WQsWw8VnQ, highWatermark=-1)])], forgottenTopicsData=[], rackId=''), createdTimeMs=1782485415405, destination=localhost:10139 (id: -2 rack: null isFenced: false)) (org.apache.kafka.raft.KafkaRaftClient:2908)
[2026-06-26 09:50:15,615] INFO Registered the listener org.apache.kafka.raft.RaftClientTestContext$MockListener@220558713 (org.apache.kafka.raft.KafkaRaftClient:3590)
[2026-06-26 09:50:15,707] TRACE Received inbound message InboundResponse(correlationId=0, data=FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=0, responses=[FetchableTopicResponse(topic='', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[PartitionData(partitionIndex=0, errorCode=6, highWatermark=0, lastStableOffset=-1, logStartOffset=-1, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=148, leaderEpoch=2), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=[], preferredReadReplica=-1, records=MemoryRecords(size=0, buffer=java.nio.HeapByteBuffer[pos=0 lim=0 cap=37]))])], nodeEndpoints=[NodeEndpoint(nodeId=148, host='localhost', port=10138, rack=null)]), source=localhost:10139 (id: -2 rack: null isFenced: false)) (org.apache.kafka.raft.KafkaRaftClient:2848)
[2026-06-26 09:50:15,708] INFO Attempting durable transition to FollowerState(fetchTimeoutMs=50000, epoch=2, leader=148, leaderEndpoints=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10138}), votedKey=Optional.empty, voters=[148, 149], highWatermark=Optional.empty, fetchingSnapshot=Optional.empty) from UnattachedState(epoch=0, leaderId=OptionalInt.empty, votedKey=Optional.empty, voters=[148, 149], electionTimeoutMs=18985, highWatermark=Optional.empty) (org.apache.kafka.raft.QuorumState:732)
[2026-06-26 09:50:15,709] INFO Completed transition to FollowerState(fetchTimeoutMs=50000, epoch=2, leader=148, leaderEndpoints=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10138}), votedKey=Optional.empty, voters=[148, 149], highWatermark=Optional.empty, fetchingSnapshot=Optional.empty) from UnattachedState(epoch=0, leaderId=OptionalInt.empty, votedKey=Optional.empty, voters=[148, 149], electionTimeoutMs=18985, highWatermark=Optional.empty) (org.apache.kafka.raft.QuorumState:744)
[2026-06-26 09:50:15,709] DEBUG Notifying listener org.apache.kafka.raft.RaftClientTestContext$MockListener@220558713 of leader change LeaderAndEpoch[leaderId=OptionalInt[148], epoch=2] (org.apache.kafka.raft.KafkaRaftClient:4121)
[2026-06-26 09:50:15,812] TRACE Sent outbound request: OutboundRequest(correlationId=1, data=FetchRequestData(clusterId='sSoE9smGSQqjfEuTnlMPsA', replicaId=-1, replicaState=ReplicaState(replicaId=147, replicaEpoch=-1), maxWaitMs=0, minBytes=0, maxBytes=1048576, isolationLevel=0, sessionId=0, sessionEpoch=-1, topics=[FetchTopic(topic='metadata', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[FetchPartition(partition=0, currentLeaderEpoch=2, fetchOffset=0, lastFetchedEpoch=0, logStartOffset=-1, partitionMaxBytes=0, replicaDirectoryId=XiEwxtuzSGuh5WQsWw8VnQ, highWatermark=-1)])], forgottenTopicsData=[], rackId=''), createdTimeMs=1782485415405, destination=localhost:10138 (id: 148 rack: null isFenced: false)) (org.apache.kafka.raft.KafkaRaftClient:2908)
[2026-06-26 09:50:15,813] TRACE Received inbound message InboundResponse(correlationId=1, data=FetchResponseData(throttleTimeMs=0, errorCode=8, sessionId=0, responses=[], nodeEndpoints=[]), source=localhost:10138 (id: 148 rack: null isFenced: false)) (org.apache.kafka.raft.KafkaRaftClient:2848)
[2026-06-26 09:50:15,814] INFO Attempting durable transition to UnattachedState(epoch=2, leaderId=OptionalInt[148], votedKey=Optional.empty, voters=[148, 149], electionTimeoutMs=9223372036854775807, highWatermark=Optional.empty) from FollowerState(fetchTimeoutMs=50000, epoch=2, leader=148, leaderEndpoints=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10138}), votedKey=Optional.empty, voters=[148, 149], highWatermark=Optional.empty, fetchingSnapshot=Optional.empty) (org.apache.kafka.raft.QuorumState:732)
[2026-06-26 09:50:15,814] INFO Completed transition to UnattachedState(epoch=2, leaderId=OptionalInt[148], votedKey=Optional.empty, voters=[148, 149], electionTimeoutMs=9223372036854775807, highWatermark=Optional.empty) from FollowerState(fetchTimeoutMs=50000, epoch=2, leader=148, leaderEndpoints=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10138}), votedKey=Optional.empty, voters=[148, 149], highWatermark=Optional.empty, fetchingSnapshot=Optional.empty) (org.apache.kafka.raft.QuorumState:744)
[2026-06-26 09:50:15,918] TRACE Sent outbound request: OutboundRequest(correlationId=2, data=FetchRequestData(clusterId='sSoE9smGSQqjfEuTnlMPsA', replicaId=-1, replicaState=ReplicaState(replicaId=147, replicaEpoch=-1), maxWaitMs=0, minBytes=0, maxBytes=1048576, isolationLevel=0, sessionId=0, sessionEpoch=-1, topics=[FetchTopic(topic='metadata', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[FetchPartition(partition=0, currentLeaderEpoch=2, fetchOffset=0, lastFetchedEpoch=0, logStartOffset=-1, partitionMaxBytes=0, replicaDirectoryId=XiEwxtuzSGuh5WQsWw8VnQ, highWatermark=-1)])], forgottenTopicsData=[], rackId=''), createdTimeMs=1782485465406, destination=localhost:10139 (id: -2 rack: null isFenced: false)) (org.apache.kafka.raft.KafkaRaftClient:2908)
[2026-06-26 09:50:15,919] TRACE Received inbound message InboundResponse(correlationId=2, data=FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=0, responses=[FetchableTopicResponse(topic='', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[PartitionData(partitionIndex=0, errorCode=6, highWatermark=0, lastStableOffset=-1, logStartOffset=-1, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=148, leaderEpoch=2), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=[], preferredReadReplica=-1, records=MemoryRecords(size=0, buffer=java.nio.HeapByteBuffer[pos=0 lim=0 cap=37]))])], nodeEndpoints=[NodeEndpoint(nodeId=148, host='localhost', port=10138, rack=null)]), source=localhost:10139 (id: -2 rack: null isFenced: false)) (org.apache.kafka.raft.KafkaRaftClient:2848)
[2026-06-26 09:50:15,920] INFO Attempting durable transition to FollowerState(fetchTimeoutMs=50000, epoch=2, leader=148, leaderEndpoints=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10138}), votedKey=Optional.empty, voters=[148, 149], highWatermark=Optional.empty, fetchingSnapshot=Optional.empty) from UnattachedState(epoch=2, leaderId=OptionalInt[148], votedKey=Optional.empty, voters=[148, 149], electionTimeoutMs=9223372036854775807, highWatermark=Optional.empty) (org.apache.kafka.raft.QuorumState:732)
[2026-06-26 09:50:15,920] INFO Completed transition to FollowerState(fetchTimeoutMs=50000, epoch=2, leader=148, leaderEndpoints=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10138}), votedKey=Optional.empty, voters=[148, 149], highWatermark=Optional.empty, fetchingSnapshot=Optional.empty) from UnattachedState(epoch=2, leaderId=OptionalInt[148], votedKey=Optional.empty, voters=[148, 149], electionTimeoutMs=9223372036854775807, highWatermark=Optional.empty) (org.apache.kafka.raft.QuorumState:744)
[2026-06-26 09:50:16,024] TRACE Sent outbound request: OutboundRequest(correlationId=3, data=FetchRequestData(clusterId='sSoE9smGSQqjfEuTnlMPsA', replicaId=-1, replicaState=ReplicaState(replicaId=147, replicaEpoch=-1), maxWaitMs=0, minBytes=0, maxBytes=1048576, isolationLevel=0, sessionId=0, sessionEpoch=-1, topics=[FetchTopic(topic='metadata', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[FetchPartition(partition=0, currentLeaderEpoch=2, fetchOffset=0, lastFetchedEpoch=0, logStartOffset=-1, partitionMaxBytes=0, replicaDirectoryId=XiEwxtuzSGuh5WQsWw8VnQ, highWatermark=-1)])], forgottenTopicsData=[], rackId=''), createdTimeMs=1782485465406, destination=localhost:10138 (id: 148 rack: null isFenced: false)) (org.apache.kafka.raft.KafkaRaftClient:2908)

The scenario is: after the local node becomes Follower, it is unable to successfully fetch from the leader, instead receiving the BROKER_NOT_AVAILABLE message, for the duration of its fetch timeout. This is shown by the local node sending a fetch to the leader, getting a BROKER_NOT_AVAILABLE response, and only then transitioning to Unattached.

final var epoch = 2;
final var local = KafkaRaftClientTest.replicaKey(
KafkaRaftClientTest.randomReplicaId(),
true
);
final var leader = KafkaRaftClientTest.replicaKey(local.id() + 1, true);
final var otherVoter = KafkaRaftClientTest.replicaKey(local.id() + 2, true);

final var voters = VoterSet.fromMap(
Map.of(
leader.id(), VoterSetTest.voterNode(leader),
otherVoter.id(), VoterSetTest.voterNode(otherVoter)
)
);

final var context = new RaftClientTestContext.Builder(
local.id(),
local.directoryId().get()
)
.withStaticVoters(voters)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use static voters? Why not enable and use all of the latest features?

.withBootstrapServers(Optional.of(List.of(RaftClientTestContext.mockAddress(otherVoter.id()))))
.withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_1166_PROTOCOL)
.build();

for (int i = 0; i < 10; ++i) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a specific reason why this loop must run 10 times? It would be nice to add a comment clarifying why there must be 10 iterations.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, 10 is just a magic number. I just want to make sure we alternate between bootstrap endpoint + leader endpoint continuously.

// The observer initially fetches from the bootstrap servers, where it will discover the leader's endpoints.
context.pollUntilRequest();
final var bootstrapFetch = context.assertSentFetchRequest();
assertEquals(-2, bootstrapFetch.destination().id());
assertEquals(RaftClientTestContext.mockAddress(otherVoter.id()).getHostName(), bootstrapFetch.destination().host());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: test would be clearer if we changed the variable name from otherVoter to bootstrapVoter

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

assertEquals(RaftClientTestContext.mockAddress(otherVoter.id()).getPort(), bootstrapFetch.destination().port());

context.deliverResponse(
bootstrapFetch.correlationId(),
bootstrapFetch.destination(),
context.fetchResponse(
epoch,
leader.id(),
MemoryRecords.EMPTY,
0L,
Errors.NOT_LEADER_OR_FOLLOWER
)
);

// Subsequent fetch from the observer is sent to the leader
context.pollUntilRequest();
final var leaderFetch = context.assertSentFetchRequest();
assertEquals(leader.id(), leaderFetch.destination().id());
assertEquals(RaftClientTestContext.mockAddress(leader.id()).getHostName(), leaderFetch.destination().host());
assertEquals(RaftClientTestContext.mockAddress(leader.id()).getPort(), leaderFetch.destination().port());

// Return a BROKER_NOT_AVAILABLE error, and then advance time past the fetch timeout,
// which should cause the observer to fetch from the bootstrap servers on the next fetch.

// The fetch timeout is much greater than the request manager's configured backoff, so the
// current unreachable connection will no longer be backing off when the next fetch is sent.
context.deliverResponse(

@josefk31 josefk31 Jun 4, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC in the actual failure mode which causes this bug, we never get a response from the leader at all. It's completely disconnected. is that the same as receiving a Errors.BROKER_NOT_AVAILABLE? In this test case RaftClieint will actually receive a response while in IRL it will not.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is that the same as receiving a Errors.BROKER_NOT_AVAILABLE

Yes, it is the same. The specific case is when the local node cannot establish a TCP connection with the leader endpoint. This causes the destination node to be considered "disconnected" and the RPC is never sent over the wire. If you trace through the KafkaNetworkChannel + network client code this ends up "returning" a dummy RPC error response to the local raft client with Errors.BROKER_NOT_AVAILABLE. This is what clears up the raft request manager's AWAITING_RESPONSE state for the "connection."

In my opinion, there is less of a clear motivation to change the above behavior compared to allowing the observer to behave more like a voter. What do you think?

leaderFetch.correlationId(),
leaderFetch.destination(),
RaftUtil.errorResponse(
ApiKeys.FETCH,
Errors.BROKER_NOT_AVAILABLE
)
);
context.client.poll();
context.time.sleep(context.fetchTimeoutMs + 1);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2011,7 +2011,7 @@ public void testFollowerAsObserverDoesNotBecomeProspectiveAfterFetchTimeout(bool

context.time.sleep(context.fetchTimeoutMs);
context.pollUntilRequest();
assertTrue(context.client.quorum().isFollower());
assertFalse(context.client.quorum().isProspective());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: do we with to add an assert for whether it becomes Unattached?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.


// transitions to unattached
context.deliverRequest(context.voteRequest(epoch + 1, replicaKey(otherNodeId, withKip853Rpc), epoch, 1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -998,8 +998,13 @@ void advanceTimeAndCompleteFetch(
int leaderId,
boolean expireUpdateVoterSetTimer
) throws Exception {
final var state = client.quorum().followerStateOrThrow();
for (int i = 0; i < NUMBER_FETCH_TIMEOUTS_IN_UPDATE_VOTER_SET_PERIOD; i++) {
time.sleep(fetchTimeoutMs - 1);
long sleepMs = Math.min(
state.remainingFetchTimeMs(time.milliseconds()) - 1,
state.remainingUpdateVoterSetTimeMs(time.milliseconds()) - 1
);
time.sleep(Math.max(0, sleepMs));
pollUntilRequest();
final var fetchRequest = assertSentFetchRequest();
assertFetchRequestData(
Expand All @@ -1025,7 +1030,8 @@ void advanceTimeAndCompleteFetch(
client.poll();
}
if (expireUpdateVoterSetTimer) {
time.sleep(fetchTimeoutMs - 1);
long remaining = state.remainingUpdateVoterSetTimeMs(time.milliseconds());
time.sleep(remaining + 1);
}
}

Expand Down