diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java index 8fc5719e76a93..8d10af7dd3211 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java @@ -160,7 +160,6 @@ import static org.apache.kafka.coordinator.group.Utils.throwIfEmptyString; import static org.apache.kafka.coordinator.group.Utils.throwIfNotEmptyCollection; import static org.apache.kafka.coordinator.group.Utils.throwIfNotNull; -import static org.apache.kafka.coordinator.group.Utils.throwIfNotNullOrEmpty; import static org.apache.kafka.coordinator.group.Utils.throwIfNull; /** @@ -617,7 +616,6 @@ private static void throwIfStreamsGroupHeartbeatRequestIsInvalid( private static void throwIfStreamsGroupHeartbeatRequestIsUsingUnsupportedFeatures( StreamsGroupHeartbeatRequestData request ) throws InvalidRequestException { - throwIfNotNullOrEmpty(request.warmupTasks(), "WarmupTasks are not supported yet."); if (request.topology() != null) { for (StreamsGroupHeartbeatRequestData.Subtopology subtopology : request.topology().subtopologies()) { throwIfNotEmptyCollection(subtopology.sourceTopicRegex(), "Regular expressions for source topics are not supported yet."); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index c5d86de30c2f1..ad669498d5bd8 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -65,6 +65,7 @@ import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData.Endpoint; import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData.KeyValue; import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData.TaskIds; +import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData.TaskOffset; import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData.Topology; import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData; import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData.Status; @@ -2078,6 +2079,8 @@ private CoordinatorResult stream List ownedActiveTasks, List ownedStandbyTasks, List ownedWarmupTasks, + List taskOffsets, + List taskEndOffsets, String processId, Endpoint userEndpoint, List clientTags, @@ -2129,6 +2132,14 @@ private CoordinatorResult stream ); } + // Store the latest task changelog offsets/end-offsets reported by the member. These are transient telemetry + // (used by the assignor to estimate task lag) and are not persisted. Task offsets and end-offsets are reported + // independently: a null list means "unchanged since the last heartbeat", so we retain the previously reported + // value for whichever of the two is null and only update when at least one is reported. + if (taskOffsets != null || taskEndOffsets != null) { + group.updateTaskOffsets(memberId, group.taskOffsets(memberId).update(taskOffsets, taskEndOffsets)); + } + // 1. Create or update the member. StreamsGroupMember.Builder updatedMemberBuilder = new StreamsGroupMember.Builder(member) .maybeUpdateInstanceId(Optional.ofNullable(instanceId)) @@ -4354,7 +4365,8 @@ private UpdateTargetAssignmentResult maybeUpdateStreamsTargetAssignm .withMembers(updatedMembersAndTargetAssignment.members()) .withTopology(configuredTopology) .withMetadataImage(metadataImage) - .withTargetAssignment(updatedMembersAndTargetAssignment.targetAssignment()); + .withTargetAssignment(updatedMembersAndTargetAssignment.targetAssignment()) + .withTaskOffsets(group.taskOffsets()); long startTimeMs = time.milliseconds(); org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult assignmentResult = @@ -5405,6 +5417,8 @@ public CoordinatorResult streams request.activeTasks(), request.standbyTasks(), request.warmupTasks(), + request.taskOffsets(), + request.taskEndOffsets(), request.processId(), request.userEndpoint(), request.clientTags(), diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/MemberTaskOffsets.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/MemberTaskOffsets.java new file mode 100644 index 0000000000000..40c863b4b2023 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/MemberTaskOffsets.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; +import org.apache.kafka.coordinator.group.streams.assignor.TaskId; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * The latest per-task cumulative changelog offsets and end-offsets that a member reported in its heartbeat. + *

+ * These values are transient telemetry that the assignor uses to estimate task lag (for warm-up promotion). + * Per KIP-1071 they are not persisted to the {@code __consumer_offsets} topic ("not persisted, as they are + * constantly changing"); they are held in memory on the group coordinator and re-reported by the member on the + * task-offset interval. + * + * @param taskOffsets Cumulative changelog offsets per task. + * @param taskEndOffsets Cumulative changelog end-offsets per task. + */ +public record MemberTaskOffsets(Map taskOffsets, Map taskEndOffsets) { + + public static final MemberTaskOffsets EMPTY = new MemberTaskOffsets(Map.of(), Map.of()); + + /** + * Returns a copy of these offsets updated with the values reported in a heartbeat. Task offsets and task + * end-offsets are reported independently: a {@code null} list means "unchanged since the last heartbeat", so the + * corresponding map is retained from this instance even when the other one is updated. + * + * @param reportedTaskOffsets The reported task offsets, or {@code null} if unchanged. + * @param reportedTaskEndOffsets The reported task end-offsets, or {@code null} if unchanged. + */ + public MemberTaskOffsets update( + final List reportedTaskOffsets, + final List reportedTaskEndOffsets + ) { + return new MemberTaskOffsets( + reportedTaskOffsets == null ? taskOffsets : toTaskIdMap(reportedTaskOffsets), + reportedTaskEndOffsets == null ? taskEndOffsets : toTaskIdMap(reportedTaskEndOffsets) + ); + } + + private static Map toTaskIdMap(final List taskOffsets) { + return taskOffsets.stream().collect(Collectors.toMap( + taskOffset -> new TaskId(taskOffset.subtopologyId(), taskOffset.partition()), + StreamsGroupHeartbeatRequestData.TaskOffset::offset + )); + } +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java index 85c0d1d894fa1..ac5219a07931e 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java @@ -194,6 +194,14 @@ public static class DeadlineAndEpoch { private final TimelineHashMap>> currentStandbyTaskToProcessIds; private final TimelineHashMap>> currentWarmupTaskToProcessIds; + /** + * The latest per-task changelog offsets and end-offsets reported by each member, keyed by member ID. + * This is transient telemetry that the assignor uses to estimate task lag for warm-up promotion. Per KIP-1071 + * it is not persisted to the {@code __consumer_offsets} topic; it is held in memory and re-reported by members + * on the task-offset interval (and is therefore lost on coordinator failover until re-reported). + */ + private final Map taskOffsets = new HashMap<>(); + /** * The Streams topology. */ @@ -522,6 +530,32 @@ public void removeMember(String memberId) { removeStaticMember(oldMember); maybeUpdateGroupState(); endpointToPartitionsCache.remove(memberId); + taskOffsets.remove(memberId); + } + + /** + * Updates the latest per-task changelog offsets reported by a member. These are transient and not persisted. + * + * @param memberId The member ID. + * @param memberOffsets The reported task offsets and end-offsets. + */ + public void updateTaskOffsets(String memberId, MemberTaskOffsets memberOffsets) { + taskOffsets.put(memberId, memberOffsets); + } + + /** + * @return The latest per-task changelog offsets reported by the given member, or + * {@link MemberTaskOffsets#EMPTY} if the member has not reported any. + */ + public MemberTaskOffsets taskOffsets(String memberId) { + return taskOffsets.getOrDefault(memberId, MemberTaskOffsets.EMPTY); + } + + /** + * @return An immutable map of the latest per-task changelog offsets reported by each member, keyed by member ID. + */ + public Map taskOffsets() { + return Collections.unmodifiableMap(taskOffsets); } /** diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java index 072dbf90e6f0f..6ce2632b6f986 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java @@ -91,6 +91,12 @@ public class TargetAssignmentBuilder { */ private ConfiguredTopology topology; + /** + * The latest per-task changelog offsets reported by each member, keyed by member ID. Transient (not persisted); + * fed to the assignor so it can estimate task lag. + */ + private Map taskOffsets = Map.of(); + /** * Constructs the object. * @@ -112,7 +118,8 @@ public TargetAssignmentBuilder( static AssignmentMemberSpec createAssignmentMemberSpec( StreamsGroupMember member, - TasksTuple targetAssignment + TasksTuple targetAssignment, + MemberTaskOffsets taskOffsets ) { return new AssignmentMemberSpec( member.instanceId(), @@ -122,8 +129,8 @@ static AssignmentMemberSpec createAssignmentMemberSpec( targetAssignment.warmupTasks(), member.processId(), member.clientTags(), - Map.of(), - Map.of() + taskOffsets.taskOffsets(), + taskOffsets.taskEndOffsets() ); } @@ -151,6 +158,19 @@ public TargetAssignmentBuilder withMembers( return this; } + /** + * Adds the latest per-task changelog offsets reported by each member. + * + * @param taskOffsets The reported task offsets/end-offsets keyed by member ID. + * @return This object. + */ + public TargetAssignmentBuilder withTaskOffsets( + Map taskOffsets + ) { + this.taskOffsets = taskOffsets; + return this; + } + /** * Adds the metadata image to use. * @@ -202,7 +222,8 @@ public TargetAssignmentResult build() throws TaskAssignorException { // Prepare the member spec for all members. members.forEach((memberId, member) -> memberSpecs.put(memberId, createAssignmentMemberSpec( member, - targetAssignment.getOrDefault(memberId, org.apache.kafka.coordinator.group.streams.TasksTuple.EMPTY) + targetAssignment.getOrDefault(memberId, org.apache.kafka.coordinator.group.streams.TasksTuple.EMPTY), + taskOffsets.getOrDefault(memberId, MemberTaskOffsets.EMPTY) ))); // Compute the assignment. diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java index 458d150b01fe9..dec4fc51eee60 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java @@ -601,7 +601,7 @@ public void testStreamsGroupHeartbeatFailsForUnsupportedFeatures() throws Except new StreamsGroupHeartbeatResult( new StreamsGroupHeartbeatResponseData() .setErrorCode(Errors.INVALID_REQUEST.code()) - .setErrorMessage("WarmupTasks are not supported yet."), + .setErrorMessage("Regular expressions for source topics are not supported yet."), Map.of(), -1, -1, @@ -610,29 +610,55 @@ public void testStreamsGroupHeartbeatFailsForUnsupportedFeatures() throws Except service.streamsGroupHeartbeat( context, new StreamsGroupHeartbeatRequestData() - .setWarmupTasks(List.of(new StreamsGroupHeartbeatRequestData.TaskIds())) + .setTopology(new StreamsGroupHeartbeatRequestData.Topology() + .setSubtopologies(List.of(new StreamsGroupHeartbeatRequestData.Subtopology() + .setSourceTopicRegex(List.of("foo.*")) + )) + ) ).get(5, TimeUnit.SECONDS) ); + } - assertEquals( + @Test + public void testStreamsGroupHeartbeatAcceptsTaskOffsetsAndWarmupTasks() throws ExecutionException, InterruptedException, TimeoutException { + CoordinatorRuntime runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() + .setRuntime(runtime) + .setConfig(createConfig()) + .build(true); + + StreamsGroupHeartbeatRequestData request = new StreamsGroupHeartbeatRequestData() + .setGroupId("foo") + .setMemberId(Uuid.randomUuid().toString()) + .setMemberEpoch(1) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of(new StreamsGroupHeartbeatRequestData.TaskIds())) + .setTaskOffsets(List.of(new StreamsGroupHeartbeatRequestData.TaskOffset())) + .setTaskEndOffsets(List.of(new StreamsGroupHeartbeatRequestData.TaskOffset())); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("streams-group-heartbeat"), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture( new StreamsGroupHeartbeatResult( - new StreamsGroupHeartbeatResponseData() - .setErrorCode(Errors.INVALID_REQUEST.code()) - .setErrorMessage("Regular expressions for source topics are not supported yet."), + new StreamsGroupHeartbeatResponseData(), Map.of(), -1, -1, -1 - ), - service.streamsGroupHeartbeat( - context, - new StreamsGroupHeartbeatRequestData() - .setTopology(new StreamsGroupHeartbeatRequestData.Topology() - .setSubtopologies(List.of(new StreamsGroupHeartbeatRequestData.Subtopology() - .setSourceTopicRegex(List.of("foo.*")) - )) - ) - ).get(5, TimeUnit.SECONDS) + ) + )); + + CompletableFuture future = service.streamsGroupHeartbeat( + requestContext(ApiKeys.STREAMS_GROUP_HEARTBEAT), + request + ); + + assertEquals( + new StreamsGroupHeartbeatResult(new StreamsGroupHeartbeatResponseData(), Map.of(), -1, -1, -1), + future.get(5, TimeUnit.SECONDS) ); } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index fbe513e7fd34b..48652f4a73bc2 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -18323,6 +18323,103 @@ public void testStreamsOwnedTasksValidation() { assertEquals("Task 3 for subtopology subtopology1 is invalid. Number of tasks for this subtopology: 3", e2.getMessage()); } + @Test + public void testStreamsGroupHeartbeatStoresTaskOffsetsWithoutPersisting() { + String groupId = "fooup"; + String memberId = Uuid.randomUuid().toString(); + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Uuid fooTopicId = Uuid.randomUuid(); + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)) + )); + + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 3) + .buildCoordinatorMetadataImage(); + long groupMetadataHash = computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage) + )); + + MockTaskAssignor assignor = new MockTaskAssignor("sticky"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroupTaskAssignors(List.of(assignor)) + .withMetadataImage(metadataImage) + .withStreamsGroup(new StreamsGroupBuilder(groupId, 10) + .withMember(streamsGroupMemberBuilderWithDefaults(memberId) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setAssignedTasks(mkTasksTupleWithCommonEpoch(TaskRole.ACTIVE, 10, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2))) + .build()) + .withTopology(StreamsTopology.fromHeartbeatRequest(topology)) + .withTargetAssignment(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2))) + .withTargetAssignmentEpoch(10) + .withMetadataHash(groupMetadataHash) + .withValidatedTopologyEpoch(0) + .withLastAssignmentConfigs(Map.of("num.standby.replicas", "0")) + ) + .build(); + + CoordinatorResult result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(10) + .setProcessId("process-id") + .setRebalanceTimeoutMs(1500) + .setActiveTasks(List.of(new StreamsGroupHeartbeatRequestData.TaskIds() + .setSubtopologyId(subtopology1) + .setPartitions(List.of(0, 1, 2)))) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()) + .setTaskOffsets(List.of(new StreamsGroupHeartbeatRequestData.TaskOffset() + .setSubtopologyId(subtopology1).setPartition(0).setOffset(10L))) + .setTaskEndOffsets(List.of(new StreamsGroupHeartbeatRequestData.TaskOffset() + .setSubtopologyId(subtopology1).setPartition(0).setOffset(20L)))); + + // An offset-only heartbeat on a stable member must not produce any records: the reported offsets are + // transient telemetry, so they trigger no member-metadata record and no group-epoch bump. + assertEquals(List.of(), result.records()); + + // The reported offsets are retained in memory for the assignor / describe path. + StreamsGroup group = context.groupMetadataManager.streamsGroup(groupId); + assertEquals( + new org.apache.kafka.coordinator.group.streams.MemberTaskOffsets( + Map.of(new org.apache.kafka.coordinator.group.streams.assignor.TaskId(subtopology1, 0), 10L), + Map.of(new org.apache.kafka.coordinator.group.streams.assignor.TaskId(subtopology1, 0), 20L) + ), + group.taskOffsets(memberId) + ); + + // A follow-up heartbeat reports only the task offsets (end-offsets unchanged, i.e. null). The new offsets + // are stored while the previously reported end-offsets are retained. + result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(10) + .setProcessId("process-id") + .setRebalanceTimeoutMs(1500) + .setActiveTasks(List.of(new StreamsGroupHeartbeatRequestData.TaskIds() + .setSubtopologyId(subtopology1) + .setPartitions(List.of(0, 1, 2)))) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()) + .setTaskOffsets(List.of(new StreamsGroupHeartbeatRequestData.TaskOffset() + .setSubtopologyId(subtopology1).setPartition(0).setOffset(12L)))); + + assertEquals(List.of(), result.records()); + assertEquals( + new org.apache.kafka.coordinator.group.streams.MemberTaskOffsets( + Map.of(new org.apache.kafka.coordinator.group.streams.assignor.TaskId(subtopology1, 0), 12L), + Map.of(new org.apache.kafka.coordinator.group.streams.assignor.TaskId(subtopology1, 0), 20L) + ), + group.taskOffsets(memberId) + ); + } + @Test public void testStreamsNewMemberIsRejectedWithMaximumMembersIsReached() { String groupId = "fooup"; diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/MemberTaskOffsetsTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/MemberTaskOffsetsTest.java new file mode 100644 index 0000000000000..68ff6b0e3702d --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/MemberTaskOffsetsTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; +import org.apache.kafka.coordinator.group.streams.assignor.TaskId; + +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class MemberTaskOffsetsTest { + + private static StreamsGroupHeartbeatRequestData.TaskOffset taskOffset(final String subtopologyId, + final int partition, + final long offset) { + return new StreamsGroupHeartbeatRequestData.TaskOffset() + .setSubtopologyId(subtopologyId) + .setPartition(partition) + .setOffset(offset); + } + + @Test + public void shouldConvertHeartbeatRequestTaskOffsets() { + MemberTaskOffsets result = MemberTaskOffsets.EMPTY.update( + List.of(taskOffset("sub-1", 0, 10L), taskOffset("sub-1", 1, 20L)), + List.of(taskOffset("sub-1", 0, 15L), taskOffset("sub-1", 1, 25L)) + ); + + assertEquals( + Map.of(new TaskId("sub-1", 0), 10L, new TaskId("sub-1", 1), 20L), + result.taskOffsets() + ); + assertEquals( + Map.of(new TaskId("sub-1", 0), 15L, new TaskId("sub-1", 1), 25L), + result.taskEndOffsets() + ); + } + + @Test + public void shouldRetainBothMapsWhenBothListsAreNull() { + MemberTaskOffsets previous = new MemberTaskOffsets( + Map.of(new TaskId("sub-1", 0), 10L), + Map.of(new TaskId("sub-1", 0), 15L) + ); + + assertEquals(previous, previous.update(null, null)); + assertEquals(MemberTaskOffsets.EMPTY, MemberTaskOffsets.EMPTY.update(null, null)); + } + + @Test + public void shouldUpdateTaskOffsetsAndRetainTaskEndOffsetsWhenEndOffsetsNull() { + MemberTaskOffsets previous = new MemberTaskOffsets( + Map.of(new TaskId("sub-1", 0), 10L), + Map.of(new TaskId("sub-1", 0), 15L) + ); + + MemberTaskOffsets result = previous.update(List.of(taskOffset("sub-1", 0, 12L)), null); + + assertEquals(Map.of(new TaskId("sub-1", 0), 12L), result.taskOffsets()); + // The end-offsets were not reported, so the previously reported values are retained. + assertEquals(Map.of(new TaskId("sub-1", 0), 15L), result.taskEndOffsets()); + } + + @Test + public void shouldUpdateTaskEndOffsetsAndRetainTaskOffsetsWhenOffsetsNull() { + MemberTaskOffsets previous = new MemberTaskOffsets( + Map.of(new TaskId("sub-1", 0), 10L), + Map.of(new TaskId("sub-1", 0), 15L) + ); + + MemberTaskOffsets result = previous.update(null, List.of(taskOffset("sub-1", 0, 18L))); + + // The offsets were not reported, so the previously reported values are retained. + assertEquals(Map.of(new TaskId("sub-1", 0), 10L), result.taskOffsets()); + assertEquals(Map.of(new TaskId("sub-1", 0), 18L), result.taskEndOffsets()); + } +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java index 159060359ce4f..7b0e97a8bcbc5 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java @@ -47,6 +47,7 @@ import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; import org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState; import org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.TaskRole; +import org.apache.kafka.coordinator.group.streams.assignor.TaskId; import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology; import org.apache.kafka.image.MetadataImage; import org.apache.kafka.timeline.SnapshotRegistry; @@ -110,6 +111,46 @@ public void testGetOrCreateUninitializedMember() { assertNotEquals(uninitializedMember, streamsGroup.getOrCreateUninitializedMember("member-id")); } + @Test + public void testUpdateAndRetrieveTaskOffsets() { + StreamsGroup streamsGroup = createStreamsGroup("foo"); + + assertEquals(MemberTaskOffsets.EMPTY, streamsGroup.taskOffsets("member-id")); + assertEquals(Map.of(), streamsGroup.taskOffsets()); + + MemberTaskOffsets offsets = new MemberTaskOffsets( + Map.of(new TaskId("sub-1", 0), 10L), + Map.of(new TaskId("sub-1", 0), 20L) + ); + streamsGroup.updateTaskOffsets("member-id", offsets); + + assertEquals(offsets, streamsGroup.taskOffsets("member-id")); + assertEquals(Map.of("member-id", offsets), streamsGroup.taskOffsets()); + + // A new report replaces the previous one. + MemberTaskOffsets newerOffsets = new MemberTaskOffsets( + Map.of(new TaskId("sub-1", 0), 15L), + Map.of(new TaskId("sub-1", 0), 25L) + ); + streamsGroup.updateTaskOffsets("member-id", newerOffsets); + assertEquals(newerOffsets, streamsGroup.taskOffsets("member-id")); + } + + @Test + public void testRemoveMemberClearsTaskOffsets() { + StreamsGroup streamsGroup = createStreamsGroup("foo"); + streamsGroup.updateMember(new StreamsGroupMember.Builder("member-id").build()); + streamsGroup.updateTaskOffsets("member-id", new MemberTaskOffsets( + Map.of(new TaskId("sub-1", 0), 10L), + Map.of(new TaskId("sub-1", 0), 20L) + )); + + streamsGroup.removeMember("member-id"); + + assertEquals(MemberTaskOffsets.EMPTY, streamsGroup.taskOffsets("member-id")); + assertEquals(Map.of(), streamsGroup.taskOffsets()); + } + @Test public void testGetOrCreateDefaultMember() { StreamsGroup streamsGroup = createStreamsGroup("foo"); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java index d4e00e51969fd..0edb840870c19 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java @@ -29,6 +29,7 @@ import org.apache.kafka.coordinator.group.streams.assignor.GroupSpecImpl; import org.apache.kafka.coordinator.group.streams.assignor.MemberAssignment; import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor; +import org.apache.kafka.coordinator.group.streams.assignor.TaskId; import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology; import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology; @@ -106,7 +107,8 @@ public void testCreateAssignmentMemberSpec(TaskRole taskRole) { AssignmentMemberSpec assignmentMemberSpec = createAssignmentMemberSpec( member, - assignment + assignment, + MemberTaskOffsets.EMPTY ); assertEquals(new AssignmentMemberSpec( @@ -122,6 +124,30 @@ public void testCreateAssignmentMemberSpec(TaskRole taskRole) { ), assignmentMemberSpec); } + @Test + public void testCreateAssignmentMemberSpecPopulatesTaskOffsets() { + String fooSubtopologyId = Uuid.randomUuid().toString(); + + StreamsGroupMember member = new StreamsGroupMember.Builder("member-id") + .setRackId("rackId") + .setInstanceId("instanceId") + .setProcessId("processId") + .setClientTags(Map.of()) + .build(); + + Map taskOffsets = Map.of(new TaskId(fooSubtopologyId, 0), 10L); + Map taskEndOffsets = Map.of(new TaskId(fooSubtopologyId, 0), 20L); + + AssignmentMemberSpec assignmentMemberSpec = createAssignmentMemberSpec( + member, + TasksTuple.EMPTY, + new MemberTaskOffsets(taskOffsets, taskEndOffsets) + ); + + assertEquals(taskOffsets, assignmentMemberSpec.taskOffsets()); + assertEquals(taskEndOffsets, assignmentMemberSpec.taskEndOffsets()); + } + @Test public void testEmpty() { TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext( @@ -407,7 +433,8 @@ public org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.Target members.forEach((memberId, member) -> memberSpecs.put(memberId, createAssignmentMemberSpec( member, - targetAssignment.getOrDefault(memberId, TasksTuple.EMPTY) + targetAssignment.getOrDefault(memberId, TasksTuple.EMPTY), + MemberTaskOffsets.EMPTY ) ));