diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java index d01a095a66cf71..85f176f83b2bdf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.SlowTaskDetectorOptions; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.concurrent.NoMainThreadCheckComponentMainThreadExecutor; +import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils; @@ -264,6 +265,11 @@ void testBalancedInput() throws Exception { ev23.setInputBytes(1024); ev23.getCurrentExecutionAttempt().markFinished(); + final long currentTimeMillis = System.currentTimeMillis(); + setStateTimestamp(ev21, ExecutionState.DEPLOYING, currentTimeMillis - 1000); + setStateTimestamp(ev22, ExecutionState.DEPLOYING, currentTimeMillis - 1000); + setStateTimestamp(ev23, ExecutionState.DEPLOYING, currentTimeMillis - 1); + setStateTimestamp(ev23, ExecutionState.FINISHED, currentTimeMillis); final Map> slowTasks = slowTaskDetector.findSlowTasks(executionGraph); @@ -502,4 +508,10 @@ private static Configuration createSlowTaskDetectorConfiguration( configuration.set(SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_MULTIPLIER, multiplier); return configuration; } + + private static void setStateTimestamp( + ExecutionVertex executionVertex, ExecutionState state, long timestamp) { + executionVertex.getCurrentExecutionAttempt().getStateTimestamps()[state.ordinal()] = + timestamp; + } }