Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,7 @@ CompletableFuture<?> runOneStreamsTopologyCleanupCycle() {
// follow-up writes. Skip the conditional clears so we do not
// schedule writes against a runtime that is being closed.
if (!isActive.get()) return CompletableFuture.completedFuture(null);
List<CompletableFuture<Void>> clearFutures = new ArrayList<>(eligible.size());
Map<String, Integer> toClear = new HashMap<>(eligible.size());
eligible.forEach((groupId, expectedStoredEpoch) -> {
if (failures.containsKey(groupId)) {
// Plugin failed: leave both stored epoch and the push-path
Expand All @@ -879,9 +879,13 @@ CompletableFuture<?> runOneStreamsTopologyCleanupCycle() {
// this groupId. A member that re-creates the same id afterwards
// is a fresh lifecycle and will arm a fresh back-off chain.
streamsGroupTopologyDescriptionManager.clearBackoffGroup(groupId);
clearFutures.add(clearStoredDescriptionTopologyEpochAsync(groupId, expectedStoredEpoch));
toClear.put(groupId, expectedStoredEpoch);
});
return CompletableFuture.allOf(clearFutures.toArray(new CompletableFuture<?>[0]));
if (toClear.isEmpty()) return CompletableFuture.completedFuture(null);
// All groups in `eligible` came from the same partition's read so they
// hash to the same __consumer_offsets partition; one batched write covers
// every clear on this shard.
return clearStoredDescriptionTopologyEpochBatchAsync(toClear);
}));
return null;
}));
Expand Down Expand Up @@ -911,22 +915,27 @@ private void recordPluginDeleteOutcome(int attempted, int errors) {
}

