From ff6aa94fa43d6dc3bc87659103ff5e6c085a17df Mon Sep 17 00:00:00 2001 From: Jess Jin Date: Sun, 14 Jun 2026 11:29:42 -0400 Subject: [PATCH 1/9] (WIP-PoC) KIP-1356 IQv2 TimestampedKeyWithHeadersQuery Proof-of-concept for KIP-1356, which exposes record headers through Interactive Queries (IQv2) for KIP-1271 headers-aware state stores. This PoC covers only the first of all proposed query types, TimestampedKeyWithHeadersQuery (parallel of TimestampedKeyQuery): - New @Evolving query class org.apache.kafka.streams.query.TimestampedKeyWithHeadersQuery, returning ValueTimestampHeaders instead of ValueAndTimestamp. - Handler in MeteredTimestampedKeyValueStoreWithHeaders that mirrors the existing runTimestampedKeyQuery (serialize key, forward a raw KeyQuery down the wrapper chain, deserialize the bytes) but keeps the full ValueTimestampHeaders instead of stripping the headers. - Wired RocksDBTimestampedStoreWithHeaders.query() to serve raw KeyQuery via StoreQueryUtils.handleBasicQueries. KIP-1271 had hardwired this layer to UNKNOWN_QUERY_TYPE, which makes the forwarded query fail once the record cache is flushed (a cache miss falls through to this layer). The metered store still performs the header-aware deserialization; other query types remain unsupported. - End-to-end integration test TimestampedKeyWithHeadersQueryIntegrationTest (caching disabled, so every query provably reaches the RocksDB layer) asserting value/timestamp/headers round-trip incl. empty-headers and tombstone cases, plus the unknown-query-type failure against a non-headers store. The remaining three query types (TimestampedRangeWithHeadersQuery, WindowKeyWithHeadersQuery, WindowRangeWithHeadersQuery) are out of scope for this PoC. --- ...pedKeyWithHeadersQueryIntegrationTest.java | 282 ++++++++++++++++++ .../query/TimestampedKeyWithHeadersQuery.java | 90 ++++++ ...edTimestampedKeyValueStoreWithHeaders.java | 36 +++ .../RocksDBTimestampedStoreWithHeaders.java | 17 ++ ...ocksDBTimestampedStoreWithHeadersTest.java | 30 +- ...edKeyValueStoreBuilderWithHeadersTest.java | 14 +- 6 files changed, 457 insertions(+), 12 deletions(-) create mode 100644 streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedKeyWithHeadersQueryIntegrationTest.java create mode 100644 streams/src/main/java/org/apache/kafka/streams/query/TimestampedKeyWithHeadersQuery.java diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedKeyWithHeadersQueryIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedKeyWithHeadersQueryIntegrationTest.java new file mode 100644 index 0000000000000..920b0249992d2 --- /dev/null +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedKeyWithHeadersQueryIntegrationTest.java @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.query.FailureReason; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.query.StateQueryRequest; +import org.apache.kafka.streams.query.StateQueryResult; +import org.apache.kafka.streams.query.TimestampedKeyWithHeadersQuery; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.ValueTimestampHeaders; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; + +import java.io.IOException; +import java.time.Duration; +import java.util.Arrays; +import java.util.Properties; + +import static org.apache.kafka.streams.query.StateQueryRequest.inStore; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * End-to-end PoC test for KIP-1356 {@link TimestampedKeyWithHeadersQuery}. + * + *

