[FLINK-36317][runtime] Populate ArchivedExecutionGraph with CheckpointStatsSnapshot in WaitingForResources state#28519
Conversation
…pshot in WaitingForResources state When a job restarts and enters WaitingForResources with a previousExecutionGraph, the checkpoint statistics from the previous execution are now preserved in the ArchivedExecutionGraph returned by getJob(). Previously, these stats were lost because StateWithoutExecutionGraph.getJob() creates a sparse archived graph with empty checkpoint stats. This change: - Adds withCheckpointStatsSnapshot() to ArchivedExecutionGraph for creating a copy with different checkpoint stats - Overrides getJob() in WaitingForResources to attach checkpoint stats from the previousExecutionGraph when available - Adds tests verifying checkpoint stats preservation Signed-off-by: Kaustubh Butte <kaustubhbutte17@gmail.com>
|
Hi @kaustubhbutte17 |
…Test Signed-off-by: Kaustubh Butte <kaustubhbutte17@gmail.com>
|
Thanks @spuru9 for the review! Fixed both issues:
Pushed the fix in the latest commit. |
|
@flinkbot run azure |
|
Gentle ping @spuru9 , or let me know if there is some other way to contact someone for diff reviews, this is my first time contributing |
|
The approach looks sound — overriding One minor observation: the second test ( CI is green and the change is well-scoped — nice first contribution! |
What is the purpose of the change
When a Flink job fails and restarts, it transitions through
Restarting → WaitingForResources. During this state, thepreviousExecutionGraph(which contains checkpoint statistics from the prior execution) is available, but the REST API/Web UI cannot access these stats becauseStateWithoutExecutionGraph.getJob()creates a sparseArchivedExecutionGraphwith empty checkpoint data.This PR fixes that by:
withCheckpointStatsSnapshot()toArchivedExecutionGraph— creates a copy with different checkpoint statsgetJob()inWaitingForResources— attaches checkpoint stats from thepreviousExecutionGraphwhen availableBrief change log
ArchivedExecutionGraph.withCheckpointStatsSnapshot()methodWaitingForResources.getJob()to preserve checkpoint stats from the previous execution graphWaitingForResourcesTestVerifying this change
This change is verified by new unit tests:
testGetJobIncludesCheckpointStatsFromPreviousExecutionGraph— verifies checkpoint stats from a mock previous execution graph are preservedtestGetJobWithoutPreviousExecutionGraphReturnsNullCheckpointStats— verifies the default behavior when no previous execution graph existsDoes this pull request potentially affect one of the following parts:
@Public(Evolving): noDocumentation