Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state;

import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.utils.Bytes;

/**
Expand All @@ -38,4 +39,14 @@ public interface VersionedBytesStore extends KeyValueStore<Bytes, byte[]>, Times
* The analog of {@link VersionedKeyValueStore#delete(Object, long)}.
*/
byte[] delete(Bytes key, long timestamp);

/**
* Return a read-only view of this store bound to the given {@link IsolationLevel}.
* Covariantly narrows {@link KeyValueStore#readOnly(IsolationLevel)} so callers retain
* access to the versioned read methods ({@link #get(Bytes, long)} and friends).
*/
@Override
default VersionedBytesStore readOnly(final IsolationLevel isolationLevel) {
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state;

import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.StateStore;

Expand Down Expand Up @@ -134,4 +135,16 @@ public interface VersionedKeyValueStore<K, V> extends StateStore {
* @throws InvalidStateStoreException if the store is not initialized
*/
VersionedRecord<V> get(K key, long asOfTimestamp);

/**
* Return a read-only view of this store bound to the given {@link IsolationLevel}.
* See {@link ReadOnlyKeyValueStore#readOnly(IsolationLevel)} for semantics.
* <p>
* Unlike the other store families, versioned stores have no dedicated {@code ReadOnly*}
* parent interface — the view is just another {@code VersionedKeyValueStore} whose mutating
* operations should not be invoked from interactive-query threads.
*/
default VersionedKeyValueStore<K, V> readOnly(final IsolationLevel isolationLevel) {
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.Bytes;
Expand Down Expand Up @@ -57,6 +58,11 @@ public byte[] delete(final Bytes key, final long timestamp) {
return oldValue;
}

@Override
public VersionedBytesStore readOnly(final IsolationLevel isolationLevel) {
return inner.readOnly(isolationLevel);
}

@Override public void log(final Bytes key, final byte[] value, final long timestamp, final Headers headers) {
internalContext.logChange(
name(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.BytesSerializer;
import org.apache.kafka.common.utils.Bytes;
Expand Down Expand Up @@ -60,26 +61,53 @@ public class LogicalKeyValueSegment implements Segment, VersionedStoreSegment {
private final String name;
private final RocksDBStore physicalStore;
private final PrefixKeyFormatter prefixKeyFormatter;
// Non-null for read-only views produced by {@link #readOnly(IsolationLevel)}: all reads go
// through this accessor (bypassing any transaction buffer for READ_COMMITTED); writes are
// disallowed. Null for regular segments, which use the physicalStore's current accessor.
private final RocksDBStore.DBAccessor readAccessor;

final Set<KeyValueIterator<Bytes, byte[]>> openIterators = Collections.synchronizedSet(new HashSet<>());

LogicalKeyValueSegment(final long id,
final String name,
final RocksDBStore physicalStore) {
this(id, name, physicalStore, null);
}

private LogicalKeyValueSegment(final long id,
final String name,
final RocksDBStore physicalStore,
final RocksDBStore.DBAccessor readAccessor) {
this.id = id;
this.name = name;
this.physicalStore = Objects.requireNonNull(physicalStore);

this.readAccessor = readAccessor;
this.prefixKeyFormatter = new PrefixKeyFormatter(serializeLongToBytes(id));
}

/**
* Returns a read-only view of this segment bound to the given isolation level. Reads go
* through the accessor appropriate for {@code level}; mutating calls throw.
*/
@Override
public LogicalKeyValueSegment readOnly(final IsolationLevel level) {
return new LogicalKeyValueSegment(id, name, physicalStore, physicalStore.viewAccessor(level));
}

private void rejectIfReadOnly() {
if (readAccessor != null) {
throw new UnsupportedOperationException("Write operations are not supported on a read-only segment view");
}
}

@Override
public long id() {
return id;
}

@Override
public synchronized void destroy() {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be missing something but could this cause a problem on closing - I haven't read the code so just asking here.

rejectIfReadOnly();
if (id < 0) {
throw new IllegalStateException("Negative segment ID indicates a reserved segment, "
+ "which should not be destroyed. Reserved segments are cleaned up only when "
Expand All @@ -95,27 +123,31 @@ public synchronized void destroy() {

@Override
public synchronized void deleteRange(final Bytes keyFrom, final Bytes keyTo) {
rejectIfReadOnly();
physicalStore.deleteRange(
prefixKeyFormatter.addPrefix(keyFrom),
prefixKeyFormatter.addPrefix(keyTo));
}

@Override
public synchronized void put(final Bytes key, final byte[] value) {
rejectIfReadOnly();
physicalStore.put(
prefixKeyFormatter.addPrefix(key),
value);
}

@Override
public synchronized byte[] putIfAbsent(final Bytes key, final byte[] value) {
rejectIfReadOnly();
return physicalStore.putIfAbsent(
prefixKeyFormatter.addPrefix(key),
value);
}

@Override
public synchronized void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
rejectIfReadOnly();
physicalStore.putAll(entries.stream()
.map(kv -> new KeyValue<>(
prefixKeyFormatter.addPrefix(kv.key),
Expand All @@ -125,6 +157,7 @@ public synchronized void putAll(final List<KeyValue<Bytes, byte[]>> entries) {

@Override
public synchronized byte[] delete(final Bytes key) {
rejectIfReadOnly();
return physicalStore.delete(prefixKeyFormatter.addPrefix(key));
}

Expand Down Expand Up @@ -184,13 +217,18 @@ public synchronized byte[] get(final Bytes key, final Snapshot snapshot) {
}

private synchronized byte[] get(final Bytes key, final Optional<Snapshot> snapshot) {
final Bytes prefixed = prefixKeyFormatter.addPrefix(key);
if (snapshot.isPresent()) {
try (ReadOptions readOptions = new ReadOptions()) {
readOptions.setSnapshot(snapshot.get());
return physicalStore.get(prefixKeyFormatter.addPrefix(key), readOptions);
return readAccessor == null
? physicalStore.get(prefixed, readOptions)
: physicalStore.get(prefixed, readOptions, readAccessor);
}
} else {
return physicalStore.get(prefixKeyFormatter.addPrefix(key));
return readAccessor == null
? physicalStore.get(prefixed)
: physicalStore.get(prefixed, readAccessor);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
Expand Down Expand Up @@ -403,4 +404,93 @@ public <R> QueryResult<R> query(final Query<R> query, final PositionBound positi
public Position getPosition() {
return internal.getPosition();
}

@Override
public VersionedKeyValueStore<K, V> readOnly(final IsolationLevel isolationLevel) {
return new ReadOnlyView(wrapped().readOnly(isolationLevel));
}

/**
* Read-only view that re-applies this store's serdes and get sensor on top of a configurable
* underlying {@link VersionedBytesStore}. Used so that IQ reads at a given isolation level
* preserve the Metered layer's metrics and (de)serialisation behaviour. Mutating operations
* throw; lifecycle methods that only report state delegate to the enclosing store.
*/
private final class ReadOnlyView implements VersionedKeyValueStore<K, V> {

private final VersionedBytesStore underlying;

ReadOnlyView(final VersionedBytesStore underlying) {
this.underlying = underlying;
}

@Override
public VersionedRecord<V> get(final K key) {
Objects.requireNonNull(key, "key cannot be null");
try {
final ValueAndTimestamp<V> valueAndTimestamp = maybeMeasureLatency(
() -> internal.deserializeValue(underlying.get(internal.serializeKey(key))),
internal.time,
internal.getSensor
);
return valueAndTimestamp == null
? null
: new VersionedRecord<>(valueAndTimestamp.value(), valueAndTimestamp.timestamp());
} catch (final ProcessorStateException e) {
throw new ProcessorStateException(String.format(e.getMessage(), key), e);
}
}

@Override
public VersionedRecord<V> get(final K key, final long asOfTimestamp) {
Objects.requireNonNull(key, "key cannot be null");
try {
final ValueAndTimestamp<V> valueAndTimestamp = maybeMeasureLatency(
() -> internal.deserializeValue(underlying.get(internal.serializeKey(key), asOfTimestamp)),
internal.time,
internal.getSensor
);
return valueAndTimestamp == null
? null
: new VersionedRecord<>(valueAndTimestamp.value(), valueAndTimestamp.timestamp());
} catch (final ProcessorStateException e) {
throw new ProcessorStateException(String.format(e.getMessage(), key), e);
}
}

@Override
public long put(final K key, final V value, final long timestamp) {
throw new UnsupportedOperationException("put is not supported on a read-only view");
}

@Override
public VersionedRecord<V> delete(final K key, final long timestamp) {
throw new UnsupportedOperationException("delete is not supported on a read-only view");
}

@Override
public String name() {
return MeteredVersionedKeyValueStore.this.name();
}

@Override
public void init(final StateStoreContext stateStoreContext, final StateStore root) {
throw new UnsupportedOperationException("init is not supported on a read-only view");
}

@Override
public void close() {
throw new UnsupportedOperationException("close is not supported on a read-only view");
}

@Override
public boolean persistent() {
return MeteredVersionedKeyValueStore.this.persistent();
}

@Override
public boolean isOpen() {
return MeteredVersionedKeyValueStore.this.isOpen();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -750,14 +750,39 @@ public long approximateNumUncommittedBytes() {

@Override
public ReadOnlyKeyValueStore<Bytes, byte[]> readOnly(final IsolationLevel isolationLevel) {
return new ReadOnlyView(viewAccessor(isolationLevel));
}

/**
* Returns the {@link DBAccessor} that should be used for reads at the given isolation level.
* For READ_COMMITTED on a transactional store this is the underlying {@link DirectDBAccessor};
* otherwise it is the active {@code dbAccessor} (which may consult the transaction buffer).
*/
DBAccessor viewAccessor(final IsolationLevel isolationLevel) {
Objects.requireNonNull(isolationLevel, "isolationLevel cannot be null");
final DBAccessor viewAccessor;
if (isolationLevel == IsolationLevel.READ_COMMITTED && dbAccessor instanceof TransactionalDBAccessor) {
viewAccessor = ((TransactionalDBAccessor) dbAccessor).underlying;
} else {
viewAccessor = dbAccessor;
return ((TransactionalDBAccessor) dbAccessor).underlying;
}
return dbAccessor;
}

// Read helpers for isolation-level views that sit above this store (e.g. LogicalKeyValueSegment.readOnly).
byte[] get(final Bytes key, final DBAccessor accessor) {
validateStoreOpen();
try {
return cfAccessor.get(accessor, key.get());
} catch (final RocksDBException e) {
throw new ProcessorStateException("Error while getting value for key from store " + name, e);
}
}

byte[] get(final Bytes key, final ReadOptions readOptions, final DBAccessor accessor) {
validateStoreOpen();
try {
return cfAccessor.get(accessor, key.get(), readOptions);
} catch (final RocksDBException e) {
throw new ProcessorStateException("Error while getting value for key from store " + name, e);
}
return new ReadOnlyView(viewAccessor);
}

/**
Expand Down
Loading
Loading