From b0ecf90ab84f91ae9d3f18edcbf5d6fc965e011a Mon Sep 17 00:00:00 2001 From: LiZi Date: Fri, 26 Jun 2026 14:29:06 +0800 Subject: [PATCH] KAFKA-20734: Return noNode for known partitions without endpoints --- .../kafka/metadata/KRaftMetadataCache.java | 11 ++-- .../apache/kafka/metadata/MetadataCache.java | 7 +-- .../kafka/metadata/MetadataCacheTest.java | 59 +++++++++++++++++++ 3 files changed, 68 insertions(+), 9 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/metadata/KRaftMetadataCache.java b/metadata/src/main/java/org/apache/kafka/metadata/KRaftMetadataCache.java index f05e34e408d0d..600aeaaf3e314 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/KRaftMetadataCache.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/KRaftMetadataCache.java @@ -417,17 +417,18 @@ public Map topicNamesToIds() { } /** - * If the leader is not known, return None; - * If the leader is known and corresponding node is available, return Some(node) - * If the leader is known but corresponding node with the listener name is not available, return Some(NO_NODE) + * If the topic or partition is not known, return None. + * If the leader endpoint is available, return Some(node). + * Otherwise, return Some(NO_NODE). */ @Override public Optional getPartitionLeaderEndpoint(String topicName, int partitionId, ListenerName listenerName) { MetadataImage image = currentImage; return Optional.ofNullable(image.topics().getTopic(topicName)) .flatMap(topic -> Optional.ofNullable(topic.partitions().get(partitionId))) - .flatMap(partition -> Optional.ofNullable(image.cluster().broker(partition.leader)) - .map(broker -> broker.node(listenerName.value()).orElse(Node.noNode()))); + .map(partition -> Optional.ofNullable(image.cluster().broker(partition.leader)) + .flatMap(broker -> broker.node(listenerName.value())) + .orElse(Node.noNode())); } @Override diff --git a/metadata/src/main/java/org/apache/kafka/metadata/MetadataCache.java b/metadata/src/main/java/org/apache/kafka/metadata/MetadataCache.java index 555f1866bd83a..5eec3bd4a7ccb 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/MetadataCache.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/MetadataCache.java @@ -101,9 +101,8 @@ List getTopicMetadata( /** * Get a partition leader's endpoint * - * @return If the leader is known, and the listener name is available, return Some(node). If the leader is known, - * but the listener is unavailable, return Some(Node.NO_NODE). Otherwise, if the leader is not known, - * return None + * @return If the topic or partition is not known, return None. If the leader endpoint is available, return + * Some(node). Otherwise, return Some(Node.NO_NODE). */ Optional getPartitionLeaderEndpoint(String topic, int partitionId, ListenerName listenerName); @@ -223,4 +222,4 @@ private static Optional getRandomAliveBroker(MetadataImage image) { if (aliveBrokers.isEmpty()) return Optional.empty(); return Optional.of(aliveBrokers.get(ThreadLocalRandom.current().nextInt(aliveBrokers.size()))); } -} \ No newline at end of file +} diff --git a/metadata/src/test/java/org/apache/kafka/metadata/MetadataCacheTest.java b/metadata/src/test/java/org/apache/kafka/metadata/MetadataCacheTest.java index 7df67f236705f..c30120f4a2b52 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/MetadataCacheTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/MetadataCacheTest.java @@ -504,6 +504,65 @@ public void getAliveBrokersShouldNotBeMutatedByUpdateCache() { } } + @Test + public void testGetPartitionLeaderEndpointForKnownPartitionWithoutEndpoint() { + MetadataCache cache = createCache(); + ListenerName listenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT); + String topic = "topic"; + Uuid topicId = Uuid.randomUuid(); + + List records = List.of( + new RegisterBrokerRecord() + .setBrokerId(0) + .setBrokerEpoch(BROKER_EPOCH) + .setFenced(false) + .setEndPoints(new BrokerEndpointCollection(List.of( + new BrokerEndpoint() + .setHost("foo0") + .setPort(9092) + .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id) + .setName(listenerName.value()) + ))), + new TopicRecord().setName(topic).setTopicId(topicId), + new PartitionRecord() + .setTopicId(topicId) + .setPartitionId(0) + .setLeader(0) + .setLeaderEpoch(0) + .setPartitionEpoch(0) + .setIsr(List.of(0)) + .setReplicas(List.of(0)), + new PartitionRecord() + .setTopicId(topicId) + .setPartitionId(1) + .setLeader(1) + .setLeaderEpoch(0) + .setPartitionEpoch(0) + .setIsr(List.of(1)) + .setReplicas(List.of(1)), + new PartitionRecord() + .setTopicId(topicId) + .setPartitionId(2) + .setLeader(LeaderConstants.NO_LEADER) + .setLeaderEpoch(0) + .setPartitionEpoch(0) + .setIsr(List.of()) + .setReplicas(List.of(0)) + ); + updateCache(cache, records); + + assertEquals(Optional.of(new Node(0, "foo0", 9092)), cache.getPartitionLeaderEndpoint(topic, 0, listenerName)); + assertEquals(Optional.of(Node.noNode()), cache.getPartitionLeaderEndpoint( + topic, + 0, + ListenerName.forSecurityProtocol(SecurityProtocol.SSL) + )); + assertEquals(Optional.of(Node.noNode()), cache.getPartitionLeaderEndpoint(topic, 1, listenerName)); + assertEquals(Optional.of(Node.noNode()), cache.getPartitionLeaderEndpoint(topic, 2, listenerName)); + assertEquals(Optional.empty(), cache.getPartitionLeaderEndpoint(topic, 3, listenerName)); + assertEquals(Optional.empty(), cache.getPartitionLeaderEndpoint("missing-topic", 0, listenerName)); + } + private void updateCacheWithBrokers(MetadataCache cache, List brokerIds, Uuid topicId, List topicRecords) { SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;