From b8386e54448fd27f57df1d58e7c22559d8a5c683 Mon Sep 17 00:00:00 2001 From: Yanjun Qiu <153984347+qiuyanjun888@users.noreply.github.com> Date: Thu, 25 Jun 2026 10:10:26 +0800 Subject: [PATCH] [FLINK-39620][table-runtime] Add restored cumulative window regression --- .../window/SlicingWindowAggOperatorTest.java | 109 ++++++++++++++++++ .../window/WindowAggOperatorTestBase.java | 25 ++++ 2 files changed, 134 insertions(+) diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java index aaece6302f1a40..75b42b270986f6 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java @@ -25,6 +25,10 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceAssigner; import org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceAssigners; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.VarCharType; import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; @@ -41,11 +45,19 @@ import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.fail; /** Tests for slicing window aggregate operators created by {@link WindowAggOperatorBuilder}. */ @ExtendWith(ParameterizedTestExtension.class) class SlicingWindowAggOperatorTest extends WindowAggOperatorTestBase { + private static final RowDataSerializer LOCAL_ACC_INPUT_ROW_SER = + new RowDataSerializer( + new VarCharType(Integer.MAX_VALUE), + new BigIntType(), + new BigIntType(), + new TimestampType()); + public SlicingWindowAggOperatorTest(ZoneId shiftTimeZone, boolean enableAsyncState) { super(shiftTimeZone, enableAsyncState); } @@ -494,6 +506,77 @@ void testEventTimeCumulativeWindows() throws Exception { testHarness.close(); } + @TestTemplate + void testGlobalEventTimeCumulativeWindowsDoNotRefireExpiredWindowAfterRestore() + throws Exception { + final SliceAssigner assigner = + SliceAssigners.cumulative( + 3, shiftTimeZone, Duration.ofSeconds(3), Duration.ofSeconds(1)); + final SlicingSumAndCountAggsFunction globalAggsFunction = + new SlicingSumAndCountAggsFunction(assigner); + final SlicingSumAndCountAggsFunction stateAggsFunction = + new SlicingSumAndCountAggsFunction(assigner); + OneInputStreamOperator operator = + buildGlobalWindowOperator( + assigner, + LOCAL_ACC_INPUT_ROW_SER, + new LocalAccumulatorRowsAggsFunction(assigner), + globalAggsFunction, + stateAggsFunction, + null); + + OneInputStreamOperatorTestHarness testHarness = + createTestHarness(operator); + + testHarness.setup(OUT_SERIALIZER); + testHarness.open(); + + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + + testHarness.processElement(insertRecord("key1", 1L, 1L, fromEpochMillis(20L))); + testHarness.processElement(insertRecord("key1", 1L, 1L, fromEpochMillis(0L))); + testHarness.processElement(insertRecord("key1", 1L, 1L, fromEpochMillis(999L))); + + testHarness.processElement(insertRecord("key2", 1L, 1L, fromEpochMillis(1998L))); + testHarness.processElement(insertRecord("key2", 1L, 1L, fromEpochMillis(1999L))); + testHarness.processElement(insertRecord("key2", 1L, 1L, fromEpochMillis(1000L))); + + testHarness.processWatermark(new Watermark(999)); + expectedOutput.add(insertRecord("key1", 3L, 3L, localMills(0L), localMills(1000L))); + expectedOutput.add(new Watermark(999)); + ASSERTER.assertOutputEqualsSorted( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.processWatermark(new Watermark(1999)); + expectedOutput.add(insertRecord("key1", 3L, 3L, localMills(0L), localMills(2000L))); + expectedOutput.add(insertRecord("key2", 3L, 3L, localMills(0L), localMills(2000L))); + expectedOutput.add(new Watermark(1999)); + ASSERTER.assertOutputEqualsSorted( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.prepareSnapshotPreBarrier(0L); + OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0); + testHarness.close(); + + expectedOutput.clear(); + testHarness = createTestHarness(operator); + testHarness.setup(); + testHarness.initializeState(snapshot); + testHarness.open(); + + testHarness.processElement(insertRecord("key2", 1L, 1L, fromEpochMillis(1000L))); + testHarness.prepareSnapshotPreBarrier(1L); + testHarness.processWatermark(new Watermark(1999)); + + expectedOutput.add(new Watermark(1999)); + ASSERTER.assertOutputEqualsSorted( + "Expired cumulative windows should not be emitted again after recovery.", + expectedOutput, + testHarness.getOutput()); + + testHarness.close(); + } + @TestTemplate void testProcessingTimeCumulativeWindows() throws Exception { final SliceAssigner assigner = @@ -809,6 +892,32 @@ void testInvalidWindows() { "Hopping window requires a COUNT(*) in the aggregate functions."); } + /** A test agg function to merge local accumulator rows of global window aggregates. */ + protected static final class LocalAccumulatorRowsAggsFunction + extends SlicingSumAndCountAggsFunction { + + public LocalAccumulatorRowsAggsFunction(SliceAssigner assigner) { + super(assigner); + } + + @Override + public void merge(Long window, RowData otherAcc) throws Exception { + if (!openCalled) { + fail("Open was not called"); + } + boolean sumIsNull2 = otherAcc.isNullAt(1); + if (!sumIsNull2) { + sum += otherAcc.getLong(1); + sumIsNull = false; + } + boolean countIsNull2 = otherAcc.isNullAt(2); + if (!countIsNull2) { + count += otherAcc.getLong(2); + countIsNull = false; + } + } + } + /** A test agg function for {@link SlicingWindowAggOperatorTest}. */ protected static class SlicingSumAndCountAggsFunction extends SumAndCountAggsFunctionBase { diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/WindowAggOperatorTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/WindowAggOperatorTestBase.java index 941a0fe486d544..c62fcd4fd29fc6 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/WindowAggOperatorTestBase.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/WindowAggOperatorTestBase.java @@ -36,6 +36,7 @@ import org.apache.flink.table.runtime.operators.window.tvf.common.WindowAggOperator; import org.apache.flink.table.runtime.operators.window.tvf.common.WindowAssigner; import org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceAssigner; +import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer; import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer; import org.apache.flink.table.runtime.typeutils.RowDataSerializer; import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator; @@ -170,6 +171,30 @@ protected OneInputStreamOperator buildWindowOperator( return operator; } + protected OneInputStreamOperator buildGlobalWindowOperator( + WindowAssigner assigner, + AbstractRowDataSerializer inputSerializer, + NamespaceAggsHandleFunction localAggsFunction, + NamespaceAggsHandleFunction globalAggsFunction, + NamespaceAggsHandleFunction stateAggsFunction, + @Nullable Integer countStarIndex) { + WindowAggOperatorBuilder builder = + WindowAggOperatorBuilder.builder() + .inputSerializer(inputSerializer) + .shiftTimeZone(shiftTimeZone) + .keySerializer(KEY_SER) + .assigner(assigner) + .globalAggregate( + createGeneratedAggsHandle(localAggsFunction), + createGeneratedAggsHandle(globalAggsFunction), + createGeneratedAggsHandle(stateAggsFunction), + ACC_SER); + if (countStarIndex != null) { + builder.countStarIndex(countStarIndex); + } + return builder.build(); + } + protected static OneInputStreamOperatorTestHarness createTestHarness( OneInputStreamOperator operator) throws Exception { if (isAsyncStateOperator(operator)) {