Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;

import java.util.ArrayList;
Expand Down Expand Up @@ -469,6 +471,52 @@ public void testRequestPartiallyFailsWithRetriableError_RetrySucceeds() throws E
verifyRequestSuccessfullyCompleted(fetchOffsetsFuture, expectedOffsets);
}

@Test
public void testListOffsetsRetriedToNewLeaderAfterMetadataUpdate() throws ExecutionException,
InterruptedException {
Map<TopicPartition, Long> timestampsToSearch = Collections.singletonMap(TEST_PARTITION_1,
ListOffsetsRequest.EARLIEST_TIMESTAMP);

mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1));
CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> fetchOffsetsFuture =
requestManager.fetchOffsets(timestampsToSearch, false);
assertEquals(1, requestManager.requestsToSend());
assertEquals(0, requestManager.requestsToRetry());

NetworkClientDelegate.PollResult res = requestManager.poll(time.milliseconds());
verifySuccessfulPollAwaitingResponse(res);
NetworkClientDelegate.UnsentRequest firstRequest = res.unsentRequests.get(0);
assertEquals(Optional.of(LEADER_1), firstRequest.node());

// Original leader returns a retriable error, so the partition queued for retry and metadata update
ClientResponse errorResponse = buildClientResponseWithErrors(firstRequest,
Collections.singletonMap(TEST_PARTITION_1, Errors.NOT_LEADER_OR_FOLLOWER));
errorResponse.onComplete();
assertFalse(fetchOffsetsFuture.isDone());

assertEquals(0, requestManager.requestsToSend());
assertEquals(1, requestManager.requestsToRetry());
verify(metadata).requestUpdate(false);

// Metadata update moves leadership of the partition to LEADER_2
mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_2));
requestManager.onUpdate(new ClusterResource(""));
assertEquals(1, requestManager.requestsToSend());
assertEquals(0, requestManager.requestsToRetry());

// The retry must be routed to the new leader, not the stale one
NetworkClientDelegate.PollResult retriedPoll = requestManager.poll(time.milliseconds());
verifySuccessfulPollAwaitingResponse(retriedPoll);
NetworkClientDelegate.UnsentRequest retryRequest = retriedPoll.unsentRequests.get(0);
assertEquals(Optional.of(LEADER_2), retryRequest.node());

Map<TopicPartition, OffsetAndTimestampInternal> expectedOffsets = Collections.singletonMap(
TEST_PARTITION_1, new OffsetAndTimestampInternal(5L, -1, Optional.empty()));
ClientResponse successResponse = buildClientResponse(retryRequest, expectedOffsets);
successResponse.onComplete();
verifyRequestSuccessfullyCompleted(fetchOffsetsFuture, expectedOffsets);
}

@Test
public void testRequestFailedResponse_NonRetriableAuthError() {
Map<TopicPartition, Long> timestampsToSearch = Collections.singletonMap(TEST_PARTITION_1,
Expand Down Expand Up @@ -889,6 +937,63 @@ public void testRemoteListOffsetsRequestTimeoutMs() {
assertEquals(requestTimeoutMs, offsetFetchRequest.timeoutMs());
}

@ParameterizedTest
@EnumSource(IsolationLevel.class)
public void testListOffsetsRequestSendsIsolationLevel(IsolationLevel isolationLevel) {
requestManager = new OffsetsRequestManager(
subscriptionState,
metadata,
isolationLevel,
time,
RETRY_BACKOFF_MS,
REQUEST_TIMEOUT_MS,
DEFAULT_API_TIMEOUT_MS,
apiVersions,
mock(NetworkClientDelegate.class),
commitRequestManager,
new PositionsValidator(new LogContext(), time, subscriptionState, metadata),
new LogContext()
);

Map<TopicPartition, Long> timestampsToSearch =
Collections.singletonMap(TEST_PARTITION_1, ListOffsetsRequest.EARLIEST_TIMESTAMP);
mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1));
requestManager.fetchOffsets(timestampsToSearch, false);
assertEquals(1, requestManager.requestsToSend());
assertEquals(0, requestManager.requestsToRetry());

NetworkClientDelegate.PollResult pollResult = requestManager.poll(time.milliseconds());
NetworkClientDelegate.UnsentRequest unsentRequest = pollResult.unsentRequests.get(0);

AbstractRequest abstractRequest = unsentRequest.requestBuilder().build();
assertInstanceOf(ListOffsetsRequest.class, abstractRequest);
ListOffsetsRequest listOffsetsRequest = (ListOffsetsRequest) abstractRequest;

assertEquals(isolationLevel, listOffsetsRequest.isolationLevel());
}

@ParameterizedTest
@ValueSource(longs = {ListOffsetsRequest.EARLIEST_TIMESTAMP, ListOffsetsRequest.LATEST_TIMESTAMP})
public void testListOffsetsRequestSendsTargetTimestamp(long timestamp) {
Map<TopicPartition, Long> timestampsToSearch =
Collections.singletonMap(TEST_PARTITION_1, timestamp);
mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1));
requestManager.fetchOffsets(timestampsToSearch, false);
assertEquals(1, requestManager.requestsToSend());

NetworkClientDelegate.PollResult pollResult = requestManager.poll(time.milliseconds());
NetworkClientDelegate.UnsentRequest unsentRequest = pollResult.unsentRequests.get(0);

AbstractRequest abstractRequest = unsentRequest.requestBuilder().build();
assertInstanceOf(ListOffsetsRequest.class, abstractRequest);
ListOffsetsRequest listOffsetsRequest = (ListOffsetsRequest) abstractRequest;

// The target timestamp requested by the caller must be carried on the wire request
assertEquals(TEST_PARTITION_1.topic(), listOffsetsRequest.topics().get(0).name());
assertEquals(timestamp,
listOffsetsRequest.topics().get(0).partitions().get(0).timestamp());
}

private void mockAssignedPartitionsMissingPositions(Set<TopicPartition> assignedPartitions,
Set<TopicPartition> initializingPartitions,
Metadata.LeaderAndEpoch leaderAndEpoch) {
Expand Down