Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -417,17 +417,18 @@ public Map<String, Uuid> topicNamesToIds() {
}

/**
* If the leader is not known, return None;
* If the leader is known and corresponding node is available, return Some(node)
* If the leader is known but corresponding node with the listener name is not available, return Some(NO_NODE)
* If the topic or partition is not known, return None.
* If the leader endpoint is available, return Some(node).
* Otherwise, return Some(NO_NODE).
*/
@Override
public Optional<Node> getPartitionLeaderEndpoint(String topicName, int partitionId, ListenerName listenerName) {
MetadataImage image = currentImage;
return Optional.ofNullable(image.topics().getTopic(topicName))
.flatMap(topic -> Optional.ofNullable(topic.partitions().get(partitionId)))
.flatMap(partition -> Optional.ofNullable(image.cluster().broker(partition.leader))
.map(broker -> broker.node(listenerName.value()).orElse(Node.noNode())));
.map(partition -> Optional.ofNullable(image.cluster().broker(partition.leader))
.flatMap(broker -> broker.node(listenerName.value()))
.orElse(Node.noNode()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,8 @@ List<MetadataResponseData.MetadataResponseTopic> getTopicMetadata(
/**
* Get a partition leader's endpoint
*
* @return If the leader is known, and the listener name is available, return Some(node). If the leader is known,
* but the listener is unavailable, return Some(Node.NO_NODE). Otherwise, if the leader is not known,
* return None
* @return If the topic or partition is not known, return None. If the leader endpoint is available, return
* Some(node). Otherwise, return Some(Node.NO_NODE).
*/
Optional<Node> getPartitionLeaderEndpoint(String topic, int partitionId, ListenerName listenerName);

Expand Down Expand Up @@ -223,4 +222,4 @@ private static Optional<Integer> getRandomAliveBroker(MetadataImage image) {
if (aliveBrokers.isEmpty()) return Optional.empty();
return Optional.of(aliveBrokers.get(ThreadLocalRandom.current().nextInt(aliveBrokers.size())));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,65 @@ public void getAliveBrokersShouldNotBeMutatedByUpdateCache() {
}
}

@Test
public void testGetPartitionLeaderEndpointForKnownPartitionWithoutEndpoint() {
MetadataCache cache = createCache();
ListenerName listenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT);
String topic = "topic";
Uuid topicId = Uuid.randomUuid();

List<ApiMessage> records = List.of(
new RegisterBrokerRecord()
.setBrokerId(0)
.setBrokerEpoch(BROKER_EPOCH)
.setFenced(false)
.setEndPoints(new BrokerEndpointCollection(List.of(
new BrokerEndpoint()
.setHost("foo0")
.setPort(9092)
.setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
.setName(listenerName.value())
))),
new TopicRecord().setName(topic).setTopicId(topicId),
new PartitionRecord()
.setTopicId(topicId)
.setPartitionId(0)
.setLeader(0)
.setLeaderEpoch(0)
.setPartitionEpoch(0)
.setIsr(List.of(0))
.setReplicas(List.of(0)),
new PartitionRecord()
.setTopicId(topicId)
.setPartitionId(1)
.setLeader(1)
.setLeaderEpoch(0)
.setPartitionEpoch(0)
.setIsr(List.of(1))
.setReplicas(List.of(1)),
new PartitionRecord()
.setTopicId(topicId)
.setPartitionId(2)
.setLeader(LeaderConstants.NO_LEADER)
.setLeaderEpoch(0)
.setPartitionEpoch(0)
.setIsr(List.of())
.setReplicas(List.of(0))
);
updateCache(cache, records);

assertEquals(Optional.of(new Node(0, "foo0", 9092)), cache.getPartitionLeaderEndpoint(topic, 0, listenerName));
assertEquals(Optional.of(Node.noNode()), cache.getPartitionLeaderEndpoint(
topic,
0,
ListenerName.forSecurityProtocol(SecurityProtocol.SSL)
));
assertEquals(Optional.of(Node.noNode()), cache.getPartitionLeaderEndpoint(topic, 1, listenerName));
assertEquals(Optional.of(Node.noNode()), cache.getPartitionLeaderEndpoint(topic, 2, listenerName));
assertEquals(Optional.empty(), cache.getPartitionLeaderEndpoint(topic, 3, listenerName));
assertEquals(Optional.empty(), cache.getPartitionLeaderEndpoint("missing-topic", 0, listenerName));
}

private void updateCacheWithBrokers(MetadataCache cache, List<Integer> brokerIds,
Uuid topicId, List<ApiMessage> topicRecords) {
SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;
Expand Down