Skip to content

KAFKA-20724: prevent duplicate task in tasks() during pause and resume#22649

Open
zoro30102000 wants to merge 1 commit into
apache:trunkfrom
zoro30102000:KAFKA-20724-pause-resume-duplicate
Open

KAFKA-20724: prevent duplicate task in tasks() during pause and resume#22649
zoro30102000 wants to merge 1 commit into
apache:trunkfrom
zoro30102000:KAFKA-20724-pause-resume-duplicate

Conversation

@zoro30102000

@zoro30102000 zoro30102000 commented Jun 23, 2026

Copy link
Copy Markdown

JIRA: https://issues.apache.org/jira/browse/KAFKA-20724

DefaultStateUpdater.tasks() can return the same TaskId twice. The snapshot is taken under
executeWithQueuesLocked, which holds tasksAndActionsLock, restoredActiveTasksLock and
exceptionsAndFailedTasksLock, but not updatingTasks or pausedTasks. pauseTask does pausedTasks.put(id)
then updatingTasks.remove(id), and resumeTask does the reverse, so for a short window the task sits in
both maps. streamOfTasks() reads both, and ReadOnlyTask has no equals/hashCode, so the returned Set keeps
both wrappers. A caller that does one remove per entry then removes the same task twice. The first remove
succeeds; the second reaches removeTask, finds the task in none of the collections, and completes the
future with null, so waitForFuture turns it into "Task X was not found in the state updater. This
indicates a bug." The same duplicate also breaks TaskManager.allTasks(), which uses Collectors.toMap, with
"Duplicate key".

Fix: take restoredActiveTasksLock (the same lock tasks() already holds) around the two map writes in
pauseTask and resumeTask, so a reader never observes the task in both maps. The changelogReader
transitions stay outside the lock.

Tests: two tests in DefaultStateUpdaterTest hold restoredActiveTasksLock and assert that the pause and
resume transitions block on it. They fail by timeout if the lock is dropped from either method.
restoredActiveTasksLock was relaxed to package private so the tests can take it.

Noticed while here, not fixed in this PR: the standalone updatingTasks() and pausedTasks() accessors
snapshot without restoredActiveTasksLock, so a caller that composes both itself could still see an
inconsistent pair. There is no such caller today (pausedTasks() is test only), so I left it out. Happy to
fold it in here or track it separately.

Reviewers: Lucas Brutschy lbrutschy@confluent.io

@github-actions github-actions Bot added triage PRs from the community streams small Small PRs labels Jun 23, 2026
@zoro30102000 zoro30102000 force-pushed the KAFKA-20724-pause-resume-duplicate branch from 9917262 to 847adf8 Compare June 23, 2026 13:18
private final Queue<StreamTask> restoredActiveTasks = new LinkedList<>();
private final Lock restoredActiveTasksLock = new ReentrantLock();
// visible for testing
final Lock restoredActiveTasksLock = new ReentrantLock();

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using the same lock in other places without making it visible for testing.

How are we testing the other uses of the lock. Can we make it consistent?

@lucasbru

Copy link
Copy Markdown
Member

Fix looks good to me, one Q about the testing approach

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ci-approved small Small PRs streams triage PRs from the community

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants