From 5e0ff2f74be7d92e950a08c47ab9c38054ec2838 Mon Sep 17 00:00:00 2001 From: david-parkk Date: Wed, 24 Jun 2026 00:18:31 +0900 Subject: [PATCH 1/4] KAFKA-20733: Restrict fetchResponseWithUnexpectedPartitionIsIgnored test to CLASSIC protocol --- .../org/apache/kafka/clients/consumer/KafkaConsumerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 2104ff42b7bee..0b53aa87caeee 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -1660,7 +1660,7 @@ public void testPollThrowsInterruptExceptionIfInterrupted(GroupProtocol groupPro } @ParameterizedTest - @EnumSource(GroupProtocol.class) + @EnumSource(value = GroupProtocol.class, names = "CLASSIC") public void fetchResponseWithUnexpectedPartitionIsIgnored(GroupProtocol groupProtocol) { ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); From 19ed3f2fbf4b854a93b86b21c6e2b219bce05b02 Mon Sep 17 00:00:00 2001 From: david-parkk Date: Wed, 24 Jun 2026 00:28:06 +0900 Subject: [PATCH 2/4] KAFKA-20733: Add FetchRequestManagerTest for unexpected partition in FetchResponse --- .../internals/FetchRequestManagerTest.java | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java index 8182eee6f1617..4a5dafaaeb8f8 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java @@ -1760,6 +1760,37 @@ private void fetchRecordsInto(List> allFetchedRec fetchedRecords.values().forEach(allFetchedRecords::addAll); } + @Test + public void testFetchResponseWithUnexpectedPartitionIsIgnored() { + buildFetcher(); + + // Only tp0 is assigned and seeked; tp1 is not part of this fetch session. + // When the response includes an unexpected partition (tp1), FetchSessionHandler + // rejects the entire response, so tp0 records are also not returned. + assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 0); + + assertEquals(1, sendFetches()); + + Map partitions = new LinkedHashMap<>(); + partitions.put(tidp0, new FetchResponseData.PartitionData() + .setPartitionIndex(tp0.partition()) + .setHighWatermark(100L) + .setLogStartOffset(0) + .setRecords(records)); + partitions.put(tidp1, new FetchResponseData.PartitionData() + .setPartitionIndex(tp1.partition()) + .setHighWatermark(100L) + .setLogStartOffset(0) + .setRecords(records)); + client.prepareResponse(FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, new LinkedHashMap<>(partitions), List.of())); + networkClientDelegate.poll(time.timer(0)); + + assertEquals(0, client.inFlightRequestCount()); + assertFalse(fetcher.hasCompletedFetches()); + assertTrue(fetchRecords().isEmpty()); + } + @Test public void testCompletedFetchRemoval() { // Ensure the removal of completed fetches that cause an Exception if and only if they contain empty records. From 25fc7392ae9992ca5afd928c9a948b665bd075bf Mon Sep 17 00:00:00 2001 From: david-parkk Date: Wed, 24 Jun 2026 01:24:53 +0900 Subject: [PATCH 3/4] KAFKA-20733: Rename testFetchResponseWithUnexpectedPartitionIsIgnored --- .../org/apache/kafka/clients/consumer/KafkaConsumerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 0b53aa87caeee..6e96970af85f8 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -1661,7 +1661,7 @@ public void testPollThrowsInterruptExceptionIfInterrupted(GroupProtocol groupPro @ParameterizedTest @EnumSource(value = GroupProtocol.class, names = "CLASSIC") - public void fetchResponseWithUnexpectedPartitionIsIgnored(GroupProtocol groupProtocol) { + public void testFetchResponseWithUnexpectedPartitionIsIgnored(GroupProtocol groupProtocol) { ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); From 01e8c2066411852813eb54aa4c870f478218c102 Mon Sep 17 00:00:00 2001 From: david-parkk Date: Wed, 24 Jun 2026 07:56:44 +0900 Subject: [PATCH 4/4] KAFKA-20733: Add comment clarifying CONSUMER protocol is tested separately --- .../org/apache/kafka/clients/consumer/KafkaConsumerTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 6e96970af85f8..b0358baebfdfc 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -1659,6 +1659,8 @@ public void testPollThrowsInterruptExceptionIfInterrupted(GroupProtocol groupPro } } + // NOTE: the CONSUMER protocol path is tested separately in + // FetchRequestManagerTest.testFetchResponseWithUnexpectedPartitionIsIgnored. @ParameterizedTest @EnumSource(value = GroupProtocol.class, names = "CLASSIC") public void testFetchResponseWithUnexpectedPartitionIsIgnored(GroupProtocol groupProtocol) {