From c2a3baf557087d80a4768b4cc70a61763816f14b Mon Sep 17 00:00:00 2001 From: Peter Huang Date: Wed, 24 Jun 2026 16:23:21 -0400 Subject: [PATCH] KAFKA-19738: Add unit tests for OffsetsRequestManager ListOffsets request building Adds coverage for ListOffsets request construction and retry routing in OffsetsRequestManager: - isolation level is carried on the request (parameterized over READ_COMMITTED and READ_UNCOMMITTED) - caller target timestamp is carried on the request (parameterized over EARLIEST_TIMESTAMP and LATEST_TIMESTAMP) - a retry after a NOT_LEADER_OR_FOLLOWER error is routed to the new leader following a metadata update --- .../internals/OffsetsRequestManagerTest.java | 105 ++++++++++++++++++ 1 file changed, 105 insertions(+) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java index 17a492e487ac8..344a7f4c1bbf7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java @@ -47,7 +47,9 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; import java.util.ArrayList; @@ -469,6 +471,52 @@ public void testRequestPartiallyFailsWithRetriableError_RetrySucceeds() throws E verifyRequestSuccessfullyCompleted(fetchOffsetsFuture, expectedOffsets); } + @Test + public void testListOffsetsRetriedToNewLeaderAfterMetadataUpdate() throws ExecutionException, + InterruptedException { + Map timestampsToSearch = Collections.singletonMap(TEST_PARTITION_1, + ListOffsetsRequest.EARLIEST_TIMESTAMP); + + mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1)); + CompletableFuture> fetchOffsetsFuture = + requestManager.fetchOffsets(timestampsToSearch, false); + assertEquals(1, requestManager.requestsToSend()); + assertEquals(0, requestManager.requestsToRetry()); + + NetworkClientDelegate.PollResult res = requestManager.poll(time.milliseconds()); + verifySuccessfulPollAwaitingResponse(res); + NetworkClientDelegate.UnsentRequest firstRequest = res.unsentRequests.get(0); + assertEquals(Optional.of(LEADER_1), firstRequest.node()); + + // Original leader returns a retriable error, so the partition queued for retry and metadata update + ClientResponse errorResponse = buildClientResponseWithErrors(firstRequest, + Collections.singletonMap(TEST_PARTITION_1, Errors.NOT_LEADER_OR_FOLLOWER)); + errorResponse.onComplete(); + assertFalse(fetchOffsetsFuture.isDone()); + + assertEquals(0, requestManager.requestsToSend()); + assertEquals(1, requestManager.requestsToRetry()); + verify(metadata).requestUpdate(false); + + // Metadata update moves leadership of the partition to LEADER_2 + mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_2)); + requestManager.onUpdate(new ClusterResource("")); + assertEquals(1, requestManager.requestsToSend()); + assertEquals(0, requestManager.requestsToRetry()); + + // The retry must be routed to the new leader, not the stale one + NetworkClientDelegate.PollResult retriedPoll = requestManager.poll(time.milliseconds()); + verifySuccessfulPollAwaitingResponse(retriedPoll); + NetworkClientDelegate.UnsentRequest retryRequest = retriedPoll.unsentRequests.get(0); + assertEquals(Optional.of(LEADER_2), retryRequest.node()); + + Map expectedOffsets = Collections.singletonMap( + TEST_PARTITION_1, new OffsetAndTimestampInternal(5L, -1, Optional.empty())); + ClientResponse successResponse = buildClientResponse(retryRequest, expectedOffsets); + successResponse.onComplete(); + verifyRequestSuccessfullyCompleted(fetchOffsetsFuture, expectedOffsets); + } + @Test public void testRequestFailedResponse_NonRetriableAuthError() { Map timestampsToSearch = Collections.singletonMap(TEST_PARTITION_1, @@ -889,6 +937,63 @@ public void testRemoteListOffsetsRequestTimeoutMs() { assertEquals(requestTimeoutMs, offsetFetchRequest.timeoutMs()); } + @ParameterizedTest + @EnumSource(IsolationLevel.class) + public void testListOffsetsRequestSendsIsolationLevel(IsolationLevel isolationLevel) { + requestManager = new OffsetsRequestManager( + subscriptionState, + metadata, + isolationLevel, + time, + RETRY_BACKOFF_MS, + REQUEST_TIMEOUT_MS, + DEFAULT_API_TIMEOUT_MS, + apiVersions, + mock(NetworkClientDelegate.class), + commitRequestManager, + new PositionsValidator(new LogContext(), time, subscriptionState, metadata), + new LogContext() + ); + + Map timestampsToSearch = + Collections.singletonMap(TEST_PARTITION_1, ListOffsetsRequest.EARLIEST_TIMESTAMP); + mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1)); + requestManager.fetchOffsets(timestampsToSearch, false); + assertEquals(1, requestManager.requestsToSend()); + assertEquals(0, requestManager.requestsToRetry()); + + NetworkClientDelegate.PollResult pollResult = requestManager.poll(time.milliseconds()); + NetworkClientDelegate.UnsentRequest unsentRequest = pollResult.unsentRequests.get(0); + + AbstractRequest abstractRequest = unsentRequest.requestBuilder().build(); + assertInstanceOf(ListOffsetsRequest.class, abstractRequest); + ListOffsetsRequest listOffsetsRequest = (ListOffsetsRequest) abstractRequest; + + assertEquals(isolationLevel, listOffsetsRequest.isolationLevel()); + } + + @ParameterizedTest + @ValueSource(longs = {ListOffsetsRequest.EARLIEST_TIMESTAMP, ListOffsetsRequest.LATEST_TIMESTAMP}) + public void testListOffsetsRequestSendsTargetTimestamp(long timestamp) { + Map timestampsToSearch = + Collections.singletonMap(TEST_PARTITION_1, timestamp); + mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1)); + requestManager.fetchOffsets(timestampsToSearch, false); + assertEquals(1, requestManager.requestsToSend()); + + NetworkClientDelegate.PollResult pollResult = requestManager.poll(time.milliseconds()); + NetworkClientDelegate.UnsentRequest unsentRequest = pollResult.unsentRequests.get(0); + + AbstractRequest abstractRequest = unsentRequest.requestBuilder().build(); + assertInstanceOf(ListOffsetsRequest.class, abstractRequest); + ListOffsetsRequest listOffsetsRequest = (ListOffsetsRequest) abstractRequest; + + // The target timestamp requested by the caller must be carried on the wire request + assertEquals(TEST_PARTITION_1.topic(), listOffsetsRequest.topics().get(0).name()); + assertEquals(timestamp, + listOffsetsRequest.topics().get(0).partitions().get(0).timestamp()); + } + private void mockAssignedPartitionsMissingPositions(Set assignedPartitions, Set initializingPartitions, Metadata.LeaderAndEpoch leaderAndEpoch) {