diff --git a/streams/src/main/java/org/apache/kafka/streams/state/VersionedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/VersionedBytesStore.java index e840824577e07..b1d7540437dbc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/VersionedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/VersionedBytesStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state; +import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.utils.Bytes; /** @@ -38,4 +39,14 @@ public interface VersionedBytesStore extends KeyValueStore, Times * The analog of {@link VersionedKeyValueStore#delete(Object, long)}. */ byte[] delete(Bytes key, long timestamp); + + /** + * Return a read-only view of this store bound to the given {@link IsolationLevel}. + * Covariantly narrows {@link KeyValueStore#readOnly(IsolationLevel)} so callers retain + * access to the versioned read methods ({@link #get(Bytes, long)} and friends). + */ + @Override + default VersionedBytesStore readOnly(final IsolationLevel isolationLevel) { + return this; + } } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/state/VersionedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/VersionedKeyValueStore.java index 40faaf003d3fe..ebd4fc7e34da7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/VersionedKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/VersionedKeyValueStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state; +import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.processor.StateStore; @@ -134,4 +135,16 @@ public interface VersionedKeyValueStore extends StateStore { * @throws InvalidStateStoreException if the store is not initialized */ VersionedRecord get(K key, long asOfTimestamp); + + /** + * Return a read-only view of this store bound to the given {@link IsolationLevel}. + * See {@link ReadOnlyKeyValueStore#readOnly(IsolationLevel)} for semantics. + *

+ * Unlike the other store families, versioned stores have no dedicated {@code ReadOnly*} + * parent interface — the view is just another {@code VersionedKeyValueStore} whose mutating + * operations should not be invoked from interactive-query threads. + */ + default VersionedKeyValueStore readOnly(final IsolationLevel isolationLevel) { + return this; + } } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingVersionedKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingVersionedKeyValueBytesStore.java index bd35648210d1c..a391dcee8c044 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingVersionedKeyValueBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingVersionedKeyValueBytesStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.utils.Bytes; @@ -57,6 +58,11 @@ public byte[] delete(final Bytes key, final long timestamp) { return oldValue; } + @Override + public VersionedBytesStore readOnly(final IsolationLevel isolationLevel) { + return inner.readOnly(isolationLevel); + } + @Override public void log(final Bytes key, final byte[] value, final long timestamp, final Headers headers) { internalContext.logChange( name(), diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java index ba6215ecca024..55461ece58a00 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.BytesSerializer; import org.apache.kafka.common.utils.Bytes; @@ -60,19 +61,45 @@ public class LogicalKeyValueSegment implements Segment, VersionedStoreSegment { private final String name; private final RocksDBStore physicalStore; private final PrefixKeyFormatter prefixKeyFormatter; + // Non-null for read-only views produced by {@link #readOnly(IsolationLevel)}: all reads go + // through this accessor (bypassing any transaction buffer for READ_COMMITTED); writes are + // disallowed. Null for regular segments, which use the physicalStore's current accessor. + private final RocksDBStore.DBAccessor readAccessor; final Set> openIterators = Collections.synchronizedSet(new HashSet<>()); LogicalKeyValueSegment(final long id, final String name, final RocksDBStore physicalStore) { + this(id, name, physicalStore, null); + } + + private LogicalKeyValueSegment(final long id, + final String name, + final RocksDBStore physicalStore, + final RocksDBStore.DBAccessor readAccessor) { this.id = id; this.name = name; this.physicalStore = Objects.requireNonNull(physicalStore); - + this.readAccessor = readAccessor; this.prefixKeyFormatter = new PrefixKeyFormatter(serializeLongToBytes(id)); } + /** + * Returns a read-only view of this segment bound to the given isolation level. Reads go + * through the accessor appropriate for {@code level}; mutating calls throw. + */ + @Override + public LogicalKeyValueSegment readOnly(final IsolationLevel level) { + return new LogicalKeyValueSegment(id, name, physicalStore, physicalStore.viewAccessor(level)); + } + + private void rejectIfReadOnly() { + if (readAccessor != null) { + throw new UnsupportedOperationException("Write operations are not supported on a read-only segment view"); + } + } + @Override public long id() { return id; @@ -80,6 +107,7 @@ public long id() { @Override public synchronized void destroy() { + rejectIfReadOnly(); if (id < 0) { throw new IllegalStateException("Negative segment ID indicates a reserved segment, " + "which should not be destroyed. Reserved segments are cleaned up only when " @@ -95,6 +123,7 @@ public synchronized void destroy() { @Override public synchronized void deleteRange(final Bytes keyFrom, final Bytes keyTo) { + rejectIfReadOnly(); physicalStore.deleteRange( prefixKeyFormatter.addPrefix(keyFrom), prefixKeyFormatter.addPrefix(keyTo)); @@ -102,6 +131,7 @@ public synchronized void deleteRange(final Bytes keyFrom, final Bytes keyTo) { @Override public synchronized void put(final Bytes key, final byte[] value) { + rejectIfReadOnly(); physicalStore.put( prefixKeyFormatter.addPrefix(key), value); @@ -109,6 +139,7 @@ public synchronized void put(final Bytes key, final byte[] value) { @Override public synchronized byte[] putIfAbsent(final Bytes key, final byte[] value) { + rejectIfReadOnly(); return physicalStore.putIfAbsent( prefixKeyFormatter.addPrefix(key), value); @@ -116,6 +147,7 @@ public synchronized byte[] putIfAbsent(final Bytes key, final byte[] value) { @Override public synchronized void putAll(final List> entries) { + rejectIfReadOnly(); physicalStore.putAll(entries.stream() .map(kv -> new KeyValue<>( prefixKeyFormatter.addPrefix(kv.key), @@ -125,6 +157,7 @@ public synchronized void putAll(final List> entries) { @Override public synchronized byte[] delete(final Bytes key) { + rejectIfReadOnly(); return physicalStore.delete(prefixKeyFormatter.addPrefix(key)); } @@ -184,13 +217,18 @@ public synchronized byte[] get(final Bytes key, final Snapshot snapshot) { } private synchronized byte[] get(final Bytes key, final Optional snapshot) { + final Bytes prefixed = prefixKeyFormatter.addPrefix(key); if (snapshot.isPresent()) { try (ReadOptions readOptions = new ReadOptions()) { readOptions.setSnapshot(snapshot.get()); - return physicalStore.get(prefixKeyFormatter.addPrefix(key), readOptions); + return readAccessor == null + ? physicalStore.get(prefixed, readOptions) + : physicalStore.get(prefixed, readOptions, readAccessor); } } else { - return physicalStore.get(prefixKeyFormatter.addPrefix(key)); + return readAccessor == null + ? physicalStore.get(prefixed) + : physicalStore.get(prefixed, readAccessor); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java index 9cd8b9e18619e..2f6f5380c1afa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; @@ -403,4 +404,93 @@ public QueryResult query(final Query query, final PositionBound positi public Position getPosition() { return internal.getPosition(); } + + @Override + public VersionedKeyValueStore readOnly(final IsolationLevel isolationLevel) { + return new ReadOnlyView(wrapped().readOnly(isolationLevel)); + } + + /** + * Read-only view that re-applies this store's serdes and get sensor on top of a configurable + * underlying {@link VersionedBytesStore}. Used so that IQ reads at a given isolation level + * preserve the Metered layer's metrics and (de)serialisation behaviour. Mutating operations + * throw; lifecycle methods that only report state delegate to the enclosing store. + */ + private final class ReadOnlyView implements VersionedKeyValueStore { + + private final VersionedBytesStore underlying; + + ReadOnlyView(final VersionedBytesStore underlying) { + this.underlying = underlying; + } + + @Override + public VersionedRecord get(final K key) { + Objects.requireNonNull(key, "key cannot be null"); + try { + final ValueAndTimestamp valueAndTimestamp = maybeMeasureLatency( + () -> internal.deserializeValue(underlying.get(internal.serializeKey(key))), + internal.time, + internal.getSensor + ); + return valueAndTimestamp == null + ? null + : new VersionedRecord<>(valueAndTimestamp.value(), valueAndTimestamp.timestamp()); + } catch (final ProcessorStateException e) { + throw new ProcessorStateException(String.format(e.getMessage(), key), e); + } + } + + @Override + public VersionedRecord get(final K key, final long asOfTimestamp) { + Objects.requireNonNull(key, "key cannot be null"); + try { + final ValueAndTimestamp valueAndTimestamp = maybeMeasureLatency( + () -> internal.deserializeValue(underlying.get(internal.serializeKey(key), asOfTimestamp)), + internal.time, + internal.getSensor + ); + return valueAndTimestamp == null + ? null + : new VersionedRecord<>(valueAndTimestamp.value(), valueAndTimestamp.timestamp()); + } catch (final ProcessorStateException e) { + throw new ProcessorStateException(String.format(e.getMessage(), key), e); + } + } + + @Override + public long put(final K key, final V value, final long timestamp) { + throw new UnsupportedOperationException("put is not supported on a read-only view"); + } + + @Override + public VersionedRecord delete(final K key, final long timestamp) { + throw new UnsupportedOperationException("delete is not supported on a read-only view"); + } + + @Override + public String name() { + return MeteredVersionedKeyValueStore.this.name(); + } + + @Override + public void init(final StateStoreContext stateStoreContext, final StateStore root) { + throw new UnsupportedOperationException("init is not supported on a read-only view"); + } + + @Override + public void close() { + throw new UnsupportedOperationException("close is not supported on a read-only view"); + } + + @Override + public boolean persistent() { + return MeteredVersionedKeyValueStore.this.persistent(); + } + + @Override + public boolean isOpen() { + return MeteredVersionedKeyValueStore.this.isOpen(); + } + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 7ed94b0f35a4d..faa12e437b775 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -750,14 +750,39 @@ public long approximateNumUncommittedBytes() { @Override public ReadOnlyKeyValueStore readOnly(final IsolationLevel isolationLevel) { + return new ReadOnlyView(viewAccessor(isolationLevel)); + } + + /** + * Returns the {@link DBAccessor} that should be used for reads at the given isolation level. + * For READ_COMMITTED on a transactional store this is the underlying {@link DirectDBAccessor}; + * otherwise it is the active {@code dbAccessor} (which may consult the transaction buffer). + */ + DBAccessor viewAccessor(final IsolationLevel isolationLevel) { Objects.requireNonNull(isolationLevel, "isolationLevel cannot be null"); - final DBAccessor viewAccessor; if (isolationLevel == IsolationLevel.READ_COMMITTED && dbAccessor instanceof TransactionalDBAccessor) { - viewAccessor = ((TransactionalDBAccessor) dbAccessor).underlying; - } else { - viewAccessor = dbAccessor; + return ((TransactionalDBAccessor) dbAccessor).underlying; + } + return dbAccessor; + } + + // Read helpers for isolation-level views that sit above this store (e.g. LogicalKeyValueSegment.readOnly). + byte[] get(final Bytes key, final DBAccessor accessor) { + validateStoreOpen(); + try { + return cfAccessor.get(accessor, key.get()); + } catch (final RocksDBException e) { + throw new ProcessorStateException("Error while getting value for key from store " + name, e); + } + } + + byte[] get(final Bytes key, final ReadOptions readOptions, final DBAccessor accessor) { + validateStoreOpen(); + try { + return cfAccessor.get(accessor, key.get(), readOptions); + } catch (final RocksDBException e) { + throw new ProcessorStateException("Error while getting value for key from store " + name, e); } - return new ReadOnlyView(viewAccessor); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java index c4c26d047f57f..3bd18a5fbf08e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.Bytes; @@ -183,11 +184,15 @@ public VersionedRecord delete(final Bytes key, final long timestamp) { @Override public VersionedRecord get(final Bytes key) { + return get(key, IsolationLevel.READ_UNCOMMITTED); + } + + private VersionedRecord get(final Bytes key, final IsolationLevel level) { Objects.requireNonNull(key, "key cannot be null"); validateStoreOpen(); // latest value (if present) is guaranteed to be in the latest value store - final byte[] rawLatestValueAndTimestamp = latestValueStore.get(key); + final byte[] rawLatestValueAndTimestamp = latestValueStore(level).get(key); if (rawLatestValueAndTimestamp != null) { return new VersionedRecord<>( LatestValueFormatter.value(rawLatestValueAndTimestamp), @@ -200,14 +205,20 @@ public VersionedRecord get(final Bytes key) { @Override public VersionedRecord get(final Bytes key, final long asOfTimestamp) { + return get(key, asOfTimestamp, IsolationLevel.READ_UNCOMMITTED); + } + + private VersionedRecord get(final Bytes key, final long asOfTimestamp, final IsolationLevel level) { Objects.requireNonNull(key, "key cannot be null"); validateStoreOpen(); + final LogicalKeyValueSegment latestView = latestValueStore(level); + if (asOfTimestamp < observedStreamTime - historyRetention) { // history retention exceeded. we still check the latest value store in case the // latest record version satisfies the timestamp bound, in which case it should // still be returned (i.e., the latest record version per key never expires). - final byte[] rawLatestValueAndTimestamp = latestValueStore.get(key); + final byte[] rawLatestValueAndTimestamp = latestView.get(key); if (rawLatestValueAndTimestamp != null) { final long latestTimestamp = LatestValueFormatter.timestamp(rawLatestValueAndTimestamp); if (latestTimestamp <= asOfTimestamp) { @@ -227,7 +238,7 @@ public VersionedRecord get(final Bytes key, final long asOfTimestamp) { } // first check the latest value store - final byte[] rawLatestValueAndTimestamp = latestValueStore.get(key); + final byte[] rawLatestValueAndTimestamp = latestView.get(key); if (rawLatestValueAndTimestamp != null) { final long latestTimestamp = LatestValueFormatter.timestamp(rawLatestValueAndTimestamp); if (latestTimestamp <= asOfTimestamp) { @@ -236,8 +247,7 @@ public VersionedRecord get(final Bytes key, final long asOfTimestamp) { } // check segment stores - final List segments = segmentStores.segments(asOfTimestamp, Long.MAX_VALUE, false); - for (final LogicalKeyValueSegment segment : segments) { + for (final LogicalKeyValueSegment segment : viewSegments(segmentStores.segments(asOfTimestamp, Long.MAX_VALUE, false), level)) { final byte[] rawSegmentValue = segment.get(key); if (rawSegmentValue != null) { final long nextTs = RocksDBVersionedStoreSegmentValueFormatter.nextTimestamp(rawSegmentValue); @@ -272,31 +282,123 @@ public VersionedRecord get(final Bytes key, final long asOfTimestamp) { return null; } - @SuppressWarnings("unchecked") VersionedRecordIterator get(final Bytes key, final long fromTimestamp, final long toTimestamp, final ResultOrder order) { + return get(key, fromTimestamp, toTimestamp, order, IsolationLevel.READ_UNCOMMITTED); + } + + VersionedRecordIterator get(final Bytes key, final long fromTimestamp, final long toTimestamp, + final ResultOrder order, final IsolationLevel level) { validateStoreOpen(); + final LogicalKeyValueSegment latestView = latestValueStore(level); + if (toTimestamp < observedStreamTime - historyRetention) { // history retention exceeded. we still check the latest value store in case the // latest record version satisfies the timestamp bound, in which case it should // still be returned (i.e., the latest record version per key never expires). - return new LogicalSegmentIterator(Collections.singletonList(latestValueStore).listIterator(), key, fromTimestamp, toTimestamp, order); + return new LogicalSegmentIterator(Collections.singletonList(latestView).listIterator(), key, fromTimestamp, toTimestamp, order); } else { final List segments = new ArrayList<>(); // add segment stores // consider the search lower bound as -INF (LONG.MIN_VALUE) to find the record that has been inserted before the {@code fromTimestamp} // but is still valid in query specified time interval. if (order.equals(ResultOrder.ASCENDING)) { - segments.addAll(segmentStores.segments(Long.MIN_VALUE, toTimestamp, true)); - segments.add(latestValueStore); + segments.addAll(viewSegments(segmentStores.segments(Long.MIN_VALUE, toTimestamp, true), level)); + segments.add(latestView); } else { - segments.add(latestValueStore); - segments.addAll(segmentStores.segments(Long.MIN_VALUE, toTimestamp, false)); + segments.add(latestView); + segments.addAll(viewSegments(segmentStores.segments(Long.MIN_VALUE, toTimestamp, false), level)); } return new LogicalSegmentIterator(segments.listIterator(), key, fromTimestamp, toTimestamp, order); } } + private LogicalKeyValueSegment latestValueStore(final IsolationLevel level) { + return level == IsolationLevel.READ_UNCOMMITTED ? latestValueStore : latestValueStore.readOnly(level); + } + + private static List viewSegments(final List segments, + final IsolationLevel level) { + if (level == IsolationLevel.READ_UNCOMMITTED) { + return segments; + } + final List views = new ArrayList<>(segments.size()); + for (final LogicalKeyValueSegment segment : segments) { + views.add(segment.readOnly(level)); + } + return views; + } + + @Override + public VersionedKeyValueStore readOnly(final IsolationLevel isolationLevel) { + Objects.requireNonNull(isolationLevel, "isolationLevel cannot be null"); + if (isolationLevel == IsolationLevel.READ_UNCOMMITTED) { + return this; + } + return new ReadOnlyView(isolationLevel); + } + + /** + * Read-only view of this store bound to an isolation level. Read methods delegate to + * the private {@code get(...)} helpers passing the level; write methods and other + * lifecycle operations throw or are no-ops, since the view is only used by IQ. + */ + private final class ReadOnlyView implements VersionedKeyValueStore { + + private final IsolationLevel level; + + ReadOnlyView(final IsolationLevel level) { + this.level = level; + } + + @Override + public VersionedRecord get(final Bytes key) { + return RocksDBVersionedStore.this.get(key, level); + } + + @Override + public VersionedRecord get(final Bytes key, final long asOfTimestamp) { + return RocksDBVersionedStore.this.get(key, asOfTimestamp, level); + } + + @Override + public long put(final Bytes key, final byte[] value, final long timestamp) { + throw new UnsupportedOperationException("put not supported on a read-only view"); + } + + @Override + public VersionedRecord delete(final Bytes key, final long timestamp) { + throw new UnsupportedOperationException("delete not supported on a read-only view"); + } + + @Override + public String name() { + return RocksDBVersionedStore.this.name(); + } + + @Override + public void init(final StateStoreContext stateStoreContext, final StateStore root) { + throw new UnsupportedOperationException("init not supported on a read-only view"); + } + + @SuppressWarnings("deprecation") + @Override + public void flush() { } + + @Override + public void close() { } + + @Override + public boolean persistent() { + return RocksDBVersionedStore.this.persistent(); + } + + @Override + public boolean isOpen() { + return RocksDBVersionedStore.this.isOpen(); + } + } + @Override public String name() { return name; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java index ab23baa765b80..c713c546d70c3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java @@ -373,7 +373,7 @@ private static QueryResult runVersionedKeyQuery( ) { if (store instanceof VersionedKeyValueStore) { final VersionedKeyValueStore versionedKeyValueStore = - (VersionedKeyValueStore) store; + ((VersionedKeyValueStore) store).readOnly(config.getIsolationLevel()); final VersionedKeyQuery rawKeyQuery = (VersionedKeyQuery) query; try { @@ -413,7 +413,8 @@ private static QueryResult runMultiVersionedKeyQuery( rawKeyQuery.key(), rawKeyQuery.fromTime().get().toEpochMilli(), rawKeyQuery.toTime().get().toEpochMilli(), - rawKeyQuery.resultOrder() + rawKeyQuery.resultOrder(), + config.getIsolationLevel() ); return (QueryResult) QueryResult.forResult(segmentIterator); } catch (final Exception e) { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java index 340ca7eb21642..5b8a13ccae26a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.header.internals.RecordHeaders; @@ -29,6 +30,7 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.state.VersionedKeyValueStore; import org.apache.kafka.streams.query.ResultOrder; import org.apache.kafka.streams.state.VersionedRecord; import org.apache.kafka.streams.state.VersionedRecordIterator; @@ -833,6 +835,88 @@ public void shouldAllowZeroHistoryRetention() { verifyExpiredRecordSensor(1); } + @Test + public void readOnlyCommittedShouldHideStagedLatestPut() { + reopenWithTransactionalEOS(); + + putToStore("k", "v1", BASE_TIMESTAMP, PUT_RETURN_CODE_VALID_TO_UNDEFINED); + store.commit(Map.of()); + putToStore("k", "v2", BASE_TIMESTAMP + 1, PUT_RETURN_CODE_VALID_TO_UNDEFINED); + + final VersionedKeyValueStore uncommitted = store.readOnly(IsolationLevel.READ_UNCOMMITTED); + final VersionedKeyValueStore committed = store.readOnly(IsolationLevel.READ_COMMITTED); + + final Bytes key = new Bytes(STRING_SERIALIZER.serialize(null, "k")); + final VersionedRecord uLatest = uncommitted.get(key); + assertThat(STRING_DESERIALIZER.deserialize(null, uLatest.value()), equalTo("v2")); + assertThat(uLatest.timestamp(), equalTo(BASE_TIMESTAMP + 1)); + + final VersionedRecord cLatest = committed.get(key); + assertThat(STRING_DESERIALIZER.deserialize(null, cLatest.value()), equalTo("v1")); + assertThat(cLatest.timestamp(), equalTo(BASE_TIMESTAMP)); + } + + @Test + public void readOnlyCommittedShouldHideStagedTimestampedPut() { + reopenWithTransactionalEOS(); + + putToStore("k", "v1", BASE_TIMESTAMP, PUT_RETURN_CODE_VALID_TO_UNDEFINED); + store.commit(Map.of()); + putToStore("k", "v2", BASE_TIMESTAMP + 2, PUT_RETURN_CODE_VALID_TO_UNDEFINED); + + final Bytes key = new Bytes(STRING_SERIALIZER.serialize(null, "k")); + + final VersionedRecord uAtNew = store.readOnly(IsolationLevel.READ_UNCOMMITTED).get(key, BASE_TIMESTAMP + 2); + assertThat(STRING_DESERIALIZER.deserialize(null, uAtNew.value()), equalTo("v2")); + + final VersionedRecord cAtNew = store.readOnly(IsolationLevel.READ_COMMITTED).get(key, BASE_TIMESTAMP + 2); + assertThat(STRING_DESERIALIZER.deserialize(null, cAtNew.value()), equalTo("v1")); + } + + @Test + public void packagePrivateTimestampRangeGetShouldRespectIsolationLevel() { + reopenWithTransactionalEOS(); + + putToStore("k", "v1", BASE_TIMESTAMP, PUT_RETURN_CODE_VALID_TO_UNDEFINED); + store.commit(Map.of()); + putToStore("k", "v2", BASE_TIMESTAMP + 2, PUT_RETURN_CODE_VALID_TO_UNDEFINED); + + final Bytes key = new Bytes(STRING_SERIALIZER.serialize(null, "k")); + + final List uncommitted = new ArrayList<>(); + try (VersionedRecordIterator it = store.get( + key, BASE_TIMESTAMP, BASE_TIMESTAMP + 2, ResultOrder.ASCENDING, IsolationLevel.READ_UNCOMMITTED)) { + while (it.hasNext()) { + uncommitted.add(STRING_DESERIALIZER.deserialize(null, it.next().value())); + } + } + final List committed = new ArrayList<>(); + try (VersionedRecordIterator it = store.get( + key, BASE_TIMESTAMP, BASE_TIMESTAMP + 2, ResultOrder.ASCENDING, IsolationLevel.READ_COMMITTED)) { + while (it.hasNext()) { + committed.add(STRING_DESERIALIZER.deserialize(null, it.next().value())); + } + } + assertThat(uncommitted, equalTo(List.of("v1", "v2"))); + assertThat(committed, equalTo(List.of("v1"))); + } + + private void reopenWithTransactionalEOS() { + store.close(); + final java.util.Properties props = StreamsTestUtils.getStreamsConfig(); + props.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); + props.setProperty(StreamsConfig.TRANSACTIONAL_STATE_STORES_CONFIG, "true"); + context = new InternalMockProcessorContext<>( + TestUtils.tempDirectory(), + Serdes.String(), + Serdes.String(), + new StreamsConfig(props) + ); + context.setTime(BASE_TIMESTAMP); + store = new RocksDBVersionedStore(STORE_NAME, METRICS_SCOPE, HISTORY_RETENTION, SEGMENT_INTERVAL); + store.init(context, store); + } + @Test public void shouldMigrateExistingPositionFromFile() { final Position position = Position.fromMap(mkMap(mkEntry("topic", mkMap(mkEntry(0, 1L)))));