diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java index 64843370fcdb..0117f7844617 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java @@ -83,6 +83,7 @@ import org.apache.pinot.segment.local.startree.v2.builder.StarTreeV2BuilderConfig; import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager; import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager; +import org.apache.pinot.segment.local.upsert.UpsertContext; import org.apache.pinot.segment.local.utils.SegmentLocks; import org.apache.pinot.segment.local.utils.SegmentOperationsThrottler; import org.apache.pinot.segment.local.utils.SegmentOperationsThrottlerSet; @@ -94,6 +95,7 @@ import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.segment.spi.SegmentContext; import org.apache.pinot.segment.spi.SegmentMetadata; +import org.apache.pinot.segment.spi.V1Constants; import org.apache.pinot.segment.spi.datasource.DataSource; import org.apache.pinot.segment.spi.index.FieldIndexConfigs; import org.apache.pinot.segment.spi.index.FieldIndexConfigsUtil; @@ -106,6 +108,7 @@ import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry; import org.apache.pinot.segment.spi.partition.PartitionFunction; import org.apache.pinot.segment.spi.store.SegmentDirectory; +import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths; import org.apache.pinot.spi.auth.AuthProvider; import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; import org.apache.pinot.spi.config.table.ColumnPartitionConfig; @@ -1108,6 +1111,47 @@ public void reloadSegment(String segmentName, IndexLoadingConfig indexLoadingCon _logger.warn("Skipping reload for segment: {} — concurrently offloaded after dispatch", segmentName); return; } + // For an upsert table with metadata TTL and snapshots enabled, a reload must not blindly re-scan the segment and + // re-add every primary key, as that would resurrect keys already expired (metadataTTL) or deleted + // (deletedKeysTTL). Instead we rebuild the upsert metadata from the persisted validDocIds snapshot (valid docs + // only). The snapshot is indexed by docId, so it is only meaningful when the segment content (CRC) is unchanged: + // - normal reload: only proceed when the CRC matches and a snapshot exists, else fail the reload; + // - forceDownload reload: always proceed, using the snapshot if present (operator accepts a possible CRC + // mismatch), otherwise fall back to a regular full scan of the downloaded segment. + // Gated on snapshots being enabled so tables that never persist a snapshot (e.g. deletedKeysTTL-only without + // snapshot) keep the regular reload behavior instead of always failing. + byte[] validDocIdsSnapshotBytes = null; + UpsertContext upsertContext = isUpsertEnabled() ? _tableUpsertMetadataManager.getContext() : null; + if (upsertContext != null && upsertContext.isSnapshotEnabled() + && (upsertContext.getMetadataTTL() > 0 || upsertContext.getDeletedKeysTTL() > 0)) { + File snapshotFile = new File(SegmentDirectoryPaths.findSegmentDirectory(indexDir), + V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME); + boolean snapshotExists = snapshotFile.exists(); + boolean crcMatch = hasSameCRC(zkMetadata, localMetadata); + if (!forceDownload && (!crcMatch || !snapshotExists)) { + throw new IllegalStateException(String.format( + "Failing reload for segment: %s of upsert table with metadata TTL: %s because %s. Use a forceDownload " + + "reload if you are okay with a CRC mismatch on the segment and confident the local validDocIds " + + "snapshot's valid docs map to the deep-store segment; otherwise the forceDownload reload will scan " + + "the entire segment data.", segmentName, _tableNameWithType, + !crcMatch ? "the segment CRC has changed from: " + localMetadata.getCrc() + " to: " + zkMetadata.getCrc() + : "no validDocIds snapshot is available")); + } + if (snapshotExists) { + if (forceDownload && !crcMatch) { + _logger.warn("forceDownload reload of segment: {} will rebuild upsert metadata from the local validDocIds " + + "snapshot despite a CRC change (local: {}, ZK: {}); snapshot valid docs are assumed to map to the " + + "downloaded segment", segmentName, localMetadata.getCrc(), zkMetadata.getCrc()); + } + // Capture the snapshot before createBackup() moves the current segment directory aside, so it can be restored + // into the freshly downloaded/copied segment directory below and drive the upsert metadata rebuild. + validDocIdsSnapshotBytes = FileUtils.readFileToByteArray(snapshotFile); + } else { + // forceDownload with no snapshot: fall back to a full scan, which may resurrect TTL-expired/deleted keys. + _logger.warn("forceDownload reload of segment: {} has no validDocIds snapshot; rebuilding upsert metadata " + + "with a full scan, which may resurrect TTL-expired or deleted keys", segmentName); + } + } /* Determines if a segment should be downloaded from deep storage based on: 1. A forced download flag. @@ -1159,6 +1203,17 @@ public void reloadSegment(String segmentName, IndexLoadingConfig indexLoadingCon } } + // Restore the captured validDocIds snapshot into the reloaded segment directory (download/copyTo paths drop it, + // as the deep-store copy and the copied index do not carry the server-local snapshot). This lets the upsert + // metadata be rebuilt from only the valid docs. The captured local snapshot is authoritative, so overwrite any + // snapshot that may have come with a downloaded segment. Only the reload path places a snapshot on the incoming + // segment; segment commits and uploads never do, so they are unaffected by the snapshot-based rebuild. + if (validDocIdsSnapshotBytes != null) { + File restoredSnapshotFile = new File(SegmentDirectoryPaths.findSegmentDirectory(indexDir), + V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME); + FileUtils.writeByteArrayToFile(restoredSnapshotFile, validDocIdsSnapshotBytes); + } + // Load from indexDir and replace the old segment in memory. What's inside indexDir // may come from SegmentDirectory.copyTo() or the segment downloaded from deep store. indexLoadingConfig.setSegmentTier(zkMetadata.getTier()); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java index dc080b173cee..9162e3a608e5 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java @@ -611,8 +611,27 @@ protected void doReplaceSegment(ImmutableSegment segment, IndexSegment oldSegmen // we can't skip segment even if it's out of TTL as its validDocIds bitmap is not updated yet. } try (UpsertUtils.RecordInfoReader recordInfoReader = createRecordInfoReader(segment)) { - Iterator recordInfoIterator = - UpsertUtils.getRecordInfoIterator(recordInfoReader, segment.getSegmentMetadata().getTotalDocs()); + // Reload-only fast path for an upsert + TTL table. The incoming segment carries a validDocIds snapshot ONLY when + // the reload flow placed it there (see BaseTableDataManager.reloadSegment); segment commits and uploads always + // build a fresh segment without one and fall through to the full scan below, unaffected. Rebuilding from just the + // snapshot's valid docs avoids resurrecting primary keys already expired/deleted by TTL. + MutableRoaringBitmap validDocIdsSnapshot = null; + if (isTTLEnabled() && segment instanceof ImmutableSegmentImpl) { + validDocIdsSnapshot = + ((ImmutableSegmentImpl) segment).loadDocIdsFromSnapshot(V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME); + } + Iterator recordInfoIterator; + if (validDocIdsSnapshot != null) { + // Defensively drop any docIds beyond this segment's range in case the snapshot was taken on a different + // (CRC-mismatched) copy via a forceDownload reload. + validDocIdsSnapshot.remove((long) segment.getSegmentMetadata().getTotalDocs(), 0x1_0000_0000L); + _logger.info("Replacing segment: {} on reload using validDocIds snapshot with {} valid docs", segmentName, + validDocIdsSnapshot.getCardinality()); + recordInfoIterator = UpsertUtils.getRecordInfoIterator(recordInfoReader, validDocIdsSnapshot); + } else { + recordInfoIterator = + UpsertUtils.getRecordInfoIterator(recordInfoReader, segment.getSegmentMetadata().getTotalDocs()); + } replaceSegment(segment, null, null, recordInfoIterator, oldSegment); } catch (Exception e) { throw new RuntimeException( diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java index f9c58b83949e..c5e8f0c4b1de 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java @@ -290,6 +290,92 @@ public void testManageWatermark() upsertMetadataManager.close(); } + @Test + public void testReplaceSegmentOnReloadRebuildsFromSnapshot() + throws Exception { + // Upsert + metadata TTL with snapshots enabled: a reload must rebuild upsert metadata from the validDocIds + // snapshot (valid docs only), so docs invalidated since the segment was first loaded are not resurrected. + _contextBuilder.setEnableSnapshot(true).setMetadataTTL(30); + ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = + new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, _contextBuilder.build()); + Map recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; + + String segmentName = "reload_snapshot_segment"; + int[] primaryKeys = new int[]{10, 30, 40}; + int[] timestamps = new int[]{1500, 3500, 4000}; + + // Initial load of the segment with 3 valid docs. + ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap(); + ImmutableSegmentImpl segment1 = createRealSegment(segmentName, primaryKeys, timestamps, validDocIds1); + upsertMetadataManager.addSegment(segment1); + assertEquals(recordLocationMap.size(), 3); + + // Reload: a fresh copy of the same segment that carries a validDocIds snapshot marking only docs {0, 2} valid + // (doc 1 / PK 30 was invalidated). The reload must rebuild from the snapshot, not rescan all docs. + ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap(); + ImmutableSegmentImpl segment2 = createRealSegment(segmentName, primaryKeys, timestamps, validDocIds2); + ThreadSafeMutableRoaringBitmap snapshot = new ThreadSafeMutableRoaringBitmap(); + snapshot.add(0); + snapshot.add(2); + segment2.persistDocIdsSnapshot(V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME, snapshot.getBytesAndCardinality()); + + upsertMetadataManager.replaceSegment(segment2, segment1); + + // PK 30 (doc 1) is not resurrected; only PK 10 and PK 40 remain, now in the reloaded segment. + assertEquals(recordLocationMap.size(), 2); + checkRecordLocation(recordLocationMap, 10, segment2, 0, 1500, HashFunction.NONE); + checkRecordLocation(recordLocationMap, 40, segment2, 2, 4000, HashFunction.NONE); + assertNull(recordLocationMap.get(HashUtils.hashPrimaryKey(makePrimaryKey(30), HashFunction.NONE))); + assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(), 2); + + upsertMetadataManager.stop(); + upsertMetadataManager.close(); + segment1.destroy(); + segment2.destroy(); + } + + @Test + public void testReplaceSegmentOnReloadTrimsSnapshotDocIdsBeyondSegment() + throws Exception { + // A forceDownload reload may apply a snapshot captured on a CRC-mismatched copy; docIds beyond the new segment's + // range must be dropped so they are not read as phantom (out-of-range) records. + _contextBuilder.setEnableSnapshot(true).setMetadataTTL(30); + ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = + new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, _contextBuilder.build()); + Map recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; + + String segmentName = "reload_snapshot_trim_segment"; + int[] primaryKeys = new int[]{10, 30, 40}; + int[] timestamps = new int[]{1500, 3500, 4000}; + + ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap(); + ImmutableSegmentImpl segment1 = createRealSegment(segmentName, primaryKeys, timestamps, validDocIds1); + upsertMetadataManager.addSegment(segment1); + assertEquals(recordLocationMap.size(), 3); + + // Snapshot references docId 5, out of range for this 3-doc segment, plus valid docs 0 and 2. + ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap(); + ImmutableSegmentImpl segment2 = createRealSegment(segmentName, primaryKeys, timestamps, validDocIds2); + ThreadSafeMutableRoaringBitmap snapshot = new ThreadSafeMutableRoaringBitmap(); + snapshot.add(0); + snapshot.add(2); + snapshot.add(5); + segment2.persistDocIdsSnapshot(V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME, snapshot.getBytesAndCardinality()); + + // Without the docId trim, reading docId 5 from a 3-doc segment would throw; the trim keeps only docs {0, 2}. + upsertMetadataManager.replaceSegment(segment2, segment1); + + assertEquals(recordLocationMap.size(), 2); + checkRecordLocation(recordLocationMap, 10, segment2, 0, 1500, HashFunction.NONE); + checkRecordLocation(recordLocationMap, 40, segment2, 2, 4000, HashFunction.NONE); + assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(), 2); + + upsertMetadataManager.stop(); + upsertMetadataManager.close(); + segment1.destroy(); + segment2.destroy(); + } + @Test public void testGetQueryableDocIds() { _contextBuilder.setDeleteRecordColumn(DELETE_RECORD_COLUMN);