-
Notifications
You must be signed in to change notification settings - Fork 15.3k
MINOR: Follow-up KAFKA-20696 #22676
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
MINOR: Follow-up KAFKA-20696 #22676
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -862,7 +862,7 @@ CompletableFuture<?> runOneStreamsTopologyCleanupCycle() { | |
| // follow-up writes. Skip the conditional clears so we do not | ||
| // schedule writes against a runtime that is being closed. | ||
| if (!isActive.get()) return CompletableFuture.completedFuture(null); | ||
| List<CompletableFuture<Void>> clearFutures = new ArrayList<>(eligible.size()); | ||
| Map<String, Integer> toClear = new HashMap<>(eligible.size()); | ||
| eligible.forEach((groupId, expectedStoredEpoch) -> { | ||
| if (failures.containsKey(groupId)) { | ||
| // Plugin failed: leave both stored epoch and the push-path | ||
|
|
@@ -879,9 +879,13 @@ CompletableFuture<?> runOneStreamsTopologyCleanupCycle() { | |
| // this groupId. A member that re-creates the same id afterwards | ||
| // is a fresh lifecycle and will arm a fresh back-off chain. | ||
| streamsGroupTopologyDescriptionManager.clearBackoffGroup(groupId); | ||
| clearFutures.add(clearStoredDescriptionTopologyEpochAsync(groupId, expectedStoredEpoch)); | ||
| toClear.put(groupId, expectedStoredEpoch); | ||
| }); | ||
| return CompletableFuture.allOf(clearFutures.toArray(new CompletableFuture<?>[0])); | ||
| if (toClear.isEmpty()) return CompletableFuture.completedFuture(null); | ||
| // All groups in `eligible` came from the same partition's read so they | ||
| // hash to the same __consumer_offsets partition; one batched write covers | ||
| // every clear on this shard. | ||
| return clearStoredDescriptionTopologyEpochBatchAsync(toClear); | ||
| })); | ||
| return null; | ||
| })); | ||
|
|
@@ -911,22 +915,27 @@ private void recordPluginDeleteOutcome(int attempted, int errors) { | |
| } | ||
|
|
||
| /** | ||
| * Conditional metadata write that clears {@code StoredDescriptionTopologyEpoch} for | ||
| * {@code groupId} only when the persisted value still equals {@code expectedStoredEpoch}. | ||
| * Mismatches and missing groups are silently ignored by the shard-side method. Runtime | ||
| * write failures (NOT_COORDINATOR etc.) are logged here and swallowed so a single failed | ||
| * write does not poison the cycle's allOf — the next cycle will retry naturally because | ||
| * the persisted storedEpoch is still non-default. | ||
| * Batched conditional metadata write that clears {@code StoredDescriptionTopologyEpoch} | ||
| * for every entry in {@code expectedStoredEpochByGroupId}, but only for the entries whose | ||
| * persisted value still equals the supplied epoch. Mismatches and missing groups are | ||
| * silently ignored by the shard-side method. All groups in the batch must hash to the same | ||
| * __consumer_offsets partition (the caller guarantees this — the eligibility scan is per | ||
| * partition). Runtime write failures (NOT_COORDINATOR etc.) are logged here and swallowed | ||
| * so a single failed write does not poison the cycle's allOf — the next cycle will retry | ||
| * naturally because the persisted storedEpoch is still non-default. | ||
| */ | ||
| private CompletableFuture<Void> clearStoredDescriptionTopologyEpochAsync(String groupId, int expectedStoredEpoch) { | ||
| return runtime.<Void>scheduleWriteOperation( | ||
| private CompletableFuture<Void> clearStoredDescriptionTopologyEpochBatchAsync( | ||
| Map<String, Integer> expectedStoredEpochByGroupId | ||
| ) { | ||
| TopicPartition tp = topicPartitionFor(expectedStoredEpochByGroupId.keySet().iterator().next()); | ||
| return runtime.scheduleWriteOperation( | ||
| "clear-stored-topology-epoch", | ||
| topicPartitionFor(groupId), | ||
| coordinator -> coordinator.clearStoredDescriptionTopologyEpoch(groupId, expectedStoredEpoch) | ||
| tp, | ||
| coordinator -> coordinator.clearStoredDescriptionTopologyEpochBatch(expectedStoredEpochByGroupId) | ||
| ).handle((__, throwable) -> { | ||
| if (throwable != null) { | ||
| log.warn("Failed to clear StoredDescriptionTopologyEpoch for group {}; the next cleanup cycle will retry.", | ||
| groupId, throwable); | ||
| log.warn("Failed to clear StoredDescriptionTopologyEpoch for {} group(s) on partition {}; the next cleanup cycle will retry.", | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The batched failure log now reports |
||
| expectedStoredEpochByGroupId.size(), tp, throwable); | ||
| } | ||
| return null; | ||
| }); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The batch's target partition is derived from
keySet().iterator().next(), which throwsNoSuchElementExceptionon an empty map. It's safe today because the only caller guards withif (toClear.isEmpty()), but that coupling is implicit. Consider an earlyif (expectedStoredEpochByGroupId.isEmpty()) return CompletableFuture.completedFuture(null);here so the method is self-protecting and doesn't depend on the caller's guard staying in place.