Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceAssigner;
import org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceAssigners;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;

Expand All @@ -41,11 +45,19 @@
import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.fail;

/** Tests for slicing window aggregate operators created by {@link WindowAggOperatorBuilder}. */
@ExtendWith(ParameterizedTestExtension.class)
class SlicingWindowAggOperatorTest extends WindowAggOperatorTestBase {

private static final RowDataSerializer LOCAL_ACC_INPUT_ROW_SER =
new RowDataSerializer(
new VarCharType(Integer.MAX_VALUE),
new BigIntType(),
new BigIntType(),
new TimestampType());

public SlicingWindowAggOperatorTest(ZoneId shiftTimeZone, boolean enableAsyncState) {
super(shiftTimeZone, enableAsyncState);
}
Expand Down Expand Up @@ -494,6 +506,77 @@ void testEventTimeCumulativeWindows() throws Exception {
testHarness.close();
}

@TestTemplate
void testGlobalEventTimeCumulativeWindowsDoNotRefireExpiredWindowAfterRestore()
throws Exception {
final SliceAssigner assigner =
SliceAssigners.cumulative(
3, shiftTimeZone, Duration.ofSeconds(3), Duration.ofSeconds(1));
final SlicingSumAndCountAggsFunction globalAggsFunction =
new SlicingSumAndCountAggsFunction(assigner);
final SlicingSumAndCountAggsFunction stateAggsFunction =
new SlicingSumAndCountAggsFunction(assigner);
OneInputStreamOperator<RowData, RowData> operator =
buildGlobalWindowOperator(
assigner,
LOCAL_ACC_INPUT_ROW_SER,
new LocalAccumulatorRowsAggsFunction(assigner),
globalAggsFunction,
stateAggsFunction,
null);

OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
createTestHarness(operator);

testHarness.setup(OUT_SERIALIZER);
testHarness.open();

ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();

testHarness.processElement(insertRecord("key1", 1L, 1L, fromEpochMillis(20L)));
testHarness.processElement(insertRecord("key1", 1L, 1L, fromEpochMillis(0L)));
testHarness.processElement(insertRecord("key1", 1L, 1L, fromEpochMillis(999L)));

testHarness.processElement(insertRecord("key2", 1L, 1L, fromEpochMillis(1998L)));
testHarness.processElement(insertRecord("key2", 1L, 1L, fromEpochMillis(1999L)));
testHarness.processElement(insertRecord("key2", 1L, 1L, fromEpochMillis(1000L)));

testHarness.processWatermark(new Watermark(999));
expectedOutput.add(insertRecord("key1", 3L, 3L, localMills(0L), localMills(1000L)));
expectedOutput.add(new Watermark(999));
ASSERTER.assertOutputEqualsSorted(
"Output was not correct.", expectedOutput, testHarness.getOutput());

testHarness.processWatermark(new Watermark(1999));
expectedOutput.add(insertRecord("key1", 3L, 3L, localMills(0L), localMills(2000L)));
expectedOutput.add(insertRecord("key2", 3L, 3L, localMills(0L), localMills(2000L)));
expectedOutput.add(new Watermark(1999));
ASSERTER.assertOutputEqualsSorted(
"Output was not correct.", expectedOutput, testHarness.getOutput());

testHarness.prepareSnapshotPreBarrier(0L);
OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0);
testHarness.close();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: (Optional) Unlike all other restore/close tests in this file this one never asserts globalAggsFunction.closeCalled / stateAggsFunction.closeCalled after close().


expectedOutput.clear();
testHarness = createTestHarness(operator);
testHarness.setup();
testHarness.initializeState(snapshot);
testHarness.open();

testHarness.processElement(insertRecord("key2", 1L, 1L, fromEpochMillis(1000L)));
testHarness.prepareSnapshotPreBarrier(1L);
testHarness.processWatermark(new Watermark(1999));

expectedOutput.add(new Watermark(1999));
ASSERTER.assertOutputEqualsSorted(
"Expired cumulative windows should not be emitted again after recovery.",
expectedOutput,
testHarness.getOutput());

testHarness.close();
}

@TestTemplate
void testProcessingTimeCumulativeWindows() throws Exception {
final SliceAssigner assigner =
Expand Down Expand Up @@ -809,6 +892,32 @@ void testInvalidWindows() {
"Hopping window requires a COUNT(*) in the aggregate functions.");
}

/** A test agg function to merge local accumulator rows of global window aggregates. */
protected static final class LocalAccumulatorRowsAggsFunction
extends SlicingSumAndCountAggsFunction {

public LocalAccumulatorRowsAggsFunction(SliceAssigner assigner) {
super(assigner);
}

@Override
public void merge(Long window, RowData otherAcc) throws Exception {
if (!openCalled) {
fail("Open was not called");
}
boolean sumIsNull2 = otherAcc.isNullAt(1);
if (!sumIsNull2) {
sum += otherAcc.getLong(1);
sumIsNull = false;
}
boolean countIsNull2 = otherAcc.isNullAt(2);
if (!countIsNull2) {
count += otherAcc.getLong(2);
countIsNull = false;
}
}
}

/** A test agg function for {@link SlicingWindowAggOperatorTest}. */
protected static class SlicingSumAndCountAggsFunction
extends SumAndCountAggsFunctionBase<Long> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.table.runtime.operators.window.tvf.common.WindowAggOperator;
import org.apache.flink.table.runtime.operators.window.tvf.common.WindowAssigner;
import org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceAssigner;
import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator;
Expand Down Expand Up @@ -170,6 +171,30 @@ protected OneInputStreamOperator<RowData, RowData> buildWindowOperator(
return operator;
}

protected OneInputStreamOperator<RowData, RowData> buildGlobalWindowOperator(
WindowAssigner assigner,
AbstractRowDataSerializer<RowData> inputSerializer,
NamespaceAggsHandleFunction<Long> localAggsFunction,
NamespaceAggsHandleFunction<Long> globalAggsFunction,
NamespaceAggsHandleFunction<Long> stateAggsFunction,
@Nullable Integer countStarIndex) {
WindowAggOperatorBuilder builder =
WindowAggOperatorBuilder.builder()
.inputSerializer(inputSerializer)
.shiftTimeZone(shiftTimeZone)
.keySerializer(KEY_SER)
.assigner(assigner)
.globalAggregate(
createGeneratedAggsHandle(localAggsFunction),
createGeneratedAggsHandle(globalAggsFunction),
createGeneratedAggsHandle(stateAggsFunction),
ACC_SER);
if (countStarIndex != null) {
builder.countStarIndex(countStarIndex);
}
return builder.build();
}

protected static OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(
OneInputStreamOperator<RowData, RowData> operator) throws Exception {
if (isAsyncStateOperator(operator)) {
Expand Down