diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index a3c0c2fb56ca1..4935575b9765f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -634,8 +634,14 @@ private void pauseTask(final Task task) { // do not need to unregister changelog partitions for paused tasks try { measureCheckpointLatency(() -> task.maybeCheckpoint()); - pausedTasks.put(taskId, task); - updatingTasks.remove(taskId); + // take the lock tasks() holds so a reader never sees the task in both maps + restoredActiveTasksLock.lock(); + try { + pausedTasks.put(taskId, task); + updatingTasks.remove(taskId); + } finally { + restoredActiveTasksLock.unlock(); + } if (task.isActive()) { transitToUpdateStandbysIfOnlyStandbysLeft(); } @@ -649,8 +655,13 @@ private void pauseTask(final Task task) { private void resumeTask(final Task task) { final TaskId taskId = task.id(); - updatingTasks.put(taskId, task); - pausedTasks.remove(taskId); + restoredActiveTasksLock.lock(); + try { + updatingTasks.put(taskId, task); + pausedTasks.remove(taskId); + } finally { + restoredActiveTasksLock.unlock(); + } if (task.isActive()) { log.info("Stateful active task " + task.id() + " was resumed to the updating tasks of the state updater"); @@ -808,7 +819,8 @@ private void recordRatio(final long now, private final Lock tasksAndActionsLock = new ReentrantLock(); private final Condition tasksAndActionsCondition = tasksAndActionsLock.newCondition(); private final Queue restoredActiveTasks = new LinkedList<>(); - private final Lock restoredActiveTasksLock = new ReentrantLock(); + // visible for testing + final Lock restoredActiveTasksLock = new ReentrantLock(); private final Condition restoredActiveTasksCondition = restoredActiveTasksLock.newCondition(); private final Lock exceptionsAndFailedTasksLock = new ReentrantLock(); private final Queue exceptionsAndFailedTasks = new LinkedList<>(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java index b6461669bd77b..0efa3fbb61b12 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java @@ -51,6 +51,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import static org.apache.kafka.common.utils.Utils.mkEntry; @@ -1199,6 +1200,64 @@ private void shouldResumeStatefulTask(final Task task) throws Exception { verifyUpdatingTasks(task); } + @Test + public void shouldHoldRestoredActiveTasksLockWhilePausingTask() throws Exception { + final ReentrantLock lock = (ReentrantLock) stateUpdater.restoredActiveTasksLock; + final StandbyTask task = standbyTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build(); + stateUpdater.start(); + stateUpdater.add(task); + verifyUpdatingTasks(task); + + // while the test holds the lock, the pause must block instead of moving the task (KAFKA-20724) + lock.lock(); + try { + when(topologyMetadata.isPaused(null)).thenReturn(true); + waitForCondition( + lock::hasQueuedThreads, + VERIFICATION_TIMEOUT, + "State updater thread did not block on the lock while pausing the task!" + ); + assertTrue(stateUpdater.updatingTasks().contains(task)); + assertTrue(stateUpdater.pausedTasks().isEmpty()); + } finally { + lock.unlock(); + } + + verifyPausedTasks(task); + verifyUpdatingTasks(); + } + + @Test + public void shouldHoldRestoredActiveTasksLockWhileResumingTask() throws Exception { + final ReentrantLock lock = (ReentrantLock) stateUpdater.restoredActiveTasksLock; + final StandbyTask task = standbyTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build(); + stateUpdater.start(); + stateUpdater.add(task); + verifyUpdatingTasks(task); + when(topologyMetadata.isPaused(null)).thenReturn(true); + verifyPausedTasks(task); + verifyUpdatingTasks(); + + // the move back must block on the same lock + lock.lock(); + try { + when(topologyMetadata.isPaused(null)).thenReturn(false); + stateUpdater.signalResume(); + waitForCondition( + lock::hasQueuedThreads, + VERIFICATION_TIMEOUT, + "State updater thread did not block on the lock while resuming the task!" + ); + assertTrue(stateUpdater.pausedTasks().contains(task)); + assertTrue(stateUpdater.updatingTasks().isEmpty()); + } finally { + lock.unlock(); + } + + verifyPausedTasks(); + verifyUpdatingTasks(task); + } + @Test public void shouldNotResumeNonExistingTasks() throws Exception { stateUpdater.start();