MINOR: Follow-up KAFKA-20696#22676
Conversation
|
Deal withs the new comments from #22622 (comment) after merge. |
aliehsaeedii
left a comment
There was a problem hiding this comment.
Thanks @frankvicky. I left some minor comments.
| private CompletableFuture<Void> clearStoredDescriptionTopologyEpochBatchAsync( | ||
| Map<String, Integer> expectedStoredEpochByGroupId | ||
| ) { | ||
| TopicPartition tp = topicPartitionFor(expectedStoredEpochByGroupId.keySet().iterator().next()); |
There was a problem hiding this comment.
The batch's target partition is derived from keySet().iterator().next(), which throws NoSuchElementException on an empty map. It's safe today because the only caller guards with if (toClear.isEmpty()), but that coupling is implicit. Consider an early if (expectedStoredEpochByGroupId.isEmpty()) return CompletableFuture.completedFuture(null); here so the method is self-protecting and doesn't depend on the caller's guard staying in place.
| if (throwable != null) { | ||
| log.warn("Failed to clear StoredDescriptionTopologyEpoch for group {}; the next cleanup cycle will retry.", | ||
| groupId, throwable); | ||
| log.warn("Failed to clear StoredDescriptionTopologyEpoch for {} group(s) on partition {}; the next cleanup cycle will retry.", |
There was a problem hiding this comment.
The batched failure log now reports size() + partition instead of the individual groupId, so a failed write no longer tells you which groups didn't get cleared. Given the next-cycle retry guarantee this is acceptable, but logging the group ids (here or at debug) would make a stuck group easier to diagnose.
Deal the comments of #22622
OffsetMetadataManager.allOffsetsExpired→groupHasNoOffsets.Per-topic / per-partition expiration check was O(topics × partitions)
per group. Since the regular offset-expiration cycle already tombstones
expired offsets, this stage only needs the post-state — a single
null-check on
offsetsByGroupplus anopenTransactions.contains(O(1)). Three cycles compose cleanly: offset expiration → topology
cleanup → group tombstone. The now-unused snapshot overload
OpenTransactions.contains(groupId, topic, partition, committedOffset)is removed.
Per-shard batching of the conditional clear write. All groups in
one partition's eligibility read hash to the same
__consumer_offsetspartition, so the cycle now issues one
scheduleWriteOperationpershard carrying every conditional clear for that shard — not one write
per group. Added
GroupCoordinatorShard.clearStoredDescriptionTopologyEpochBatch(Map<String, Integer>)that folds per-group GMM results into oneCoordinatorResult.Restored inline comment on the pending-transactional-offset guard
in
cleanupExpiredOffsets— accidentally dropped during thehelper-extract / inline round-trip.
Reviewers: Alieh Saeedi asaeedi@confluent.io