KAFKA-20514: kraft observers should use previous fetch response to decide where to send the next fetch#22111
KAFKA-20514: kraft observers should use previous fetch response to decide where to send the next fetch#22111kevin-wu24 wants to merge 13 commits into
Conversation
|
A label of 'needs-attention' was automatically added to this PR in order to raise the |
| * and a non-leader source's fetch response advertises the leader's endpoints, switch | ||
| * the observer back to fetching from the leader. | ||
| */ | ||
| private void maybeSwitchObserverFetchToLeader( |
There was a problem hiding this comment.
Nit: slightly more descriptive name might be a tad better - for example maybeSwitchObserverToFetchFromLeader.
| long currentTimeMs | ||
| ) { | ||
| if (!hasConsistentLeader(responseEpoch, responseLeaderId)) { | ||
| throw new IllegalStateException("Received request or response with leader " + responseLeaderId + |
There was a problem hiding this comment.
Nit: we might also wish to include quorum.localIdOrSentinel in the exception message for convenience.
There was a problem hiding this comment.
I followed the other usage of hasConsistentLeader, which uses quorum.localId(). If the local leader is empty, hasConsistentLeader returns true.
josefk31
left a comment
There was a problem hiding this comment.
Thanks for changes @kevin-wu24 :)
| .withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_1166_PROTOCOL) | ||
| .build(); | ||
|
|
||
| for (int i = 0; i < 10; ++i) { |
There was a problem hiding this comment.
Is there a specific reason why this loop must run 10 times? It would be nice to add a comment clarifying why there must be 10 iterations.
There was a problem hiding this comment.
Not really, 10 is just a magic number. I just want to make sure we alternate between bootstrap endpoint + leader endpoint continuously.
jsancio
left a comment
There was a problem hiding this comment.
@kevin-wu24 thanks for the fix and detail description. The description helped me understand the issue. Here are some high-level comments before I look into the details of the implementation.
| } | ||
|
|
||
| @Test | ||
| void testObserverFetchesBetweenLeaderAndBootstrapServers() throws Exception { |
There was a problem hiding this comment.
Does this fail without your change? If so, can you tell me exactly what fails?
There was a problem hiding this comment.
Does this fail without your change? If so, can you tell me exactly what fails?
Yes, trunk fails on L800 by sending a fetch request to the leader endpoint instead of the bootstrap endpoint on the second iteration of the for loop. At this point, the fetch timeout is expired (simulating being unable to reach the leader), and the local node will continue to fetch from that endpoint for the remainder of the epoch, instead of trying to fetch from bootstrap servers.
Looking at this again after a while, the for loop actually made this harder for me to read. I'm going to remove it.
| maybeSwitchObserverFetchToLeader( | ||
| responseEpoch, | ||
| responseLeaderId, | ||
| leaderEndpoints, | ||
| currentTimeMs | ||
| ); |
There was a problem hiding this comment.
Can you document the exact RPC trace for this issue? Can you document trace in trunk vs the trace in this branch? I am interested in the RPCs and kraft state transitions.
There was a problem hiding this comment.
Can you document the exact RPC trace for this issue? Can you document trace in trunk vs the trace in this branch? I am interested in the RPCs and kraft state transitions.
Sure, the trace is below. It is a very esoteric edge case, so let me know if you have any questions.
Assume that the observer is unable to establish a TCP connection with the leader endpoint X. This means that requestManager.hasRequestTimedOut in maybeSendFetchToBestNode will return false, since the raft client gets an error response from its network client when it cannot connect to the request destination node. Also, assume that on the subsequent poll(), the requestManager.isBackingOff is also false. This is what I am referring to when I say the fetching destination logic needs to be independent of the request manager state.
The observer starts up either in the Unattached state or the FollowerState with a known leader endpoint X at the current epoch.
On trunk:
- If the observer is in the first case, we enter
pollUnattachedCommonto fetch from bootstrap servers, and if there is a leader, we receive its endpoint X and transition toFollowerState, so we are in the second case going forward. - Observer tries to fetch from the leader going forward. Under our assumptions above,
maybeSendFetchToBestNodewill always execute code in theelse if (!requestManager.hasAnyInflightRequest)branch, which sends a fetch to the unreachable endpoint X. - The observer is now unable to make progress so long as
requestManager.isBackingOffkeeps returning false and endpoint X is partitioned from the observer.
On this branch:
- If the observer is in the first case, we
pollUnattachedCommonto fetch from bootstrap servers, and if there is a leader, we receive its endpoint X and transition toFollowerState, so we are in the second case going forward. - If the observer is unable to fetch from the leader within the fetch timeout, it transitions from
FollowerState(leader X) -> Unattached(leader X)in the same epoch, which allows it to fetch from the bootstrap servers. - If the bootstrap servers are still in the same epoch with the same leader X, their completed fetch response to the observer will transition the observer from
Unattached(leader X) -> FollowerState(leader X). The observer will then fetch from endpoint X. - If there is a leader election and an epoch bump, the observer can discover that from the bootstrap servers. The observer cannot discover this new state if it is stuck fetching from X to which it cannot connect.
The motivation for the proposed behavior is that on trunk, the observer can be "stuck" (depending on the request manager state) fetching from a leader's endpoint which is unreachable. This "starves" the bootstrap endpoints and prevents the observer from discovering a new leader or fetching until X becomes reachable.
There was a problem hiding this comment.
Why do you need this special handler for FETCH? How about the other RPCs that follower send like FETCH_SNAPSHOT and UPDATE_VOTER?
There was a problem hiding this comment.
I am checking:
else if (responseEpoch == quorum.epoch() && quorum.isUnattached() &&
responseLeaderId.isPresent() && !leaderEndpoints.isEmpty()) {
in this method. This handler and logic only applies to nodes in the Unattached state, which can only fetch from the bootstrap servers. A follower whose fetch timeout has expired transitions to Unattached and then can only send Fetch.
|
|
||
| // The fetch timeout is much greater than the request manager's configured backoff, so the | ||
| // current unreachable connection will no longer be backing off when the next fetch is sent. | ||
| context.deliverResponse( |
There was a problem hiding this comment.
IIUC in the actual failure mode which causes this bug, we never get a response from the leader at all. It's completely disconnected. is that the same as receiving a Errors.BROKER_NOT_AVAILABLE? In this test case RaftClieint will actually receive a response while in IRL it will not.
There was a problem hiding this comment.
is that the same as receiving a Errors.BROKER_NOT_AVAILABLE
Yes, it is the same. The specific case is when the local node cannot establish a TCP connection with the leader endpoint. This causes the destination node to be considered "disconnected" and the RPC is never sent over the wire. If you trace through the KafkaNetworkChannel + network client code this ends up "returning" a dummy RPC error response to the local raft client with Errors.BROKER_NOT_AVAILABLE. This is what clears up the raft request manager's AWAITING_RESPONSE state for the "connection."
In my opinion, there is less of a clear motivation to change the above behavior compared to allowing the observer to behave more like a voter. What do you think?
josefk31
left a comment
There was a problem hiding this comment.
Thanks for changes! A few more comments :)
| context.pollUntilRequest(); | ||
| final var bootstrapFetch = context.assertSentFetchRequest(); | ||
| assertEquals(-2, bootstrapFetch.destination().id()); | ||
| assertEquals(RaftClientTestContext.mockAddress(otherVoter.id()).getHostName(), bootstrapFetch.destination().host()); |
There was a problem hiding this comment.
Nit: test would be clearer if we changed the variable name from otherVoter to bootstrapVoter
| context.time.sleep(context.fetchTimeoutMs); | ||
| context.pollUntilRequest(); | ||
| assertTrue(context.client.quorum().isFollower()); | ||
| assertFalse(context.client.quorum().isProspective()); |
There was a problem hiding this comment.
Nit: do we with to add an assert for whether it becomes Unattached?
| } | ||
| return sendResult.timeToWaitMs(); | ||
| } else if (state.hasFetchTimeoutExpired(currentTimeMs)) { | ||
| transitionToUnattached(state.epoch(), OptionalInt.of(state.leaderId())); |
There was a problem hiding this comment.
We should add a test which explicitly checks that the observer can transition to unattached if there is a timeout.
There was a problem hiding this comment.
I added this to the existing unit test.
josefk31
left a comment
There was a problem hiding this comment.
Thanks for changes! Looks good to me :)
| return fetchTimer.remainingMs(); | ||
| } | ||
|
|
||
| public long remainingUpdateVoterSetTimeMs(long currentTimeMs) { |
There was a problem hiding this comment.
what's this change for? it's only used in the testContext?
There was a problem hiding this comment.
It's to fix some logic in RaftClientTestContext#advanceTimeAndCompleteFetch, so that both timers won't be expired on a subsequent invocation of the helper method. I was originally using this method to write my test case, but ended up changing the test to not use this helper. I can remove it from the PR if it is confusing things.
| leaderEndpoints = Endpoints.empty(); | ||
| } | ||
|
|
||
| maybeSwitchObserverFetchToLeader( |
There was a problem hiding this comment.
is this transition not already handled by maybeHandleCommonResponse?
There was a problem hiding this comment.
If we store the leader information when becoming Unattached in the same epoch after the fetch timeout expires, receiving a response from a bootstrap fetch that contains the same leader from the same epoch will fall through to maybeTransition and not match any of the cases in that method.
ahuang98
left a comment
There was a problem hiding this comment.
Partial review, getting up to speed on the issue again.
I do wonder if there's a separate problem regarding timing that is worth fixing.
We should have the following two distinct paths:
- request times out (no response) → RequestManager moves state to
READY→ retry immediately - request errors → RequestManager moves state to
BACKING_OFFforretryBackoffMs
When a fetch can't be sent (e.g. unreachable endpoint), the network channel returns a BROKER_NOT_AVAILABLE response, which is arguably meant to push RequestManager into BACKING_OFF state. But that response is always discarded, and I think that's the separate issue we might want to fix (in a separate PR).
RequestManager's response timeout fires at >= lastSendTimeMs + requestTimeoutMs while the network channel's synthesized BROKER_NOT_AVAILABLE response will only appear at > createdTimeMs + requestTimeoutMs. lastSendTimeMs equals createdTimeMs and both RequestManager and the network channel use the same requestTimeoutMs so the RequestManager timeout will always fire first and we'll never actually see the BROKER_NOT_AVAILABLE response.
| RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); | ||
| if (isBootstrapFetch) { | ||
| assertTrue(context.client.quorum().isUnattached()); | ||
| assertEquals(-2, fetchRequest.destination().id()); |
There was a problem hiding this comment.
This is an implementation detail that I am not sure that we should check in the protocol. Maybe it is enough to check that id is less than -1 (network client hack to capture bootstrap node) and check the destination endpoint is the correct endpoint (leader endpoint vs bootstrap endpoint).
| .withStaticVoters(voters) | ||
| .withBootstrapServers(Optional.of(List.of(RaftClientTestContext.mockAddress(otherVoter.id())))) | ||
| .withBootstrapServers( | ||
| Optional.of(List.of(RaftClientTestContext.mockAddress(bootstrapVoter.id()))) |
There was a problem hiding this comment.
Let's document that you are doing this to reliably check fetches to the leader (known node) vs fetches to the bootstrap server (unknown nodes). Another way to check this is that "bootstrap nodes" have an unknown id. We represent this in the network client by giving those nodes an id less than -1. RPCs to known kafka nodes have an id greater than or equal to 0.
| int leaderId, | ||
| boolean expireUpdateVoterSetTimer | ||
| ) throws Exception { | ||
| final var state = client.quorum().followerStateOrThrow(); |
There was a problem hiding this comment.
Can we implement this without using internal kraft state? These are protocol tests and ideally should not know anything about the internal implementation. In the future we should be able to change the kraft implementation and not have to update any of the KafkaRaftClient*Test tests.
There was a problem hiding this comment.
Hmmm, it looks like #22111 (comment) was because some auto-join tests would fail without this change. It's been a while since I changed this.
Subsequent invocations to this method can expire the fetch timeout. Prior to this change, the local node did not change state, but now you go to Unattached, so completing the fetch transitions the node back to Follower, but with a new updateVoterSetTimer that is not expired. This causes the test to fail on the second assertion of sending add/remove voter.
There was a problem hiding this comment.
I updated this method to not expire the fetch timeout on subsequent invocations without leaking kraft state. I documented how callers are expected to invoke this method.
| public long remainingUpdateVoterSetTimeMs(long currentTimeMs) { | ||
| updateVoterSetPeriodTimer.update(currentTimeMs); | ||
| return updateVoterSetPeriodTimer.remainingMs(); | ||
| } |
There was a problem hiding this comment.
See my other comment but ideally we should not have this method if is not used by the kraft implementation.
There was a problem hiding this comment.
Sounds good, I can remove this.
| maybeSwitchObserverFetchToLeader( | ||
| responseEpoch, | ||
| responseLeaderId, | ||
| leaderEndpoints, | ||
| currentTimeMs | ||
| ); |
There was a problem hiding this comment.
Why do you need this special handler for FETCH? How about the other RPCs that follower send like FETCH_SNAPSHOT and UPDATE_VOTER?
Thanks for the review @ahuang98. WRT to your comment: The case with which this case is dealing is one where a TCP handshake between the local and destination node cannot be established. This is evidenced by repeated logs like the one below from the cluster where this behavior was observed: This means the KRaft fetch request is never sent over the wire by the local node. Instead, the connection failure will result in the connection state being I think you're referring to a case where we fall through to |
Synced offline, it seems we're referring to different incidents which can result in the same broken behavior. My comments on a potential RequestManager & KafkaNetworkChannel timing issue stem from https://gist.github.com/justin-chen/a7deade5b0ab17b33d64ec07cd2542ab |
jsancio
left a comment
There was a problem hiding this comment.
Thanks for the changes. I took a look at the problem in more detail.
| } | ||
|
|
||
| @Test | ||
| void testObserverFetchesBetweenLeaderAndBootstrapServers() throws Exception { |
There was a problem hiding this comment.
I ran this test against this PR and I got this trace:
[2026-06-25 15:02:25,329] INFO Starting request manager with bootstrap servers: [localhost:10634 (id: -2 rack: null isFenced: false)] (org.apache.kafka.raft.KafkaRaftClient:331)
[2026-06-25 15:02:25,561] INFO Reading KRaft snapshot and log as part of the initialization (org.apache.kafka.raft.KafkaRaftClient:509)
[2026-06-25 15:02:25,563] INFO Starting voters are VoterSet(voters={643=VoterNode(voterKey=ReplicaKey(id=643, directoryId=<undefined>), listeners=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10633}), supportedKRaftVersion=SupportedVersionRange[min_version:0, max_version:0]), 644=VoterNode(voterKey=ReplicaKey(id=644, directoryId=<undefined>), listeners=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10634}), supportedKRaftVersion=SupportedVersionRange[min_version:0, max_version:0])}) (org.apache.kafka.raft.KafkaRaftClient:511)
[2026-06-25 15:02:25,565] INFO Attempting durable transition to UnattachedState(epoch=0, leaderId=OptionalInt.empty, votedKey=Optional.empty, voters=[643, 644], electionTimeoutMs=18985, highWatermark=Optional.empty) from null (org.apache.kafka.raft.QuorumState:732)
[2026-06-25 15:02:25,568] INFO Completed transition to UnattachedState(epoch=0, leaderId=OptionalInt.empty, votedKey=Optional.empty, voters=[643, 644], electionTimeoutMs=18985, highWatermark=Optional.empty) from null (org.apache.kafka.raft.QuorumState:744)
[2026-06-25 15:02:25,586] TRACE Sent outbound request: OutboundRequest(correlationId=0, data=FetchRequestData(clusterId='Xs7d_i8LRIuAcKg9hc0dhw', replicaId=-1, replicaState=ReplicaState(replicaId=642, replicaEpoch=-1), maxWaitMs=0, minBytes=0, maxBytes=1048576, isolationLevel=0, sessionId=0, sessionEpoch=-1, topics=[FetchTopic(topic='metadata', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[FetchPartition(partition=0, currentLeaderEpoch=0, fetchOffset=0, lastFetchedEpoch=0, logStartOffset=-1, partitionMaxBytes=0, replicaDirectoryId=ezHminGtTAmIQQ3i5JFLUQ, highWatermark=-1)])], forgottenTopicsData=[], rackId=''), createdTimeMs=1782414145309, destination=localhost:10634 (id: -2 rack: null isFenced: false)) (org.apache.kafka.raft.KafkaRaftClient:2908)
[2026-06-25 15:02:25,587] INFO Registered the listener org.apache.kafka.raft.RaftClientTestContext$MockListener@107632469 (org.apache.kafka.raft.KafkaRaftClient:3590)
[2026-06-25 15:02:25,725] TRACE Received inbound message InboundResponse(correlationId=0, data=FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=0, responses=[FetchableTopicResponse(topic='', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[PartitionData(partitionIndex=0, errorCode=6, highWatermark=0, lastStableOffset=-1, logStartOffset=-1, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=643, leaderEpoch=2), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=[], preferredReadReplica=-1, records=MemoryRecords(size=0, buffer=java.nio.HeapByteBuffer[pos=0 lim=0 cap=37]))])], nodeEndpoints=[NodeEndpoint(nodeId=643, host='localhost', port=10633, rack=null)]), source=localhost:10634 (id: -2 rack: null isFenced: false)) (org.apache.kafka.raft.KafkaRaftClient:2848)
[2026-06-25 15:02:25,726] INFO Attempting durable transition to FollowerState(fetchTimeoutMs=50000, epoch=2, leader=643, leaderEndpoints=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10633}), votedKey=Optional.empty, voters=[643, 644], highWatermark=Optional.empty, fetchingSnapshot=Optional.empty) from UnattachedState(epoch=0, leaderId=OptionalInt.empty, votedKey=Optional.empty, voters=[643, 644], electionTimeoutMs=18985, highWatermark=Optional.empty) (org.apache.kafka.raft.QuorumState:732)
[2026-06-25 15:02:25,727] INFO Completed transition to FollowerState(fetchTimeoutMs=50000, epoch=2, leader=643, leaderEndpoints=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10633}), votedKey=Optional.empty, voters=[643, 644], highWatermark=Optional.empty, fetchingSnapshot=Optional.empty) from UnattachedState(epoch=0, leaderId=OptionalInt.empty, votedKey=Optional.empty, voters=[643, 644], electionTimeoutMs=18985, highWatermark=Optional.empty) (org.apache.kafka.raft.QuorumState:744)
[2026-06-25 15:02:25,728] DEBUG Notifying listener org.apache.kafka.raft.RaftClientTestContext$MockListener@107632469 of leader change LeaderAndEpoch[leaderId=OptionalInt[643], epoch=2] (org.apache.kafka.raft.KafkaRaftClient:4121)
[2026-06-25 15:02:25,836] TRACE Sent outbound request: OutboundRequest(correlationId=1, data=FetchRequestData(clusterId='Xs7d_i8LRIuAcKg9hc0dhw', replicaId=-1, replicaState=ReplicaState(replicaId=642, replicaEpoch=-1), maxWaitMs=0, minBytes=0, maxBytes=1048576, isolationLevel=0, sessionId=0, sessionEpoch=-1, topics=[FetchTopic(topic='metadata', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[FetchPartition(partition=0, currentLeaderEpoch=2, fetchOffset=0, lastFetchedEpoch=0, logStartOffset=-1, partitionMaxBytes=0, replicaDirectoryId=ezHminGtTAmIQQ3i5JFLUQ, highWatermark=-1)])], forgottenTopicsData=[], rackId=''), createdTimeMs=1782414145309, destination=localhost:10633 (id: 643 rack: null isFenced: false)) (org.apache.kafka.raft.KafkaRaftClient:2908)
[2026-06-25 15:02:25,837] INFO Attempting durable transition to UnattachedState(epoch=2, leaderId=OptionalInt[643], votedKey=Optional.empty, voters=[643, 644], electionTimeoutMs=9223372036854775807, highWatermark=Optional.empty) from FollowerState(fetchTimeoutMs=50000, epoch=2, leader=643, leaderEndpoints=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10633}), votedKey=Optional.empty, voters=[643, 644], highWatermark=Optional.empty, fetchingSnapshot=Optional.empty) (org.apache.kafka.raft.QuorumState:732)
[2026-06-25 15:02:25,837] INFO Completed transition to UnattachedState(epoch=2, leaderId=OptionalInt[643], votedKey=Optional.empty, voters=[643, 644], electionTimeoutMs=9223372036854775807, highWatermark=Optional.empty) from FollowerState(fetchTimeoutMs=50000, epoch=2, leader=643, leaderEndpoints=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10633}), votedKey=Optional.empty, voters=[643, 644], highWatermark=Optional.empty, fetchingSnapshot=Optional.empty) (org.apache.kafka.raft.QuorumState:744)
[2026-06-25 15:02:25,837] TRACE Received inbound message InboundResponse(correlationId=1, data=FetchResponseData(throttleTimeMs=0, errorCode=8, sessionId=0, responses=[], nodeEndpoints=[]), source=localhost:10633 (id: 643 rack: null isFenced: false)) (org.apache.kafka.raft.KafkaRaftClient:2848)
[2026-06-25 15:02:25,838] DEBUG Ignoring response InboundResponse(correlationId=1, data=FetchResponseData(throttleTimeMs=0, errorCode=8, sessionId=0, responses=[], nodeEndpoints=[]), source=localhost:10633 (id: 643 rack: null isFenced: false)) since it is no longer needed (org.apache.kafka.raft.KafkaRaftClient:2856)
[2026-06-25 15:02:25,942] TRACE Sent outbound request: OutboundRequest(correlationId=2, data=FetchRequestData(clusterId='Xs7d_i8LRIuAcKg9hc0dhw', replicaId=-1, replicaState=ReplicaState(replicaId=642, replicaEpoch=-1), maxWaitMs=0, minBytes=0, maxBytes=1048576, isolationLevel=0, sessionId=0, sessionEpoch=-1, topics=[FetchTopic(topic='metadata', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[FetchPartition(partition=0, currentLeaderEpoch=2, fetchOffset=0, lastFetchedEpoch=0, logStartOffset=-1, partitionMaxBytes=0, replicaDirectoryId=ezHminGtTAmIQQ3i5JFLUQ, highWatermark=-1)])], forgottenTopicsData=[], rackId=''), createdTimeMs=1782414195310, destination=localhost:10634 (id: -2 rack: null isFenced: false)) (org.apache.kafka.raft.KafkaRaftClient:2908)
[2026-06-25 15:02:25,943] TRACE Received inbound message InboundResponse(correlationId=2, data=FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=0, responses=[FetchableTopicResponse(topic='', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[PartitionData(partitionIndex=0, errorCode=6, highWatermark=0, lastStableOffset=-1, logStartOffset=-1, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=643, leaderEpoch=2), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=[], preferredReadReplica=-1, records=MemoryRecords(size=0, buffer=java.nio.HeapByteBuffer[pos=0 lim=0 cap=37]))])], nodeEndpoints=[NodeEndpoint(nodeId=643, host='localhost', port=10633, rack=null)]), source=localhost:10634 (id: -2 rack: null isFenced: false)) (org.apache.kafka.raft.KafkaRaftClient:2848)
[2026-06-25 15:02:25,943] INFO Attempting durable transition to FollowerState(fetchTimeoutMs=50000, epoch=2, leader=643, leaderEndpoints=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10633}), votedKey=Optional.empty, voters=[643, 644], highWatermark=Optional.empty, fetchingSnapshot=Optional.empty) from UnattachedState(epoch=2, leaderId=OptionalInt[643], votedKey=Optional.empty, voters=[643, 644], electionTimeoutMs=9223372036854775807, highWatermark=Optional.empty) (org.apache.kafka.raft.QuorumState:732)
[2026-06-25 15:02:25,944] INFO Completed transition to FollowerState(fetchTimeoutMs=50000, epoch=2, leader=643, leaderEndpoints=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10633}), votedKey=Optional.empty, voters=[643, 644], highWatermark=Optional.empty, fetchingSnapshot=Optional.empty) from UnattachedState(epoch=2, leaderId=OptionalInt[643], votedKey=Optional.empty, voters=[643, 644], electionTimeoutMs=9223372036854775807, highWatermark=Optional.empty) (org.apache.kafka.raft.QuorumState:744)
[2026-06-25 15:02:26,047] TRACE Sent outbound request: OutboundRequest(correlationId=3, data=FetchRequestData(clusterId='Xs7d_i8LRIuAcKg9hc0dhw', replicaId=-1, replicaState=ReplicaState(replicaId=642, replicaEpoch=-1), maxWaitMs=0, minBytes=0, maxBytes=1048576, isolationLevel=0, sessionId=0, sessionEpoch=-1, topics=[FetchTopic(topic='metadata', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[FetchPartition(partition=0, currentLeaderEpoch=2, fetchOffset=0, lastFetchedEpoch=0, logStartOffset=-1, partitionMaxBytes=0, replicaDirectoryId=ezHminGtTAmIQQ3i5JFLUQ, highWatermark=-1)])], forgottenTopicsData=[], rackId=''), createdTimeMs=1782414195310, destination=localhost:10633 (id: 643 rack: null isFenced: false)) (org.apache.kafka.raft.KafkaRaftClient:2908)
Can we get a TRACE of the actual issue to make sure we are solving the correct problem? I am having a hard time understanding the actual problem so I am sure that this change solves that problem.
There was a problem hiding this comment.
Here is an updated trace from my most recent local changes:
[2026-06-26 09:50:15,417] INFO Starting request manager with bootstrap servers: [localhost:10139 (id: -2 rack: null isFenced: false)] (org.apache.kafka.raft.KafkaRaftClient:331)
[2026-06-26 09:50:15,600] INFO Reading KRaft snapshot and log as part of the initialization (org.apache.kafka.raft.KafkaRaftClient:509)
[2026-06-26 09:50:15,601] INFO Starting voters are VoterSet(voters={148=VoterNode(voterKey=ReplicaKey(id=148, directoryId=<undefined>), listeners=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10138}), supportedKRaftVersion=SupportedVersionRange[min_version:0, max_version:0]), 149=VoterNode(voterKey=ReplicaKey(id=149, directoryId=<undefined>), listeners=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10139}), supportedKRaftVersion=SupportedVersionRange[min_version:0, max_version:0])}) (org.apache.kafka.raft.KafkaRaftClient:511)
[2026-06-26 09:50:15,603] INFO Attempting durable transition to UnattachedState(epoch=0, leaderId=OptionalInt.empty, votedKey=Optional.empty, voters=[148, 149], electionTimeoutMs=18985, highWatermark=Optional.empty) from null (org.apache.kafka.raft.QuorumState:732)
[2026-06-26 09:50:15,605] INFO Completed transition to UnattachedState(epoch=0, leaderId=OptionalInt.empty, votedKey=Optional.empty, voters=[148, 149], electionTimeoutMs=18985, highWatermark=Optional.empty) from null (org.apache.kafka.raft.QuorumState:744)
[2026-06-26 09:50:15,614] TRACE Sent outbound request: OutboundRequest(correlationId=0, data=FetchRequestData(clusterId='sSoE9smGSQqjfEuTnlMPsA', replicaId=-1, replicaState=ReplicaState(replicaId=147, replicaEpoch=-1), maxWaitMs=0, minBytes=0, maxBytes=1048576, isolationLevel=0, sessionId=0, sessionEpoch=-1, topics=[FetchTopic(topic='metadata', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[FetchPartition(partition=0, currentLeaderEpoch=0, fetchOffset=0, lastFetchedEpoch=0, logStartOffset=-1, partitionMaxBytes=0, replicaDirectoryId=XiEwxtuzSGuh5WQsWw8VnQ, highWatermark=-1)])], forgottenTopicsData=[], rackId=''), createdTimeMs=1782485415405, destination=localhost:10139 (id: -2 rack: null isFenced: false)) (org.apache.kafka.raft.KafkaRaftClient:2908)
[2026-06-26 09:50:15,615] INFO Registered the listener org.apache.kafka.raft.RaftClientTestContext$MockListener@220558713 (org.apache.kafka.raft.KafkaRaftClient:3590)
[2026-06-26 09:50:15,707] TRACE Received inbound message InboundResponse(correlationId=0, data=FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=0, responses=[FetchableTopicResponse(topic='', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[PartitionData(partitionIndex=0, errorCode=6, highWatermark=0, lastStableOffset=-1, logStartOffset=-1, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=148, leaderEpoch=2), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=[], preferredReadReplica=-1, records=MemoryRecords(size=0, buffer=java.nio.HeapByteBuffer[pos=0 lim=0 cap=37]))])], nodeEndpoints=[NodeEndpoint(nodeId=148, host='localhost', port=10138, rack=null)]), source=localhost:10139 (id: -2 rack: null isFenced: false)) (org.apache.kafka.raft.KafkaRaftClient:2848)
[2026-06-26 09:50:15,708] INFO Attempting durable transition to FollowerState(fetchTimeoutMs=50000, epoch=2, leader=148, leaderEndpoints=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10138}), votedKey=Optional.empty, voters=[148, 149], highWatermark=Optional.empty, fetchingSnapshot=Optional.empty) from UnattachedState(epoch=0, leaderId=OptionalInt.empty, votedKey=Optional.empty, voters=[148, 149], electionTimeoutMs=18985, highWatermark=Optional.empty) (org.apache.kafka.raft.QuorumState:732)
[2026-06-26 09:50:15,709] INFO Completed transition to FollowerState(fetchTimeoutMs=50000, epoch=2, leader=148, leaderEndpoints=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10138}), votedKey=Optional.empty, voters=[148, 149], highWatermark=Optional.empty, fetchingSnapshot=Optional.empty) from UnattachedState(epoch=0, leaderId=OptionalInt.empty, votedKey=Optional.empty, voters=[148, 149], electionTimeoutMs=18985, highWatermark=Optional.empty) (org.apache.kafka.raft.QuorumState:744)
[2026-06-26 09:50:15,709] DEBUG Notifying listener org.apache.kafka.raft.RaftClientTestContext$MockListener@220558713 of leader change LeaderAndEpoch[leaderId=OptionalInt[148], epoch=2] (org.apache.kafka.raft.KafkaRaftClient:4121)
[2026-06-26 09:50:15,812] TRACE Sent outbound request: OutboundRequest(correlationId=1, data=FetchRequestData(clusterId='sSoE9smGSQqjfEuTnlMPsA', replicaId=-1, replicaState=ReplicaState(replicaId=147, replicaEpoch=-1), maxWaitMs=0, minBytes=0, maxBytes=1048576, isolationLevel=0, sessionId=0, sessionEpoch=-1, topics=[FetchTopic(topic='metadata', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[FetchPartition(partition=0, currentLeaderEpoch=2, fetchOffset=0, lastFetchedEpoch=0, logStartOffset=-1, partitionMaxBytes=0, replicaDirectoryId=XiEwxtuzSGuh5WQsWw8VnQ, highWatermark=-1)])], forgottenTopicsData=[], rackId=''), createdTimeMs=1782485415405, destination=localhost:10138 (id: 148 rack: null isFenced: false)) (org.apache.kafka.raft.KafkaRaftClient:2908)
[2026-06-26 09:50:15,813] TRACE Received inbound message InboundResponse(correlationId=1, data=FetchResponseData(throttleTimeMs=0, errorCode=8, sessionId=0, responses=[], nodeEndpoints=[]), source=localhost:10138 (id: 148 rack: null isFenced: false)) (org.apache.kafka.raft.KafkaRaftClient:2848)
[2026-06-26 09:50:15,814] INFO Attempting durable transition to UnattachedState(epoch=2, leaderId=OptionalInt[148], votedKey=Optional.empty, voters=[148, 149], electionTimeoutMs=9223372036854775807, highWatermark=Optional.empty) from FollowerState(fetchTimeoutMs=50000, epoch=2, leader=148, leaderEndpoints=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10138}), votedKey=Optional.empty, voters=[148, 149], highWatermark=Optional.empty, fetchingSnapshot=Optional.empty) (org.apache.kafka.raft.QuorumState:732)
[2026-06-26 09:50:15,814] INFO Completed transition to UnattachedState(epoch=2, leaderId=OptionalInt[148], votedKey=Optional.empty, voters=[148, 149], electionTimeoutMs=9223372036854775807, highWatermark=Optional.empty) from FollowerState(fetchTimeoutMs=50000, epoch=2, leader=148, leaderEndpoints=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10138}), votedKey=Optional.empty, voters=[148, 149], highWatermark=Optional.empty, fetchingSnapshot=Optional.empty) (org.apache.kafka.raft.QuorumState:744)
[2026-06-26 09:50:15,918] TRACE Sent outbound request: OutboundRequest(correlationId=2, data=FetchRequestData(clusterId='sSoE9smGSQqjfEuTnlMPsA', replicaId=-1, replicaState=ReplicaState(replicaId=147, replicaEpoch=-1), maxWaitMs=0, minBytes=0, maxBytes=1048576, isolationLevel=0, sessionId=0, sessionEpoch=-1, topics=[FetchTopic(topic='metadata', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[FetchPartition(partition=0, currentLeaderEpoch=2, fetchOffset=0, lastFetchedEpoch=0, logStartOffset=-1, partitionMaxBytes=0, replicaDirectoryId=XiEwxtuzSGuh5WQsWw8VnQ, highWatermark=-1)])], forgottenTopicsData=[], rackId=''), createdTimeMs=1782485465406, destination=localhost:10139 (id: -2 rack: null isFenced: false)) (org.apache.kafka.raft.KafkaRaftClient:2908)
[2026-06-26 09:50:15,919] TRACE Received inbound message InboundResponse(correlationId=2, data=FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=0, responses=[FetchableTopicResponse(topic='', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[PartitionData(partitionIndex=0, errorCode=6, highWatermark=0, lastStableOffset=-1, logStartOffset=-1, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=148, leaderEpoch=2), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=[], preferredReadReplica=-1, records=MemoryRecords(size=0, buffer=java.nio.HeapByteBuffer[pos=0 lim=0 cap=37]))])], nodeEndpoints=[NodeEndpoint(nodeId=148, host='localhost', port=10138, rack=null)]), source=localhost:10139 (id: -2 rack: null isFenced: false)) (org.apache.kafka.raft.KafkaRaftClient:2848)
[2026-06-26 09:50:15,920] INFO Attempting durable transition to FollowerState(fetchTimeoutMs=50000, epoch=2, leader=148, leaderEndpoints=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10138}), votedKey=Optional.empty, voters=[148, 149], highWatermark=Optional.empty, fetchingSnapshot=Optional.empty) from UnattachedState(epoch=2, leaderId=OptionalInt[148], votedKey=Optional.empty, voters=[148, 149], electionTimeoutMs=9223372036854775807, highWatermark=Optional.empty) (org.apache.kafka.raft.QuorumState:732)
[2026-06-26 09:50:15,920] INFO Completed transition to FollowerState(fetchTimeoutMs=50000, epoch=2, leader=148, leaderEndpoints=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10138}), votedKey=Optional.empty, voters=[148, 149], highWatermark=Optional.empty, fetchingSnapshot=Optional.empty) from UnattachedState(epoch=2, leaderId=OptionalInt[148], votedKey=Optional.empty, voters=[148, 149], electionTimeoutMs=9223372036854775807, highWatermark=Optional.empty) (org.apache.kafka.raft.QuorumState:744)
[2026-06-26 09:50:16,024] TRACE Sent outbound request: OutboundRequest(correlationId=3, data=FetchRequestData(clusterId='sSoE9smGSQqjfEuTnlMPsA', replicaId=-1, replicaState=ReplicaState(replicaId=147, replicaEpoch=-1), maxWaitMs=0, minBytes=0, maxBytes=1048576, isolationLevel=0, sessionId=0, sessionEpoch=-1, topics=[FetchTopic(topic='metadata', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[FetchPartition(partition=0, currentLeaderEpoch=2, fetchOffset=0, lastFetchedEpoch=0, logStartOffset=-1, partitionMaxBytes=0, replicaDirectoryId=XiEwxtuzSGuh5WQsWw8VnQ, highWatermark=-1)])], forgottenTopicsData=[], rackId=''), createdTimeMs=1782485465406, destination=localhost:10138 (id: 148 rack: null isFenced: false)) (org.apache.kafka.raft.KafkaRaftClient:2908)
The scenario is: after the local node becomes Follower, it is unable to successfully fetch from the leader, instead receiving the BROKER_NOT_AVAILABLE message, for the duration of its fetch timeout. This is shown by the local node sending a fetch to the leader, getting a BROKER_NOT_AVAILABLE response, and only then transitioning to Unattached.
| context.pollUntilRequest(); | ||
| RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); | ||
| if (isBootstrapFetch) { | ||
| assertTrue(context.client.quorum().isUnattached()); |
There was a problem hiding this comment.
Let's avoid testing internal kraft state. We should try to test and check kraft's externalities: RPCs, writes to the log, etc.
| local.id(), | ||
| local.directoryId().get() | ||
| ) | ||
| .withStaticVoters(voters) |
There was a problem hiding this comment.
Why use static voters? Why not enable and use all of the latest features?
| context.deliverResponse( | ||
| leaderFetch.correlationId(), | ||
| leaderFetch.destination(), | ||
| RaftUtil.errorResponse( | ||
| ApiKeys.FETCH, | ||
| Errors.BROKER_NOT_AVAILABLE | ||
| ) | ||
| ); |
There was a problem hiding this comment.
According to the trace I pasted, this response is not deliver before the fetch timeout. This is misleading when reading the test.
| // The fetch timeout is much greater than the request manager's configured backoff, so the | ||
| // current unreachable connection will no longer be backing off when the next fetch is sent. | ||
| // Expire the fetch timeout and check that the next fetch is sent to the bootstrap server again. | ||
| context.time.sleep(context.fetchTimeoutMs + 1); |
There was a problem hiding this comment.
I made this changes and the test pass. It looks like the issue is that the leader is in the backoff state because kraft got an error from the leader:
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java
index 8d762d6c96..3d873add30 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java
@@ -836,7 +836,6 @@ public final class KafkaRaftClientFetchTest {
// The fetch timeout is much greater than the request manager's configured backoff, so the
// current unreachable connection will no longer be backing off when the next fetch is sent.
// Expire the fetch timeout and check that the next fetch is sent to the bootstrap server again.
- context.time.sleep(context.fetchTimeoutMs + 1);
final var nextBootstrapFetch = pollAndCheckObserverFetchRequest(
context,
true,
@@ -854,6 +853,8 @@ public final class KafkaRaftClientFetchTest {
)
);
+ context.time.sleep(context.retryBackoffMs);
+
// Discovering the leader from a bootstrap fetch means the observer resumes fetching from the leader
pollAndCheckObserverFetchRequest(
context,
@@ -871,10 +872,8 @@ public final class KafkaRaftClientFetchTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
if (isBootstrapFetch) {
- assertTrue(context.client.quorum().isUnattached());
assertTrue(fetchRequest.destination().id() < -1);
} else {
- assertTrue(context.client.quorum().isFollower());
assertEquals(expectedDestinationId, fetchRequest.destination().id());
}
// only need to check port since the host is always "localhost" for the mock addressesThere was a problem hiding this comment.
My test is written incorrectly. I agree with your above comments that the response from the leader fetch is not delivered until after the fetch timeout expires.
I need to handle the BROKER_NOT_AVAILABLE response first via poll() to accurately simulate this scenario.
|
Looks like with the current implementation, there is a certain state + message delivery that can result in: Where we aren't calling |
Background
Currently, there is a timing issue where a KRaft observer can be stuck
fetching from the leader if the next poll occurs after the previous
fetch's backoff has completed, and the previous request did not time
out. This can happen if the leader's advertised endpoints are not
routable or there is a network partition. The bootstrap server endpoints
could contain routable endpoints for the leader, but the observer would
be stuck fetching from the unroutable endpoints.
Previously, there was an issue where observers could be stuck fetching
from the bootstrap servers even if it discovers leader endpoints from
the bootstrap fetch. This is because the fetch timeout is not reset on
the observer.
What changed
Observer fetching logic should ensure that within the same epoch, all
the bootstrap server endpoints and the leader have a chance to serve
fetch requests. This logic should be independent of request manager's
state. The key observation is that, just because an observer did not
successfully fetch from node X within its fetch timeout, does not mean
that node X was not actually the leader. Therefore, if the bootstrap
servers say node X is indeed the leader, an observer should resume
trying to fetch from it.
leader
Unattachedwithin the same epoch, and will then fetch from thebootstrap servers.
Unattachedobserver receives a fetch response from bootstrapservers with leader endpoints, the
Unattachedobserver transitionsback to
Followerin the same epoch.A voter has similar functionality where a fetch timeout expiration and a
failed pre-vote election results in a reset of the fetch timer to the
same leader in the same epoch. The following state transition: Follower
-> Prospective -> Follower, allows for a voter to refresh the fetch
timer for the leader within the same epoch, but observers do not have
this behavior currently. This PR proposes adding a similar state
transition
for observers: Follower -> Unattached -> Follower.
Testing
Added unit test to KafkaRaftClientFetchTest to show fetches oscillate
between the leader and bootstrap endpoints based on the fetch timer.
Reviewers: José Armando García Sancio jsancio@apache.org, Jonah Hooper
jhooper@confluent.io, Alyssa Huang ahuang@confluent.io