KAFKA-18775: Simplify controller quorum changes through the Admin API#22531
KAFKA-18775: Simplify controller quorum changes through the Admin API#22531brandboat wants to merge 1 commit into
Conversation
Add convenience Admin APIs that require only a controller ID and extend the voter RPCs to support deriving omitted directory IDs and endpoints. Resolve add-voter details from observer state and registered controller endpoints, and resolve remove-voter directory IDs from the current voter set. Preserve explicitly supplied voter information. Update MetadataQuorumCommand to add controllers by ID and make the directory ID optional when removing controllers. Add Raft, command, and integration coverage for the new behavior and validation paths.
|
@kevin-wu24 , fyi. |
showuon
left a comment
There was a problem hiding this comment.
Thanks for the PR. Left some comments.
| if (endpoints.address(channel.listenerName()).isEmpty()) { | ||
| throw new IllegalArgumentException( | ||
| String.format( | ||
| "Add voter request didn't include the endpoint (%s) for the default listener %s", |
There was a problem hiding this comment.
Is the error message correct for request without listener?
| .toList(); | ||
| if (matchingObservers.size() > 1) { | ||
| throw new IllegalArgumentException( | ||
| String.format("Multiple observers with node ID %d detected (%s).", data.voterId(), matchingObservers) |
There was a problem hiding this comment.
I would add some more instruction for this. Ex: Please remove one of the node and then try it again.
| ); | ||
|
|
||
| context.pollUntilResponse(); | ||
| context.assertSentAddVoterResponse(Errors.INVALID_REQUEST); |
There was a problem hiding this comment.
Since we expect the "error message" should be different in difference cases, could we also verify the it? Same as below.
| } | ||
|
|
||
| @Test | ||
| public void testAddVoterDoesNotReplaceExplicitDirectoryId() throws Exception { |
There was a problem hiding this comment.
TBH, I've read this test twice, I still don't know why it will fail. The test name DoesNotReplaceExplicitDirectoryId is also not clear to me. Could we add comments on each test like the testRemoveVoterDerivesDirectoryId test does to make it clear what we're testing in each test?
There was a problem hiding this comment.
Hi Luke, thanks for the thorough review and sorry for bringing the misunderstanding... the goal of this test is to verify the behavior when a user tries to add a voter with an explicit but incorrect directory id.
The test intentionally creates two different ReplicaKeys with the same node id but different directory ids. The leader has only seen and caught up one of them as an observer, but the AddVoter request asks to add the other one. Because catch-up is tracked by the full ReplicaKey rather than just the node id, the requested voter is not considered caught up, so AddVoterHandler returns REQUEST_TIMED_OUT[1][2].
Update:
So this test verifies that the current implementation does not overwrite or derive the directory id when the user provides one explicitly. Instead, it preserves the user-provided ReplicaKey and passes it through to AddVoterHandler. As before, an incorrect explicit directory id means the requested voter cannot be matched to the caught-up observer, and the request times out.
There was a problem hiding this comment.
I'll add more comments in the test, hope it helps
| private void testAddRemoveRaftVoter(ClusterInstance clusterInstance, boolean usingBootstrapControllers) throws Exception { | ||
| try (Admin admin = Admin.create(adminConfig(clusterInstance, usingBootstrapControllers))) { | ||
| Set<Integer> initialVoters = voterIds(admin.describeMetadataQuorum().quorumInfo().get()); | ||
| AtomicInteger voterId = new AtomicInteger(); |
There was a problem hiding this comment.
Let's make the variable name clear. Maybe voterIdToBeAddedAndRemoved?
| if (cluster.type() == Type.CO_KRAFT) | ||
| if (cluster.type() == Type.CO_KRAFT) | ||
| assertEquals(Math.max(cluster.config().numControllers(), cluster.config().numBrokers()), data.size()); | ||
| else | ||
| else | ||
| assertEquals(cluster.config().numBrokers() + cluster.config().numControllers(), data.size()); |
There was a problem hiding this comment.
nit: Let's revert unneeded empty spaces.
| QuorumInfo info = admin.describeMetadataQuorum().quorumInfo().get(); | ||
| controllerId.set(info.observers().stream() | ||
| .mapToInt(QuorumInfo.ReplicaState::replicaId) | ||
| .filter(cluster.controllerIds()::contains) | ||
| .findFirst() | ||
| .orElseThrow(() -> new AssertionError("No controller observer found in quorum info " + info))); |
There was a problem hiding this comment.
Let's also assert the voter size is 1 before adding voters here.
| * controller node ID (no directory ID or endpoints required from the user). | ||
| */ | ||
| @ClusterTest(types = {Type.KRAFT}, controllers = 2, standalone = true) | ||
| public void testAddAndRemoveControllerByIdSuccessful(ClusterInstance cluster) throws Exception { |
There was a problem hiding this comment.
Question: Could we addVoter with [node ID, directory ID] or [node ID, endpoint] ? If [node ID, directory ID] or [node ID, endpoint] doesn't match with each other, ex: wrong directory ID/endpoint to the node, what will happen? Could we also add tests for them?
|
@jsancio , call for review. Thanks. |
kevin-wu24
left a comment
There was a problem hiding this comment.
Thanks for the feature @brandboat. Left a few comments:
| raftManager.client.setNodeEndpointProvider { nodeId => | ||
| Option(registrationsPublisher.controllers().get(nodeId)).map { registration => | ||
| val endpoints = registration.listeners().asScala.map { case (listenerName, endpoint) => | ||
| ListenerName.normalised(listenerName) -> | ||
| InetSocketAddress.createUnresolved(endpoint.host(), endpoint.port()) | ||
| }.asJava | ||
| Endpoints.fromInetSocketAddresses(endpoints) | ||
| }.getOrElse(Endpoints.empty()) | ||
| } | ||
|
|
There was a problem hiding this comment.
It would be nice if we did not introduce public methods to KafkaRaftClient that are not part of the RaftClient API. This method seems very implementation-specific. I think the issue is that it is not easy to get controller server state into the raft layer.
Is it possible to have the raft client constructor take a NodeEndpointProvider instead as a parameter?
In ControllerServer, I see:
registrationsPublisher = new ControllerRegistrationsPublisher()
We could initialize the publisher in SharedServer if we are a controller, define the NodeEndpointProvider there for based on whether we are broker or controller, and then pass it to KafkaRaftManager. What do you think?
| @@ -206,6 +207,16 @@ class ControllerServer( | |||
|
|
|||
| sharedServer.startForController(listenerInfo) | |||
There was a problem hiding this comment.
This line needs to execute after we set the NodeEndpointProvider if we keep the current approach, not before. When returning from this line, raft network client and io thread have already started doing work.
| context.client.setNodeEndpointProvider(nodeId -> | ||
| nodeId == newVoter.id() ? newListeners : Endpoints.empty() | ||
| ); |
There was a problem hiding this comment.
We should add a withNodeEndpointProvider method to the RaftClientTestContext.Builder class instead.
| context.pollUntilResponse(); | ||
| context.assertSentAddVoterResponse(Errors.REQUEST_TIMED_OUT); |
There was a problem hiding this comment.
I think you're hitting this code in AddVoterHandler#handleApiVersionsResponse:
if (!leaderState.isReplicaCaughtUp(current.voterKey(), currentTimeMs)) {
logger.info(
"Aborting add voter operation for {} at {} since it is lagging behind: {}",
current.voterKey(),
current.voterEndpoints(),
leaderState.getReplicaState(current.voterKey())
);
We do a fetch via the observer replica key to catch it up before sending the add voter request, but not the requestedVoter replica key.
Ideally, putting the wrong directory id in the ADD_RAFT_VOTER request should not even get to this point. We should return INVALID_REQUEST before even entering AddVoterHandler. We should be returning a similar error to the test below.
Because you're running add-voter from the node that is being added (in both manual + auto join case), the directory id would be correct or ZERO_UUID, but never incorrect.
| } | ||
|
|
||
| @Test | ||
| public void testAddVoterDoesNotReplaceExplicitDirectoryId() throws Exception { |
There was a problem hiding this comment.
I am kind of confused about the purpose of this test. We have an observer replica key that the leader should add to observerStates because we send a fetch from that key in prepareLeaderToReceiveAddVoter. Then we send ADD_RAFT_VOTER with the same observer id but a different directory ID, is that correct?
It still seems to me that even after this KIP, the malformed ADD_RAFT_VOTER with the wrong directory UUID delivered on L474 is not handled by the existing code.
| @ClusterTest(controllers = 3, standalone = true) | ||
| public void testAddRemoveRaftVoterByControllers(ClusterInstance clusterInstance) throws Exception { | ||
| testAddRemoveRaftVoter(clusterInstance, true); | ||
| } | ||
|
|
||
| @ClusterTest(controllers = 3, standalone = true) | ||
| public void testAddRemoveRaftVoter(ClusterInstance clusterInstance) throws Exception { | ||
| testAddRemoveRaftVoter(clusterInstance, false); | ||
| } |
There was a problem hiding this comment.
We can parametrize this test based on the boolean usingBootstrapControllers right?
| try (Admin admin = Admin.create(adminConfig(clusterInstance, usingBootstrapControllers))) { | ||
| Set<Integer> initialVoters = voterIds(admin.describeMetadataQuorum().quorumInfo().get()); | ||
| AtomicInteger voterId = new AtomicInteger(); | ||
| TestUtils.retryOnExceptionWithTimeout(30_000, () -> { | ||
| QuorumInfo quorumInfo = admin.describeMetadataQuorum().quorumInfo().get(); | ||
| voterId.set(quorumInfo.observers().stream() |
There was a problem hiding this comment.
How did you set up an observer controller in the integration tests? This is using the KafkaClusterTestKit under the hood right? Mostly curious, since I don't remember that code being able to set up kraft observers that were not brokers.
There was a problem hiding this comment.
Oh I see, we make it standalone, but do not have auto-join on.
Add convenience Admin APIs that require only a controller ID and extend
the voter RPCs to support deriving omitted directory IDs and endpoints.
Resolve add-voter details from observer state and registered controller
endpoints, and resolve remove-voter directory IDs from the current voter
set. Preserve explicitly supplied voter information.
Update MetadataQuorumCommand to add controllers by ID and make the
directory ID optional when removing controllers. Add Raft, command, and
integration coverage for the new behavior and validation paths.
Reviewers: Luke Chen showuon@gmail.com, Kevin Wu kwu@confluent.io