From c0908124c2d3fb8d70c7b577ec16499c28b0fbc8 Mon Sep 17 00:00:00 2001 From: Oleksandr Porunov Date: Fri, 26 Jun 2026 21:01:37 +0100 Subject: [PATCH] feat(storage): delete whole row on vertex removal to reduce super-node tombstones Removing a vertex previously issued one column-level delete per incident edge/property on the vertex's storage row. For super-nodes this produced thousands of cell tombstones on a single Cassandra partition, leading to read timeouts and TombstoneOverwhelmingException. When a vertex is fully removed, JanusGraph now deletes its entire storage row in a single backend operation: on CQL a partition-level `DELETE ... WHERE key = ?` (one partition tombstone) instead of N per-column deletes. Because the per-column delete entries are no longer serialized, this also reduces client-side memory, batching, and network cost for large removals. Implementation: - New whole-row-deletion flag on `Mutation`, plumbed through `BackendTransaction.mutateEdges` -> `KCVSCache.mutateEntries` -> `CacheTransaction` -> `KCVMutation` -> `KeyColumnValueStoreManager.mutateMany`. - New `StoreFeatures.hasOptimizedWholeRowDeletion()` capability (default false). The graph skips building per-column deletions only when the backend advertises the capability, so backends without it keep the previous per-column behavior (no data loss). - Commit-time detection in `StandardJanusGraph.prepareCommitAddRelationMutations`, driven by `StandardJanusGraphTx` tracking the canonical ids of fully-removed vertices. Only the edge store is affected; the index store still deletes per-column (its rows are shared across vertices). Partitioned vertices wipe every representative row via canonical-id normalization. - Backend support: CQL/Scylla (partition delete), in-memory (drop key), HBase (column-family delete). BerkeleyJE and others fall back to per-column deletes. - Controlled by the new `storage.drop-whole-row-on-vertex-removal` option (default true). Tests cover the Mutation flag, the write-path plumbing, each backend, and graph-level integration (super-node removal, flag-off fallback, partial-edge removal, and partitioned-vertex removal). Documented in docs/changelog.md (1.2.0) and docs/storage-backend/cassandra.md. Co-Authored-By: Claude Opus 4.8 (1M context) Signed-off-by: Oleksandr Porunov --- docs/changelog.md | 12 ++ docs/configs/janusgraph-cfg.md | 1 + docs/storage-backend/cassandra.md | 9 + .../diskstorage/BackendTransaction.java | 6 +- .../org/janusgraph/diskstorage/Mutation.java | 21 ++- .../keycolumnvalue/StandardStoreFeatures.java | 20 ++- .../keycolumnvalue/StoreFeatures.java | 10 ++ .../cache/CacheTransaction.java | 15 +- .../keycolumnvalue/cache/KCVSCache.java | 6 +- .../GraphDatabaseConfiguration.java | 13 ++ .../graphdb/database/StandardJanusGraph.java | 30 +++- .../transaction/StandardJanusGraphTx.java | 30 ++++ .../graphdb/vertices/StandardVertex.java | 11 +- .../janusgraph/diskstorage/MutationTest.java | 73 ++++++++ .../StandardStoreFeaturesTest.java | 40 +++++ .../CacheTransactionWholeRowDeletionTest.java | 79 +++++++++ .../cql/CQLKeyColumnValueStore.java | 19 +++ .../cql/builder/CQLStoreFeaturesBuilder.java | 1 + .../mutate/AbstractCQLMutateManyFunction.java | 19 ++- .../cql/CQLWholeRowDeletionTest.java | 100 +++++++++++ .../diskstorage/hbase/HBaseStoreManager.java | 18 +- .../hbase/HBaseStoreManagerMutationTest.java | 63 +++++++ .../inmemory/InMemoryKeyColumnValueStore.java | 5 + .../inmemory/InMemoryStoreManager.java | 11 +- .../InMemoryWholeRowDeletionTest.java | 102 +++++++++++ ...WholeRowDeletionCapturingStoreManager.java | 62 +++++++ ...PartitionedVertexWholeRowDeletionTest.java | 159 ++++++++++++++++++ .../inmemory/RemovedVertexTrackingTest.java | 76 +++++++++ .../WholeRowDeletionOptimizationTest.java | 114 +++++++++++++ 29 files changed, 1102 insertions(+), 23 deletions(-) create mode 100644 janusgraph-core/src/test/java/org/janusgraph/diskstorage/MutationTest.java create mode 100644 janusgraph-core/src/test/java/org/janusgraph/diskstorage/keycolumnvalue/StandardStoreFeaturesTest.java create mode 100644 janusgraph-core/src/test/java/org/janusgraph/diskstorage/keycolumnvalue/cache/CacheTransactionWholeRowDeletionTest.java create mode 100644 janusgraph-cql/src/test/java/org/janusgraph/diskstorage/cql/CQLWholeRowDeletionTest.java create mode 100644 janusgraph-inmemory/src/test/java/org/janusgraph/diskstorage/inmemory/InMemoryWholeRowDeletionTest.java create mode 100644 janusgraph-inmemory/src/test/java/org/janusgraph/diskstorage/inmemory/WholeRowDeletionCapturingStoreManager.java create mode 100644 janusgraph-inmemory/src/test/java/org/janusgraph/graphdb/inmemory/PartitionedVertexWholeRowDeletionTest.java create mode 100644 janusgraph-inmemory/src/test/java/org/janusgraph/graphdb/inmemory/RemovedVertexTrackingTest.java create mode 100644 janusgraph-inmemory/src/test/java/org/janusgraph/graphdb/inmemory/WholeRowDeletionOptimizationTest.java diff --git a/docs/changelog.md b/docs/changelog.md index 298d39f208..33782b6cf8 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -122,6 +122,18 @@ graph.management-ack-timeout=240000 ms ``` This is a breaking change for users who use the `JanusGraphIndexStatusUpdate` interface. +##### Whole-row deletion on vertex removal (super-node tombstone reduction) + +Starting from version 1.2.0, when a vertex is removed JanusGraph deletes its entire storage row in a single +operation on backends that support it (CQL/Cassandra issues one partition-level delete instead of one +column delete per incident edge), drastically reducing tombstone pressure when removing super-nodes. + +This behavior is enabled by default. To restore the previous per-column deletion behavior, set: + +``` +storage.drop-whole-row-on-vertex-removal=false +``` + ### Version 1.1.0 (Release Date: November 7, 2024) /// tab | Maven diff --git a/docs/configs/janusgraph-cfg.md b/docs/configs/janusgraph-cfg.md index 2b6125cadf..7dc21a0aa0 100644 --- a/docs/configs/janusgraph-cfg.md +++ b/docs/configs/janusgraph-cfg.md @@ -423,6 +423,7 @@ Configuration options for the storage backend. Some options are applicable only | storage.connection-timeout | Default timeout, in milliseconds, when connecting to a remote database instance | Duration | 10000 ms | MASKABLE | | storage.directory | Storage directory for those storage backends that require local storage. | String | (no default value) | LOCAL | | storage.drop-on-clear | Whether to drop the graph database (true) or delete rows (false) when clearing storage. Note that some backends always drop the graph database when clearing storage. Also note that indices are always dropped when clearing storage. | Boolean | true | MASKABLE | +| storage.drop-whole-row-on-vertex-removal | When a vertex is removed, delete its entire storage row in a single operation (e.g. a Cassandra partition-level delete producing one tombstone) instead of deleting each edge/property column individually. This greatly reduces tombstone pressure when removing super-nodes. Only takes effect on storage backends that support optimized whole-row deletion; ignored otherwise. | Boolean | true | MASKABLE | | storage.hostname | The hostname or comma-separated list of hostnames of storage backend servers. This is only applicable to some storage backends, such as cassandra and hbase. | String[] | 127.0.0.1 | LOCAL | | storage.keys-size | The maximum amount of keys/partitions to retrieve from distributed storage system by JanusGraph in a single request. | Integer | 100 | MASKABLE | | storage.num-mutations-parallel-threshold | This parameter determines the minimum number of mutations a transaction must have before parallel processing is applied during aggregation. Leveraging parallel processing can enhance the commit times for transactions involving a large number of mutations. However, it is advisable not to set the threshold too low (e.g., 0 or 1) due to the overhead associated with parallelism synchronization. This overhead is more efficiently offset in the context of larger transactions. | Integer | 100 | MASKABLE | diff --git a/docs/storage-backend/cassandra.md b/docs/storage-backend/cassandra.md index 441861ef31..f3472a8c16 100644 --- a/docs/storage-backend/cassandra.md +++ b/docs/storage-backend/cassandra.md @@ -178,6 +178,15 @@ More information on Cassandra consistency levels and acceptable values can be found [here](https://docs.datastax.com/en/cassandra-oss/3.x/cassandra/dml/dmlDataConsistencyTOC.html). In general, higher levels are more consistent and robust but have higher latency. +### Super-node deletion (whole-row delete) + +Removing a vertex with many incident edges previously issued one column-level `DELETE` per edge on the +vertex's partition, creating a large number of cell tombstones that can overwhelm reads on that partition. +As of 1.2.0, when a vertex is removed JanusGraph instead issues a single partition-level +`DELETE ... WHERE key = ?`, producing one tombstone for the whole partition. This is controlled by +`storage.drop-whole-row-on-vertex-removal` (default `true`). Set it to `false` to restore the previous +per-column behavior. + ## Global Graph Operations JanusGraph over Cassandra supports global vertex and edge iteration. diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/BackendTransaction.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/BackendTransaction.java index 2e9bfa7ee4..8288651ac2 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/BackendTransaction.java +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/BackendTransaction.java @@ -213,7 +213,11 @@ public void logMutations(DataOutput out) { * @param deletions List of columns to be removed */ public void mutateEdges(StaticBuffer key, List additions, List deletions) throws BackendException { - edgeStore.mutateEntries(key, additions, deletions, storeTx); + mutateEdges(key, additions, deletions, false); + } + + public void mutateEdges(StaticBuffer key, List additions, List deletions, boolean wholeRowDeletion) throws BackendException { + edgeStore.mutateEntries(key, additions, deletions, wholeRowDeletion, storeTx); } /** diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/Mutation.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/Mutation.java index 466898a89b..4990342982 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/Mutation.java +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/Mutation.java @@ -37,6 +37,8 @@ public abstract class Mutation { private List deletions; + private boolean wholeRowDeletion = false; + public Mutation(List additions, List deletions) { assert additions!=null; assert deletions!=null; @@ -97,6 +99,21 @@ public List getDeletions() { return deletions; } + /** + * Whether this mutation deletes the entire row/partition for its key. When set, the backend + * (if it advertises {@code StoreFeatures.hasOptimizedWholeRowDeletion()}) deletes all columns + * for the key in a single operation before applying any additions, instead of deleting each + * column individually. + */ + public boolean hasWholeRowDeletion() { + return wholeRowDeletion; + } + + /** Marks whether this mutation deletes the entire row/partition for its key. @see #hasWholeRowDeletion() */ + public void setWholeRowDeletion(boolean wholeRowDeletion) { + this.wholeRowDeletion = wholeRowDeletion; + } + /** * Adds a new entry as an addition to this mutation * @@ -135,6 +152,8 @@ public void merge(Mutation m) { if (null == deletions) deletions = m.deletions; else deletions.addAll(m.deletions); } + + this.wholeRowDeletion = this.wholeRowDeletion || m.wholeRowDeletion; } public boolean isEmpty() { @@ -142,7 +161,7 @@ public boolean isEmpty() { } public int getTotalMutations() { - return (additions==null?0:additions.size()) + (deletions==null?0:deletions.size()); + return (additions==null?0:additions.size()) + (deletions==null?0:deletions.size()) + (wholeRowDeletion?1:0); } /** diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/StandardStoreFeatures.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/StandardStoreFeatures.java index 9bf4fb945b..2ad62a55bf 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/StandardStoreFeatures.java +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/StandardStoreFeatures.java @@ -44,6 +44,7 @@ public class StandardStoreFeatures implements StoreFeatures { private final Configuration scanTxConfig; private final boolean supportsInterruption; private final boolean optimisticLocking; + private final boolean optimizedWholeRowDeletion; @Override public boolean hasScan() { @@ -159,6 +160,11 @@ public boolean hasOptimisticLocking() { return optimisticLocking; } + @Override + public boolean hasOptimizedWholeRowDeletion() { + return optimizedWholeRowDeletion; + } + /** * The only way to instantiate {@link StandardStoreFeatures}. */ @@ -186,6 +192,7 @@ public static class Builder { private Configuration scanTxConfig; private boolean supportsInterruption = true; private boolean optimisticLocking; + private boolean optimizedWholeRowDeletion; /** * Construct a Builder with everything disabled/unsupported/false/null. @@ -219,6 +226,7 @@ public Builder(StoreFeatures template) { scanTxConfig(template.getScanTxConfig()); supportsInterruption(template.supportsInterruption()); optimisticLocking(template.hasOptimisticLocking()); + optimizedWholeRowDeletion(template.hasOptimizedWholeRowDeletion()); } public Builder optimisticLocking(boolean b) { @@ -226,6 +234,11 @@ public Builder optimisticLocking(boolean b) { return this; } + public Builder optimizedWholeRowDeletion(boolean b) { + optimizedWholeRowDeletion = b; + return this; + } + public Builder consistentScan(boolean consistentScan) { this.consistentScan = consistentScan; return this; @@ -343,7 +356,8 @@ public StandardStoreFeatures build() { timestamps, preferredTimestamps, cellLevelTTL, storeLevelTTL, visibility, supportsPersist, keyConsistentTxConfig, - localKeyConsistentTxConfig, scanTxConfig, supportsInterruption, optimisticLocking); + localKeyConsistentTxConfig, scanTxConfig, supportsInterruption, optimisticLocking, + optimizedWholeRowDeletion); } } @@ -356,7 +370,8 @@ private StandardStoreFeatures(boolean consistentScan, boolean unorderedScan, boo boolean visibility, boolean supportsPersist, Configuration keyConsistentTxConfig, Configuration localKeyConsistentTxConfig, - Configuration scanTxConfig, boolean supportsInterruption, boolean optimisticLocking) { + Configuration scanTxConfig, boolean supportsInterruption, boolean optimisticLocking, + boolean optimizedWholeRowDeletion) { this.consistentScan = consistentScan; this.unorderedScan = unorderedScan; this.orderedScan = orderedScan; @@ -379,5 +394,6 @@ private StandardStoreFeatures(boolean consistentScan, boolean unorderedScan, boo this.scanTxConfig = scanTxConfig; this.supportsInterruption = supportsInterruption; this.optimisticLocking = optimisticLocking; + this.optimizedWholeRowDeletion = optimizedWholeRowDeletion; } } diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/StoreFeatures.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/StoreFeatures.java index 6ec80bfa79..00638bc689 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/StoreFeatures.java +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/StoreFeatures.java @@ -223,4 +223,14 @@ public interface StoreFeatures { */ boolean hasOptimisticLocking(); + /** + * Whether this storage backend can delete an entire row/partition for a key in a single + * efficient operation, honoring {@link org.janusgraph.diskstorage.Mutation#hasWholeRowDeletion()}. + * Backends returning true must, when a mutation has this flag set, delete every column for the + * key (e.g. a Cassandra partition-level DELETE) rather than enumerating columns. + * + * @return true if the backend supports optimized whole-row deletion, else false + */ + boolean hasOptimizedWholeRowDeletion(); + } diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/cache/CacheTransaction.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/cache/CacheTransaction.java index a3f9a3e89b..40d45b1f03 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/cache/CacheTransaction.java +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/cache/CacheTransaction.java @@ -82,10 +82,15 @@ public StoreTransaction getWrappedTransaction() { } void mutate(KCVSCache store, StaticBuffer key, List additions, List deletions) throws BackendException { + mutate(store, key, additions, deletions, false); + } + + void mutate(KCVSCache store, StaticBuffer key, List additions, List deletions, boolean wholeRowDeletion) throws BackendException { Preconditions.checkNotNull(store); - if (additions.isEmpty() && deletions.isEmpty()) return; + if (additions.isEmpty() && deletions.isEmpty() && !wholeRowDeletion) return; KCVEntryMutation m = new KCVEntryMutation(additions, deletions); + if (wholeRowDeletion) m.setWholeRowDeletion(true); final Map storeMutation = mutations.computeIfAbsent(store, k -> new HashMap<>()); KCVEntryMutation existingM = storeMutation.get(key); if (existingM != null) { @@ -121,8 +126,9 @@ public String toString() { private KCVMutation convert(KCVEntryMutation mutation) { assert !mutation.isEmpty(); + final KCVMutation result; if (mutation.hasDeletions()) { - return new KCVMutation( + result = new KCVMutation( () -> new ArrayList<>(mutation.getAdditions()), () -> { List deletions = mutation.getDeletions(); @@ -132,8 +138,11 @@ private KCVMutation convert(KCVEntryMutation mutation) { } return convertedDeletions; }); + } else { + result = new KCVMutation(mutation.getAdditions(), KeyColumnValueStore.NO_DELETIONS); } - return new KCVMutation(mutation.getAdditions(), KeyColumnValueStore.NO_DELETIONS); + result.setWholeRowDeletion(mutation.hasWholeRowDeletion()); + return result; } private void flushInternal() throws BackendException { diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/cache/KCVSCache.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/cache/KCVSCache.java index 69fc93265b..0064423e08 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/cache/KCVSCache.java +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/cache/KCVSCache.java @@ -68,8 +68,12 @@ public void mutate(StaticBuffer key, List additions, List d } public void mutateEntries(StaticBuffer key, List additions, List deletions, StoreTransaction txh) throws BackendException { + mutateEntries(key, additions, deletions, false, txh); + } + + public void mutateEntries(StaticBuffer key, List additions, List deletions, boolean wholeRowDeletion, StoreTransaction txh) throws BackendException { assert txh instanceof CacheTransaction; - ((CacheTransaction) txh).mutate(this, key, additions, deletions); + ((CacheTransaction) txh).mutate(this, key, additions, deletions, wholeRowDeletion); } @Override diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/configuration/GraphDatabaseConfiguration.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/configuration/GraphDatabaseConfiguration.java index 29f96555f7..6797049480 100755 --- a/janusgraph-core/src/main/java/org/janusgraph/graphdb/configuration/GraphDatabaseConfiguration.java +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/configuration/GraphDatabaseConfiguration.java @@ -665,6 +665,13 @@ public boolean apply(@Nullable String s) { "Whether to enable batch loading into the storage backend", ConfigOption.Type.LOCAL, false); + public static final ConfigOption DROP_WHOLE_ROW_ON_VERTEX_REMOVAL = new ConfigOption<>(STORAGE_NS,"drop-whole-row-on-vertex-removal", + "When a vertex is removed, delete its entire storage row in a single operation (e.g. a Cassandra " + + "partition-level delete producing one tombstone) instead of deleting each edge/property column " + + "individually. This greatly reduces tombstone pressure when removing super-nodes. Only takes effect " + + "on storage backends that support optimized whole-row deletion; ignored otherwise.", + ConfigOption.Type.MASKABLE, true); + /** * Enables transactions on storage backends that support them */ @@ -1466,6 +1473,7 @@ public boolean apply(@Nullable String s) { private boolean allowVertexIdSetting; private boolean allowCustomVertexIdType; private boolean logTransactions; + private boolean dropWholeRowOnVertexRemoval; private String metricsPrefix; private String unknownIndexKeyName; private MultiQueryHasStepStrategyMode hasStepStrategyMode; @@ -1627,6 +1635,10 @@ public boolean hasLogTransactions() { return logTransactions; } + public boolean getDropWholeRowOnVertexRemoval() { + return dropWholeRowOnVertexRemoval; + } + public TimestampProvider getTimestampProvider() { return configuration.get(TIMESTAMP_PROVIDER); } @@ -1794,6 +1806,7 @@ private void preLoadConfiguration() { } logTransactions = configuration.get(SYSTEM_LOG_TRANSACTIONS); + dropWholeRowOnVertexRemoval = configuration.get(DROP_WHOLE_ROW_ON_VERTEX_REMOVAL); unknownIndexKeyName = configuration.get(IGNORE_UNKNOWN_INDEX_FIELD) ? UNKNOWN_FIELD_NAME : null; diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/database/StandardJanusGraph.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/database/StandardJanusGraph.java index cc04a46dda..39bee5d186 100755 --- a/janusgraph-core/src/main/java/org/janusgraph/graphdb/database/StandardJanusGraph.java +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/database/StandardJanusGraph.java @@ -174,6 +174,7 @@ public class StandardJanusGraph extends JanusGraphBlueprintsGraph { private final GraphDatabaseConfiguration config; private final Backend backend; private final IDManager idManager; + private final boolean wholeRowDeletionEnabled; private final VertexIDAssigner idAssigner; private final TimestampProvider times; private final CacheInvalidationService cacheInvalidationService; @@ -217,6 +218,8 @@ public StandardJanusGraph(GraphDatabaseConfiguration configuration) { this.idAssigner = config.getIDAssigner(backend); this.idManager = idAssigner.getIDManager(); + this.wholeRowDeletionEnabled = configuration.getDropWholeRowOnVertexRemoval() + && backend.getStoreFeatures().hasOptimizedWholeRowDeletion(); this.cacheInvalidationService = new KCVSCacheInvalidationService( backend.getEdgeStoreCache(), backend.getIndexStoreCache(), idManager); @@ -856,8 +859,18 @@ private void prepareCommitAddRelationMutations(final ListMultimap edges = mutations.get(vertexId); - final List additions = new ArrayList<>(edges.size()); - final List deletions = new ArrayList<>(Math.max(10, edges.size() / 10)); + + final Object canonicalId = idManager.isPartitionedVertex(vertexId) + ? idManager.getCanonicalVertexId(((Number) vertexId).longValue()) + : vertexId; + final boolean wholeRowDeletion = wholeRowDeletionEnabled && tx.isVertexFullyRemoved(canonicalId); + + // When the whole row is deleted, per-column deletions are skipped and a fully-removed + // vertex has no additions, so both lists stay empty. Avoid preallocating large backing + // arrays sized to edges.size() for super-node removals. + final List additions = new ArrayList<>(wholeRowDeletion ? 0 : edges.size()); + final List deletions = new ArrayList<>(wholeRowDeletion ? 0 : Math.max(10, edges.size() / 10)); + for (final InternalRelation edge : edges) { final InternalRelationType baseType = (InternalRelationType) edge.getType(); assert baseType.getBaseType()==null; @@ -868,11 +881,18 @@ private void prepareCommitAddRelationMutations(final ListMultimap 0) { entry.setMetaData(EntryMetaData.TTL, ttl); @@ -885,7 +905,7 @@ private void prepareCommitAddRelationMutations(final ListMultimap deletedRelations; + /** + * Keeps track of canonical ids of vertices that have been fully removed in this transaction + */ + private volatile Set removedVertexCanonicalIds = null; + //######## Index Caches /** * Caches the result of index calls so that repeated index queries don't need @@ -764,6 +769,31 @@ public boolean isRemovedRelation(Long relationId) { return deletedRelations.containsKey(relationId); } + /** Records that the given (loaded) vertex has been fully removed in this transaction, keyed by its canonical id, so commit can issue a single whole-row deletion instead of per-column deletes. */ + public void recordRemovedVertex(InternalVertex vertex) { + if (vertex.isNew()) return; + final Object canonicalId = idInspector.isPartitionedVertex(vertex.id()) + ? idManager.getCanonicalVertexId(((Number) vertex.id()).longValue()) + : vertex.id(); + Set result = removedVertexCanonicalIds; + if (result == null) { + if (config.isSingleThreaded()) { + removedVertexCanonicalIds = result = new HashSet<>(); + } else { + synchronized (this) { + result = removedVertexCanonicalIds; + if (result == null) removedVertexCanonicalIds = result = ConcurrentHashMap.newKeySet(); + } + } + } + result.add(canonicalId); + } + + /** Returns true if the vertex with the given canonical id was fully removed in this transaction (see {@link #recordRemovedVertex}). */ + public boolean isVertexFullyRemoved(Object canonicalId) { + return removedVertexCanonicalIds != null && removedVertexCanonicalIds.contains(canonicalId); + } + private TransactionLock getLock(final Object... tuple) { return getLock(new LockTuple(tuple)); } diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/vertices/StandardVertex.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/vertices/StandardVertex.java index 8572157ead..d77846c971 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/graphdb/vertices/StandardVertex.java +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/vertices/StandardVertex.java @@ -162,13 +162,20 @@ public boolean hasAddedRelations() { @Override public synchronized void remove() { super.remove(); - ((StandardVertex) it()).updateLifeCycle(ElementLifeCycle.Event.REMOVED); + // Resolve the current-transaction vertex once. Calling it() again after the lifecycle is + // flipped to REMOVED would re-resolve via getNextTx().getVertex(id) and throw "was removed" + // when this reference originated in an already-closed transaction. + final StandardVertex vertex = (StandardVertex) it(); + vertex.updateLifeCycle(ElementLifeCycle.Event.REMOVED); + tx().recordRemovedVertex(vertex); } @Override public synchronized void remove(Iterable loadedRelations) { super.remove(loadedRelations); - ((StandardVertex) it()).updateLifeCycle(ElementLifeCycle.Event.REMOVED); + final StandardVertex vertex = (StandardVertex) it(); + vertex.updateLifeCycle(ElementLifeCycle.Event.REMOVED); + tx().recordRemovedVertex(vertex); } @Override diff --git a/janusgraph-core/src/test/java/org/janusgraph/diskstorage/MutationTest.java b/janusgraph-core/src/test/java/org/janusgraph/diskstorage/MutationTest.java new file mode 100644 index 0000000000..1a51815645 --- /dev/null +++ b/janusgraph-core/src/test/java/org/janusgraph/diskstorage/MutationTest.java @@ -0,0 +1,73 @@ +// Copyright 2017 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage; + +import org.janusgraph.diskstorage.keycolumnvalue.KCVMutation; +import org.janusgraph.diskstorage.keycolumnvalue.KeyColumnValueStore; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class MutationTest { + + private static KCVMutation empty() { + return new KCVMutation(KeyColumnValueStore.NO_ADDITIONS, KeyColumnValueStore.NO_DELETIONS); + } + + @Test + public void defaultsToNoWholeRowDeletion() { + assertFalse(empty().hasWholeRowDeletion()); + } + + @Test + public void wholeRowDeletionMakesMutationNonEmpty() { + KCVMutation m = empty(); + assertTrue(m.isEmpty()); + m.setWholeRowDeletion(true); + assertTrue(m.hasWholeRowDeletion()); + assertFalse(m.isEmpty()); + assertEquals(1, m.getTotalMutations()); + } + + @Test + public void mergeOrsWholeRowDeletion() { + KCVMutation a = empty(); + KCVMutation b = empty(); + b.setWholeRowDeletion(true); + a.merge(b); + assertTrue(a.hasWholeRowDeletion()); + } + + @Test + public void mergeKeepsWholeRowDeletionWhenAlreadySet() { + KCVMutation a = empty(); + a.setWholeRowDeletion(true); + KCVMutation b = empty(); // b has no whole-row deletion + a.merge(b); + assertTrue(a.hasWholeRowDeletion()); + } + + @Test + public void mergeWholeRowDeletionBothSet() { + KCVMutation a = empty(); + a.setWholeRowDeletion(true); + KCVMutation b = empty(); + b.setWholeRowDeletion(true); + a.merge(b); + assertTrue(a.hasWholeRowDeletion()); + } +} diff --git a/janusgraph-core/src/test/java/org/janusgraph/diskstorage/keycolumnvalue/StandardStoreFeaturesTest.java b/janusgraph-core/src/test/java/org/janusgraph/diskstorage/keycolumnvalue/StandardStoreFeaturesTest.java new file mode 100644 index 0000000000..e75444f6da --- /dev/null +++ b/janusgraph-core/src/test/java/org/janusgraph/diskstorage/keycolumnvalue/StandardStoreFeaturesTest.java @@ -0,0 +1,40 @@ +// Copyright 2026 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.keycolumnvalue; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class StandardStoreFeaturesTest { + + @Test + public void wholeRowDeletionDefaultsFalse() { + assertFalse(new StandardStoreFeatures.Builder().build().hasOptimizedWholeRowDeletion()); + } + + @Test + public void wholeRowDeletionCanBeEnabled() { + assertTrue(new StandardStoreFeatures.Builder().optimizedWholeRowDeletion(true).build() + .hasOptimizedWholeRowDeletion()); + } + + @Test + public void copyConstructorPreservesWholeRowDeletion() { + StoreFeatures template = new StandardStoreFeatures.Builder().optimizedWholeRowDeletion(true).build(); + assertTrue(new StandardStoreFeatures.Builder(template).build().hasOptimizedWholeRowDeletion()); + } +} diff --git a/janusgraph-core/src/test/java/org/janusgraph/diskstorage/keycolumnvalue/cache/CacheTransactionWholeRowDeletionTest.java b/janusgraph-core/src/test/java/org/janusgraph/diskstorage/keycolumnvalue/cache/CacheTransactionWholeRowDeletionTest.java new file mode 100644 index 0000000000..69ff7a923f --- /dev/null +++ b/janusgraph-core/src/test/java/org/janusgraph/diskstorage/keycolumnvalue/cache/CacheTransactionWholeRowDeletionTest.java @@ -0,0 +1,79 @@ +// Copyright 2024 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.keycolumnvalue.cache; + +import org.janusgraph.diskstorage.BackendException; +import org.janusgraph.diskstorage.StaticBuffer; +import org.janusgraph.diskstorage.keycolumnvalue.KCVMutation; +import org.janusgraph.diskstorage.keycolumnvalue.KeyColumnValueStore; +import org.janusgraph.diskstorage.keycolumnvalue.KeyColumnValueStoreManager; +import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction; +import org.janusgraph.diskstorage.util.BufferUtil; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class CacheTransactionWholeRowDeletionTest { + + @SuppressWarnings("unchecked") + @Test + public void wholeRowDeletionFlagReachesBackend() throws BackendException { + KeyColumnValueStoreManager manager = mock(KeyColumnValueStoreManager.class); + KeyColumnValueStore rawStore = mock(KeyColumnValueStore.class); + when(rawStore.getName()).thenReturn("edgestore"); + NoKCVSCache cache = new NoKCVSCache(rawStore); + StoreTransaction wrappedTx = mock(StoreTransaction.class); + CacheTransaction ctx = new CacheTransaction(wrappedTx, manager, 100, 100, Duration.ofSeconds(10), false); + + // Capture a deep copy of the mutations map before persist() clears it. + // Mockito ArgumentCaptor captures by reference, which is cleared after mutateMany returns, + // so we use doAnswer to snapshot the contents at call time. + AtomicReference>> captured = new AtomicReference<>(); + doAnswer(invocation -> { + Map> arg = + (Map>) invocation.getArgument(0); + // Shallow copy of submutations map taken before persist() clears it; the KCVMutation values are not mutated afterward. + Map> snapshot = new HashMap<>(); + for (Map.Entry> e : arg.entrySet()) { + snapshot.put(e.getKey(), new HashMap<>(e.getValue())); + } + captured.set(snapshot); + return null; + }).when(manager).mutateMany(any(), any()); + + StaticBuffer key = BufferUtil.getIntBuffer(1); + cache.mutateEntries(key, KeyColumnValueStore.NO_ADDITIONS, KCVSCache.NO_DELETIONS, true, ctx); + ctx.commit(); + + Map> mutations = captured.get(); + assertNotNull(mutations, "mutateMany was not called"); + Map storeMutations = mutations.get("edgestore"); + assertNotNull(storeMutations, "no mutations for 'edgestore'"); + KCVMutation m = storeMutations.get(key); + assertNotNull(m, "no mutation for key"); + assertTrue(m.hasWholeRowDeletion(), "wholeRowDeletion flag must be true"); + assertTrue(m.getDeletions().isEmpty(), "deletions must be empty for a whole-row deletion"); + } +} diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLKeyColumnValueStore.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLKeyColumnValueStore.java index f983205126..e4cfff63ab 100644 --- a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLKeyColumnValueStore.java +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLKeyColumnValueStore.java @@ -161,6 +161,7 @@ private static boolean isKeySizeTooLarge(Throwable cause) { private final PreparedStatement getKeysAll; private final PreparedStatement getKeysRanged; private final PreparedStatement deleteColumn; + private final PreparedStatement deleteRow; private final PreparedStatement insertColumn; private final PreparedStatement insertColumnWithTTL; @@ -235,6 +236,11 @@ public CQLKeyColumnValueStore(final CQLStoreManager storeManager, final String t .whereColumn(COLUMN_COLUMN_NAME).isEqualTo(bindMarker(COLUMN_BINDING)) .build()); + final DeleteSelection deleteRowSelection = addUsingTimestamp(deleteFrom(this.storeManager.getKeyspaceName(), this.tableName)); + this.deleteRow = this.session.prepare(deleteRowSelection + .whereColumn(KEY_COLUMN_NAME).isEqualTo(bindMarker(KEY_BINDING)) + .build()); + final Insert insertColumnInsert = addUsingTimestamp(insertInto(this.storeManager.getKeyspaceName(), this.tableName) .value(KEY_COLUMN_NAME, bindMarker(KEY_BINDING)) .value(COLUMN_COLUMN_NAME, bindMarker(COLUMN_BINDING)) @@ -447,6 +453,19 @@ public BatchableStatement deleteColumn(final StaticBuffer key, f return builder.build(); } + public BatchableStatement deleteRow(final StaticBuffer key) { + return deleteRow(key, null); + } + + public BatchableStatement deleteRow(final StaticBuffer key, final Long timestamp) { + BoundStatementBuilder builder = deleteRow.boundStatementBuilder() + .setByteBuffer(KEY_BINDING, key.asByteBuffer()); + if (timestamp != null) { + builder = builder.setLong(TIMESTAMP_BINDING, timestamp); + } + return builder.build(); + } + public BatchableStatement insertColumn(final StaticBuffer key, final Entry entry) { return insertColumn(key, entry, null); } diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/builder/CQLStoreFeaturesBuilder.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/builder/CQLStoreFeaturesBuilder.java index a49d175f1c..369b6f1bfe 100644 --- a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/builder/CQLStoreFeaturesBuilder.java +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/builder/CQLStoreFeaturesBuilder.java @@ -61,6 +61,7 @@ public CQLStoreFeaturesWrapper build(final CqlSession session, final Configurati fb.keyConsistent((onlyUseLocalConsistency ? local : global), local); fb.locking(useExternalLocking); fb.optimisticLocking(true); + fb.optimizedWholeRowDeletion(true); fb.multiQuery(true); if (!configuration.get(TTL_ENABLED)) { diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/mutate/AbstractCQLMutateManyFunction.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/mutate/AbstractCQLMutateManyFunction.java index f71f5d2707..a54de2ccce 100644 --- a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/mutate/AbstractCQLMutateManyFunction.java +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/mutate/AbstractCQLMutateManyFunction.java @@ -41,15 +41,26 @@ public AbstractCQLMutateManyFunction(final ConsumerWithBackendException Iterator.of(commitTime.getDeletionTime(times)) - .flatMap(deleteTime -> Iterator.ofAll(keyMutations.getDeletions()).map(deletion -> columnValueStore.deleteColumn(key, deletion, deleteTime))); + this.deletionsFunction = (commitTime, keyMutations, columnValueStore, key) -> { + if (keyMutations.hasWholeRowDeletion()) { + return Iterator.of(columnValueStore.deleteRow(key, commitTime.getDeletionTime(times))); + } + return Iterator.of(commitTime.getDeletionTime(times)) + .flatMap(deleteTime -> Iterator.ofAll(keyMutations.getDeletions()) + .map(deletion -> columnValueStore.deleteColumn(key, deletion, deleteTime))); + }; this.additionsFunction = (commitTime, keyMutations, columnValueStore, key) -> Iterator.of(commitTime.getAdditionTime(times)) .flatMap(addTime -> Iterator.ofAll(keyMutations.getAdditions()).map(addition -> columnValueStore.insertColumn(key, addition, addTime))); } else { this.createMaskedTimestampFunction = txh -> null; this.sleepAfterWriteFunction = mustPass -> {}; - this.deletionsFunction = (commitTime, keyMutations, columnValueStore, key) -> Iterator.ofAll(keyMutations.getDeletions()) - .map(deletion -> columnValueStore.deleteColumn(key, deletion)); + this.deletionsFunction = (commitTime, keyMutations, columnValueStore, key) -> { + if (keyMutations.hasWholeRowDeletion()) { + return Iterator.of(columnValueStore.deleteRow(key)); + } + return Iterator.ofAll(keyMutations.getDeletions()) + .map(deletion -> columnValueStore.deleteColumn(key, deletion)); + }; this.additionsFunction = (commitTime, keyMutations, columnValueStore, key) -> Iterator.ofAll(keyMutations.getAdditions()) .map(addition -> columnValueStore.insertColumn(key, addition)); } diff --git a/janusgraph-cql/src/test/java/org/janusgraph/diskstorage/cql/CQLWholeRowDeletionTest.java b/janusgraph-cql/src/test/java/org/janusgraph/diskstorage/cql/CQLWholeRowDeletionTest.java new file mode 100644 index 0000000000..9bb6d1db17 --- /dev/null +++ b/janusgraph-cql/src/test/java/org/janusgraph/diskstorage/cql/CQLWholeRowDeletionTest.java @@ -0,0 +1,100 @@ +// Copyright 2024 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.cql; + +import org.janusgraph.JanusGraphCassandraContainer; +import org.janusgraph.diskstorage.AbstractKCVSTest; +import org.janusgraph.diskstorage.BackendException; +import org.janusgraph.diskstorage.Entry; +import org.janusgraph.diskstorage.StaticBuffer; +import org.janusgraph.diskstorage.keycolumnvalue.KCVMutation; +import org.janusgraph.diskstorage.keycolumnvalue.KeyColumnValueStore; +import org.janusgraph.diskstorage.keycolumnvalue.KeyColumnValueStoreManager; +import org.janusgraph.diskstorage.keycolumnvalue.KeySliceQuery; +import org.janusgraph.diskstorage.keycolumnvalue.SliceQuery; +import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction; +import org.janusgraph.diskstorage.util.BufferUtil; +import org.janusgraph.diskstorage.util.StaticArrayEntry; +import org.junit.jupiter.api.Test; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.util.Arrays; +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@Testcontainers +public class CQLWholeRowDeletionTest extends AbstractKCVSTest { + + @Container + public static final JanusGraphCassandraContainer cqlContainer = new JanusGraphCassandraContainer(); + + private KeyColumnValueStoreManager openStorageManager() throws BackendException { + return new CachingCQLStoreManager(cqlContainer.getConfiguration(getClass().getSimpleName())); + } + + private static Entry e(int col, int val) { + return StaticArrayEntry.of(BufferUtil.getIntBuffer(col), BufferUtil.getIntBuffer(val)); + } + + @Test + public void featureAdvertised() throws Exception { + KeyColumnValueStoreManager mgr = openStorageManager(); + try { + assertTrue(mgr.getFeatures().hasOptimizedWholeRowDeletion(), + "CQL backend must advertise hasOptimizedWholeRowDeletion=true"); + } finally { + mgr.close(); + } + } + + @Test + public void wholeRowDeletionEmptiesPartition() throws Exception { + KeyColumnValueStoreManager mgr = openStorageManager(); + try { + KeyColumnValueStore store = mgr.openDatabase("wrdtest"); + + // Write 3 columns to a key + StoreTransaction tx = mgr.beginTransaction(getTxConfig()); + StaticBuffer key = BufferUtil.getIntBuffer(42); + store.mutate(key, Arrays.asList(e(1, 1), e(2, 2), e(3, 3)), KeyColumnValueStore.NO_DELETIONS, tx); + tx.commit(); + + // Verify 3 columns are present + tx = mgr.beginTransaction(getTxConfig()); + SliceQuery all = new SliceQuery(BufferUtil.getIntBuffer(0), BufferUtil.getIntBuffer(Integer.MAX_VALUE)); + assertEquals(3, store.getSlice(new KeySliceQuery(key, all), tx).size(), + "Expected 3 columns before whole-row deletion"); + tx.rollback(); + + // Issue whole-row deletion + tx = mgr.beginTransaction(getTxConfig()); + KCVMutation m = new KCVMutation(KeyColumnValueStore.NO_ADDITIONS, KeyColumnValueStore.NO_DELETIONS); + m.setWholeRowDeletion(true); + mgr.mutateMany(Collections.singletonMap("wrdtest", Collections.singletonMap(key, m)), tx); + tx.commit(); + + // Verify partition is now empty + tx = mgr.beginTransaction(getTxConfig()); + assertEquals(0, store.getSlice(new KeySliceQuery(key, all), tx).size(), + "Expected 0 columns after whole-row deletion"); + tx.rollback(); + } finally { + mgr.close(); + } + } +} diff --git a/janusgraph-hbase/src/main/java/org/janusgraph/diskstorage/hbase/HBaseStoreManager.java b/janusgraph-hbase/src/main/java/org/janusgraph/diskstorage/hbase/HBaseStoreManager.java index 51801ab285..d8ccba1e06 100644 --- a/janusgraph-hbase/src/main/java/org/janusgraph/diskstorage/hbase/HBaseStoreManager.java +++ b/janusgraph-hbase/src/main/java/org/janusgraph/diskstorage/hbase/HBaseStoreManager.java @@ -381,7 +381,8 @@ public StoreFeatures getFeatures() { .orderedScan(true).unorderedScan(true).batchMutation(true) .multiQuery(true).distributed(true).keyOrdered(true).storeTTL(true) .cellTTL(true).timestamps(true).preferredTimestamps(PREFERRED_TIMESTAMPS) - .optimisticLocking(true).keyConsistent(c); + .optimisticLocking(true).keyConsistent(c) + .optimizedWholeRowDeletion(true); try { fb.localKeyPartition(getDeployment() == Deployment.LOCAL); @@ -950,7 +951,20 @@ Map, Delete>> convertToCommands(MapBefore the fix, a flag-only mutation with empty deletions list produced no Delete at all + * (the old guard was {@code if (mutation.hasDeletions())}). After the fix it must produce a + * Delete whose cell map contains exactly one {@link Cell.Type#DeleteFamily} cell, with no + * qualifier bytes (i.e. qualifier length == 0) — the hallmark of a family-level tombstone. + */ + @Test + public void convertToCommandsWholeRowDeletionUsesFamilyDelete() throws Exception { + final String storeName = "store1"; + final StaticBuffer rowkey = KeyColumnValueStoreUtil.longToByteBuffer(42); + + KCVMutation mutation = new KCVMutation(Collections.emptyList(), Collections.emptyList()); + mutation.setWholeRowDeletion(true); + + final Map rowkeyMutationMap = new HashMap<>(); + rowkeyMutationMap.put(rowkey, mutation); + + final Map> storeMutationMap = new HashMap<>(); + storeMutationMap.put(storeName, rowkeyMutationMap); + + HBaseStoreManager manager = new HBaseStoreManager(hBaseContainer.getModifiableConfiguration()); + // Use a non-null delTimestamp so the timestamp branch is exercised. + final long delTimestamp = System.currentTimeMillis(); + final Map, Delete>> commandsPerRowKey = + manager.convertToCommands(storeMutationMap, null, delTimestamp); + + assertEquals(1, commandsPerRowKey.size(), "Expected exactly one rowkey entry"); + Pair, Delete> commands = commandsPerRowKey.values().iterator().next(); + + // A whole-row deletion must produce a Delete (old code would have produced null). + Delete d = commands.getSecond(); + assertNotNull(d, "Delete must be non-null for a whole-row-deletion mutation"); + + // The Delete's family-cell map must contain at least one entry. + assertNotNull(d.getFamilyCellMap(), "Family cell map must not be null"); + assertFalse(d.getFamilyCellMap().isEmpty(), "Family cell map must not be empty"); + + // Every cell in the Delete must be a DeleteFamily tombstone (no per-qualifier delete). + boolean hasDeleteFamily = false; + for (Map.Entry> familyEntry : d.getFamilyCellMap().entrySet()) { + for (Cell c : familyEntry.getValue()) { + Cell.Type type = c.getType(); + assertEquals(Cell.Type.DeleteFamily, type, + "Expected DeleteFamily cell type, got " + type + " — a per-column qualifier delete was produced instead of a family delete"); + // A family delete cell has a zero-length qualifier. + assertEquals(0, c.getQualifierLength(), + "Family delete cell must have an empty qualifier"); + hasDeleteFamily = true; + } + } + assertTrue(hasDeleteFamily, "At least one DeleteFamily cell must be present"); + + // No puts should have been produced (the mutation had no additions). + assertTrue(commands.getFirst().isEmpty(), "Expected no Put commands for a whole-row-deletion-only mutation"); + } } diff --git a/janusgraph-inmemory/src/main/java/org/janusgraph/diskstorage/inmemory/InMemoryKeyColumnValueStore.java b/janusgraph-inmemory/src/main/java/org/janusgraph/diskstorage/inmemory/InMemoryKeyColumnValueStore.java index 9b29bd6b6e..83650bf41b 100644 --- a/janusgraph-inmemory/src/main/java/org/janusgraph/diskstorage/inmemory/InMemoryKeyColumnValueStore.java +++ b/janusgraph-inmemory/src/main/java/org/janusgraph/diskstorage/inmemory/InMemoryKeyColumnValueStore.java @@ -123,6 +123,11 @@ public void mutate(StaticBuffer key, List additions, List d cvs.mutate(additions, deletions, txh); } + /** Removes the entire row (all columns) for the given key in a single operation. */ + void deleteRow(StaticBuffer key) { + kcv.remove(key); + } + @Override public void acquireLock(StaticBuffer key, StaticBuffer column, StaticBuffer expectedValue, StoreTransaction txh) throws BackendException { throw new UnsupportedOperationException(); diff --git a/janusgraph-inmemory/src/main/java/org/janusgraph/diskstorage/inmemory/InMemoryStoreManager.java b/janusgraph-inmemory/src/main/java/org/janusgraph/diskstorage/inmemory/InMemoryStoreManager.java index 413a62fcf8..b9ee2309be 100644 --- a/janusgraph-inmemory/src/main/java/org/janusgraph/diskstorage/inmemory/InMemoryStoreManager.java +++ b/janusgraph-inmemory/src/main/java/org/janusgraph/diskstorage/inmemory/InMemoryStoreManager.java @@ -67,6 +67,7 @@ public InMemoryStoreManager(final Configuration configuration) { .keyOrdered(true) .persists(false) .optimisticLocking(true) + .optimizedWholeRowDeletion(true) .keyConsistent(GraphDatabaseConfiguration.buildGraphConfiguration()) .build(); } @@ -115,10 +116,16 @@ public KeyColumnValueStore openDatabase(final String name, StoreMetaData.Contain @Override public void mutateMany(Map> mutations, StoreTransaction txh) throws BackendException { for (Map.Entry> storeMut : mutations.entrySet()) { - KeyColumnValueStore store = stores.get(storeMut.getKey()); + InMemoryKeyColumnValueStore store = stores.get(storeMut.getKey()); Preconditions.checkNotNull(store); for (Map.Entry keyMut : storeMut.getValue().entrySet()) { - store.mutate(keyMut.getKey(), keyMut.getValue().getAdditions(), keyMut.getValue().getDeletions(), txh); + KCVMutation mut = keyMut.getValue(); + if (mut.hasWholeRowDeletion()) { + store.deleteRow(keyMut.getKey()); + } + if (mut.hasAdditions() || mut.hasDeletions()) { + store.mutate(keyMut.getKey(), mut.getAdditions(), mut.getDeletions(), txh); + } } } } diff --git a/janusgraph-inmemory/src/test/java/org/janusgraph/diskstorage/inmemory/InMemoryWholeRowDeletionTest.java b/janusgraph-inmemory/src/test/java/org/janusgraph/diskstorage/inmemory/InMemoryWholeRowDeletionTest.java new file mode 100644 index 0000000000..ae570cb243 --- /dev/null +++ b/janusgraph-inmemory/src/test/java/org/janusgraph/diskstorage/inmemory/InMemoryWholeRowDeletionTest.java @@ -0,0 +1,102 @@ +// Copyright 2024 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.inmemory; + +import org.janusgraph.diskstorage.BackendException; +import org.janusgraph.diskstorage.Entry; +import org.janusgraph.diskstorage.StaticBuffer; +import org.janusgraph.diskstorage.keycolumnvalue.KCVMutation; +import org.janusgraph.diskstorage.keycolumnvalue.KeyColumnValueStore; +import org.janusgraph.diskstorage.keycolumnvalue.KeySliceQuery; +import org.janusgraph.diskstorage.keycolumnvalue.SliceQuery; +import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction; +import org.janusgraph.diskstorage.util.BufferUtil; +import org.janusgraph.diskstorage.util.StandardBaseTransactionConfig; +import org.janusgraph.diskstorage.util.StaticArrayEntry; +import org.janusgraph.diskstorage.util.time.TimestampProviders; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class InMemoryWholeRowDeletionTest { + + private static Entry entry(int col, int val) { + return StaticArrayEntry.of(BufferUtil.getIntBuffer(col), BufferUtil.getIntBuffer(val)); + } + + @Test + public void featureAdvertised() { + assertTrue(new InMemoryStoreManager().getFeatures().hasOptimizedWholeRowDeletion(), + "InMemoryStoreManager should advertise hasOptimizedWholeRowDeletion=true"); + } + + @Test + public void wholeRowDeletionRemovesAllColumns() throws BackendException { + InMemoryStoreManager mgr = new InMemoryStoreManager(); + StoreTransaction txh = mgr.beginTransaction( + StandardBaseTransactionConfig.of(TimestampProviders.MICRO)); + KeyColumnValueStore store = mgr.openDatabase("edgestore"); + StaticBuffer key = BufferUtil.getIntBuffer(7); + + // Insert 3 columns + store.mutate(key, Arrays.asList(entry(1, 10), entry(2, 20), entry(3, 30)), + KeyColumnValueStore.NO_DELETIONS, txh); + + // Sanity check: 3 columns present + SliceQuery all = new SliceQuery(BufferUtil.zeroBuffer(4), BufferUtil.oneBuffer(4)); + assertEquals(3, store.getSlice(new KeySliceQuery(key, all), txh).size(), + "Expected 3 columns before whole-row deletion"); + + // Whole-row deletion via mutateMany + KCVMutation m = new KCVMutation(KeyColumnValueStore.NO_ADDITIONS, KeyColumnValueStore.NO_DELETIONS); + m.setWholeRowDeletion(true); + mgr.mutateMany(Collections.singletonMap("edgestore", Collections.singletonMap(key, m)), txh); + + assertEquals(0, store.getSlice(new KeySliceQuery(key, all), txh).size(), + "Expected 0 columns after whole-row deletion"); + } + + @Test + public void wholeRowDeletionThenAdditionKeepsOnlyNewColumn() throws BackendException { + InMemoryStoreManager mgr = new InMemoryStoreManager(); + StoreTransaction txh = mgr.beginTransaction( + StandardBaseTransactionConfig.of(TimestampProviders.MICRO)); + KeyColumnValueStore store = mgr.openDatabase("edgestore"); + StaticBuffer key = BufferUtil.getIntBuffer(7); + + // Insert 3 columns + store.mutate(key, Arrays.asList(entry(1, 10), entry(2, 20), entry(3, 30)), + KeyColumnValueStore.NO_DELETIONS, txh); + + // Sanity check: 3 columns present + SliceQuery all = new SliceQuery(BufferUtil.zeroBuffer(4), BufferUtil.oneBuffer(4)); + assertEquals(3, store.getSlice(new KeySliceQuery(key, all), txh).size(), + "Expected 3 columns before whole-row deletion"); + + // Whole-row deletion with one new addition + KCVMutation m = new KCVMutation(Arrays.asList(entry(5, 50)), KeyColumnValueStore.NO_DELETIONS); + m.setWholeRowDeletion(true); + mgr.mutateMany(Collections.singletonMap("edgestore", Collections.singletonMap(key, m)), txh); + + // Verify only the new column (5) is present + SliceQuery allRange = new SliceQuery(BufferUtil.zeroBuffer(4), BufferUtil.oneBuffer(4)); + assertEquals(1, store.getSlice(new KeySliceQuery(key, allRange), txh).size(), + "Expected 1 column after whole-row deletion with addition"); + } +} diff --git a/janusgraph-inmemory/src/test/java/org/janusgraph/diskstorage/inmemory/WholeRowDeletionCapturingStoreManager.java b/janusgraph-inmemory/src/test/java/org/janusgraph/diskstorage/inmemory/WholeRowDeletionCapturingStoreManager.java new file mode 100644 index 0000000000..18492931d5 --- /dev/null +++ b/janusgraph-inmemory/src/test/java/org/janusgraph/diskstorage/inmemory/WholeRowDeletionCapturingStoreManager.java @@ -0,0 +1,62 @@ +// Copyright 2024 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.inmemory; + +import org.janusgraph.diskstorage.BackendException; +import org.janusgraph.diskstorage.StaticBuffer; +import org.janusgraph.diskstorage.configuration.Configuration; +import org.janusgraph.diskstorage.keycolumnvalue.KCVMutation; +import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; + +/** Test backend: delegates to in-memory but records every mutateMany for assertions. */ +public class WholeRowDeletionCapturingStoreManager extends InMemoryStoreManager { + + public static final class Captured { + public final String store; + public final StaticBuffer key; + public final boolean wholeRow; + public final int deletions; + public final int additions; + + Captured(String store, StaticBuffer key, boolean wholeRow, int deletions, int additions) { + this.store = store; + this.key = key; + this.wholeRow = wholeRow; + this.deletions = deletions; + this.additions = additions; + } + } + + public static final List CAPTURED = new CopyOnWriteArrayList<>(); + + public static void reset() { + CAPTURED.clear(); + } + + public WholeRowDeletionCapturingStoreManager(Configuration configuration) { + super(configuration); + } + + @Override + public void mutateMany(Map> mutations, StoreTransaction txh) throws BackendException { + mutations.forEach((store, km) -> km.forEach((key, m) -> + CAPTURED.add(new Captured(store, key, m.hasWholeRowDeletion(), m.getDeletions().size(), m.getAdditions().size())))); + super.mutateMany(mutations, txh); + } +} diff --git a/janusgraph-inmemory/src/test/java/org/janusgraph/graphdb/inmemory/PartitionedVertexWholeRowDeletionTest.java b/janusgraph-inmemory/src/test/java/org/janusgraph/graphdb/inmemory/PartitionedVertexWholeRowDeletionTest.java new file mode 100644 index 0000000000..4352352e56 --- /dev/null +++ b/janusgraph-inmemory/src/test/java/org/janusgraph/graphdb/inmemory/PartitionedVertexWholeRowDeletionTest.java @@ -0,0 +1,159 @@ +// Copyright 2024 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.graphdb.inmemory; + +import org.janusgraph.core.JanusGraph; +import org.janusgraph.core.JanusGraphFactory; +import org.janusgraph.core.JanusGraphVertex; +import org.janusgraph.core.schema.JanusGraphManagement; +import org.janusgraph.diskstorage.StaticBuffer; +import org.janusgraph.diskstorage.inmemory.WholeRowDeletionCapturingStoreManager; +import org.janusgraph.graphdb.database.StandardJanusGraph; +import org.janusgraph.graphdb.idmanagement.IDManager; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.janusgraph.diskstorage.Backend.EDGESTORE_NAME; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Verifies that removing a partitioned vertex triggers whole-row deletion for + * ALL physical representative rows (canonical + per-partition) in the edge store. + * + *