Builds a KIP-1271 {@code WithHeaders} store, writes records (with headers) into it through a + * processor, then queries the store through IQv2 and asserts that the returned + * {@link ValueTimestampHeaders} carries value, timestamp, and the exact headers written. + */ +@Tag("integration") +public class TimestampedKeyWithHeadersQueryIntegrationTest { + + private static final String STORE_NAME = "headers-store"; + + private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + + private static final Headers HEADERS = new RecordHeaders() + .add("source", "test".getBytes()) + .add("version", "1.0".getBytes()); + + private String inputStream; + private String outputStream; + private long baseTimestamp; + private KafkaStreams kafkaStreams; + private TestInfo testInfo; + + @BeforeAll + public static void before() throws IOException { + CLUSTER.start(); + } + + @AfterAll + public static void after() { + CLUSTER.stop(); + } + + @BeforeEach + public void beforeTest(final TestInfo testInfo) throws InterruptedException { + this.testInfo = testInfo; + final String uniqueTestName = safeUniqueTestName(testInfo); + inputStream = "input-stream-" + uniqueTestName; + outputStream = "output-stream-" + uniqueTestName; + CLUSTER.createTopic(inputStream); + CLUSTER.createTopic(outputStream); + baseTimestamp = CLUSTER.time.milliseconds(); + } + + @AfterEach + public void afterTest() { + if (kafkaStreams != null) { + kafkaStreams.close(Duration.ofSeconds(30L)); + kafkaStreams.cleanUp(); + } + } + + @Test + public void shouldHandleTimestampedKeyWithHeadersQuery() throws Exception { + startStreams(); + + // key 1 has headers, key 2 has empty headers, key 3 is tombstoned (null value) + produceDataToTopicWithHeaders(inputStream, baseTimestamp, HEADERS, + KeyValue.pair(1, "a0")); + produceDataToTopicWithHeaders(inputStream, baseTimestamp + 1, new RecordHeaders(), + KeyValue.pair(2, "b0")); + produceDataToTopicWithHeaders(inputStream, baseTimestamp + 2, HEADERS, + KeyValue.pair(3, "c0"), KeyValue.pair(3, null)); + + // wait until all 4 input records have been processed (one output per input record) + IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + TestUtils.consumerConfig(CLUSTER.bootstrapServers(), IntegerDeserializer.class, StringDeserializer.class), + outputStream, + 4); + + // key 1: value + timestamp + headers round-trip + final ValueTimestampHeaders result1 = query(1); + assertEquals("a0", result1.value()); + assertEquals(baseTimestamp, result1.timestamp()); + assertEquals(HEADERS, result1.headers()); + + // key 2: written with no headers -> empty (never null) headers + final ValueTimestampHeaders result2 = query(2); + assertEquals("b0", result2.value()); + assertEquals(baseTimestamp + 1, result2.timestamp()); + assertEquals(new RecordHeaders(), result2.headers()); + + // key 3: tombstoned -> null result, never a partially-populated wrapper + assertNull(query(3)); + + // never-written key -> null result + assertNull(query(999)); + } + + @Test + public void shouldFailWithUnknownQueryTypeAgainstNonHeadersStore() throws Exception { + // store built WITHOUT a WithHeaders supplier -> the query type is unsupported + final StreamsBuilder builder = new StreamsBuilder(); + builder + .addStateStore( + Stores.timestampedKeyValueStoreBuilder( + Stores.persistentTimestampedKeyValueStore(STORE_NAME), + Serdes.Integer(), + Serdes.String())) + .stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) + .process(() -> new PlainStoreWriterProcessor(), STORE_NAME) + .to(outputStream, Produced.with(Serdes.Integer(), Serdes.String())); + + kafkaStreams = new KafkaStreams(builder.build(), props()); + IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams); + + produceDataToTopicWithHeaders(inputStream, baseTimestamp, HEADERS, KeyValue.pair(1, "a0")); + IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + TestUtils.consumerConfig(CLUSTER.bootstrapServers(), IntegerDeserializer.class, StringDeserializer.class), + outputStream, + 1); + + final StateQueryResult> result = + kafkaStreams.query(inStore(STORE_NAME).withQuery(TimestampedKeyWithHeadersQuery.withKey(1))); + + assertTrue(result.getOnlyPartitionResult().isFailure()); + assertEquals(FailureReason.UNKNOWN_QUERY_TYPE, result.getOnlyPartitionResult().getFailureReason()); + } + + private void startStreams() throws Exception { + final StreamsBuilder builder = new StreamsBuilder(); + builder + .addStateStore( + Stores.timestampedKeyValueStoreWithHeadersBuilder( + Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME), + Serdes.Integer(), + Serdes.String()) + // Disable caching so every IQv2 query is forced down to the persistent + // RocksDBTimestampedStoreWithHeaders layer, exercising its KeyQuery handling + // (rather than being short-circuited by a cache hit). + .withCachingDisabled()) + .stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) + .process(() -> new HeadersStoreWriterProcessor(), STORE_NAME) + .to(outputStream, Produced.with(Serdes.Integer(), Serdes.String())); + + kafkaStreams = new KafkaStreams(builder.build(), props()); + IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams); + } + + private ValueTimestampHeaders query(final int key) { + final StateQueryRequest> request = + inStore(STORE_NAME).withQuery(TimestampedKeyWithHeadersQuery.withKey(key)); + final StateQueryResult> result = kafkaStreams.query(request); + // getOnlyPartitionResult() returns null when the single partition result is a successful + // null (tombstoned / absent key), which we surface to the caller as a null lookup. + final QueryResult> onlyResult = result.getOnlyPartitionResult(); + return onlyResult == null ? null : onlyResult.getResult(); + } + + private Properties props() { + final String safeTestName = safeUniqueTestName(testInfo); + final Properties streamsConfiguration = new Properties(); + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); + streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + return streamsConfiguration; + } + + @SuppressWarnings("varargs") + @SafeVarargs + private final void produceDataToTopicWithHeaders(final String topic, + final long timestamp, + final Headers headers, + final KeyValue... keyValues) { + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( + topic, + Arrays.asList(keyValues), + TestUtils.producerConfig(CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class), + headers, + timestamp, + false); + } + + private static class HeadersStoreWriterProcessor implements Processor { + private ProcessorContext context; + private TimestampedKeyValueStoreWithHeaders store; + + @Override + public void init(final ProcessorContext context) { + this.context = context; + store = context.getStateStore(STORE_NAME); + } + + @Override + public void process(final Record record) { + store.put( + record.key(), + ValueTimestampHeaders.make(record.value(), record.timestamp(), record.headers())); + context.forward(record); + } + } + + private static class PlainStoreWriterProcessor implements Processor { + private ProcessorContext context; + private TimestampedKeyValueStore store; + + @Override + public void init(final ProcessorContext context) { + this.context = context; + store = context.getStateStore(STORE_NAME); + } + + @Override + public void process(final Record record) { + store.put( + record.key(), + ValueAndTimestamp.make(record.value(), record.timestamp())); + context.forward(record); + } + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/query/TimestampedKeyWithHeadersQuery.java b/streams/src/main/java/org/apache/kafka/streams/query/TimestampedKeyWithHeadersQuery.java new file mode 100644 index 0000000000000..5810030439fe5 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/query/TimestampedKeyWithHeadersQuery.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; +import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders; +import org.apache.kafka.streams.state.ValueTimestampHeaders; + +import java.util.Objects; + +/** + * Interactive query for retrieving a single record, including its record headers, based on its key + * from a {@link TimestampedKeyValueStoreWithHeaders}. + * + *

This is the headers-aware parallel of {@link TimestampedKeyQuery}: it returns a + * {@link ValueTimestampHeaders} (value, timestamp, and headers) instead of a + * {@link org.apache.kafka.streams.state.ValueAndTimestamp} (value and timestamp only). + * + *

Headers are only returned when the queried store was built with a KIP-1271 + * {@code WithHeaders} supplier. Against a plain (non-headers) store, this query type is + * unsupported and fails with {@link FailureReason#UNKNOWN_QUERY_TYPE}. + * + * @param Type of keys + * @param Type of values + */ +@Evolving +public final class TimestampedKeyWithHeadersQuery implements Query> { + + private final K key; + private final boolean skipCache; + + private TimestampedKeyWithHeadersQuery(final K key, final boolean skipCache) { + this.key = key; + this.skipCache = skipCache; + } + + /** + * Creates a query that will retrieve the record (value, timestamp, and headers) identified by + * {@code key} if it exists (or {@code null} otherwise). + * @param key The key to retrieve + * @param The type of the key + * @param The type of the value that will be retrieved + */ + public static TimestampedKeyWithHeadersQuery withKey(final K key) { + Objects.requireNonNull(key, "the key should not be null"); + return new TimestampedKeyWithHeadersQuery<>(key, false); + } + + /** + * Specifies that the cache should be skipped during query evaluation. This means, that the query will always + * get forwarded to the underlying store. + * + *

PoC limitation: this flag is currently a no-op. The header-aware store handler does not yet + * propagate it to the underlying cache, so queries are evaluated against the cache regardless. Wiring + * this through is out of scope for the proof-of-concept. + */ + public TimestampedKeyWithHeadersQuery skipCache() { + return new TimestampedKeyWithHeadersQuery<>(key, true); + } + + /** + * Return the key that was specified for this query. + * + * @return The key that was specified for this query. + */ + public K key() { + return key; + } + + /** + * The flag whether to skip the cache or not during query evaluation. + */ + public boolean isSkipCache() { + return skipCache; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java index 8d10c6ccd25e3..98ff29b856455 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java @@ -36,6 +36,7 @@ import org.apache.kafka.streams.query.RangeQuery; import org.apache.kafka.streams.query.ResultOrder; import org.apache.kafka.streams.query.TimestampedKeyQuery; +import org.apache.kafka.streams.query.TimestampedKeyWithHeadersQuery; import org.apache.kafka.streams.query.TimestampedRangeQuery; import org.apache.kafka.streams.query.internals.InternalQueryResultUtil; import org.apache.kafka.streams.state.KeyValueIterator; @@ -91,6 +92,10 @@ public class MeteredTimestampedKeyValueStoreWithHeaders TimestampedKeyQuery.class, (query, positionBound, config, store) -> runTimestampedKeyQuery(query, positionBound, config) ), + mkEntry( + TimestampedKeyWithHeadersQuery.class, + (query, positionBound, config, store) -> runTimestampedKeyWithHeadersQuery(query, positionBound, config) + ), mkEntry( RangeQuery.class, (query, positionBound, config, store) -> runRangeQuery(query, positionBound, config) @@ -381,6 +386,37 @@ private QueryResult runTimestampedKeyQuery( return result; } + @SuppressWarnings("unchecked") + private QueryResult runTimestampedKeyWithHeadersQuery( + final Query query, + final PositionBound positionBound, + final QueryConfig config + ) { + final QueryResult result; + final TimestampedKeyWithHeadersQuery typedKeyQuery = (TimestampedKeyWithHeadersQuery) query; + + // PoC limitation: typedKeyQuery.isSkipCache() is intentionally NOT propagated to the raw + // KeyQuery below. CachingKeyValueStore only honors skipCache when it is set on the forwarded + // KeyQuery, so for now skipCache() is a no-op for this query type (the existing + // runTimestampedKeyQuery has the same gap). Out of scope for this PoC; would be wired by + // calling rawKeyQuery.skipCache() when typedKeyQuery.isSkipCache() is true. + final KeyQuery rawKeyQuery = KeyQuery.withKey(serializeKey(typedKeyQuery.key(), internalContext.headers())); + final QueryResult rawResult = wrapped().query(rawKeyQuery, positionBound, config); + if (rawResult.isSuccess()) { + // value will be `rawValueTimestampHeader`; no need to pass headers explicitly + final Function> deserializer = StoreQueryUtils.deserializeValue(serdes, wrapped()); + final ValueTimestampHeaders valueTimestampHeaders = deserializer.apply(rawResult.getResult()); + // Unlike runTimestampedKeyQuery, the headers are kept rather than discarded. + final QueryResult> typedQueryResult = + InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, valueTimestampHeaders); + result = (QueryResult) typedQueryResult; + } else { + // the generic type doesn't matter, since failed queries have no result set. + result = (QueryResult) rawResult; + } + return result; + } + @SuppressWarnings("unchecked") private QueryResult runRangeQuery( final Query query, diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java index 8f96b18291652..ecc14bbfdd6c8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.streams.errors.ProcessorStateException; +import org.apache.kafka.streams.query.KeyQuery; import org.apache.kafka.streams.query.PositionBound; import org.apache.kafka.streams.query.Query; import org.apache.kafka.streams.query.QueryConfig; @@ -199,6 +200,22 @@ private void openFromTimestampedStore(final DBOptions dbOptions, public QueryResult query(final Query query, final PositionBound positionBound, final QueryConfig config) { + // PoC (KIP-1356): serve raw KeyQuery from the persistent store so that the header-aware + // IQv2 query forwarded by MeteredTimestampedKeyValueStoreWithHeaders keeps working after the + // record cache has been flushed (a cache miss falls through to this layer). The returned bytes + // are the serialized ValueTimestampHeaders; the metered store performs the header-aware + // deserialization. Other query types remain unsupported for now. + if (query instanceof KeyQuery) { + return StoreQueryUtils.handleBasicQueries( + query, + positionBound, + config, + this, + position, + context + ); + } + final long start = config.isCollectExecutionInfo() ? System.nanoTime() : -1L; final QueryResult result; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java index d098696ded7e9..2189eb99d1aa6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java @@ -27,6 +27,7 @@ import org.apache.kafka.streams.query.PositionBound; import org.apache.kafka.streams.query.QueryConfig; import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.query.RangeQuery; import org.apache.kafka.streams.state.KeyValueIterator; import org.junit.jupiter.api.Test; @@ -981,17 +982,17 @@ private byte[] wrapTimestampedValue(final byte[] value) { } @Test - public void shouldReturnUnknownQueryTypeForQuery() { + public void shouldReturnUnknownQueryTypeForUnsupportedQuery() { // Initialize the store rocksDBStore.init(context, rocksDBStore); - // Create a query - final KeyQuery query = KeyQuery.withKey(new Bytes("test-key".getBytes())); + // RangeQuery is not (yet) supported by this store; only KeyQuery is wired (KIP-1356 PoC). + final RangeQuery query = RangeQuery.withNoBounds(); final PositionBound positionBound = PositionBound.unbounded(); final QueryConfig config = new QueryConfig(false); // Execute query - final QueryResult result = rocksDBStore.query(query, positionBound, config); + final QueryResult> result = rocksDBStore.query(query, positionBound, config); // Verify result indicates unknown query type assertFalse(result.isSuccess(), "Expected query to fail with unknown query type"); @@ -1005,6 +1006,27 @@ public void shouldReturnUnknownQueryTypeForQuery() { assertNotNull(result.getPosition(), "Expected position to be set"); } + @Test + public void shouldHandleKeyQuery() { + // Initialize the store + rocksDBStore.init(context, rocksDBStore); + + // Store raw (serialized ValueTimestampHeaders) bytes for a key. + final Bytes key = new Bytes("test-key".getBytes()); + final byte[] storedBytes = "headers+timestamp+value".getBytes(); + rocksDBStore.put(key, storedBytes); + + // KIP-1356 PoC: KeyQuery is now served by this store (rather than UNKNOWN_QUERY_TYPE), + // returning the raw stored bytes (the metered store performs header-aware deserialization). + final KeyQuery query = KeyQuery.withKey(key); + final QueryResult result = + rocksDBStore.query(query, PositionBound.unbounded(), new QueryConfig(false)); + + assertTrue(result.isSuccess(), "Expected KeyQuery to succeed"); + assertArrayEquals(storedBytes, result.getResult(), "Expected the raw stored bytes to be returned"); + assertNotNull(result.getPosition(), "Expected position to be set"); + } + @Test public void shouldCollectExecutionInfoWhenRequested() { // Initialize the store diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java index 3c150da4f3d36..9e6ec3bb0f3d0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java @@ -60,6 +60,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; @@ -560,7 +561,7 @@ private TimestampedKeyValueStoreWithHeaders headersStoreMaybeWit @ParameterizedTest @ValueSource(booleans = {true, false}) - public void shouldReturnUnknownQueryTypeForKeyQueryOnHeadersStore(final boolean cachingEnabled) { + public void shouldHandleKeyQueryOnHeadersStore(final boolean cachingEnabled) { final TimestampedKeyValueStoreWithHeaders store = headersStoreMaybeWithCache(cachingEnabled); try { @@ -571,13 +572,10 @@ public void shouldReturnUnknownQueryTypeForKeyQueryOnHeadersStore(final boolean final StateStore wrapped = ((WrappedStateStore) store).wrapped(); final QueryResult result = wrapped.query(query, positionBound, config); - // Verify: Headers store currently returns UNKNOWN_QUERY_TYPE - assertFalse(result.isSuccess(), "Expected query to fail with unknown query type"); - assertEquals( - FailureReason.UNKNOWN_QUERY_TYPE, - result.getFailureReason(), - "Expected UNKNOWN_QUERY_TYPE failure reason" - ); + // KIP-1356 PoC: the headers store now serves KeyQuery (rather than UNKNOWN_QUERY_TYPE). + // No data was written for "test-key", so the result is a successful null lookup. + assertTrue(result.isSuccess(), "Expected KeyQuery to be handled"); + assertNull(result.getResult(), "Expected null result for an absent key"); assertNotNull(result.getPosition(), "Expected position to be set"); } finally { store.close(); From 3e8f1f220bf9e9cb4f35ac4f13ebf8d5b3f1d86f Mon Sep 17 00:00:00 2001 From: Jess Jin Date: Wed, 24 Jun 2026 10:36:15 -0400 Subject: [PATCH 2/9] Add ReadOnlyRecord IQv2 result type Introduce org.apache.kafka.streams.processor.api.ReadOnlyRecord, a read-only view of a record's key/value/timestamp/headers, and make the existing PAPI Record implement it. This is the result type returned by the headers-aware IQv2 query types added by KIP-1356. --- .../streams/processor/api/ReadOnlyRecord.java | 55 +++++++++++++++++++ .../kafka/streams/processor/api/Record.java | 2 +- 2 files changed, 56 insertions(+), 1 deletion(-) create mode 100644 streams/src/main/java/org/apache/kafka/streams/processor/api/ReadOnlyRecord.java diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/ReadOnlyRecord.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/ReadOnlyRecord.java new file mode 100644 index 0000000000000..990d6ac16b0fb --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/ReadOnlyRecord.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.api; + +import org.apache.kafka.common.header.Headers; + +/** + * A read-only view of a record's {@code key}, {@code value}, {@code timestamp}, and {@code headers}. + * + *

This is the shared read surface of a record: the processing-layer {@link Record} extends it with + * transform-and-forward builders ({@code withKey}/{@code withValue}/{@code withTimestamp}/{@code withHeaders}), + * while an Interactive Query (IQv2) result is exactly this read-only snapshot and exposes nothing more. + * It is the result type returned by the headers-aware IQv2 query types (e.g. + * {@link org.apache.kafka.streams.query.TimestampedKeyWithHeadersQuery}), which surface the record headers + * persisted by header-aware state stores. + * + * @param The type of the key + * @param The type of the value + */ +public interface ReadOnlyRecord { + + /** + * The key of the record. May be null. + */ + K key(); + + /** + * The value of the record. May be null. + */ + V value(); + + /** + * The timestamp of the record. Will never be negative. + */ + long timestamp(); + + /** + * The headers of the record. Never null. + */ + Headers headers(); +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java index 225b95fa4006d..cd103555d4124 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java @@ -35,7 +35,7 @@ * @param The type of the key * @param The type of the value */ -public class Record { +public class Record implements ReadOnlyRecord { private final K key; private final V value; private final long timestamp; From 25168a542dd415eca37598db065dc26c79720f77 Mon Sep 17 00:00:00 2001 From: Jess Jin Date: Wed, 24 Jun 2026 10:47:41 -0400 Subject: [PATCH 3/9] Add TimestampedKeyWithHeadersQuery IQv2 query - Add the headers-aware point query TimestampedKeyWithHeadersQuery, the parallel of TimestampedKeyQuery, returning a ReadOnlyRecord carrying key/value/timestamp/headers (rather than value-and-timestamp only). - MeteredTimestampedKeyValueStoreWithHeaders handles it by forwarding a raw byte-level KeyQuery to the wrapped store and deserializing the stored ValueTimestampHeaders into a Record (a ReadOnlyRecord); an absent or tombstoned key yields a null result. - Remove the query(...) override on RocksDBTimestampedStoreWithHeaders, which returned UNKNOWN_QUERY_TYPE for everything. The store now inherits RocksDBStore.query(), dispatching basic queries via StoreQueryUtils.handleBasicQueries. This lets the new query reach the persistent layer and restores build-path parity for the existing query types (KeyQuery, RangeQuery, ...), which previously succeeded against adapter-built header stores but failed against natively-built ones. --- .../query/TimestampedKeyWithHeadersQuery.java | 9 ++-- ...edTimestampedKeyValueStoreWithHeaders.java | 23 ++++++---- .../RocksDBTimestampedStoreWithHeaders.java | 44 +------------------ 3 files changed, 21 insertions(+), 55 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/query/TimestampedKeyWithHeadersQuery.java b/streams/src/main/java/org/apache/kafka/streams/query/TimestampedKeyWithHeadersQuery.java index 5810030439fe5..8e591ee6c59e2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/query/TimestampedKeyWithHeadersQuery.java +++ b/streams/src/main/java/org/apache/kafka/streams/query/TimestampedKeyWithHeadersQuery.java @@ -17,8 +17,8 @@ package org.apache.kafka.streams.query; import org.apache.kafka.common.annotation.InterfaceStability.Evolving; +import org.apache.kafka.streams.processor.api.ReadOnlyRecord; import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders; -import org.apache.kafka.streams.state.ValueTimestampHeaders; import java.util.Objects; @@ -27,8 +27,9 @@ * from a {@link TimestampedKeyValueStoreWithHeaders}. * *

This is the headers-aware parallel of {@link TimestampedKeyQuery}: it returns a - * {@link ValueTimestampHeaders} (value, timestamp, and headers) instead of a - * {@link org.apache.kafka.streams.state.ValueAndTimestamp} (value and timestamp only). + * {@link ReadOnlyRecord} carrying the key, value, timestamp, and headers, whereas + * {@link TimestampedKeyQuery} returns a {@link org.apache.kafka.streams.state.ValueAndTimestamp} + * (value and timestamp only, no key or headers). * *

Headers are only returned when the queried store was built with a KIP-1271 * {@code WithHeaders} supplier. Against a plain (non-headers) store, this query type is @@ -38,7 +39,7 @@ * @param Type of values */ @Evolving -public final class TimestampedKeyWithHeadersQuery implements Query> { +public final class TimestampedKeyWithHeadersQuery implements Query> { private final K key; private final boolean skipCache; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java index 98ff29b856455..84a0840ee3f82 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java @@ -26,6 +26,8 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.ProcessorStateException; +import org.apache.kafka.streams.processor.api.ReadOnlyRecord; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.SerdeGetter; import org.apache.kafka.streams.query.KeyQuery; @@ -291,10 +293,6 @@ public ValueTimestampHeaders delete(final K key) { /** * Executes a query against this store. * - *

Note: Query results do NOT include headers, even though headers are - * preserved in the underlying store. This behavior provides compatibility - * with existing IQv2 APIs that operate on timestamped stores. - * * @param query the query to execute * @param positionBound the position bound * @param config the query configuration @@ -395,6 +393,8 @@ private QueryResult runTimestampedKeyWithHeadersQuery( final QueryResult result; final TimestampedKeyWithHeadersQuery typedKeyQuery = (TimestampedKeyWithHeadersQuery) query; + // Forward a raw byte-level KeyQuery to the wrapped store; the result bytes are the serialized + // ValueTimestampHeaders, which we deserialize below to recover value, timestamp, and headers. // PoC limitation: typedKeyQuery.isSkipCache() is intentionally NOT propagated to the raw // KeyQuery below. CachingKeyValueStore only honors skipCache when it is set on the forwarded // KeyQuery, so for now skipCache() is a no-op for this query type (the existing @@ -403,12 +403,19 @@ private QueryResult runTimestampedKeyWithHeadersQuery( final KeyQuery rawKeyQuery = KeyQuery.withKey(serializeKey(typedKeyQuery.key(), internalContext.headers())); final QueryResult rawResult = wrapped().query(rawKeyQuery, positionBound, config); if (rawResult.isSuccess()) { - // value will be `rawValueTimestampHeader`; no need to pass headers explicitly final Function> deserializer = StoreQueryUtils.deserializeValue(serdes, wrapped()); final ValueTimestampHeaders valueTimestampHeaders = deserializer.apply(rawResult.getResult()); - // Unlike runTimestampedKeyQuery, the headers are kept rather than discarded. - final QueryResult> typedQueryResult = - InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, valueTimestampHeaders); + // Surface the result as a ReadOnlyRecord (implemented by Record), keeping the headers. + // A null wrapper means the key is absent or tombstoned, which we surface as a null result. + final ReadOnlyRecord record = valueTimestampHeaders == null + ? null + : new Record<>( + typedKeyQuery.key(), + valueTimestampHeaders.value(), + valueTimestampHeaders.timestamp(), + valueTimestampHeaders.headers()); + final QueryResult> typedQueryResult = + InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, record); result = (QueryResult) typedQueryResult; } else { // the generic type doesn't matter, since failed queries have no result set. diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java index ecc14bbfdd6c8..931889c58d205 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java @@ -18,11 +18,6 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.streams.errors.ProcessorStateException; -import org.apache.kafka.streams.query.KeyQuery; -import org.apache.kafka.streams.query.PositionBound; -import org.apache.kafka.streams.query.Query; -import org.apache.kafka.streams.query.QueryConfig; -import org.apache.kafka.streams.query.QueryResult; import org.apache.kafka.streams.state.HeadersBytesStore; import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder; @@ -195,41 +190,4 @@ private void openFromTimestampedStore(final DBOptions dbOptions, } } - @SuppressWarnings("SynchronizeOnNonFinalField") - @Override - public QueryResult query(final Query query, - final PositionBound positionBound, - final QueryConfig config) { - // PoC (KIP-1356): serve raw KeyQuery from the persistent store so that the header-aware - // IQv2 query forwarded by MeteredTimestampedKeyValueStoreWithHeaders keeps working after the - // record cache has been flushed (a cache miss falls through to this layer). The returned bytes - // are the serialized ValueTimestampHeaders; the metered store performs the header-aware - // deserialization. Other query types remain unsupported for now. - if (query instanceof KeyQuery) { - return StoreQueryUtils.handleBasicQueries( - query, - positionBound, - config, - this, - position, - context - ); - } - - final long start = config.isCollectExecutionInfo() ? System.nanoTime() : -1L; - final QueryResult result; - - synchronized (position) { - result = QueryResult.forUnknownQueryType(query, this); - - if (config.isCollectExecutionInfo()) { - result.addExecutionInfo( - "Handled in " + this.getClass() + " in " + (System.nanoTime() - start) + "ns" - ); - } - result.setPosition(position.copy()); - } - return result; - } - -} \ No newline at end of file +} From 09d4865a1f41c9e74b77e7e78fcdc8ba974f459b Mon Sep 17 00:00:00 2001 From: Jess Jin Date: Wed, 24 Jun 2026 12:59:52 -0400 Subject: [PATCH 4/9] Honor skipCache for point IQv2 queries skipCache() on KeyQuery/TimestampedKeyQuery (and the new TimestampedKeyWithHeadersQuery) was a no-op: the metered query handlers rebuilt a fresh raw KeyQuery via KeyQuery.withKey(...) and dropped the flag, so it never reached CachingKeyValueStore -- the only layer that reads isSkipCache(). Propagate isSkipCache() onto the forwarded raw KeyQuery in MeteredKeyValueStore, MeteredTimestampedKeyValueStore, and MeteredTimestampedKeyValueStoreWithHeaders. For the header store, also remove the CachingKeyValueStoreWithHeaders.query() override, which forwarded every query straight to the wrapped store and never consulted the cache. Inheriting CachingKeyValueStore.query() makes point queries consult the record cache (fixing read-your-writes for not-yet-flushed writes) and lets the propagated skipCache flag be honored; the cached value bytes are the serialized ValueTimestampHeaders, which the metered layer deserializes. --- .../query/TimestampedKeyWithHeadersQuery.java | 4 --- .../CachingKeyValueStoreWithHeaders.java | 20 ++++++--------- .../state/internals/MeteredKeyValueStore.java | 5 +++- .../MeteredTimestampedKeyValueStore.java | 10 ++++++-- ...edTimestampedKeyValueStoreWithHeaders.java | 25 +++++++++++-------- 5 files changed, 34 insertions(+), 30 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/query/TimestampedKeyWithHeadersQuery.java b/streams/src/main/java/org/apache/kafka/streams/query/TimestampedKeyWithHeadersQuery.java index 8e591ee6c59e2..99b21453990f3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/query/TimestampedKeyWithHeadersQuery.java +++ b/streams/src/main/java/org/apache/kafka/streams/query/TimestampedKeyWithHeadersQuery.java @@ -64,10 +64,6 @@ public static TimestampedKeyWithHeadersQuery withKey(final K key) { /** * Specifies that the cache should be skipped during query evaluation. This means, that the query will always * get forwarded to the underlying store. - * - *

PoC limitation: this flag is currently a no-op. The header-aware store handler does not yet - * propagate it to the underlying cache, so queries are evaluated against the cache regardless. Wiring - * this through is out of scope for the proof-of-concept. */ public TimestampedKeyWithHeadersQuery skipCache() { return new TimestampedKeyWithHeadersQuery<>(key, true); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreWithHeaders.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreWithHeaders.java index 47c9c10c45c9b..f2609803ed78e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreWithHeaders.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreWithHeaders.java @@ -18,26 +18,20 @@ import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.query.PositionBound; -import org.apache.kafka.streams.query.Query; -import org.apache.kafka.streams.query.QueryConfig; -import org.apache.kafka.streams.query.QueryResult; import org.apache.kafka.streams.state.KeyValueStore; /** - * A caching key-value store with headers is a caching key-value store that only forwards the query to the - * wrapped store. + * A caching key-value store with headers. + * + *

It inherits {@link CachingKeyValueStore}'s IQv2 {@code query(...)} handling: point queries + * ({@code KeyQuery}) consult the record cache (honoring {@code skipCache}) and fall back to the + * wrapped store on a miss, while other query types are forwarded to the wrapped store. The cached + * value bytes are the serialized {@code ValueTimestampHeaders}; the metered layer performs the + * header-aware deserialization, so no override is needed here. */ public class CachingKeyValueStoreWithHeaders extends CachingKeyValueStore { CachingKeyValueStoreWithHeaders(final KeyValueStore underlying) { super(underlying, CacheType.TIMESTAMPED_KEY_VALUE_STORE_WITH_HEADERS); } - - @Override - public QueryResult query(final Query query, - final PositionBound positionBound, - final QueryConfig config) { - return wrapped().query(query, positionBound, config); - } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index ca8f12ac74363..ec987b98ec1e6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -323,8 +323,11 @@ private QueryResult runKeyQuery(final Query query, final QueryConfig config) { final QueryResult result; final KeyQuery typedKeyQuery = (KeyQuery) query; - final KeyQuery rawKeyQuery = + KeyQuery rawKeyQuery = KeyQuery.withKey(serializeKey(typedKeyQuery.getKey())); + if (typedKeyQuery.isSkipCache()) { + rawKeyQuery = rawKeyQuery.skipCache(); + } final QueryResult rawResult = wrapped().query(rawKeyQuery, positionBound, config); if (rawResult.isSuccess()) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java index 72caa56d88e35..74704ecb4d7d6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java @@ -170,7 +170,10 @@ private QueryResult runTimestampedKeyQuery( ) { final QueryResult result; final TimestampedKeyQuery typedKeyQuery = (TimestampedKeyQuery) query; - final KeyQuery rawKeyQuery = KeyQuery.withKey(serializeKey(typedKeyQuery.key())); + KeyQuery rawKeyQuery = KeyQuery.withKey(serializeKey(typedKeyQuery.key())); + if (typedKeyQuery.isSkipCache()) { + rawKeyQuery = rawKeyQuery.skipCache(); + } final QueryResult rawResult = wrapped().query(rawKeyQuery, positionBound, config); if (rawResult.isSuccess()) { final Function> deserializer = StoreQueryUtils.deserializeValue(serdes, wrapped()); @@ -237,7 +240,10 @@ private QueryResult runKeyQuery( ) { final QueryResult result; final KeyQuery typedKeyQuery = (KeyQuery) query; - final KeyQuery rawKeyQuery = KeyQuery.withKey(serializeKey(typedKeyQuery.getKey())); + KeyQuery rawKeyQuery = KeyQuery.withKey(serializeKey(typedKeyQuery.getKey())); + if (typedKeyQuery.isSkipCache()) { + rawKeyQuery = rawKeyQuery.skipCache(); + } final QueryResult rawResult = wrapped().query(rawKeyQuery, positionBound, config); if (rawResult.isSuccess()) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java index 84a0840ee3f82..b12f05fda1417 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java @@ -337,7 +337,10 @@ private QueryResult runKeyQuery( final QueryResult result; final KeyQuery typedKeyQuery = (KeyQuery) query; - final KeyQuery rawKeyQuery = KeyQuery.withKey(serializeKey(typedKeyQuery.getKey(), internalContext.headers())); + KeyQuery rawKeyQuery = KeyQuery.withKey(serializeKey(typedKeyQuery.getKey(), internalContext.headers())); + if (typedKeyQuery.isSkipCache()) { + rawKeyQuery = rawKeyQuery.skipCache(); + } final QueryResult rawResult = wrapped().query(rawKeyQuery, positionBound, config); if (rawResult.isSuccess()) { // value will be `rawValueTimestampHeader`; no need to pass headers explicitly @@ -363,7 +366,10 @@ private QueryResult runTimestampedKeyQuery( final QueryResult result; final TimestampedKeyQuery typedKeyQuery = (TimestampedKeyQuery) query; - final KeyQuery rawKeyQuery = KeyQuery.withKey(serializeKey(typedKeyQuery.key(), internalContext.headers())); + KeyQuery rawKeyQuery = KeyQuery.withKey(serializeKey(typedKeyQuery.key(), internalContext.headers())); + if (typedKeyQuery.isSkipCache()) { + rawKeyQuery = rawKeyQuery.skipCache(); + } final QueryResult rawResult = wrapped().query(rawKeyQuery, positionBound, config); if (rawResult.isSuccess()) { // value will be `rawValueTimestampHeader`; no need to pass headers explicitly @@ -393,14 +399,13 @@ private QueryResult runTimestampedKeyWithHeadersQuery( final QueryResult result; final TimestampedKeyWithHeadersQuery typedKeyQuery = (TimestampedKeyWithHeadersQuery) query; - // Forward a raw byte-level KeyQuery to the wrapped store; the result bytes are the serialized - // ValueTimestampHeaders, which we deserialize below to recover value, timestamp, and headers. - // PoC limitation: typedKeyQuery.isSkipCache() is intentionally NOT propagated to the raw - // KeyQuery below. CachingKeyValueStore only honors skipCache when it is set on the forwarded - // KeyQuery, so for now skipCache() is a no-op for this query type (the existing - // runTimestampedKeyQuery has the same gap). Out of scope for this PoC; would be wired by - // calling rawKeyQuery.skipCache() when typedKeyQuery.isSkipCache() is true. - final KeyQuery rawKeyQuery = KeyQuery.withKey(serializeKey(typedKeyQuery.key(), internalContext.headers())); + // Forward a raw byte-level KeyQuery to the wrapped store, propagating skipCache so the caching + // layer can honor it; the result bytes are the serialized ValueTimestampHeaders, which we + // deserialize below to recover value, timestamp, and headers. + KeyQuery rawKeyQuery = KeyQuery.withKey(serializeKey(typedKeyQuery.key(), internalContext.headers())); + if (typedKeyQuery.isSkipCache()) { + rawKeyQuery = rawKeyQuery.skipCache(); + } final QueryResult rawResult = wrapped().query(rawKeyQuery, positionBound, config); if (rawResult.isSuccess()) { final Function> deserializer = StoreQueryUtils.deserializeValue(serdes, wrapped()); From da3949ea57aadcfef49d11c6bc4e222c0230db15 Mon Sep 17 00:00:00 2001 From: Jess Jin Date: Wed, 24 Jun 2026 15:19:10 -0400 Subject: [PATCH 5/9] Add tests for IQv2 header-store query handling - ReadOnlyRecordTest: Record is assignable to ReadOnlyRecord and exposes the same key/value/timestamp/headers (non-null empty headers; null key/value tolerated). - skipCache propagation: MeteredKeyValueStore, MeteredTimestampedKeyValueStore and MeteredTimestampedKeyValueStoreWithHeaders forward isSkipCache() onto the raw KeyQuery for KeyQuery/TimestampedKeyQuery/TimestampedKeyWithHeadersQuery. - RocksDBTimestampedStoreWithHeadersTest: the native store now serves KeyQuery and RangeQuery (previously UNKNOWN_QUERY_TYPE); a genuinely unsupported query still returns UNKNOWN_QUERY_TYPE. - TimestampedKeyValueStoreBuilderWithHeadersTest: consolidated IQv2 query tests, parameterized over build path (native/adapter/in-memory) x caching, plus native-vs-adapter result parity; dropped the duplicated per-store setup and the mock-cache helper. - TimestampedKeyWithHeadersQueryIntegrationTest: fix the stale ValueTimestampHeaders result type to ReadOnlyRecord (it no longer compiled against the query type) and add a caching-enabled cache-hit (read-your-writes) case. --- ...pedKeyWithHeadersQueryIntegrationTest.java | 72 +- .../processor/api/ReadOnlyRecordTest.java | 68 ++ .../internals/MeteredKeyValueStoreTest.java | 29 + .../MeteredTimestampedKeyValueStoreTest.java | 46 ++ ...mestampedKeyValueStoreWithHeadersTest.java | 63 ++ ...ocksDBTimestampedStoreWithHeadersTest.java | 40 +- ...edKeyValueStoreBuilderWithHeadersTest.java | 685 ++++-------------- 7 files changed, 436 insertions(+), 567 deletions(-) create mode 100644 streams/src/test/java/org/apache/kafka/streams/processor/api/ReadOnlyRecordTest.java diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedKeyWithHeadersQueryIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedKeyWithHeadersQueryIntegrationTest.java index 920b0249992d2..d6ea29c939744 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedKeyWithHeadersQueryIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedKeyWithHeadersQueryIntegrationTest.java @@ -34,12 +34,14 @@ import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.ReadOnlyRecord; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.query.FailureReason; import org.apache.kafka.streams.query.QueryResult; import org.apache.kafka.streams.query.StateQueryRequest; import org.apache.kafka.streams.query.StateQueryResult; import org.apache.kafka.streams.query.TimestampedKeyWithHeadersQuery; +import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders; @@ -71,7 +73,7 @@ * *

Builds a KIP-1271 {@code WithHeaders} store, writes records (with headers) into it through a * processor, then queries the store through IQv2 and asserts that the returned - * {@link ValueTimestampHeaders} carries value, timestamp, and the exact headers written. + * {@link ReadOnlyRecord} carries the key, value, timestamp, and the exact headers written. */ @Tag("integration") public class TimestampedKeyWithHeadersQueryIntegrationTest { @@ -87,6 +89,7 @@ public class TimestampedKeyWithHeadersQueryIntegrationTest { private String inputStream; private String outputStream; private long baseTimestamp; + private long commitIntervalMs = 1000L; private KafkaStreams kafkaStreams; private TestInfo testInfo; @@ -137,14 +140,16 @@ public void shouldHandleTimestampedKeyWithHeadersQuery() throws Exception { outputStream, 4); - // key 1: value + timestamp + headers round-trip - final ValueTimestampHeaders result1 = query(1); + // key 1: key + value + timestamp + headers round-trip + final ReadOnlyRecord result1 = query(1); + assertEquals(Integer.valueOf(1), result1.key()); assertEquals("a0", result1.value()); assertEquals(baseTimestamp, result1.timestamp()); assertEquals(HEADERS, result1.headers()); // key 2: written with no headers -> empty (never null) headers - final ValueTimestampHeaders result2 = query(2); + final ReadOnlyRecord result2 = query(2); + assertEquals(Integer.valueOf(2), result2.key()); assertEquals("b0", result2.value()); assertEquals(baseTimestamp + 1, result2.timestamp()); assertEquals(new RecordHeaders(), result2.headers()); @@ -156,6 +161,29 @@ public void shouldHandleTimestampedKeyWithHeadersQuery() throws Exception { assertNull(query(999)); } + @Test + public void shouldServeCacheHitWhenCachingEnabledAndRecordNotYetFlushed() throws Exception { + // Use a very large commit interval so the processed record is never committed/flushed during + // the test: it lives only in the record cache (the persistent RocksDB layer stays empty). A + // successful query therefore proves the result was served from the cache via + // CachingKeyValueStoreWithHeaders -> the metered store's cache-hit path, end-to-end. + commitIntervalMs = Duration.ofMinutes(10).toMillis(); + startStreams(true); + + produceDataToTopicWithHeaders(inputStream, baseTimestamp, HEADERS, KeyValue.pair(1, "a0")); + IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + TestUtils.consumerConfig(CLUSTER.bootstrapServers(), IntegerDeserializer.class, StringDeserializer.class), + outputStream, + 1); + + // Read-your-writes: the not-yet-flushed record is visible (served from the cache), with headers. + final ReadOnlyRecord result = query(1); + assertEquals(Integer.valueOf(1), result.key()); + assertEquals("a0", result.value()); + assertEquals(baseTimestamp, result.timestamp()); + assertEquals(HEADERS, result.headers()); + } + @Test public void shouldFailWithUnknownQueryTypeAgainstNonHeadersStore() throws Exception { // store built WITHOUT a WithHeaders supplier -> the query type is unsupported @@ -179,25 +207,29 @@ public void shouldFailWithUnknownQueryTypeAgainstNonHeadersStore() throws Except outputStream, 1); - final StateQueryResult> result = - kafkaStreams.query(inStore(STORE_NAME).withQuery(TimestampedKeyWithHeadersQuery.withKey(1))); + final StateQueryResult> result = + kafkaStreams.query(inStore(STORE_NAME).withQuery(TimestampedKeyWithHeadersQuery.withKey(1))); assertTrue(result.getOnlyPartitionResult().isFailure()); assertEquals(FailureReason.UNKNOWN_QUERY_TYPE, result.getOnlyPartitionResult().getFailureReason()); } private void startStreams() throws Exception { + // Caching disabled: every IQv2 query is forced down to the persistent + // RocksDBTimestampedStoreWithHeaders layer, exercising its KeyQuery handling + // (rather than being short-circuited by a cache hit). + startStreams(false); + } + + private void startStreams(final boolean cachingEnabled) throws Exception { final StreamsBuilder builder = new StreamsBuilder(); + final StoreBuilder> storeBuilder = + Stores.timestampedKeyValueStoreWithHeadersBuilder( + Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME), + Serdes.Integer(), + Serdes.String()); builder - .addStateStore( - Stores.timestampedKeyValueStoreWithHeadersBuilder( - Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME), - Serdes.Integer(), - Serdes.String()) - // Disable caching so every IQv2 query is forced down to the persistent - // RocksDBTimestampedStoreWithHeaders layer, exercising its KeyQuery handling - // (rather than being short-circuited by a cache hit). - .withCachingDisabled()) + .addStateStore(cachingEnabled ? storeBuilder.withCachingEnabled() : storeBuilder.withCachingDisabled()) .stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) .process(() -> new HeadersStoreWriterProcessor(), STORE_NAME) .to(outputStream, Produced.with(Serdes.Integer(), Serdes.String())); @@ -206,13 +238,13 @@ private void startStreams() throws Exception { IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams); } - private ValueTimestampHeaders query(final int key) { - final StateQueryRequest> request = + private ReadOnlyRecord query(final int key) { + final StateQueryRequest> request = inStore(STORE_NAME).withQuery(TimestampedKeyWithHeadersQuery.withKey(key)); - final StateQueryResult> result = kafkaStreams.query(request); + final StateQueryResult> result = kafkaStreams.query(request); // getOnlyPartitionResult() returns null when the single partition result is a successful // null (tombstoned / absent key), which we surface to the caller as a null lookup. - final QueryResult> onlyResult = result.getOnlyPartitionResult(); + final QueryResult> onlyResult = result.getOnlyPartitionResult(); return onlyResult == null ? null : onlyResult.getResult(); } @@ -222,7 +254,7 @@ private Properties props() { streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); - streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, commitIntervalMs); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return streamsConfiguration; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/api/ReadOnlyRecordTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/api/ReadOnlyRecordTest.java new file mode 100644 index 0000000000000..ebeee76a00a0a --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/api/ReadOnlyRecordTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.api; + +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.header.internals.RecordHeaders; + +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class ReadOnlyRecordTest { + + private static Headers headers() { + return new RecordHeaders().add(new RecordHeader("k", "v".getBytes(StandardCharsets.UTF_8))); + } + + @Test + public void recordShouldBeAssignableToReadOnlyRecordAndExposeSameAccessors() { + final Headers headers = headers(); + final Record record = new Record<>("key", "value", 123L, headers); + + // The assignment itself proves Record is a ReadOnlyRecord (source/binary compatible change). + final ReadOnlyRecord readOnly = record; + + assertEquals(record.key(), readOnly.key()); + assertEquals(record.value(), readOnly.value()); + assertEquals(record.timestamp(), readOnly.timestamp()); + assertEquals(record.headers(), readOnly.headers()); + + assertEquals("key", readOnly.key()); + assertEquals("value", readOnly.value()); + assertEquals(123L, readOnly.timestamp()); + assertEquals(headers, readOnly.headers()); + } + + @Test + public void readOnlyHeadersShouldBeNonNullAndEmptyWhenConstructedWithoutHeaders() { + final ReadOnlyRecord readOnly = new Record<>("key", "value", 123L); + assertEquals(new RecordHeaders(), readOnly.headers()); + } + + @Test + public void readOnlyAccessorsShouldTolerateNullKeyAndValue() { + final ReadOnlyRecord readOnly = new Record<>(null, null, 0L); + assertEquals(null, readOnly.key()); + assertEquals(null, readOnly.value()); + assertEquals(0L, readOnly.timestamp()); + assertEquals(new RecordHeaders(), readOnly.headers()); + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java index d5d65e56be5b0..cb2606e1b3f1a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java @@ -43,6 +43,11 @@ import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.query.KeyQuery; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryConfig; +import org.apache.kafka.streams.query.QueryResult; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; @@ -151,6 +156,30 @@ private void init() { metered.init(context, metered); } + @Test + public void shouldPropagateSkipCacheForKeyQuery() { + setUp(); + init(); + assertTrue(forwardedRawKeyQuery(KeyQuery.withKey(KEY).skipCache()).isSkipCache()); + } + + @Test + public void shouldNotSkipCacheForKeyQueryByDefault() { + setUp(); + init(); + assertFalse(forwardedRawKeyQuery(KeyQuery.withKey(KEY)).isSkipCache()); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + private KeyQuery forwardedRawKeyQuery(final Query query) { + when(inner.query(any(), any(PositionBound.class), any(QueryConfig.class))) + .thenReturn((QueryResult) QueryResult.forResult(null)); + metered.query(query, PositionBound.unbounded(), new QueryConfig(false)); + final ArgumentCaptor captor = ArgumentCaptor.forClass(KeyQuery.class); + verify(inner).query(captor.capture(), any(PositionBound.class), any(QueryConfig.class)); + return captor.getValue(); + } + @Test public void shouldDelegateInit() { setUp(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java index 6e05513ec9c7c..6313886d163af 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java @@ -37,6 +37,12 @@ import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.query.KeyQuery; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryConfig; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.query.TimestampedKeyQuery; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.ValueAndTimestamp; @@ -44,6 +50,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; @@ -67,6 +74,7 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @@ -143,6 +151,44 @@ private void init() { metered.init(context, metered); } + @Test + public void shouldPropagateSkipCacheForKeyQuery() { + setUp(); + init(); + assertTrue(forwardedRawKeyQuery(KeyQuery.withKey(KEY).skipCache()).isSkipCache()); + } + + @Test + public void shouldNotSkipCacheForKeyQueryByDefault() { + setUp(); + init(); + assertFalse(forwardedRawKeyQuery(KeyQuery.withKey(KEY)).isSkipCache()); + } + + @Test + public void shouldPropagateSkipCacheForTimestampedKeyQuery() { + setUp(); + init(); + assertTrue(forwardedRawKeyQuery(TimestampedKeyQuery.withKey(KEY).skipCache()).isSkipCache()); + } + + @Test + public void shouldNotSkipCacheForTimestampedKeyQueryByDefault() { + setUp(); + init(); + assertFalse(forwardedRawKeyQuery(TimestampedKeyQuery.withKey(KEY)).isSkipCache()); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + private KeyQuery forwardedRawKeyQuery(final Query query) { + when(inner.query(any(), any(PositionBound.class), any(QueryConfig.class))) + .thenReturn((QueryResult) QueryResult.forResult(null)); + metered.query(query, PositionBound.unbounded(), new QueryConfig(false)); + final ArgumentCaptor captor = ArgumentCaptor.forClass(KeyQuery.class); + verify(inner).query(captor.capture(), any(PositionBound.class), any(QueryConfig.class)); + return captor.getValue(); + } + @Test public void shouldDelegateInit() { setUp(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java index f6355e7bd6af5..36b20e4d4e327 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java @@ -36,6 +36,13 @@ import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.query.KeyQuery; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryConfig; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.query.TimestampedKeyQuery; +import org.apache.kafka.streams.query.TimestampedKeyWithHeadersQuery; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.ValueTimestampHeaders; @@ -43,6 +50,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; @@ -143,6 +151,61 @@ private void init() { metered.init(context, metered); } + // --- skipCache propagation: the metered handlers must forward isSkipCache() onto the raw + // KeyQuery they build, otherwise the caching layer never sees it (it was a no-op before). --- + + @Test + public void shouldPropagateSkipCacheForKeyQuery() { + setUp(); + init(); + assertTrue(forwardedRawKeyQuery(KeyQuery.withKey(KEY).skipCache()).isSkipCache()); + } + + @Test + public void shouldNotSkipCacheForKeyQueryByDefault() { + setUp(); + init(); + assertFalse(forwardedRawKeyQuery(KeyQuery.withKey(KEY)).isSkipCache()); + } + + @Test + public void shouldPropagateSkipCacheForTimestampedKeyQuery() { + setUp(); + init(); + assertTrue(forwardedRawKeyQuery(TimestampedKeyQuery.withKey(KEY).skipCache()).isSkipCache()); + } + + @Test + public void shouldNotSkipCacheForTimestampedKeyQueryByDefault() { + setUp(); + init(); + assertFalse(forwardedRawKeyQuery(TimestampedKeyQuery.withKey(KEY)).isSkipCache()); + } + + @Test + public void shouldPropagateSkipCacheForTimestampedKeyWithHeadersQuery() { + setUp(); + init(); + assertTrue(forwardedRawKeyQuery(TimestampedKeyWithHeadersQuery.withKey(KEY).skipCache()).isSkipCache()); + } + + @Test + public void shouldNotSkipCacheForTimestampedKeyWithHeadersQueryByDefault() { + setUp(); + init(); + assertFalse(forwardedRawKeyQuery(TimestampedKeyWithHeadersQuery.withKey(KEY)).isSkipCache()); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + private KeyQuery forwardedRawKeyQuery(final Query query) { + when(inner.query(any(), any(PositionBound.class), any(QueryConfig.class))) + .thenReturn((QueryResult) QueryResult.forResult(null)); + metered.query(query, PositionBound.unbounded(), new QueryConfig(false)); + final ArgumentCaptor captor = ArgumentCaptor.forClass(KeyQuery.class); + verify(inner).query(captor.capture(), any(PositionBound.class), any(QueryConfig.class)); + return captor.getValue(); + } + @Test public void shouldDelegateInit() { setUp(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java index 2189eb99d1aa6..057e8c2f03ffe 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.streams.query.FailureReason; import org.apache.kafka.streams.query.KeyQuery; import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; import org.apache.kafka.streams.query.QueryConfig; import org.apache.kafka.streams.query.QueryResult; import org.apache.kafka.streams.query.RangeQuery; @@ -986,13 +987,13 @@ public void shouldReturnUnknownQueryTypeForUnsupportedQuery() { // Initialize the store rocksDBStore.init(context, rocksDBStore); - // RangeQuery is not (yet) supported by this store; only KeyQuery is wired (KIP-1356 PoC). - final RangeQuery query = RangeQuery.withNoBounds(); + // A query type the store has no handler for still yields UNKNOWN_QUERY_TYPE. + final Query unsupportedQuery = new Query<>() { }; final PositionBound positionBound = PositionBound.unbounded(); final QueryConfig config = new QueryConfig(false); // Execute query - final QueryResult> result = rocksDBStore.query(query, positionBound, config); + final QueryResult result = rocksDBStore.query(unsupportedQuery, positionBound, config); // Verify result indicates unknown query type assertFalse(result.isSuccess(), "Expected query to fail with unknown query type"); @@ -1016,8 +1017,9 @@ public void shouldHandleKeyQuery() { final byte[] storedBytes = "headers+timestamp+value".getBytes(); rocksDBStore.put(key, storedBytes); - // KIP-1356 PoC: KeyQuery is now served by this store (rather than UNKNOWN_QUERY_TYPE), - // returning the raw stored bytes (the metered store performs header-aware deserialization). + // KIP-1356: removing the query() override lets the native store serve KeyQuery (previously + // UNKNOWN_QUERY_TYPE), matching the adapter build path. The raw stored bytes are returned; + // the metered store performs the header-aware deserialization. final KeyQuery query = KeyQuery.withKey(key); final QueryResult result = rocksDBStore.query(query, PositionBound.unbounded(), new QueryConfig(false)); @@ -1027,6 +1029,34 @@ public void shouldHandleKeyQuery() { assertNotNull(result.getPosition(), "Expected position to be set"); } + @Test + public void shouldHandleRangeQuery() { + // Initialize the store + rocksDBStore.init(context, rocksDBStore); + + // Store raw (serialized ValueTimestampHeaders) bytes for a key. + final Bytes key = new Bytes("test-key".getBytes()); + final byte[] storedBytes = "headers+timestamp+value".getBytes(); + rocksDBStore.put(key, storedBytes); + + // KIP-1356: removing the query() override lets the native store serve RangeQuery (previously + // UNKNOWN_QUERY_TYPE), matching the adapter build path. The raw stored bytes are returned; + // the metered store performs the header-aware deserialization. + final RangeQuery query = RangeQuery.withNoBounds(); + final QueryResult> result = + rocksDBStore.query(query, PositionBound.unbounded(), new QueryConfig(false)); + + assertTrue(result.isSuccess(), "Expected RangeQuery to succeed"); + try (KeyValueIterator iterator = result.getResult()) { + assertTrue(iterator.hasNext(), "Expected the stored key in the range result"); + final KeyValue keyValue = iterator.next(); + assertEquals(key, keyValue.key); + assertArrayEquals(storedBytes, keyValue.value, "Expected the raw stored bytes to be returned"); + assertFalse(iterator.hasNext()); + } + assertNotNull(result.getPosition(), "Expected position to be set"); + } + @Test public void shouldCollectExecutionInfoWhenRequested() { // Initialize the store diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java index 9e6ec3bb0f3d0..e18e659777912 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java @@ -19,12 +19,16 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.common.utils.internals.LogContext; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.query.FailureReason; +import org.apache.kafka.streams.processor.api.ReadOnlyRecord; +import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.query.KeyQuery; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.query.PositionBound; @@ -32,40 +36,36 @@ import org.apache.kafka.streams.query.QueryResult; import org.apache.kafka.streams.query.RangeQuery; import org.apache.kafka.streams.query.TimestampedKeyQuery; -import org.apache.kafka.streams.state.HeadersBytesStore; +import org.apache.kafka.streams.query.TimestampedKeyWithHeadersQuery; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.ValueTimestampHeaders; import org.apache.kafka.test.InternalMockProcessorContext; -import org.apache.kafka.test.StreamsTestUtils; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; -import java.io.File; import java.nio.charset.StandardCharsets; import java.util.Collections; -import java.util.Properties; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.lenient; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @@ -229,353 +229,89 @@ public void shouldWrapPlainKeyValueStoreAsHeadersStore() { assertInstanceOf(PlainToHeadersStoreAdapter.class, ((WrappedStateStore) store).wrapped()); } - @Test - public void shouldHandleKeyQueryOnInMemoryStore() { - when(supplier.name()).thenReturn("test-store"); - when(supplier.metricsScope()).thenReturn("in-memory"); - when(supplier.get()).thenReturn(new InMemoryKeyValueStore("test-store")); - - builder = new TimestampedKeyValueStoreBuilderWithHeaders<>( - supplier, - Serdes.String(), - Serdes.String(), - new MockTime() - ); - - final TimestampedKeyValueStoreWithHeaders store = builder - .withLoggingDisabled() - .withCachingDisabled() - .build(); - - final File dir = TestUtils.tempDirectory(); - final Properties props = StreamsTestUtils.getStreamsConfig(); - final InternalMockProcessorContext context = new InternalMockProcessorContext<>( - dir, - Serdes.String(), - Serdes.String(), - new StreamsConfig(props) - ); - store.init(context, store); - - try { - // Put data into the store - final Headers headers = new RecordHeaders(); - headers.add("key1", "value1".getBytes()); - store.put("test-key", ValueTimestampHeaders.make("test-value", 12345L, headers)); - - // Verify wrapper type for InMemoryKeyValueStore - final StateStore wrapped = ((WrappedStateStore) store).wrapped(); - assertInstanceOf(HeadersBytesStore.class, wrapped, - "Expected wrapper to implement HeadersBytesStore for InMemoryKeyValueStore"); - - // Query at typed level - KeyQuery should return just the value - final KeyQuery query = KeyQuery.withKey("test-key"); - final QueryResult result = store.query(query, PositionBound.unbounded(), new QueryConfig(false)); - - // Verify IQv2 query result - assertTrue(result.isSuccess(), "Expected query to succeed on InMemoryKeyValueStore"); - assertNotNull(result.getPosition(), "Expected position to be set"); - assertInstanceOf(String.class, result.getResult()); - assertEquals("test-value", result.getResult(), "KeyQuery should return just the value"); - } finally { - store.close(); - } - } - - @Test - public void shouldHandleTimestampedKeyQueryOnInMemoryStore() { - when(supplier.name()).thenReturn("test-store"); - when(supplier.metricsScope()).thenReturn("in-memory"); - when(supplier.get()).thenReturn(new InMemoryKeyValueStore("test-store")); - - builder = new TimestampedKeyValueStoreBuilderWithHeaders<>( - supplier, - Serdes.String(), - Serdes.String(), - new MockTime() - ); - - final TimestampedKeyValueStoreWithHeaders store = builder - .withLoggingDisabled() - .withCachingDisabled() - .build(); - - final File dir = TestUtils.tempDirectory(); - final Properties props = StreamsTestUtils.getStreamsConfig(); - final InternalMockProcessorContext context = new InternalMockProcessorContext<>( - dir, - Serdes.String(), - Serdes.String(), - new StreamsConfig(props) - ); - store.init(context, store); - - try { - // Put data into the store - final Headers headers = new RecordHeaders(); - headers.add("key1", "value1".getBytes()); - store.put("test-key", ValueTimestampHeaders.make("test-value", 12345L, headers)); - - // Verify wrapper type for InMemoryKeyValueStore - final StateStore wrapped = ((WrappedStateStore) store).wrapped(); - assertInstanceOf(HeadersBytesStore.class, wrapped, - "Expected wrapper to implement HeadersBytesStore for InMemoryKeyValueStore"); - - // Query at typed level - TimestampedKeyQuery should return value + timestamp - final TimestampedKeyQuery query = TimestampedKeyQuery.withKey("test-key"); - final QueryResult> result = store.query(query, PositionBound.unbounded(), new QueryConfig(false)); - - // Verify IQv2 query result - assertTrue(result.isSuccess(), "Expected query to succeed on InMemoryKeyValueStore"); - assertNotNull(result.getPosition(), "Expected position to be set"); - assertNotNull(result.getResult(), "Expected non-null result"); - assertInstanceOf(ValueAndTimestamp.class, result.getResult()); - assertEquals("test-value", result.getResult().value(), "TimestampedKeyQuery should return the value"); - assertEquals(12345L, result.getResult().timestamp(), "TimestampedKeyQuery should return the timestamp"); - } finally { - store.close(); + // ---------------------------------------------------------------------------------------------- + // IQv2 query handling for the built header store. + // + // The header store can be built three ways; each is exercised with caching on and off through a + // single helper that builds the real store chain (Metered -> [Caching] -> inner) with a real + // record cache, then writes and reads real data at the typed (metered) level: + // NATIVE -> RocksDBTimestampedStoreWithHeaders (persists headers) + // ADAPTER -> RocksDBTimestampedStore via TimestampedToHeadersStoreAdapter (drops headers) + // IN_MEMORY -> InMemoryKeyValueStore via the in-memory headers marker + // ---------------------------------------------------------------------------------------------- + + private enum StoreType { NATIVE, ADAPTER, IN_MEMORY } + + private KeyValueStore innerStore(final StoreType storeType) { + switch (storeType) { + case NATIVE: return new RocksDBTimestampedStoreWithHeaders("test-store", "metrics-scope"); + case ADAPTER: return new RocksDBTimestampedStore("test-store", "metrics-scope"); + case IN_MEMORY: return new InMemoryKeyValueStore("test-store"); + default: throw new IllegalArgumentException("unknown store type: " + storeType); } } - @Test - public void shouldReturnPositionFromHeadersStore() { - when(supplier.name()).thenReturn("test-store"); - when(supplier.metricsScope()).thenReturn("metricScope"); - when(supplier.get()).thenReturn(new RocksDBTimestampedStoreWithHeaders("test-store", "metrics-scope")); + private TimestampedKeyValueStoreWithHeaders buildAndInitStore( + final StoreType storeType, + final boolean cachingEnabled) { + lenient().when(supplier.name()).thenReturn("test-store"); + lenient().when(supplier.metricsScope()).thenReturn("metricScope"); + lenient().when(supplier.get()).thenReturn(innerStore(storeType)); builder = new TimestampedKeyValueStoreBuilderWithHeaders<>( - supplier, - Serdes.String(), - Serdes.String(), - new MockTime() - ); - - final TimestampedKeyValueStoreWithHeaders store = builder - .withLoggingDisabled() - .withCachingDisabled() - .build(); - - final File dir = TestUtils.tempDirectory(); - final Properties props = StreamsTestUtils.getStreamsConfig(); - final InternalMockProcessorContext context = new InternalMockProcessorContext<>( - dir, - Serdes.String(), - Serdes.String(), - new StreamsConfig(props) - ); - store.init(context, store); - - try { - final StateStore wrapped = ((WrappedStateStore) store).wrapped(); - final Position position = wrapped.getPosition(); - - // Verify: Position is returned (should be non-null) - assertNotNull(position, "Expected non-null position"); - assertTrue(position.isEmpty(), "Expected position to be empty initially"); - } finally { - store.close(); - } - } - - @Test - public void shouldReturnPositionFromAdaptedTimestampedStore() { - when(supplier.name()).thenReturn("test-store"); - when(supplier.metricsScope()).thenReturn("metricScope"); - when(supplier.get()).thenReturn(new RocksDBTimestampedStore("test-store", "metrics-scope")); - - builder = new TimestampedKeyValueStoreBuilderWithHeaders<>( - supplier, - Serdes.String(), - Serdes.String(), - new MockTime() - ); - - final TimestampedKeyValueStoreWithHeaders store = builder - .withLoggingDisabled() - .withCachingDisabled() - .build(); + supplier, Serdes.String(), Serdes.String(), new MockTime()); + final TimestampedKeyValueStoreWithHeaders store = + (cachingEnabled + ? builder.withLoggingDisabled().withCachingEnabled() + : builder.withLoggingDisabled().withCachingDisabled()) + .build(); - final File dir = TestUtils.tempDirectory(); - final Properties props = StreamsTestUtils.getStreamsConfig(); + final ThreadCache cache = new ThreadCache( + new LogContext("test "), + cachingEnabled ? 10 * 1024 * 1024 : 0, + new MockStreamsMetrics(new Metrics())); final InternalMockProcessorContext context = new InternalMockProcessorContext<>( - dir, - Serdes.String(), - Serdes.String(), - new StreamsConfig(props) - ); + TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), null, cache); + context.setRecordContext(new ProcessorRecordContext(0L, 0L, 0, "topic", new RecordHeaders())); store.init(context, store); - - try { - // Verify adapter is used - final StateStore wrapped = ((WrappedStateStore) store).wrapped(); - assertInstanceOf(TimestampedToHeadersStoreAdapter.class, wrapped); - - // Get position from adapter (should delegate to underlying store) - final Position position = wrapped.getPosition(); - - assertNotNull(position, "Expected non-null position from adapter"); - assertTrue(position.isEmpty(), "Expected position to be empty initially"); - } finally { - store.close(); - } + return store; } - @Test - public void shouldReturnPositionFromInMemoryStore() { - when(supplier.name()).thenReturn("test-store"); - when(supplier.metricsScope()).thenReturn("in-memory"); - when(supplier.get()).thenReturn(new InMemoryKeyValueStore("test-store")); - - builder = new TimestampedKeyValueStoreBuilderWithHeaders<>( - supplier, - Serdes.String(), - Serdes.String(), - new MockTime() - ); - - final TimestampedKeyValueStoreWithHeaders store = builder - .withLoggingDisabled() - .withCachingDisabled() - .build(); - - final File dir = TestUtils.tempDirectory(); - final Properties props = StreamsTestUtils.getStreamsConfig(); - final InternalMockProcessorContext context = new InternalMockProcessorContext<>( - dir, - Serdes.String(), - Serdes.String(), - new StreamsConfig(props) - ); - store.init(context, store); - - try { - // Verify marker wrapper is used - final StateStore wrapped = ((WrappedStateStore) store).wrapped(); - assertInstanceOf(HeadersBytesStore.class, wrapped); - - // Get position from marker (should delegate to InMemoryKeyValueStore) - final Position position = wrapped.getPosition(); - - assertNotNull(position, "Expected non-null position from in-memory store"); - assertTrue(position.isEmpty(), "Expected position to be empty initially"); - } finally { - store.close(); - } + private static Headers headersWith(final String key, final String value) { + return new RecordHeaders().add(key, value.getBytes(StandardCharsets.UTF_8)); } - @Test - public void shouldMaintainPositionAcrossOperationsOnHeadersStore() { - when(supplier.name()).thenReturn("test-store"); - when(supplier.metricsScope()).thenReturn("metricScope"); - when(supplier.get()).thenReturn(new RocksDBTimestampedStoreWithHeaders("test-store", "metrics-scope")); - - builder = new TimestampedKeyValueStoreBuilderWithHeaders<>( - supplier, - Serdes.String(), - Serdes.String(), - new MockTime() - ); - - final TimestampedKeyValueStoreWithHeaders store = builder - .withLoggingDisabled() - .withCachingDisabled() - .build(); - - final File dir = TestUtils.tempDirectory(); - final Properties props = StreamsTestUtils.getStreamsConfig(); - final InternalMockProcessorContext context = new InternalMockProcessorContext<>( - dir, - Serdes.String(), - Serdes.String(), - new StreamsConfig(props) - ); - store.init(context, store); - + @ParameterizedTest + @CsvSource({"NATIVE, true", "NATIVE, false", "ADAPTER, true", "ADAPTER, false", "IN_MEMORY, true", "IN_MEMORY, false"}) + public void shouldHandleKeyQuery(final StoreType storeType, final boolean cachingEnabled) { + final TimestampedKeyValueStoreWithHeaders store = buildAndInitStore(storeType, cachingEnabled); try { - final StateStore wrapped = ((WrappedStateStore) store).wrapped(); - - // Get initial position - final Position initialPosition = wrapped.getPosition(); - assertNotNull(initialPosition, "Expected non-null initial position"); + store.put("k", ValueTimestampHeaders.make("v", 123L, headersWith("h", "x"))); - // Put some data - store.put("key1", ValueTimestampHeaders.make("value1", 100L, new RecordHeaders())); - store.put("key2", ValueTimestampHeaders.make("value2", 200L, new RecordHeaders())); + final QueryResult result = + store.query(KeyQuery.withKey("k"), PositionBound.unbounded(), new QueryConfig(false)); - // Get position after puts - final Position afterPutPosition = wrapped.getPosition(); - assertNotNull(afterPutPosition, "Expected non-null position after puts"); - - // Position object should be the same instance (stores maintain a single position) - // The position content might be updated internally by the context + assertTrue(result.isSuccess(), "Expected KeyQuery to succeed"); + assertEquals("v", result.getResult(), "KeyQuery returns the value only"); + assertNotNull(result.getPosition(), "Expected position to be set"); } finally { store.close(); } } - private static ThreadCache mockCacheHit() { - final ThreadCache cache = mock(ThreadCache.class); - final LRUCacheEntry entry = mock(LRUCacheEntry.class); - final byte[] entryValue = "mockEntryValue".getBytes(StandardCharsets.UTF_8); - lenient().when(entry.value()).thenReturn(entryValue); - lenient().when(cache.get(any(String.class), any(Bytes.class))).thenReturn(entry); - return cache; - } - - private TimestampedKeyValueStoreWithHeaders headersStoreMaybeWithCache(final boolean cachingEnabled) { - when(supplier.name()).thenReturn("test-store"); - when(supplier.metricsScope()).thenReturn("metricScope"); - when(supplier.get()).thenReturn(new RocksDBTimestampedStoreWithHeaders("test-store", "metrics-scope")); - - final File dir = TestUtils.tempDirectory(); - final ThreadCache cache = mockCacheHit(); - final InternalMockProcessorContext context = new InternalMockProcessorContext<>( - dir, - Serdes.String(), - Serdes.String(), - null, - cache - ); - - builder = new TimestampedKeyValueStoreBuilderWithHeaders<>( - supplier, - Serdes.String(), - Serdes.String(), - new MockTime() - ); - - final TimestampedKeyValueStoreWithHeaders store; - if (cachingEnabled) { - store = builder.withLoggingDisabled() - .withCachingEnabled() - .build(); - } else { - store = builder.withLoggingDisabled() - .withCachingDisabled() - .build(); - } - - store.init(context, store); - return store; - } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void shouldHandleKeyQueryOnHeadersStore(final boolean cachingEnabled) { - final TimestampedKeyValueStoreWithHeaders store = headersStoreMaybeWithCache(cachingEnabled); - + @CsvSource({"NATIVE, true", "NATIVE, false", "ADAPTER, true", "ADAPTER, false", "IN_MEMORY, true", "IN_MEMORY, false"}) + public void shouldHandleTimestampedKeyQuery(final StoreType storeType, final boolean cachingEnabled) { + final TimestampedKeyValueStoreWithHeaders store = buildAndInitStore(storeType, cachingEnabled); try { - final KeyQuery query = KeyQuery.withKey(new Bytes("test-key".getBytes())); - final PositionBound positionBound = PositionBound.unbounded(); - final QueryConfig config = new QueryConfig(false); + store.put("k", ValueTimestampHeaders.make("v", 123L, headersWith("h", "x"))); - final StateStore wrapped = ((WrappedStateStore) store).wrapped(); - final QueryResult result = wrapped.query(query, positionBound, config); + final QueryResult> result = + store.query(TimestampedKeyQuery.withKey("k"), PositionBound.unbounded(), new QueryConfig(false)); - // KIP-1356 PoC: the headers store now serves KeyQuery (rather than UNKNOWN_QUERY_TYPE). - // No data was written for "test-key", so the result is a successful null lookup. - assertTrue(result.isSuccess(), "Expected KeyQuery to be handled"); - assertNull(result.getResult(), "Expected null result for an absent key"); + assertTrue(result.isSuccess(), "Expected TimestampedKeyQuery to succeed"); + assertEquals("v", result.getResult().value()); + assertEquals(123L, result.getResult().timestamp()); assertNotNull(result.getPosition(), "Expected position to be set"); } finally { store.close(); @@ -584,247 +320,112 @@ public void shouldHandleKeyQueryOnHeadersStore(final boolean cachingEnabled) { @ParameterizedTest @ValueSource(booleans = {true, false}) - public void shouldReturnUnknownQueryTypeForRangeQueryOnHeadersStore(final boolean cachingEnabled) { - final TimestampedKeyValueStoreWithHeaders store = headersStoreMaybeWithCache(cachingEnabled); - + public void shouldHandleTimestampedKeyWithHeadersQueryOnNativeStore(final boolean cachingEnabled) { + // Only the native store persists headers; the adapter and in-memory builds drop them on write. + final TimestampedKeyValueStoreWithHeaders store = buildAndInitStore(StoreType.NATIVE, cachingEnabled); try { - final RangeQuery query = RangeQuery.withRange( - new Bytes("a".getBytes()), - new Bytes("z".getBytes()) - ); - final PositionBound positionBound = PositionBound.unbounded(); - final QueryConfig config = new QueryConfig(false); - - final StateStore wrapped = ((WrappedStateStore) store).wrapped(); - final QueryResult> result = wrapped.query(query, positionBound, config); - - // Verify: Headers store currently returns UNKNOWN_QUERY_TYPE - assertFalse(result.isSuccess(), "Expected query to fail with unknown query type"); - assertEquals( - FailureReason.UNKNOWN_QUERY_TYPE, - result.getFailureReason(), - "Expected UNKNOWN_QUERY_TYPE failure reason" - ); + final Headers headers = headersWith("h", "x"); + store.put("k", ValueTimestampHeaders.make("v", 123L, headers)); + + final QueryResult> result = + store.query(TimestampedKeyWithHeadersQuery.withKey("k"), PositionBound.unbounded(), new QueryConfig(false)); + + assertTrue(result.isSuccess(), "Expected TimestampedKeyWithHeadersQuery to succeed"); + final ReadOnlyRecord record = result.getResult(); + assertEquals("k", record.key()); + assertEquals("v", record.value()); + assertEquals(123L, record.timestamp()); + // Headers must round-trip on both the persistent path and the cache path. + assertEquals(headers, record.headers()); assertNotNull(result.getPosition(), "Expected position to be set"); } finally { store.close(); } } - @Test - public void shouldCollectExecutionInfoForQueryOnHeadersStore() { - when(supplier.name()).thenReturn("test-store"); - when(supplier.metricsScope()).thenReturn("metricScope"); - when(supplier.get()).thenReturn(new RocksDBTimestampedStoreWithHeaders("test-store", "metrics-scope")); - - builder = new TimestampedKeyValueStoreBuilderWithHeaders<>( - supplier, - Serdes.String(), - Serdes.String(), - new MockTime() - ); - - final TimestampedKeyValueStoreWithHeaders store = builder - .withLoggingDisabled() - .withCachingDisabled() - .build(); - - final File dir = TestUtils.tempDirectory(); - final Properties props = StreamsTestUtils.getStreamsConfig(); - final InternalMockProcessorContext context = new InternalMockProcessorContext<>( - dir, - Serdes.String(), - Serdes.String(), - new StreamsConfig(props) - ); - store.init(context, store); - + @ParameterizedTest + @CsvSource({"NATIVE", "ADAPTER", "IN_MEMORY"}) + public void shouldHandleRangeQuery(final StoreType storeType) { + // Caching disabled: an IQv2 RangeQuery forwards straight to the store (it does not merge the + // record cache), so the write is served from the persistent store. + final TimestampedKeyValueStoreWithHeaders store = buildAndInitStore(storeType, false); try { - final KeyQuery query = KeyQuery.withKey(new Bytes("test-key".getBytes())); - final PositionBound positionBound = PositionBound.unbounded(); - final QueryConfig config = new QueryConfig(true); // Enable execution info - - final StateStore wrapped = ((WrappedStateStore) store).wrapped(); - final QueryResult result = wrapped.query(query, positionBound, config); - - // Verify: Execution info was collected - assertFalse(result.getExecutionInfo().isEmpty(), "Expected execution info to be collected"); - assertTrue( - result.getExecutionInfo().get(0).contains("Handled in"), - "Expected execution info to contain handling information" - ); - assertTrue( - result.getExecutionInfo().get(0).contains(RocksDBTimestampedStoreWithHeaders.class.getName()), - "Expected execution info to mention the class name" - ); + store.put("k", ValueTimestampHeaders.make("v", 123L, headersWith("h", "x"))); + + final QueryResult> result = + store.query(RangeQuery.withNoBounds(), PositionBound.unbounded(), new QueryConfig(false)); + + assertTrue(result.isSuccess(), "Expected RangeQuery to succeed"); + try (KeyValueIterator iterator = result.getResult()) { + assertTrue(iterator.hasNext(), "Expected the stored key in the range result"); + final KeyValue keyValue = iterator.next(); + assertEquals("k", keyValue.key); + assertEquals("v", keyValue.value); + assertFalse(iterator.hasNext()); + } + assertNotNull(result.getPosition(), "Expected position to be set"); } finally { store.close(); } } - @Test - public void shouldHandleKeyQueryOnAdaptedTimestampedStore() { - when(supplier.name()).thenReturn("test-store"); - when(supplier.metricsScope()).thenReturn("metricScope"); - when(supplier.get()).thenReturn(new RocksDBTimestampedStore("test-store", "metrics-scope")); - - builder = new TimestampedKeyValueStoreBuilderWithHeaders<>( - supplier, - Serdes.String(), - Serdes.String(), - new MockTime() - ); - - final TimestampedKeyValueStoreWithHeaders store = builder - .withLoggingDisabled() - .withCachingDisabled() - .build(); - - final File dir = TestUtils.tempDirectory(); - final Properties props = StreamsTestUtils.getStreamsConfig(); - final InternalMockProcessorContext context = new InternalMockProcessorContext<>( - dir, - Serdes.String(), - Serdes.String(), - new StreamsConfig(props) - ); - store.init(context, store); - + @ParameterizedTest + @CsvSource({"NATIVE", "ADAPTER", "IN_MEMORY"}) + public void shouldReturnEmptyPositionInitially(final StoreType storeType) { + final TimestampedKeyValueStoreWithHeaders store = buildAndInitStore(storeType, false); try { - // Put data into the store (headers will be discarded when adapted to timestamped store) - final Headers headers = new RecordHeaders(); - headers.add("adapter", "test".getBytes()); - store.put("test-key", ValueTimestampHeaders.make("adapter-value", 55555L, headers)); - - // Verify adapter is used for legacy timestamped store - final StateStore wrapped = ((WrappedStateStore) store).wrapped(); - assertInstanceOf(TimestampedToHeadersStoreAdapter.class, wrapped, - "Expected TimestampedToHeadersStoreAdapter for legacy timestamped store"); - - // Query at typed level - KeyQuery should return just the value - final KeyQuery query = KeyQuery.withKey("test-key"); - final QueryResult result = store.query(query, PositionBound.unbounded(), new QueryConfig(false)); - - // Verify IQv2 query result - // Adapter delegates to RocksDBTimestampedStore which supports IQv2 through RocksDBStore - assertTrue(result.isSuccess(), - "Expected query to succeed since RocksDBTimestampedStore supports IQv2"); - assertNotNull(result.getPosition(), "Expected position to be set"); - assertInstanceOf(String.class, result.getResult()); - assertEquals("adapter-value", result.getResult(), "KeyQuery should return just the value"); + final Position position = ((WrappedStateStore) store).wrapped().getPosition(); + assertNotNull(position, "Expected non-null position"); + assertTrue(position.isEmpty(), "Expected position to be empty initially"); } finally { store.close(); } } - @Test - public void shouldHandleTimestampedKeyQueryOnAdaptedTimestampedStore() { - when(supplier.name()).thenReturn("test-store"); - when(supplier.metricsScope()).thenReturn("metricScope"); - when(supplier.get()).thenReturn(new RocksDBTimestampedStore("test-store", "metrics-scope")); - - builder = new TimestampedKeyValueStoreBuilderWithHeaders<>( - supplier, - Serdes.String(), - Serdes.String(), - new MockTime() - ); - - final TimestampedKeyValueStoreWithHeaders store = builder - .withLoggingDisabled() - .withCachingDisabled() - .build(); - - final File dir = TestUtils.tempDirectory(); - final Properties props = StreamsTestUtils.getStreamsConfig(); - final InternalMockProcessorContext context = new InternalMockProcessorContext<>( - dir, - Serdes.String(), - Serdes.String(), - new StreamsConfig(props) - ); - store.init(context, store); - + @ParameterizedTest + @CsvSource({"NATIVE", "ADAPTER"}) + public void shouldCollectExecutionInfoWhenRequested(final StoreType storeType) { + final TimestampedKeyValueStoreWithHeaders store = buildAndInitStore(storeType, false); try { - // Put data into the store (headers will be discarded when adapted to timestamped store) - final Headers headers = new RecordHeaders(); - headers.add("adapter", "test".getBytes()); - store.put("test-key", ValueTimestampHeaders.make("adapter-value", 55555L, headers)); - - // Verify adapter is used for legacy timestamped store final StateStore wrapped = ((WrappedStateStore) store).wrapped(); - assertInstanceOf(TimestampedToHeadersStoreAdapter.class, wrapped, - "Expected TimestampedToHeadersStoreAdapter for legacy timestamped store"); - - // Query at typed level - TimestampedKeyQuery should return value + timestamp - final TimestampedKeyQuery query = TimestampedKeyQuery.withKey("test-key"); - final QueryResult> result = store.query(query, PositionBound.unbounded(), new QueryConfig(false)); + final QueryResult result = wrapped.query( + KeyQuery.withKey(new Bytes("k".getBytes())), PositionBound.unbounded(), new QueryConfig(true)); - // Verify IQv2 query result - // Adapter delegates to RocksDBTimestampedStore which supports IQv2 through RocksDBStore - assertTrue(result.isSuccess(), - "Expected query to succeed since RocksDBTimestampedStore supports IQv2"); - assertNotNull(result.getPosition(), "Expected position to be set"); - assertNotNull(result.getResult(), "Expected non-null result"); - assertInstanceOf(ValueAndTimestamp.class, result.getResult()); - assertEquals("adapter-value", result.getResult().value(), "TimestampedKeyQuery should return the value"); - assertEquals(55555L, result.getResult().timestamp(), "TimestampedKeyQuery should return the timestamp"); + final String executionInfo = String.join("\n", result.getExecutionInfo()); + assertFalse(executionInfo.isEmpty(), "Expected execution info to be collected"); + assertTrue(executionInfo.contains("Handled in"), "Expected execution info to contain handling information"); + final String expectedClass = storeType == StoreType.NATIVE + ? RocksDBTimestampedStoreWithHeaders.class.getName() + : TimestampedToHeadersStoreAdapter.class.getName(); + assertTrue(executionInfo.contains(expectedClass), "Expected execution info to mention " + expectedClass); } finally { store.close(); } } - @Test - public void shouldCollectExecutionInfoForQueryOnAdaptedTimestampedStore() { - when(supplier.name()).thenReturn("test-store"); - when(supplier.metricsScope()).thenReturn("metricScope"); - when(supplier.get()).thenReturn(new RocksDBTimestampedStore("test-store", "metrics-scope")); - - builder = new TimestampedKeyValueStoreBuilderWithHeaders<>( - supplier, - Serdes.String(), - Serdes.String(), - new MockTime() - ); - - final TimestampedKeyValueStoreWithHeaders store = builder - .withLoggingDisabled() - .withCachingDisabled() - .build(); - - final File dir = TestUtils.tempDirectory(); - final Properties props = StreamsTestUtils.getStreamsConfig(); - final InternalMockProcessorContext context = new InternalMockProcessorContext<>( - dir, - Serdes.String(), - Serdes.String(), - new StreamsConfig(props) - ); - store.init(context, store); + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void shouldReturnIdenticalResultsForNativeAndAdapterBuiltStores(final boolean cachingEnabled) { + // The existing (header-stripped) query types must return identical results on both build paths. + final ValueTimestampHeaders value = ValueTimestampHeaders.make("v", 123L, headersWith("h", "x")); + final TimestampedKeyValueStoreWithHeaders nativeStore = buildAndInitStore(StoreType.NATIVE, cachingEnabled); + final TimestampedKeyValueStoreWithHeaders adapterStore = buildAndInitStore(StoreType.ADAPTER, cachingEnabled); try { - final KeyQuery query = KeyQuery.withKey(new Bytes("test-key".getBytes())); - final PositionBound positionBound = PositionBound.unbounded(); - final QueryConfig config = new QueryConfig(true); // Enable execution info - - final StateStore wrapped = ((WrappedStateStore) store).wrapped(); - final QueryResult result = wrapped.query(query, positionBound, config); - - // Verify: Execution info includes both adapter and underlying store - assertFalse(result.getExecutionInfo().isEmpty(), "Expected execution info to be collected"); + nativeStore.put("k", value); + adapterStore.put("k", value); - final String executionInfo = String.join("\n", result.getExecutionInfo()); - assertTrue( - executionInfo.contains("Handled in"), - "Expected execution info to contain handling information" - ); - // Should mention the adapter class - assertTrue( - executionInfo.contains(TimestampedToHeadersStoreAdapter.class.getName()), - "Expected execution info to mention the adapter class" - ); + assertEquals( + nativeStore.query(KeyQuery.withKey("k"), PositionBound.unbounded(), new QueryConfig(false)).getResult(), + adapterStore.query(KeyQuery.withKey("k"), PositionBound.unbounded(), new QueryConfig(false)).getResult(), + "KeyQuery results should be identical across native and adapter build paths"); + assertEquals( + nativeStore.query(TimestampedKeyQuery.withKey("k"), PositionBound.unbounded(), new QueryConfig(false)).getResult(), + adapterStore.query(TimestampedKeyQuery.withKey("k"), PositionBound.unbounded(), new QueryConfig(false)).getResult(), + "TimestampedKeyQuery results should be identical across native and adapter build paths"); } finally { - store.close(); + nativeStore.close(); + adapterStore.close(); } } } From 9a12de8386322c78cb35c057ab390ca1127be722 Mon Sep 17 00:00:00 2001 From: Jess Jin Date: Thu, 25 Jun 2026 13:14:18 -0400 Subject: [PATCH 6/9] Scope skipCache propagation to TimestampedKeyWithHeadersQuery Per review: propagating skipCache for the existing KeyQuery/TimestampedKeyQuery on the plain metered stores (MeteredKeyValueStore/MeteredTimestampedKeyValueStore) is a pre-existing, cross-cutting bug fix outside KIP-1356's scope. Revert those changes (and their tests), and drop skipCache from the header store's existing-query handlers (runKeyQuery/runTimestampedKeyQuery) too. This PR now propagates skipCache only in the new runTimestampedKeyWithHeadersQuery handler. The general KeyQuery/TimestampedKeyQuery fix, with the repeated propagation collapsed into one shared helper, will follow in a separate JIRA after this merges. --- .../state/internals/MeteredKeyValueStore.java | 5 +- .../MeteredTimestampedKeyValueStore.java | 10 +--- ...edTimestampedKeyValueStoreWithHeaders.java | 10 +--- .../internals/MeteredKeyValueStoreTest.java | 29 ------------ .../MeteredTimestampedKeyValueStoreTest.java | 46 ------------------- ...mestampedKeyValueStoreWithHeadersTest.java | 33 +------------ 6 files changed, 7 insertions(+), 126 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index ec987b98ec1e6..ca8f12ac74363 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -323,11 +323,8 @@ private QueryResult runKeyQuery(final Query query, final QueryConfig config) { final QueryResult result; final KeyQuery typedKeyQuery = (KeyQuery) query; - KeyQuery rawKeyQuery = + final KeyQuery rawKeyQuery = KeyQuery.withKey(serializeKey(typedKeyQuery.getKey())); - if (typedKeyQuery.isSkipCache()) { - rawKeyQuery = rawKeyQuery.skipCache(); - } final QueryResult rawResult = wrapped().query(rawKeyQuery, positionBound, config); if (rawResult.isSuccess()) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java index 74704ecb4d7d6..72caa56d88e35 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java @@ -170,10 +170,7 @@ private QueryResult runTimestampedKeyQuery( ) { final QueryResult result; final TimestampedKeyQuery typedKeyQuery = (TimestampedKeyQuery) query; - KeyQuery rawKeyQuery = KeyQuery.withKey(serializeKey(typedKeyQuery.key())); - if (typedKeyQuery.isSkipCache()) { - rawKeyQuery = rawKeyQuery.skipCache(); - } + final KeyQuery rawKeyQuery = KeyQuery.withKey(serializeKey(typedKeyQuery.key())); final QueryResult rawResult = wrapped().query(rawKeyQuery, positionBound, config); if (rawResult.isSuccess()) { final Function> deserializer = StoreQueryUtils.deserializeValue(serdes, wrapped()); @@ -240,10 +237,7 @@ private QueryResult runKeyQuery( ) { final QueryResult result; final KeyQuery typedKeyQuery = (KeyQuery) query; - KeyQuery rawKeyQuery = KeyQuery.withKey(serializeKey(typedKeyQuery.getKey())); - if (typedKeyQuery.isSkipCache()) { - rawKeyQuery = rawKeyQuery.skipCache(); - } + final KeyQuery rawKeyQuery = KeyQuery.withKey(serializeKey(typedKeyQuery.getKey())); final QueryResult rawResult = wrapped().query(rawKeyQuery, positionBound, config); if (rawResult.isSuccess()) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java index b12f05fda1417..09db4e61d88e9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java @@ -337,10 +337,7 @@ private QueryResult runKeyQuery( final QueryResult result; final KeyQuery typedKeyQuery = (KeyQuery) query; - KeyQuery rawKeyQuery = KeyQuery.withKey(serializeKey(typedKeyQuery.getKey(), internalContext.headers())); - if (typedKeyQuery.isSkipCache()) { - rawKeyQuery = rawKeyQuery.skipCache(); - } + final KeyQuery rawKeyQuery = KeyQuery.withKey(serializeKey(typedKeyQuery.getKey(), internalContext.headers())); final QueryResult rawResult = wrapped().query(rawKeyQuery, positionBound, config); if (rawResult.isSuccess()) { // value will be `rawValueTimestampHeader`; no need to pass headers explicitly @@ -366,10 +363,7 @@ private QueryResult runTimestampedKeyQuery( final QueryResult result; final TimestampedKeyQuery typedKeyQuery = (TimestampedKeyQuery) query; - KeyQuery rawKeyQuery = KeyQuery.withKey(serializeKey(typedKeyQuery.key(), internalContext.headers())); - if (typedKeyQuery.isSkipCache()) { - rawKeyQuery = rawKeyQuery.skipCache(); - } + final KeyQuery rawKeyQuery = KeyQuery.withKey(serializeKey(typedKeyQuery.key(), internalContext.headers())); final QueryResult rawResult = wrapped().query(rawKeyQuery, positionBound, config); if (rawResult.isSuccess()) { // value will be `rawValueTimestampHeader`; no need to pass headers explicitly diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java index cb2606e1b3f1a..d5d65e56be5b0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java @@ -43,11 +43,6 @@ import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; -import org.apache.kafka.streams.query.KeyQuery; -import org.apache.kafka.streams.query.PositionBound; -import org.apache.kafka.streams.query.Query; -import org.apache.kafka.streams.query.QueryConfig; -import org.apache.kafka.streams.query.QueryResult; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; @@ -156,30 +151,6 @@ private void init() { metered.init(context, metered); } - @Test - public void shouldPropagateSkipCacheForKeyQuery() { - setUp(); - init(); - assertTrue(forwardedRawKeyQuery(KeyQuery.withKey(KEY).skipCache()).isSkipCache()); - } - - @Test - public void shouldNotSkipCacheForKeyQueryByDefault() { - setUp(); - init(); - assertFalse(forwardedRawKeyQuery(KeyQuery.withKey(KEY)).isSkipCache()); - } - - @SuppressWarnings({"unchecked", "rawtypes"}) - private KeyQuery forwardedRawKeyQuery(final Query query) { - when(inner.query(any(), any(PositionBound.class), any(QueryConfig.class))) - .thenReturn((QueryResult) QueryResult.forResult(null)); - metered.query(query, PositionBound.unbounded(), new QueryConfig(false)); - final ArgumentCaptor captor = ArgumentCaptor.forClass(KeyQuery.class); - verify(inner).query(captor.capture(), any(PositionBound.class), any(QueryConfig.class)); - return captor.getValue(); - } - @Test public void shouldDelegateInit() { setUp(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java index 6313886d163af..6e05513ec9c7c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java @@ -37,12 +37,6 @@ import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; -import org.apache.kafka.streams.query.KeyQuery; -import org.apache.kafka.streams.query.PositionBound; -import org.apache.kafka.streams.query.Query; -import org.apache.kafka.streams.query.QueryConfig; -import org.apache.kafka.streams.query.QueryResult; -import org.apache.kafka.streams.query.TimestampedKeyQuery; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.ValueAndTimestamp; @@ -50,7 +44,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; @@ -74,7 +67,6 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @@ -151,44 +143,6 @@ private void init() { metered.init(context, metered); } - @Test - public void shouldPropagateSkipCacheForKeyQuery() { - setUp(); - init(); - assertTrue(forwardedRawKeyQuery(KeyQuery.withKey(KEY).skipCache()).isSkipCache()); - } - - @Test - public void shouldNotSkipCacheForKeyQueryByDefault() { - setUp(); - init(); - assertFalse(forwardedRawKeyQuery(KeyQuery.withKey(KEY)).isSkipCache()); - } - - @Test - public void shouldPropagateSkipCacheForTimestampedKeyQuery() { - setUp(); - init(); - assertTrue(forwardedRawKeyQuery(TimestampedKeyQuery.withKey(KEY).skipCache()).isSkipCache()); - } - - @Test - public void shouldNotSkipCacheForTimestampedKeyQueryByDefault() { - setUp(); - init(); - assertFalse(forwardedRawKeyQuery(TimestampedKeyQuery.withKey(KEY)).isSkipCache()); - } - - @SuppressWarnings({"unchecked", "rawtypes"}) - private KeyQuery forwardedRawKeyQuery(final Query query) { - when(inner.query(any(), any(PositionBound.class), any(QueryConfig.class))) - .thenReturn((QueryResult) QueryResult.forResult(null)); - metered.query(query, PositionBound.unbounded(), new QueryConfig(false)); - final ArgumentCaptor captor = ArgumentCaptor.forClass(KeyQuery.class); - verify(inner).query(captor.capture(), any(PositionBound.class), any(QueryConfig.class)); - return captor.getValue(); - } - @Test public void shouldDelegateInit() { setUp(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java index 36b20e4d4e327..568a11a7226c7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java @@ -41,7 +41,6 @@ import org.apache.kafka.streams.query.Query; import org.apache.kafka.streams.query.QueryConfig; import org.apache.kafka.streams.query.QueryResult; -import org.apache.kafka.streams.query.TimestampedKeyQuery; import org.apache.kafka.streams.query.TimestampedKeyWithHeadersQuery; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; @@ -151,36 +150,8 @@ private void init() { metered.init(context, metered); } - // --- skipCache propagation: the metered handlers must forward isSkipCache() onto the raw - // KeyQuery they build, otherwise the caching layer never sees it (it was a no-op before). --- - - @Test - public void shouldPropagateSkipCacheForKeyQuery() { - setUp(); - init(); - assertTrue(forwardedRawKeyQuery(KeyQuery.withKey(KEY).skipCache()).isSkipCache()); - } - - @Test - public void shouldNotSkipCacheForKeyQueryByDefault() { - setUp(); - init(); - assertFalse(forwardedRawKeyQuery(KeyQuery.withKey(KEY)).isSkipCache()); - } - - @Test - public void shouldPropagateSkipCacheForTimestampedKeyQuery() { - setUp(); - init(); - assertTrue(forwardedRawKeyQuery(TimestampedKeyQuery.withKey(KEY).skipCache()).isSkipCache()); - } - - @Test - public void shouldNotSkipCacheForTimestampedKeyQueryByDefault() { - setUp(); - init(); - assertFalse(forwardedRawKeyQuery(TimestampedKeyQuery.withKey(KEY)).isSkipCache()); - } + // --- skipCache propagation: the TimestampedKeyWithHeadersQuery handler must forward isSkipCache() + // onto the raw KeyQuery it builds, otherwise the caching layer never sees it. --- @Test public void shouldPropagateSkipCacheForTimestampedKeyWithHeadersQuery() { From fd13dde563d47336b9356283edcf6e61867d462f Mon Sep 17 00:00:00 2001 From: Jess Jin Date: Thu, 25 Jun 2026 13:38:06 -0400 Subject: [PATCH 7/9] Avoid dangling javadoc reference in ReadOnlyRecord Reference TimestampedKeyWithHeadersQuery with {@code} instead of {@link} so ReadOnlyRecord's javadoc does not depend on the query class being present (it can then stand alone, e.g. in a separate ReadOnlyRecord PR, without breaking :streams:javadoc). --- .../org/apache/kafka/streams/processor/api/ReadOnlyRecord.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/ReadOnlyRecord.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/ReadOnlyRecord.java index 990d6ac16b0fb..def44fda62612 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/api/ReadOnlyRecord.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/ReadOnlyRecord.java @@ -25,7 +25,7 @@ * transform-and-forward builders ({@code withKey}/{@code withValue}/{@code withTimestamp}/{@code withHeaders}), * while an Interactive Query (IQv2) result is exactly this read-only snapshot and exposes nothing more. * It is the result type returned by the headers-aware IQv2 query types (e.g. - * {@link org.apache.kafka.streams.query.TimestampedKeyWithHeadersQuery}), which surface the record headers + * {@code TimestampedKeyWithHeadersQuery}), which surface the record headers * persisted by header-aware state stores. * * @param The type of the key From 7787b6e925e952b2c7a64eaca7933995397b8afe Mon Sep 17 00:00:00 2001 From: Jess Jin Date: Thu, 25 Jun 2026 15:33:26 -0400 Subject: [PATCH 8/9] Fail TimestampedKeyWithHeadersQuery on a negative stored timestamp A negative stored timestamp made new Record<>(...) throw out of query() instead of returning a failed result. It can't arise from the normal record-driven flow but is storable directly via ValueTimestampHeaders.make, so map it to QueryResult.forFailure(STORE_EXCEPTION, ...), with a regression test covering caching on/off. --- ...edTimestampedKeyValueStoreWithHeaders.java | 39 +++++++++++++------ ...edKeyValueStoreBuilderWithHeadersTest.java | 22 +++++++++++ 2 files changed, 49 insertions(+), 12 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java index 09db4e61d88e9..acfded3cec7f5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java @@ -30,6 +30,7 @@ import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.query.FailureReason; import org.apache.kafka.streams.query.KeyQuery; import org.apache.kafka.streams.query.PositionBound; import org.apache.kafka.streams.query.Query; @@ -404,18 +405,32 @@ private QueryResult runTimestampedKeyWithHeadersQuery( if (rawResult.isSuccess()) { final Function> deserializer = StoreQueryUtils.deserializeValue(serdes, wrapped()); final ValueTimestampHeaders valueTimestampHeaders = deserializer.apply(rawResult.getResult()); - // Surface the result as a ReadOnlyRecord (implemented by Record), keeping the headers. - // A null wrapper means the key is absent or tombstoned, which we surface as a null result. - final ReadOnlyRecord record = valueTimestampHeaders == null - ? null - : new Record<>( - typedKeyQuery.key(), - valueTimestampHeaders.value(), - valueTimestampHeaders.timestamp(), - valueTimestampHeaders.headers()); - final QueryResult> typedQueryResult = - InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, record); - result = (QueryResult) typedQueryResult; + if (valueTimestampHeaders != null && valueTimestampHeaders.timestamp() < 0) { + // The result is modeled as a Record, whose constructor rejects negative timestamps. A + // negative stored timestamp cannot arise from the normal record-driven flow (the PAPI + // Record a processor stores already forbids it), so it indicates corrupted/unexpected + // store state; surface it as a failed result rather than letting `new Record<>` throw + // out of query(). + final QueryResult> failure = QueryResult.forFailure( + FailureReason.STORE_EXCEPTION, + "Stored record for the queried key has a negative timestamp (" + + valueTimestampHeaders.timestamp() + "); cannot construct a ReadOnlyRecord."); + failure.setPosition(rawResult.getPosition()); + result = (QueryResult) failure; + } else { + // Surface the result as a ReadOnlyRecord (implemented by Record), keeping the headers. + // A null wrapper means the key is absent or tombstoned, which we surface as a null result. + final ReadOnlyRecord record = valueTimestampHeaders == null + ? null + : new Record<>( + typedKeyQuery.key(), + valueTimestampHeaders.value(), + valueTimestampHeaders.timestamp(), + valueTimestampHeaders.headers()); + final QueryResult> typedQueryResult = + InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, record); + result = (QueryResult) typedQueryResult; + } } else { // the generic type doesn't matter, since failed queries have no result set. result = (QueryResult) rawResult; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java index e18e659777912..d4dd33fc0969b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java @@ -29,6 +29,7 @@ import org.apache.kafka.streams.processor.api.ReadOnlyRecord; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.query.FailureReason; import org.apache.kafka.streams.query.KeyQuery; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.query.PositionBound; @@ -343,6 +344,27 @@ public void shouldHandleTimestampedKeyWithHeadersQueryOnNativeStore(final boolea } } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void shouldFailTimestampedKeyWithHeadersQueryForNegativeStoredTimestamp(final boolean cachingEnabled) { + // A negative stored timestamp can't arise from the normal record-driven flow, but a caller can + // store one directly. Because the result is modeled as a Record (whose constructor rejects + // negative timestamps), the query must surface a failed result rather than throw out of query(). + final TimestampedKeyValueStoreWithHeaders store = buildAndInitStore(StoreType.NATIVE, cachingEnabled); + try { + store.put("k", ValueTimestampHeaders.make("v", -1L, headersWith("h", "x"))); + + final QueryResult> result = + store.query(TimestampedKeyWithHeadersQuery.withKey("k"), PositionBound.unbounded(), new QueryConfig(false)); + + assertFalse(result.isSuccess(), "A negative stored timestamp should yield a failed result, not throw"); + assertEquals(FailureReason.STORE_EXCEPTION, result.getFailureReason()); + assertNotNull(result.getPosition(), "Expected the failure to preserve the queried partition's position"); + } finally { + store.close(); + } + } + @ParameterizedTest @CsvSource({"NATIVE", "ADAPTER", "IN_MEMORY"}) public void shouldHandleRangeQuery(final StoreType storeType) { From 267a0b6790303919f16f4428a48ad4c20a77bc56 Mon Sep 17 00:00:00 2001 From: Jess Jin Date: Fri, 26 Jun 2026 10:14:59 -0400 Subject: [PATCH 9/9] Add adapter-build-path test: TimestampedKeyWithHeadersQuery returns empty headers When the WithHeaders builder is given a non-header supplier it produces an adapter-built store that cannot persist headers. Assert the new query still succeeds and round-trips value/timestamp, but returns empty (never null) headers for a record written with headers (caching disabled, i.e. the persisted path). --- ...edKeyValueStoreBuilderWithHeadersTest.java | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java index d4dd33fc0969b..666d9037c9de8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java @@ -344,6 +344,34 @@ public void shouldHandleTimestampedKeyWithHeadersQueryOnNativeStore(final boolea } } + @Test + public void shouldReturnEmptyHeadersForTimestampedKeyWithHeadersQueryOnAdapterStore() { + // Feeding a non-header supplier into the WithHeaders builder yields an adapter-built store + // (TimestampedToHeadersStoreAdapter over a plain timestamped store), which cannot persist + // headers. Caching is disabled so the query reads through the adapter to the underlying store: + // the query still succeeds and the value/timestamp round-trip, but headers come back empty + // (never null) even though the record was written with headers. + // (With caching enabled the not-yet-flushed value is still served from the record cache with + // its headers intact, so that path is covered by the native/round-trip tests instead.) + final TimestampedKeyValueStoreWithHeaders store = buildAndInitStore(StoreType.ADAPTER, false); + try { + store.put("k", ValueTimestampHeaders.make("v", 123L, headersWith("h", "x"))); + + final QueryResult> result = + store.query(TimestampedKeyWithHeadersQuery.withKey("k"), PositionBound.unbounded(), new QueryConfig(false)); + + assertTrue(result.isSuccess(), "Expected TimestampedKeyWithHeadersQuery to succeed on an adapter-built store"); + final ReadOnlyRecord record = result.getResult(); + assertEquals("k", record.key()); + assertEquals("v", record.value()); + assertEquals(123L, record.timestamp()); + assertEquals(new RecordHeaders(), record.headers()); + assertNotNull(result.getPosition(), "Expected position to be set"); + } finally { + store.close(); + } + } + @ParameterizedTest @ValueSource(booleans = {true, false}) public void shouldFailTimestampedKeyWithHeadersQueryForNegativeStoredTimestamp(final boolean cachingEnabled) {