From f2ab230a516283626af40f6f42ec061f36aac808 Mon Sep 17 00:00:00 2001 From: Jess Jin Date: Thu, 25 Jun 2026 11:49:16 -0400 Subject: [PATCH 1/2] KAFKA-20741: Add ReadOnlyRecord IQv2 result type (KIP-1356) Introduce a read-only view of a record (key/value/timestamp/headers); Record implements it. Prerequisite for the KIP-1356 headers-aware IQv2 query types. --- .../streams/processor/api/ReadOnlyRecord.java | 55 +++++++++++++++ .../kafka/streams/processor/api/Record.java | 2 +- .../processor/api/ReadOnlyRecordTest.java | 68 +++++++++++++++++++ 3 files changed, 124 insertions(+), 1 deletion(-) create mode 100644 streams/src/main/java/org/apache/kafka/streams/processor/api/ReadOnlyRecord.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/processor/api/ReadOnlyRecordTest.java diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/ReadOnlyRecord.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/ReadOnlyRecord.java new file mode 100644 index 0000000000000..def44fda62612 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/ReadOnlyRecord.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.api; + +import org.apache.kafka.common.header.Headers; + +/** + * A read-only view of a record's {@code key}, {@code value}, {@code timestamp}, and {@code headers}. + * + *

This is the shared read surface of a record: the processing-layer {@link Record} extends it with + * transform-and-forward builders ({@code withKey}/{@code withValue}/{@code withTimestamp}/{@code withHeaders}), + * while an Interactive Query (IQv2) result is exactly this read-only snapshot and exposes nothing more. + * It is the result type returned by the headers-aware IQv2 query types (e.g. + * {@code TimestampedKeyWithHeadersQuery}), which surface the record headers + * persisted by header-aware state stores. + * + * @param The type of the key + * @param The type of the value + */ +public interface ReadOnlyRecord { + + /** + * The key of the record. May be null. + */ + K key(); + + /** + * The value of the record. May be null. + */ + V value(); + + /** + * The timestamp of the record. Will never be negative. + */ + long timestamp(); + + /** + * The headers of the record. Never null. + */ + Headers headers(); +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java index 225b95fa4006d..cd103555d4124 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java @@ -35,7 +35,7 @@ * @param The type of the key * @param The type of the value */ -public class Record { +public class Record implements ReadOnlyRecord { private final K key; private final V value; private final long timestamp; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/api/ReadOnlyRecordTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/api/ReadOnlyRecordTest.java new file mode 100644 index 0000000000000..ebeee76a00a0a --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/api/ReadOnlyRecordTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.api; + +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.header.internals.RecordHeaders; + +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class ReadOnlyRecordTest { + + private static Headers headers() { + return new RecordHeaders().add(new RecordHeader("k", "v".getBytes(StandardCharsets.UTF_8))); + } + + @Test + public void recordShouldBeAssignableToReadOnlyRecordAndExposeSameAccessors() { + final Headers headers = headers(); + final Record record = new Record<>("key", "value", 123L, headers); + + // The assignment itself proves Record is a ReadOnlyRecord (source/binary compatible change). + final ReadOnlyRecord readOnly = record; + + assertEquals(record.key(), readOnly.key()); + assertEquals(record.value(), readOnly.value()); + assertEquals(record.timestamp(), readOnly.timestamp()); + assertEquals(record.headers(), readOnly.headers()); + + assertEquals("key", readOnly.key()); + assertEquals("value", readOnly.value()); + assertEquals(123L, readOnly.timestamp()); + assertEquals(headers, readOnly.headers()); + } + + @Test + public void readOnlyHeadersShouldBeNonNullAndEmptyWhenConstructedWithoutHeaders() { + final ReadOnlyRecord readOnly = new Record<>("key", "value", 123L); + assertEquals(new RecordHeaders(), readOnly.headers()); + } + + @Test + public void readOnlyAccessorsShouldTolerateNullKeyAndValue() { + final ReadOnlyRecord readOnly = new Record<>(null, null, 0L); + assertEquals(null, readOnly.key()); + assertEquals(null, readOnly.value()); + assertEquals(0L, readOnly.timestamp()); + assertEquals(new RecordHeaders(), readOnly.headers()); + } +} From 88a4d612a1687c1fca23dad3096b1747cccb8a49 Mon Sep 17 00:00:00 2001 From: Jess Jin Date: Fri, 26 Jun 2026 09:45:07 -0400 Subject: [PATCH 2/2] KAFKA-20741: Address review feedback on ReadOnlyRecord - Add @Override to Record's key/value/timestamp/headers accessors - Drop tautological assertions in ReadOnlyRecordTest and use assertNull for null accessors. --- .../org/apache/kafka/streams/processor/api/Record.java | 4 ++++ .../streams/processor/api/ReadOnlyRecordTest.java | 10 +++------- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java index cd103555d4124..37645d526407a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java @@ -86,6 +86,7 @@ public Record(final K key, final V value, final long timestamp) { /** * The key of the record. May be null. */ + @Override public K key() { return key; } @@ -93,6 +94,7 @@ public K key() { /** * The value of the record. May be null. */ + @Override public V value() { return value; } @@ -100,6 +102,7 @@ public V value() { /** * The timestamp of the record. Will never be negative. */ + @Override public long timestamp() { return timestamp; } @@ -107,6 +110,7 @@ public long timestamp() { /** * The headers of the record. Never null. */ + @Override public Headers headers() { return headers; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/api/ReadOnlyRecordTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/api/ReadOnlyRecordTest.java index ebeee76a00a0a..ee946119c39cc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/api/ReadOnlyRecordTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/api/ReadOnlyRecordTest.java @@ -25,6 +25,7 @@ import java.nio.charset.StandardCharsets; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; public class ReadOnlyRecordTest { @@ -40,11 +41,6 @@ public void recordShouldBeAssignableToReadOnlyRecordAndExposeSameAccessors() { // The assignment itself proves Record is a ReadOnlyRecord (source/binary compatible change). final ReadOnlyRecord readOnly = record; - assertEquals(record.key(), readOnly.key()); - assertEquals(record.value(), readOnly.value()); - assertEquals(record.timestamp(), readOnly.timestamp()); - assertEquals(record.headers(), readOnly.headers()); - assertEquals("key", readOnly.key()); assertEquals("value", readOnly.value()); assertEquals(123L, readOnly.timestamp()); @@ -60,8 +56,8 @@ public void readOnlyHeadersShouldBeNonNullAndEmptyWhenConstructedWithoutHeaders( @Test public void readOnlyAccessorsShouldTolerateNullKeyAndValue() { final ReadOnlyRecord readOnly = new Record<>(null, null, 0L); - assertEquals(null, readOnly.key()); - assertEquals(null, readOnly.value()); + assertNull(readOnly.key()); + assertNull(readOnly.value()); assertEquals(0L, readOnly.timestamp()); assertEquals(new RecordHeaders(), readOnly.headers()); }