A partitioned vertex is stored once per partition that holds edges to/from it, + * so its removal must whole-row-delete every such row, not only the canonical one. + * The detection in {@code StandardJanusGraph.prepareCommitAddRelationMutations} + * normalises each physical vertex-id to the canonical form before checking + * {@code isVertexFullyRemoved}, which should match all representatives. + */ +public class PartitionedVertexWholeRowDeletionTest { + + private JanusGraph graph; + + @AfterEach + public void close() { + try { + if (graph != null) { + graph.close(); + } + } finally { + WholeRowDeletionCapturingStoreManager.reset(); + } + } + + /** + * Opens a JanusGraph instance backed by the capturing in-memory store, with + * whole-row deletion enabled and 8 partitions. + *

+ * {@code ids.flush=false} forces JanusGraph to commit IDs up-front so that + * vertices in each transaction are spread across distinct partitions — + * the prerequisite for generating multiple physical rows for a partitioned vertex. + */ + private JanusGraph openWithPartitions() { + return JanusGraphFactory.build() + .set("storage.backend", WholeRowDeletionCapturingStoreManager.class.getName()) + .set("storage.drop-whole-row-on-vertex-removal", true) + .set("cluster.max-partitions", 8) + // spread IDs across partitions from the first commit + .set("ids.flush", false) + // keep multiple concurrent partition blocks so edges land in different partitions + .set("ids.num-partitions", 24) + .open(); + } + + @Test + public void partitionedVertexRemovalWholeRowDeletesAllRepresentatives() { + graph = openWithPartitions(); + + // ---- schema: a partitioned "hub" label and a plain edge label ---- + JanusGraphManagement mgmt = graph.openManagement(); + mgmt.makeVertexLabel("hub").partition().make(); + mgmt.makeEdgeLabel("link").make(); + mgmt.commit(); + + // ---- create the partitioned hub, then add leaves across MANY separate transactions ---- + // JanusGraph assigns a partition per transaction, so committing each leaf separately spreads + // the leaves -- and therefore the hub's incident edges -- across multiple partitions. That is + // what produces multiple physical representative rows for the partitioned hub. Creating + // everything in a single transaction would cluster them into one partition (and make a + // "more than one representative" assertion flaky). + JanusGraphVertex hub = graph.addVertex("hub"); + graph.tx().commit(); + final long hubId = (long) hub.id(); + + final IDManager idManager = ((StandardJanusGraph) graph).getIDManager(); + // confirm the hub is genuinely a partitioned vertex + assertTrue(idManager.isPartitionedVertex(hubId), + "hub must be assigned a partitioned-vertex id"); + + for (int i = 0; i < 24; i++) { + graph.traversal().V(hubId).next().addEdge("link", graph.addVertex()); + graph.tx().commit(); + } + + // The hub's physical representative rows (canonical + per-partition), as storage keys. + final Set representativeKeys = + Arrays.stream(idManager.getPartitionedVertexRepresentatives(hubId)) + .mapToObj(repId -> idManager.getKey(repId)) + .collect(Collectors.toSet()); + + // Which of the hub's representative rows actually received data during creation. + final Set hubRowsWithData = + WholeRowDeletionCapturingStoreManager.CAPTURED.stream() + .filter(c -> c.store.equals(EDGESTORE_NAME) && representativeKeys.contains(c.key)) + .map(c -> c.key) + .collect(Collectors.toSet()); + + // The multi-transaction setup must have spread the hub across more than one physical row, + // otherwise this test would not actually exercise multi-representative deletion. + assertTrue(hubRowsWithData.size() > 1, + "test setup must spread the partitioned hub across multiple representative rows, got " + + hubRowsWithData.size()); + + // ---- reset captures so we only observe the deletion commit, then remove the hub ---- + WholeRowDeletionCapturingStoreManager.reset(); + graph.traversal().V(hubId).next().remove(); + graph.tx().commit(); + + // ---- assertions ---- + final List edgeStoreWholeRows = + WholeRowDeletionCapturingStoreManager.CAPTURED.stream() + .filter(c -> c.store.equals(EDGESTORE_NAME) && c.wholeRow) + .collect(Collectors.toList()); + final Set deletedKeys = edgeStoreWholeRows.stream() + .map(c -> c.key) + .collect(Collectors.toSet()); + + // Removal must whole-row delete EXACTLY the representative rows that held hub data: no row + // missed (the regression this test guards) and no unrelated row touched. + assertEquals(hubRowsWithData, deletedKeys, + "whole-row deletion must cover exactly the partitioned hub's populated representative rows"); + + // Each whole-row-deleted row carries 0 per-column deletions (the flag replaces them). + assertTrue(edgeStoreWholeRows.stream().allMatch(c -> c.deletions == 0), + "whole-row-deleted rows must have 0 per-column deletions"); + + // Graph correctness: the hub is gone. + assertFalse(graph.traversal().V(hubId).hasNext(), + "partitioned hub vertex must be gone after removal"); + } +} diff --git a/janusgraph-inmemory/src/test/java/org/janusgraph/graphdb/inmemory/RemovedVertexTrackingTest.java b/janusgraph-inmemory/src/test/java/org/janusgraph/graphdb/inmemory/RemovedVertexTrackingTest.java new file mode 100644 index 0000000000..db160fe15d --- /dev/null +++ b/janusgraph-inmemory/src/test/java/org/janusgraph/graphdb/inmemory/RemovedVertexTrackingTest.java @@ -0,0 +1,76 @@ +// Copyright 2017 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.graphdb.inmemory; + +import org.janusgraph.core.JanusGraph; +import org.janusgraph.core.JanusGraphFactory; +import org.janusgraph.core.JanusGraphVertex; +import org.janusgraph.graphdb.transaction.StandardJanusGraphTx; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RemovedVertexTrackingTest { + + private JanusGraph graph; + + @BeforeEach + public void open() { + graph = JanusGraphFactory.build().set("storage.backend", "inmemory").open(); + } + + @AfterEach + public void close() { + if (graph != null) graph.close(); + } + + @Test + public void removedVertexIsTracked() { + Object id; + try (org.janusgraph.core.JanusGraphTransaction setup = graph.newTransaction()) { + JanusGraphVertex v = setup.addVertex(); + setup.commit(); + id = v.id(); + } + StandardJanusGraphTx tx = (StandardJanusGraphTx) graph.newTransaction(); + try { + assertFalse(tx.isVertexFullyRemoved(id)); + JanusGraphVertex v = (JanusGraphVertex) tx.vertices(id).next(); + v.remove(); + assertTrue(tx.isVertexFullyRemoved(id)); + } finally { + tx.rollback(); + } + } + + @Test + public void removeViaStaleCrossTransactionReferenceDoesNotThrow() { + // Obtain a vertex in one (thread-bound) transaction, commit it -- closing that transaction -- + // then remove the now-stale reference. remove() must resolve it() to the new transaction + // without throwing "was removed". Regression test: recordRemovedVertex must not re-resolve + // it() after the lifecycle has been flipped to REMOVED. + final JanusGraphVertex v = graph.addVertex(); + final Object id = v.id(); + graph.tx().commit(); + + v.remove(); + graph.tx().commit(); + + assertFalse(graph.traversal().V(id).hasNext()); + } +} diff --git a/janusgraph-inmemory/src/test/java/org/janusgraph/graphdb/inmemory/WholeRowDeletionOptimizationTest.java b/janusgraph-inmemory/src/test/java/org/janusgraph/graphdb/inmemory/WholeRowDeletionOptimizationTest.java new file mode 100644 index 0000000000..e7edb46c43 --- /dev/null +++ b/janusgraph-inmemory/src/test/java/org/janusgraph/graphdb/inmemory/WholeRowDeletionOptimizationTest.java @@ -0,0 +1,114 @@ +// Copyright 2024 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.graphdb.inmemory; + +import org.janusgraph.core.JanusGraph; +import org.janusgraph.core.JanusGraphFactory; +import org.janusgraph.core.JanusGraphVertex; +import org.janusgraph.diskstorage.StaticBuffer; +import org.janusgraph.diskstorage.inmemory.WholeRowDeletionCapturingStoreManager; +import org.janusgraph.graphdb.database.StandardJanusGraph; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import static org.janusgraph.diskstorage.Backend.EDGESTORE_NAME; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class WholeRowDeletionOptimizationTest { + + private JanusGraph graph; + + private JanusGraph open(boolean enabled) { + return JanusGraphFactory.build() + .set("storage.backend", WholeRowDeletionCapturingStoreManager.class.getName()) + .set("storage.drop-whole-row-on-vertex-removal", enabled) + .open(); + } + + @AfterEach + public void close() { + try { + if (graph != null) { + graph.close(); + } + } finally { + WholeRowDeletionCapturingStoreManager.reset(); + } + } + + private long createSuperNode(int leaves) { + JanusGraphVertex hub = graph.addVertex(); + for (int i = 0; i < leaves; i++) { + hub.addEdge("link", graph.addVertex()); + } + graph.tx().commit(); + return (long) hub.id(); + } + + @Test + public void superNodeRemovalUsesWholeRowDeletion() { + graph = open(true); + long hubId = createSuperNode(50); + StaticBuffer hubKey = ((StandardJanusGraph) graph).getIDManager().getKey(hubId); + WholeRowDeletionCapturingStoreManager.reset(); + + graph.traversal().V(hubId).next().remove(); + graph.tx().commit(); + + long wholeRowEdgeStore = WholeRowDeletionCapturingStoreManager.CAPTURED.stream() + .filter(c -> c.store.equals(EDGESTORE_NAME) && c.wholeRow).count(); + assertEquals(1, wholeRowEdgeStore, "exactly one edge-store row (the hub) deleted as whole-row"); + WholeRowDeletionCapturingStoreManager.CAPTURED.stream() + .filter(c -> c.store.equals(EDGESTORE_NAME) && c.key.equals(hubKey)) + .forEach(c -> { + assertTrue(c.wholeRow); + assertEquals(0, c.deletions); + }); + // graph state correct + assertFalse(graph.traversal().V(hubId).hasNext()); + } + + @Test + public void disabledFlagFallsBackToColumnDeletes() { + graph = open(false); + long hubId = createSuperNode(50); + WholeRowDeletionCapturingStoreManager.reset(); + + graph.traversal().V(hubId).next().remove(); + graph.tx().commit(); + + assertEquals(0, WholeRowDeletionCapturingStoreManager.CAPTURED.stream() + .filter(c -> c.wholeRow).count(), "no whole-row deletes when flag disabled"); + assertFalse(graph.traversal().V(hubId).hasNext()); + } + + @Test + public void partialEdgeRemovalDoesNotWholeRowDelete() { + graph = open(true); + long hubId = createSuperNode(5); + WholeRowDeletionCapturingStoreManager.reset(); + + // remove ONE edge, keep the vertex + JanusGraphVertex hub = (JanusGraphVertex) graph.traversal().V(hubId).next(); + hub.edges(org.apache.tinkerpop.gremlin.structure.Direction.OUT).next().remove(); + graph.tx().commit(); + + assertEquals(0, WholeRowDeletionCapturingStoreManager.CAPTURED.stream() + .filter(c -> c.wholeRow).count(), "surviving vertex must not be whole-row deleted"); + assertTrue(graph.traversal().V(hubId).hasNext()); + } +}