diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 2104ff42b7bee..b0358baebfdfc 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -1659,9 +1659,11 @@ public void testPollThrowsInterruptExceptionIfInterrupted(GroupProtocol groupPro } } + // NOTE: the CONSUMER protocol path is tested separately in + // FetchRequestManagerTest.testFetchResponseWithUnexpectedPartitionIsIgnored. @ParameterizedTest - @EnumSource(GroupProtocol.class) - public void fetchResponseWithUnexpectedPartitionIsIgnored(GroupProtocol groupProtocol) { + @EnumSource(value = GroupProtocol.class, names = "CLASSIC") + public void testFetchResponseWithUnexpectedPartitionIsIgnored(GroupProtocol groupProtocol) { ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java index 8182eee6f1617..4a5dafaaeb8f8 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java @@ -1760,6 +1760,37 @@ private void fetchRecordsInto(List> allFetchedRec fetchedRecords.values().forEach(allFetchedRecords::addAll); } + @Test + public void testFetchResponseWithUnexpectedPartitionIsIgnored() { + buildFetcher(); + + // Only tp0 is assigned and seeked; tp1 is not part of this fetch session. + // When the response includes an unexpected partition (tp1), FetchSessionHandler + // rejects the entire response, so tp0 records are also not returned. + assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 0); + + assertEquals(1, sendFetches()); + + Map partitions = new LinkedHashMap<>(); + partitions.put(tidp0, new FetchResponseData.PartitionData() + .setPartitionIndex(tp0.partition()) + .setHighWatermark(100L) + .setLogStartOffset(0) + .setRecords(records)); + partitions.put(tidp1, new FetchResponseData.PartitionData() + .setPartitionIndex(tp1.partition()) + .setHighWatermark(100L) + .setLogStartOffset(0) + .setRecords(records)); + client.prepareResponse(FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, new LinkedHashMap<>(partitions), List.of())); + networkClientDelegate.poll(time.timer(0)); + + assertEquals(0, client.inFlightRequestCount()); + assertFalse(fetcher.hasCompletedFetches()); + assertTrue(fetchRecords().isEmpty()); + } + @Test public void testCompletedFetchRemoval() { // Ensure the removal of completed fetches that cause an Exception if and only if they contain empty records.