diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/ReadOnlyRecord.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/ReadOnlyRecord.java new file mode 100644 index 0000000000000..def44fda62612 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/ReadOnlyRecord.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.api; + +import org.apache.kafka.common.header.Headers; + +/** + * A read-only view of a record's {@code key}, {@code value}, {@code timestamp}, and {@code headers}. + * + *

This is the shared read surface of a record: the processing-layer {@link Record} extends it with + * transform-and-forward builders ({@code withKey}/{@code withValue}/{@code withTimestamp}/{@code withHeaders}), + * while an Interactive Query (IQv2) result is exactly this read-only snapshot and exposes nothing more. + * It is the result type returned by the headers-aware IQv2 query types (e.g. + * {@code TimestampedKeyWithHeadersQuery}), which surface the record headers + * persisted by header-aware state stores. + * + * @param The type of the key + * @param The type of the value + */ +public interface ReadOnlyRecord { + + /** + * The key of the record. May be null. + */ + K key(); + + /** + * The value of the record. May be null. + */ + V value(); + + /** + * The timestamp of the record. Will never be negative. + */ + long timestamp(); + + /** + * The headers of the record. Never null. + */ + Headers headers(); +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java index 225b95fa4006d..37645d526407a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java @@ -35,7 +35,7 @@ * @param The type of the key * @param The type of the value */ -public class Record { +public class Record implements ReadOnlyRecord { private final K key; private final V value; private final long timestamp; @@ -86,6 +86,7 @@ public Record(final K key, final V value, final long timestamp) { /** * The key of the record. May be null. */ + @Override public K key() { return key; } @@ -93,6 +94,7 @@ public K key() { /** * The value of the record. May be null. */ + @Override public V value() { return value; } @@ -100,6 +102,7 @@ public V value() { /** * The timestamp of the record. Will never be negative. */ + @Override public long timestamp() { return timestamp; } @@ -107,6 +110,7 @@ public long timestamp() { /** * The headers of the record. Never null. */ + @Override public Headers headers() { return headers; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/api/ReadOnlyRecordTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/api/ReadOnlyRecordTest.java new file mode 100644 index 0000000000000..ee946119c39cc --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/api/ReadOnlyRecordTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.api; + +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.header.internals.RecordHeaders; + +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class ReadOnlyRecordTest { + + private static Headers headers() { + return new RecordHeaders().add(new RecordHeader("k", "v".getBytes(StandardCharsets.UTF_8))); + } + + @Test + public void recordShouldBeAssignableToReadOnlyRecordAndExposeSameAccessors() { + final Headers headers = headers(); + final Record record = new Record<>("key", "value", 123L, headers); + + // The assignment itself proves Record is a ReadOnlyRecord (source/binary compatible change). + final ReadOnlyRecord readOnly = record; + + assertEquals("key", readOnly.key()); + assertEquals("value", readOnly.value()); + assertEquals(123L, readOnly.timestamp()); + assertEquals(headers, readOnly.headers()); + } + + @Test + public void readOnlyHeadersShouldBeNonNullAndEmptyWhenConstructedWithoutHeaders() { + final ReadOnlyRecord readOnly = new Record<>("key", "value", 123L); + assertEquals(new RecordHeaders(), readOnly.headers()); + } + + @Test + public void readOnlyAccessorsShouldTolerateNullKeyAndValue() { + final ReadOnlyRecord readOnly = new Record<>(null, null, 0L); + assertNull(readOnly.key()); + assertNull(readOnly.value()); + assertEquals(0L, readOnly.timestamp()); + assertEquals(new RecordHeaders(), readOnly.headers()); + } +}