Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,18 @@ graph.management-ack-timeout=240000 ms
```
This is a breaking change for users who use the `JanusGraphIndexStatusUpdate` interface.

##### Whole-row deletion on vertex removal (super-node tombstone reduction)

Starting from version 1.2.0, when a vertex is removed JanusGraph deletes its entire storage row in a single
operation on backends that support it (CQL/Cassandra issues one partition-level delete instead of one
column delete per incident edge), drastically reducing tombstone pressure when removing super-nodes.

This behavior is enabled by default. To restore the previous per-column deletion behavior, set:

```
storage.drop-whole-row-on-vertex-removal=false
```

### Version 1.1.0 (Release Date: November 7, 2024)

/// tab | Maven
Expand Down
1 change: 1 addition & 0 deletions docs/configs/janusgraph-cfg.md
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ Configuration options for the storage backend. Some options are applicable only
| storage.connection-timeout | Default timeout, in milliseconds, when connecting to a remote database instance | Duration | 10000 ms | MASKABLE |
| storage.directory | Storage directory for those storage backends that require local storage. | String | (no default value) | LOCAL |
| storage.drop-on-clear | Whether to drop the graph database (true) or delete rows (false) when clearing storage. Note that some backends always drop the graph database when clearing storage. Also note that indices are always dropped when clearing storage. | Boolean | true | MASKABLE |
| storage.drop-whole-row-on-vertex-removal | When a vertex is removed, delete its entire storage row in a single operation (e.g. a Cassandra partition-level delete producing one tombstone) instead of deleting each edge/property column individually. This greatly reduces tombstone pressure when removing super-nodes. Only takes effect on storage backends that support optimized whole-row deletion; ignored otherwise. | Boolean | true | MASKABLE |
| storage.hostname | The hostname or comma-separated list of hostnames of storage backend servers. This is only applicable to some storage backends, such as cassandra and hbase. | String[] | 127.0.0.1 | LOCAL |
| storage.keys-size | The maximum amount of keys/partitions to retrieve from distributed storage system by JanusGraph in a single request. | Integer | 100 | MASKABLE |
| storage.num-mutations-parallel-threshold | This parameter determines the minimum number of mutations a transaction must have before parallel processing is applied during aggregation. Leveraging parallel processing can enhance the commit times for transactions involving a large number of mutations. However, it is advisable not to set the threshold too low (e.g., 0 or 1) due to the overhead associated with parallelism synchronization. This overhead is more efficiently offset in the context of larger transactions. | Integer | 100 | MASKABLE |
Expand Down
9 changes: 9 additions & 0 deletions docs/storage-backend/cassandra.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,15 @@ More information on Cassandra consistency levels and acceptable
values can be found [here](https://docs.datastax.com/en/cassandra-oss/3.x/cassandra/dml/dmlDataConsistencyTOC.html).
In general, higher levels are more consistent and robust but have higher latency.

### Super-node deletion (whole-row delete)

Removing a vertex with many incident edges previously issued one column-level `DELETE` per edge on the
vertex's partition, creating a large number of cell tombstones that can overwhelm reads on that partition.
As of 1.2.0, when a vertex is removed JanusGraph instead issues a single partition-level
`DELETE ... WHERE key = ?`, producing one tombstone for the whole partition. This is controlled by
`storage.drop-whole-row-on-vertex-removal` (default `true`). Set it to `false` to restore the previous
per-column behavior.

## Global Graph Operations

JanusGraph over Cassandra supports global vertex and edge iteration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,11 @@ public void logMutations(DataOutput out) {
* @param deletions List of columns to be removed
*/
public void mutateEdges(StaticBuffer key, List<Entry> additions, List<Entry> deletions) throws BackendException {
edgeStore.mutateEntries(key, additions, deletions, storeTx);
mutateEdges(key, additions, deletions, false);
}

public void mutateEdges(StaticBuffer key, List<Entry> additions, List<Entry> deletions, boolean wholeRowDeletion) throws BackendException {
edgeStore.mutateEntries(key, additions, deletions, wholeRowDeletion, storeTx);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public abstract class Mutation<E,K> {

private List<K> deletions;

private boolean wholeRowDeletion = false;

public Mutation(List<E> additions, List<K> deletions) {
assert additions!=null;
assert deletions!=null;
Expand Down Expand Up @@ -97,6 +99,21 @@ public List<K> getDeletions() {
return deletions;
}

/**
* Whether this mutation deletes the entire row/partition for its key. When set, the backend
* (if it advertises {@code StoreFeatures.hasOptimizedWholeRowDeletion()}) deletes all columns
* for the key in a single operation before applying any additions, instead of deleting each
* column individually.
*/
public boolean hasWholeRowDeletion() {
return wholeRowDeletion;
}

/** Marks whether this mutation deletes the entire row/partition for its key. @see #hasWholeRowDeletion() */
public void setWholeRowDeletion(boolean wholeRowDeletion) {
this.wholeRowDeletion = wholeRowDeletion;
}

/**
* Adds a new entry as an addition to this mutation
*
Expand Down Expand Up @@ -135,14 +152,16 @@ public void merge(Mutation<E,K> m) {
if (null == deletions) deletions = m.deletions;
else deletions.addAll(m.deletions);
}

this.wholeRowDeletion = this.wholeRowDeletion || m.wholeRowDeletion;
}

public boolean isEmpty() {
return getTotalMutations()==0;
}

public int getTotalMutations() {
return (additions==null?0:additions.size()) + (deletions==null?0:deletions.size());
return (additions==null?0:additions.size()) + (deletions==null?0:deletions.size()) + (wholeRowDeletion?1:0);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class StandardStoreFeatures implements StoreFeatures {
private final Configuration scanTxConfig;
private final boolean supportsInterruption;
private final boolean optimisticLocking;
private final boolean optimizedWholeRowDeletion;

@Override
public boolean hasScan() {
Expand Down Expand Up @@ -159,6 +160,11 @@ public boolean hasOptimisticLocking() {
return optimisticLocking;
}

@Override
public boolean hasOptimizedWholeRowDeletion() {
return optimizedWholeRowDeletion;
}

/**
* The only way to instantiate {@link StandardStoreFeatures}.
*/
Expand Down Expand Up @@ -186,6 +192,7 @@ public static class Builder {
private Configuration scanTxConfig;
private boolean supportsInterruption = true;
private boolean optimisticLocking;
private boolean optimizedWholeRowDeletion;

/**
* Construct a Builder with everything disabled/unsupported/false/null.
Expand Down Expand Up @@ -219,13 +226,19 @@ public Builder(StoreFeatures template) {
scanTxConfig(template.getScanTxConfig());
supportsInterruption(template.supportsInterruption());
optimisticLocking(template.hasOptimisticLocking());
optimizedWholeRowDeletion(template.hasOptimizedWholeRowDeletion());
}

public Builder optimisticLocking(boolean b) {
optimisticLocking = b;
return this;
}

public Builder optimizedWholeRowDeletion(boolean b) {
optimizedWholeRowDeletion = b;
return this;
}

public Builder consistentScan(boolean consistentScan) {
this.consistentScan = consistentScan;
return this;
Expand Down Expand Up @@ -343,7 +356,8 @@ public StandardStoreFeatures build() {
timestamps, preferredTimestamps, cellLevelTTL,
storeLevelTTL, visibility, supportsPersist,
keyConsistentTxConfig,
localKeyConsistentTxConfig, scanTxConfig, supportsInterruption, optimisticLocking);
localKeyConsistentTxConfig, scanTxConfig, supportsInterruption, optimisticLocking,
optimizedWholeRowDeletion);
}
}

Expand All @@ -356,7 +370,8 @@ private StandardStoreFeatures(boolean consistentScan, boolean unorderedScan, boo
boolean visibility, boolean supportsPersist,
Configuration keyConsistentTxConfig,
Configuration localKeyConsistentTxConfig,
Configuration scanTxConfig, boolean supportsInterruption, boolean optimisticLocking) {
Configuration scanTxConfig, boolean supportsInterruption, boolean optimisticLocking,
boolean optimizedWholeRowDeletion) {
this.consistentScan = consistentScan;
this.unorderedScan = unorderedScan;
this.orderedScan = orderedScan;
Expand All @@ -379,5 +394,6 @@ private StandardStoreFeatures(boolean consistentScan, boolean unorderedScan, boo
this.scanTxConfig = scanTxConfig;
this.supportsInterruption = supportsInterruption;
this.optimisticLocking = optimisticLocking;
this.optimizedWholeRowDeletion = optimizedWholeRowDeletion;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -223,4 +223,14 @@ public interface StoreFeatures {
*/
boolean hasOptimisticLocking();

/**
* Whether this storage backend can delete an entire row/partition for a key in a single
* efficient operation, honoring {@link org.janusgraph.diskstorage.Mutation#hasWholeRowDeletion()}.
* Backends returning true must, when a mutation has this flag set, delete every column for the
* key (e.g. a Cassandra partition-level DELETE) rather than enumerating columns.
*
* @return true if the backend supports optimized whole-row deletion, else false
*/
boolean hasOptimizedWholeRowDeletion();

}
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,15 @@ public StoreTransaction getWrappedTransaction() {
}

void mutate(KCVSCache store, StaticBuffer key, List<Entry> additions, List<Entry> deletions) throws BackendException {
mutate(store, key, additions, deletions, false);
}

void mutate(KCVSCache store, StaticBuffer key, List<Entry> additions, List<Entry> deletions, boolean wholeRowDeletion) throws BackendException {
Preconditions.checkNotNull(store);
if (additions.isEmpty() && deletions.isEmpty()) return;
if (additions.isEmpty() && deletions.isEmpty() && !wholeRowDeletion) return;

KCVEntryMutation m = new KCVEntryMutation(additions, deletions);
if (wholeRowDeletion) m.setWholeRowDeletion(true);
final Map<StaticBuffer, KCVEntryMutation> storeMutation = mutations.computeIfAbsent(store, k -> new HashMap<>());
KCVEntryMutation existingM = storeMutation.get(key);
if (existingM != null) {
Expand Down Expand Up @@ -121,8 +126,9 @@ public String toString() {

private KCVMutation convert(KCVEntryMutation mutation) {
assert !mutation.isEmpty();
final KCVMutation result;
if (mutation.hasDeletions()) {
return new KCVMutation(
result = new KCVMutation(
() -> new ArrayList<>(mutation.getAdditions()),
() -> {
List<Entry> deletions = mutation.getDeletions();
Expand All @@ -132,8 +138,11 @@ private KCVMutation convert(KCVEntryMutation mutation) {
}
return convertedDeletions;
});
} else {
result = new KCVMutation(mutation.getAdditions(), KeyColumnValueStore.NO_DELETIONS);
}
return new KCVMutation(mutation.getAdditions(), KeyColumnValueStore.NO_DELETIONS);
result.setWholeRowDeletion(mutation.hasWholeRowDeletion());
return result;
}

private void flushInternal() throws BackendException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,12 @@ public void mutate(StaticBuffer key, List<Entry> additions, List<StaticBuffer> d
}

public void mutateEntries(StaticBuffer key, List<Entry> additions, List<Entry> deletions, StoreTransaction txh) throws BackendException {
mutateEntries(key, additions, deletions, false, txh);
}

public void mutateEntries(StaticBuffer key, List<Entry> additions, List<Entry> deletions, boolean wholeRowDeletion, StoreTransaction txh) throws BackendException {
assert txh instanceof CacheTransaction;
((CacheTransaction) txh).mutate(this, key, additions, deletions);
((CacheTransaction) txh).mutate(this, key, additions, deletions, wholeRowDeletion);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,13 @@ public boolean apply(@Nullable String s) {
"Whether to enable batch loading into the storage backend",
ConfigOption.Type.LOCAL, false);

public static final ConfigOption<Boolean> DROP_WHOLE_ROW_ON_VERTEX_REMOVAL = new ConfigOption<>(STORAGE_NS,"drop-whole-row-on-vertex-removal",
"When a vertex is removed, delete its entire storage row in a single operation (e.g. a Cassandra " +
"partition-level delete producing one tombstone) instead of deleting each edge/property column " +
"individually. This greatly reduces tombstone pressure when removing super-nodes. Only takes effect " +
"on storage backends that support optimized whole-row deletion; ignored otherwise.",
ConfigOption.Type.MASKABLE, true);

/**
* Enables transactions on storage backends that support them
*/
Expand Down Expand Up @@ -1466,6 +1473,7 @@ public boolean apply(@Nullable String s) {
private boolean allowVertexIdSetting;
private boolean allowCustomVertexIdType;
private boolean logTransactions;
private boolean dropWholeRowOnVertexRemoval;
private String metricsPrefix;
private String unknownIndexKeyName;
private MultiQueryHasStepStrategyMode hasStepStrategyMode;
Expand Down Expand Up @@ -1627,6 +1635,10 @@ public boolean hasLogTransactions() {
return logTransactions;
}

public boolean getDropWholeRowOnVertexRemoval() {
return dropWholeRowOnVertexRemoval;
}

public TimestampProvider getTimestampProvider() {
return configuration.get(TIMESTAMP_PROVIDER);
}
Expand Down Expand Up @@ -1794,6 +1806,7 @@ private void preLoadConfiguration() {
}

logTransactions = configuration.get(SYSTEM_LOG_TRANSACTIONS);
dropWholeRowOnVertexRemoval = configuration.get(DROP_WHOLE_ROW_ON_VERTEX_REMOVAL);

unknownIndexKeyName = configuration.get(IGNORE_UNKNOWN_INDEX_FIELD) ? UNKNOWN_FIELD_NAME : null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ public class StandardJanusGraph extends JanusGraphBlueprintsGraph {
private final GraphDatabaseConfiguration config;
private final Backend backend;
private final IDManager idManager;
private final boolean wholeRowDeletionEnabled;
private final VertexIDAssigner idAssigner;
private final TimestampProvider times;
private final CacheInvalidationService cacheInvalidationService;
Expand Down Expand Up @@ -217,6 +218,8 @@ public StandardJanusGraph(GraphDatabaseConfiguration configuration) {

this.idAssigner = config.getIDAssigner(backend);
this.idManager = idAssigner.getIDManager();
this.wholeRowDeletionEnabled = configuration.getDropWholeRowOnVertexRemoval()
&& backend.getStoreFeatures().hasOptimizedWholeRowDeletion();

this.cacheInvalidationService = new KCVSCacheInvalidationService(
backend.getEdgeStoreCache(), backend.getIndexStoreCache(), idManager);
Expand Down Expand Up @@ -856,8 +859,18 @@ private void prepareCommitAddRelationMutations(final ListMultimap<Object, Intern
for (Object vertexId : mutations.keySet()) {
IDUtils.checkId(vertexId);
final List<InternalRelation> edges = mutations.get(vertexId);
final List<Entry> additions = new ArrayList<>(edges.size());
final List<Entry> deletions = new ArrayList<>(Math.max(10, edges.size() / 10));

final Object canonicalId = idManager.isPartitionedVertex(vertexId)
? idManager.getCanonicalVertexId(((Number) vertexId).longValue())
: vertexId;
final boolean wholeRowDeletion = wholeRowDeletionEnabled && tx.isVertexFullyRemoved(canonicalId);

// When the whole row is deleted, per-column deletions are skipped and a fully-removed
// vertex has no additions, so both lists stay empty. Avoid preallocating large backing
// arrays sized to edges.size() for super-node removals.
final List<Entry> additions = new ArrayList<>(wholeRowDeletion ? 0 : edges.size());
final List<Entry> deletions = new ArrayList<>(wholeRowDeletion ? 0 : Math.max(10, edges.size() / 10));

for (final InternalRelation edge : edges) {
final InternalRelationType baseType = (InternalRelationType) edge.getType();
assert baseType.getBaseType()==null;
Expand All @@ -868,11 +881,18 @@ private void prepareCommitAddRelationMutations(final ListMultimap<Object, Intern
if (!type.isUnidirected(Direction.BOTH) && !type.isUnidirected(EdgeDirection.fromPosition(pos)))
continue; //Directionality is not covered
if (edge.getVertex(pos).id().equals(vertexId)) {
StaticArrayEntry entry = edgeSerializer.writeRelation(edge, type, pos, tx);
if (edge.isRemoved()) {
deletions.add(entry);
// Skip serializing per-column deletions entirely when a whole-row delete will be issued:
// writeRelation() is not called, so removing a super-node avoids serializing every incident
// edge (CPU + allocations) only to discard the result. wholeRowDeletion is true only when the
// backend advertises the capability, so backends without it always serialize the full
// per-column deletions list (no data loss).
if (!wholeRowDeletion) {
deletions.add(edgeSerializer.writeRelation(edge, type, pos, tx));
}
} else {
Preconditions.checkArgument(edge.isNew());
StaticArrayEntry entry = edgeSerializer.writeRelation(edge, type, pos, tx);
int ttl = getTTL(edge);
if (ttl > 0) {
entry.setMetaData(EntryMetaData.TTL, ttl);
Expand All @@ -885,7 +905,7 @@ private void prepareCommitAddRelationMutations(final ListMultimap<Object, Intern
}

StaticBuffer vertexKey = idManager.getKey(vertexId);
mutator.mutateEdges(vertexKey, additions, deletions);
mutator.mutateEdges(vertexKey, additions, deletions, wholeRowDeletion);
}
}

Expand Down
Loading
Loading