/**
* Conditional metadata write that clears {@code StoredDescriptionTopologyEpoch} for
* {@code groupId} only when the persisted value still equals {@code expectedStoredEpoch}.
* Mismatches and missing groups are silently ignored by the shard-side method. Runtime
* write failures (NOT_COORDINATOR etc.) are logged here and swallowed so a single failed
* write does not poison the cycle's allOf — the next cycle will retry naturally because
* the persisted storedEpoch is still non-default.
* Batched conditional metadata write that clears {@code StoredDescriptionTopologyEpoch}
* for every entry in {@code expectedStoredEpochByGroupId}, but only for the entries whose
* persisted value still equals the supplied epoch. Mismatches and missing groups are
* silently ignored by the shard-side method. All groups in the batch must hash to the same
* __consumer_offsets partition (the caller guarantees this — the eligibility scan is per
* partition). Runtime write failures (NOT_COORDINATOR etc.) are logged here and swallowed
* so a single failed write does not poison the cycle's allOf — the next cycle will retry
* naturally because the persisted storedEpoch is still non-default.
*/
private CompletableFuture<Void> clearStoredDescriptionTopologyEpochAsync(String groupId, int expectedStoredEpoch) {
return runtime.<Void>scheduleWriteOperation(
private CompletableFuture<Void> clearStoredDescriptionTopologyEpochBatchAsync(
Map<String, Integer> expectedStoredEpochByGroupId
) {
TopicPartition tp = topicPartitionFor(expectedStoredEpochByGroupId.keySet().iterator().next());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The batch's target partition is derived from keySet().iterator().next(), which throws NoSuchElementException on an empty map. It's safe today because the only caller guards with if (toClear.isEmpty()), but that coupling is implicit. Consider an early if (expectedStoredEpochByGroupId.isEmpty()) return CompletableFuture.completedFuture(null); here so the method is self-protecting and doesn't depend on the caller's guard staying in place.

return runtime.scheduleWriteOperation(
"clear-stored-topology-epoch",
topicPartitionFor(groupId),
coordinator -> coordinator.clearStoredDescriptionTopologyEpoch(groupId, expectedStoredEpoch)
tp,
coordinator -> coordinator.clearStoredDescriptionTopologyEpochBatch(expectedStoredEpochByGroupId)
).handle((__, throwable) -> {
if (throwable != null) {
log.warn("Failed to clear StoredDescriptionTopologyEpoch for group {}; the next cleanup cycle will retry.",
groupId, throwable);
log.warn("Failed to clear StoredDescriptionTopologyEpoch for {} group(s) on partition {}; the next cleanup cycle will retry.",

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The batched failure log now reports size() + partition instead of the individual groupId, so a failed write no longer tells you which groups didn't get cleared. Given the next-cycle retry guarantee this is acceptable, but logging the group ids (here or at debug) would make a stuck group easier to diagnose.

expectedStoredEpochByGroupId.size(), tp, throwable);
}
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -985,26 +985,27 @@ public Set<String> streamsGroupsWithStoredTopologyDescription(

/**
* Return the streams groups on this shard eligible for plugin-side topology cleanup: empty
* (no live members), every committed offset already past {@code offsets.retention.ms}, and a
* (no live members), no committed offsets and no pending transactional offsets, and a
* {@code StoredDescriptionTopologyEpoch != -1}. Keyed by group id, valued by the
* {@code StoredDescriptionTopologyEpoch} observed at {@code committedOffset} — the cleanup
* cycle echoes that epoch back to {@link #clearStoredDescriptionTopologyEpoch} so a
* concurrent {@code setTopology} that has since advanced the field cannot be silently undone
* by a stale plugin delete.
*
* <p>This sweep relies on the regular offset-expiration cycle to have already tombstoned
* expired offsets, so the eligibility check itself only asks "does this group still have any
* offsets at all?" — see {@link OffsetMetadataManager#groupHasNoOffsets}. Three independent
* cycles thus compose cleanly: offset expiration, topology cleanup (here), and group
* tombstoning on the next sweep once the stored epoch is cleared.
*
* <p>Every state lookup goes through the snapshot at {@code committedOffset}: the iterated
* group-id set, the per-group resolution, the {@code EMPTY}-state check, the stored
* topology epoch, and the per-offset retention check inside
* {@link OffsetMetadataManager#allOffsetsExpired}. The only non-snapshot input is
* {@code now} from the wall clock, which is unavoidable for "offset has aged past
* retention" and is captured once at the top of the scan so every group is compared
* against the same instant.
* topology epoch, and the no-offsets check.
*
* <p>Non-streams and missing groups are silently skipped. Per-group errors are logged and
* the scan continues so one bad group cannot stall the cycle.
*/
public Map<String, Integer> listStreamsGroupsNeedingTopologyCleanup(long committedOffset) {
long now = time.milliseconds();
Map<String, Integer> eligible = new HashMap<>();
for (String groupId : groupMetadataManager.groupIds(committedOffset)) {
try {
Expand All @@ -1014,7 +1015,7 @@ public Map<String, Integer> listStreamsGroupsNeedingTopologyCleanup(long committ
if (!streamsGroup.isEmpty(committedOffset)) continue;
int storedEpoch = streamsGroup.storedDescriptionTopologyEpoch(committedOffset);
if (storedEpoch == -1) continue;
if (!offsetMetadataManager.allOffsetsExpired(groupId, now, committedOffset)) continue;
if (!offsetMetadataManager.groupHasNoOffsets(groupId, committedOffset)) continue;
eligible.put(groupId, storedEpoch);
} catch (Throwable t) {
// One bad group must not abort the whole scan; the next cycle retries.
Expand All @@ -1037,6 +1038,23 @@ public CoordinatorResult<Void, CoordinatorRecord> clearStoredDescriptionTopology
return groupMetadataManager.clearStoredDescriptionTopologyEpoch(groupId, expectedStoredEpoch);
}

/**
* Batched form of {@link #clearStoredDescriptionTopologyEpoch}: emits one conditional clear
* record per entry in {@code expectedStoredEpochByGroupId} in a single
* {@link CoordinatorResult}, so the cycle issues one {@code scheduleWriteOperation} per
* shard instead of one per group. Groups whose stored epoch no longer matches yield no
* record (the GMM-level method returns an empty list for those), matching the single-group
* variant's silent skip.
*/
public CoordinatorResult<Void, CoordinatorRecord> clearStoredDescriptionTopologyEpochBatch(
Map<String, Integer> expectedStoredEpochByGroupId
) {
List<CoordinatorRecord> records = new ArrayList<>(expectedStoredEpochByGroupId.size());
expectedStoredEpochByGroupId.forEach((groupId, expectedStoredEpoch) ->
records.addAll(groupMetadataManager.clearStoredDescriptionTopologyEpoch(groupId, expectedStoredEpoch).records()));
return new CoordinatorResult<>(records);
}

/**
* Handles a ShareGroupDescribe request.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -282,21 +281,6 @@ private boolean contains(String groupId, String topic, int partition) {
return openTransactions != null;
}

/**
* Snapshot-aware overload of {@link #contains(String, String, int)}: returns
* {@code true} if the given group had any pending transactional offsets for the
* given topic and partition at {@code committedOffset}.
*/
private boolean contains(String groupId, String topic, int partition, long committedOffset) {
TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> openTransactionsByTopic =
openTransactionsByGroup.get(groupId, committedOffset);
if (openTransactionsByTopic == null) return false;
TimelineHashMap<Integer, TimelineHashSet<Long>> openTransactionsByPartition =
openTransactionsByTopic.get(topic, committedOffset);
if (openTransactionsByPartition == null) return false;
return openTransactionsByPartition.containsKey(partition, committedOffset);
}

/**
* Performs the given action for each partition with a pending transactional offset for the given group.
*
Expand Down Expand Up @@ -1061,55 +1045,21 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup fetchAllOffsets(
}

/**
* Read-only counterpart to {@link #cleanupExpiredOffsets(String, List)}: returns whether
* every committed offset for the group is currently eligible for expiration and no pending
* transactional offsets remain. Used by the topology-description plugin cleanup cycle on
* the eligibility read side, where the sweep must not mutate any record but still needs to
* decide whether the group is fully expirable before driving a {@code plugin.deleteTopology}.
* Whether {@code groupId} currently has no committed offsets and no pending transactional
* offsets at the snapshot {@code committedOffset}. Used by the topology-description plugin
* cleanup cycle on the eligibility read side — the regular offset-expiration cycle is
* already running periodically and tombstoning expired offsets, so by the time this is
* called the group's {@code offsetsByGroup} entry is gone iff every committed offset has
* been expired and no transactional commits are in flight.
*
* <p>{@code committedOffset} is the snapshot point the runtime hands to the read operation
* that calls this method. Every timeline-backed lookup in here uses that snapshot — the
* runtime contract is that read operations only observe committed state, so a concurrent
* uncommitted offset commit or pending-transaction record must not flip the eligibility
* outcome on us.
* that calls this method. Both lookups go through that snapshot — the runtime contract is
* that read operations only observe committed state, so a concurrent uncommitted offset
* commit or pending-transaction record must not flip the eligibility outcome on us.
*/
public boolean allOffsetsExpired(String groupId, long currentTimestampMs, long committedOffset) {
TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> offsetsByTopic =
offsets.offsetsByGroup.get(groupId, committedOffset);
if (offsetsByTopic == null) {
return !openTransactions.contains(groupId, committedOffset);
}
Group group;
try {
group = groupMetadataManager.group(groupId, committedOffset);
} catch (GroupIdNotFoundException e) {
// The group disappeared between the caller's existence check and this lookup at
// the same snapshot — it is not eligible for plugin cleanup, the next sweep will
// pick this up naturally.
return false;
}
Optional<OffsetExpirationCondition> offsetExpirationCondition = group.offsetExpirationCondition();
if (offsetExpirationCondition.isEmpty()) {
return false;
}
OffsetExpirationCondition condition = offsetExpirationCondition.get();
for (Map.Entry<String, TimelineHashMap<Integer, OffsetAndMetadata>> topicEntry
: offsetsByTopic.entrySet(committedOffset)) {
String topic = topicEntry.getKey();
if (group.isSubscribedToTopic(topic)) {
return false;
}
for (Map.Entry<Integer, OffsetAndMetadata> partitionEntry
: topicEntry.getValue().entrySet(committedOffset)) {
int partition = partitionEntry.getKey();
OffsetAndMetadata offsetAndMetadata = partitionEntry.getValue();
if (!condition.isOffsetExpired(offsetAndMetadata, currentTimestampMs, config.offsetsRetentionMs())
|| openTransactions.contains(groupId, topic, partition, committedOffset)) {
return false;
}
}
}
return !openTransactions.contains(groupId, committedOffset);
public boolean groupHasNoOffsets(String groupId, long committedOffset) {
return offsets.offsetsByGroup.get(groupId, committedOffset) == null
&& !openTransactions.contains(groupId, committedOffset);
}

/**
Expand Down Expand Up @@ -1143,6 +1093,7 @@ public boolean cleanupExpiredOffsets(String groupId, List<CoordinatorRecord> rec
offsetsByTopic.forEach((topic, partitions) -> {
if (!group.isSubscribedToTopic(topic)) {
partitions.forEach((partition, offsetAndMetadata) -> {
// We don't expire the offset yet if there is a pending transactional offset for the partition.
if (condition.isOffsetExpired(offsetAndMetadata, currentTimestampMs, config.offsetsRetentionMs())
&& !hasPendingTransactionalOffsets(groupId, topic, partition)) {
appendOffsetCommitTombstone(groupId, topic, partition, records);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@

import java.time.Duration;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -951,6 +952,32 @@ public void testCleanupCycleClearsStoredEpochOnPluginSuccess() {
verify(runtime, times(1)).scheduleWriteOperation(eq("clear-stored-topology-epoch"), eq(GROUP_TP), any());
}

@Test
public void testCleanupCycleBatchesClearWritesPerPartition() {
// Two eligible groups land on the same partition's eligibility read — they must trigger
// exactly one scheduleWriteOperation carrying both conditional clears (dajac's per-shard
// batching suggestion), not one write per group.
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
StreamsGroupTopologyDescriptionPlugin plugin = mock(StreamsGroupTopologyDescriptionPlugin.class);
when(plugin.deleteTopology("foo")).thenReturn(CompletableFuture.completedFuture(null));
when(plugin.deleteTopology("bar")).thenReturn(CompletableFuture.completedFuture(null));
Map<String, Integer> eligible = new LinkedHashMap<>();
eligible.put("foo", 4);
eligible.put("bar", 9);
when(runtime.scheduleReadAllOperation(eq("list-streams-groups-needing-topology-cleanup"), any()))
.thenReturn(List.of(CompletableFuture.completedFuture(eligible)));
when(runtime.scheduleWriteOperation(eq("clear-stored-topology-epoch"), any(), any()))
.thenReturn(CompletableFuture.completedFuture(null));

GroupCoordinatorService service = buildService(runtime, Optional.of(plugin), true);
service.runOneStreamsTopologyCleanupCycle();

verify(plugin, times(1)).deleteTopology("foo");
verify(plugin, times(1)).deleteTopology("bar");
// One write covers both groups; not two per-group writes.
verify(runtime, times(1)).scheduleWriteOperation(eq("clear-stored-topology-epoch"), any(), any());
}

@Test
public void testCleanupCycleSkipsClearOnPluginFailure() {
// Plugin fails -> the cycle must NOT clear stored epoch; the group stays gated on
Expand Down
Loading
Loading