From 5623fa93e5774eaac87274793a171427d970d257 Mon Sep 17 00:00:00 2001 From: Chaitanya Deepthi Date: Thu, 25 Jun 2026 15:58:54 -0700 Subject: [PATCH 1/8] Add includeBitmaps option to validDocIdsMetadata endpoint --- .../resources/ValidDocIdsMetadataInfo.java | 32 +++++++++++++++++- .../util/ServerSegmentMetadataReader.java | 33 ++++++++++++++++--- .../server/api/resources/TablesResource.java | 14 +++++++- .../pinot/server/api/TablesResourceTest.java | 32 ++++++++++++++++++ 4 files changed, 105 insertions(+), 6 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdsMetadataInfo.java b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdsMetadataInfo.java index 6fd42c86f659..197a3ee3b0ba 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdsMetadataInfo.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdsMetadataInfo.java @@ -18,8 +18,11 @@ */ package org.apache.pinot.common.restlet.resources; +import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; +import javax.annotation.Nullable; import org.apache.pinot.common.utils.ServiceStatus; @@ -35,14 +38,29 @@ public class ValidDocIdsMetadataInfo { private final long _segmentCreationTimeMillis; private final String _instanceId; private final ServiceStatus.Status _serverStatus; + // Optional serialized RoaringBitmap of the validDocIds for the segment. Populated only when the caller of the + // batched validDocIdsMetadata endpoint passes includeBitmaps=true. Null when omitted or when the responding + // server is older and doesn't recognise the flag. Field is JsonInclude.NON_NULL on the getter so payloads stay + // small when bitmaps are not requested. + @Nullable + private final byte[] _bitmap; + public ValidDocIdsMetadataInfo(String segmentName, long totalValidDocs, long totalInvalidDocs, long totalDocs, + String segmentCrc, ValidDocIdsType validDocIdsType, long segmentSizeInBytes, long segmentCreationTimeMillis, + String instanceId, ServiceStatus.Status serverStatus) { + this(segmentName, totalValidDocs, totalInvalidDocs, totalDocs, segmentCrc, validDocIdsType, segmentSizeInBytes, + segmentCreationTimeMillis, instanceId, serverStatus, null); + } + + @JsonCreator public ValidDocIdsMetadataInfo(@JsonProperty("segmentName") String segmentName, @JsonProperty("totalValidDocs") long totalValidDocs, @JsonProperty("totalInvalidDocs") long totalInvalidDocs, @JsonProperty("totalDocs") long totalDocs, @JsonProperty("segmentCrc") String segmentCrc, @JsonProperty("validDocIdsType") ValidDocIdsType validDocIdsType, @JsonProperty("segmentSizeInBytes") long segmentSizeInBytes, @JsonProperty("segmentCreationTimeMillis") long segmentCreationTimeMillis, - @JsonProperty("instanceId") String instanceId, @JsonProperty("serverStatus") ServiceStatus.Status serverStatus) { + @JsonProperty("instanceId") String instanceId, @JsonProperty("serverStatus") ServiceStatus.Status serverStatus, + @JsonProperty("bitmap") @Nullable byte[] bitmap) { _segmentName = segmentName; _totalValidDocs = totalValidDocs; _totalInvalidDocs = totalInvalidDocs; @@ -53,6 +71,7 @@ public ValidDocIdsMetadataInfo(@JsonProperty("segmentName") String segmentName, _segmentCreationTimeMillis = segmentCreationTimeMillis; _instanceId = instanceId; _serverStatus = serverStatus; + _bitmap = bitmap; } public String getSegmentName() { @@ -94,4 +113,15 @@ public String getInstanceId() { public ServiceStatus.Status getServerStatus() { return _serverStatus; } + + /** + * Returns the serialized RoaringBitmap of validDocIds for the segment, or null when not requested by the caller + * (or the responding server is older and doesn't emit it). Callers can deserialize via + * {@code RoaringBitmapUtils.deserialize}. + */ + @JsonInclude(JsonInclude.Include.NON_NULL) + @Nullable + public byte[] getBitmap() { + return _bitmap; + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java index a1669a2882b8..7a71dca7b67d 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java @@ -282,6 +282,20 @@ public Map> getSegmentToValidDocIdsMetadat Map> serverToSegmentsMap, BiMap serverToEndpoints, @Nullable List segmentNames, int timeoutMs, String validDocIdsType, int numSegmentsBatchPerServerRequest) { + return getSegmentToValidDocIdsMetadataFromServer(tableNameWithType, serverToSegmentsMap, serverToEndpoints, + segmentNames, timeoutMs, validDocIdsType, numSegmentsBatchPerServerRequest, false); + } + + /** + * Overload that also lets the caller request the serialized validDocIds bitmap in each per-segment response entry. + * When {@code includeBitmaps} is true, every {@link ValidDocIdsMetadataInfo} returned by the server includes its + * bitmap bytes (see {@link ValidDocIdsMetadataInfo#getBitmap()}). Use sparingly — the response payload grows with + * the total bitmap size across the requested segments. + */ + public Map> getSegmentToValidDocIdsMetadataFromServer(String tableNameWithType, + Map> serverToSegmentsMap, BiMap serverToEndpoints, + @Nullable List segmentNames, int timeoutMs, String validDocIdsType, + int numSegmentsBatchPerServerRequest, boolean includeBitmaps) { List> serverURLsAndBodies = new ArrayList<>(); for (Map.Entry> serverToSegments : serverToSegmentsMap.entrySet()) { List segmentsForServer = serverToSegments.getValue(); @@ -301,7 +315,7 @@ public Map> getSegmentToValidDocIdsMetadat // huge payload to pinot-server in request. Batching the requests will help in reducing the payload size. Lists.partition(segmentsToQuery, numSegmentsBatchPerServerRequest).forEach(segmentsToQueryBatch -> serverURLsAndBodies.add(generateValidDocIdsMetadataURL(tableNameWithType, segmentsToQueryBatch, - validDocIdsType, serverToEndpoints.get(serverToSegments.getKey())))); + validDocIdsType, serverToEndpoints.get(serverToSegments.getKey()), includeBitmaps))); } BiMap endpointsToServers = serverToEndpoints.inverse(); @@ -482,6 +496,11 @@ private String generateValidDocIdsBitmapURL(String tableNameWithType, String seg private Pair generateValidDocIdsMetadataURL(String tableNameWithType, List segmentNames, String validDocIdsType, String endpoint) { + return generateValidDocIdsMetadataURL(tableNameWithType, segmentNames, validDocIdsType, endpoint, false); + } + + private Pair generateValidDocIdsMetadataURL(String tableNameWithType, List segmentNames, + String validDocIdsType, String endpoint, boolean includeBitmaps) { tableNameWithType = encode(tableNameWithType); TableSegments tableSegments = new TableSegments(segmentNames); String jsonTableSegments; @@ -491,11 +510,17 @@ private Pair generateValidDocIdsMetadataURL(String tableNameWith LOGGER.error("Failed to convert segment names to json request body: segmentNames={}", segmentNames); throw new RuntimeException(e); } - String url = String.format("%s/tables/%s/validDocIdsMetadata", endpoint, tableNameWithType); + StringBuilder url = new StringBuilder( + String.format("%s/tables/%s/validDocIdsMetadata", endpoint, tableNameWithType)); + String separator = "?"; if (validDocIdsType != null) { - url = url + "?validDocIdsType=" + validDocIdsType; + url.append(separator).append("validDocIdsType=").append(validDocIdsType); + separator = "&"; + } + if (includeBitmaps) { + url.append(separator).append("includeBitmaps=true"); } - return Pair.of(url, jsonTableSegments); + return Pair.of(url.toString(), jsonTableSegments); } private String generateStaleSegmentsServerURL(String tableNameWithType, String endpoint) { diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java index 013e4a3588ac..ce66e1cf6c5b 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java @@ -674,15 +674,24 @@ public String getValidDocIdsMetadata( @ApiParam(value = "Table name including type", required = true, example = "myTable_REALTIME") @PathParam("tableNameWithType") String tableNameWithType, @ApiParam(value = "Valid doc ids type") @QueryParam("validDocIdsType") String validDocIdsType, + @ApiParam(value = "Whether to include the serialized validDocIds bitmap in each per-segment response entry. " + + "When true the response payload grows linearly with the number of valid doc bitmaps, so callers should " + + "request bitmaps only for the set of segments they actually need cross-replica consensus for.") + @QueryParam("includeBitmaps") @DefaultValue("false") boolean includeBitmaps, TableSegments tableSegments, @Context HttpHeaders headers) { tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType, headers); List segmentNames = tableSegments.getSegments(); return ResourceUtils.convertToJsonString( - processValidDocIdsMetadata(tableNameWithType, segmentNames, validDocIdsType)); + processValidDocIdsMetadata(tableNameWithType, segmentNames, validDocIdsType, includeBitmaps)); } private List> processValidDocIdsMetadata(String tableNameWithType, List segments, String validDocIdsType) { + return processValidDocIdsMetadata(tableNameWithType, segments, validDocIdsType, false); + } + + private List> processValidDocIdsMetadata(String tableNameWithType, List segments, + String validDocIdsType, boolean includeBitmaps) { TableDataManager tableDataManager = ServerResourceUtils.checkGetTableDataManager(_serverInstance, tableNameWithType); List missingSegments = new ArrayList<>(); @@ -755,6 +764,9 @@ private List> processValidDocIdsMetadata(String tableNameWit ((ImmutableSegment) segmentDataManager.getSegment()).getSegmentSizeBytes()); } validDocIdsMetadata.put("segmentCreationTimeMillis", indexSegment.getSegmentMetadata().getIndexCreationTime()); + if (includeBitmaps) { + validDocIdsMetadata.put("bitmap", RoaringBitmapUtils.serialize(validDocIdsSnapshot)); + } allValidDocIdsMetadata.add(validDocIdsMetadata); } if (nonImmutableSegmentCount > 0) { diff --git a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java index f43c2f52875f..8e42909fd3d1 100644 --- a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java +++ b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java @@ -399,6 +399,9 @@ public void testValidDocIdsMetadataPost() ((ImmutableSegmentImpl) segment).getSegmentSizeBytes()); Assert.assertTrue(validDocIdsMetadata.has("segmentCreationTimeMillis")); Assert.assertTrue(validDocIdsMetadata.get("segmentCreationTimeMillis").asLong() > 0); + // bitmap field is omitted by default (includeBitmaps was not requested). + Assert.assertFalse(validDocIdsMetadata.has("bitmap"), + "Bitmap should not be included by default to keep response payloads small"); // Verify server status information Assert.assertTrue(validDocIdsMetadata.has("serverStatus"), "Server status should be included in response"); @@ -407,6 +410,35 @@ public void testValidDocIdsMetadataPost() Assert.assertEquals(serverStatus, "NOT_STARTED", serverStatus); } + @Test + public void testValidDocIdsMetadataPostWithIncludeBitmaps() + throws IOException { + IndexSegment segment = _realtimeIndexSegments.get(0); + + List segments = List.of(segment.getSegmentName()); + TableSegments tableSegments = new TableSegments(segments); + String validDocIdsMetadataPath = "/tables/" + REALTIME_TABLE_NAME + "/validDocIdsMetadata"; + String response = _webTarget.path(validDocIdsMetadataPath) + .queryParam("includeBitmaps", "true") + .request() + .post(Entity.json(tableSegments), String.class); + JsonNode validDocIdsMetadata = JsonUtils.stringToJsonNode(response).get(0); + + // The metadata counts are still emitted alongside the bitmap. + Assert.assertEquals(validDocIdsMetadata.get("totalValidDocs").asInt(), 8); + Assert.assertEquals(validDocIdsMetadata.get("totalDocs").asInt(), 200000); + + // The bitmap should now be present and its cardinality must match totalValidDocs. + Assert.assertTrue(validDocIdsMetadata.has("bitmap"), + "Bitmap field should be included when includeBitmaps=true"); + byte[] bitmapBytes = validDocIdsMetadata.get("bitmap").binaryValue(); + Assert.assertNotNull(bitmapBytes); + org.roaringbitmap.RoaringBitmap bitmap = new org.roaringbitmap.RoaringBitmap(); + bitmap.deserialize(java.nio.ByteBuffer.wrap(bitmapBytes)); + Assert.assertEquals(bitmap.getCardinality(), 8, + "Deserialized bitmap cardinality must equal totalValidDocs"); + } + @Test public void testValidDocIdsMetadataPostForSnapshotWithDelete() throws IOException { From cac3f5e29c7d88afc83c4f6d54f97bc73fa5e7df Mon Sep 17 00:00:00 2001 From: Chaitanya Deepthi Date: Fri, 26 Jun 2026 09:23:17 -0700 Subject: [PATCH 2/8] Add validDocIds replica consensus checks to upsert task generators Mirror the executor's validDocIds enforcement at generation time so inconsistent segments are never scheduled. UpsertCompactionTaskGenerator and UpsertCompactMergeTaskGenerator now validate each segment's replicas (CRC match, server health, and EQUAL/UNSAFE/MOST_VALID_DOCS consensus) via the shared MinionTaskUtils.selectValidDocIdsMetadataForConsensus, requesting includeBitmaps only for EQUAL and requiring all assigned replicas to respond for the strict modes. A new validDocIdsValidationMode config (STRICT default, EXECUTOR_ONLY) gates the generator-side checks: STRICT runs them in both generator and executor; EXECUTOR_ONLY downgrades the generator to a lenient pick and leaves the executor as the sole gate. --- .../pinot/core/common/MinionConstants.java | 28 +++- .../plugin/minion/tasks/MinionTaskUtils.java | 153 +++++++++++++++++- .../UpsertCompactionTaskGenerator.java | 88 +++++----- .../UpsertCompactMergeTaskGenerator.java | 89 +++++----- .../minion/tasks/MinionTaskUtilsTest.java | 35 ++++ .../UpsertCompactionTaskGeneratorTest.java | 113 ++++++++++++- .../UpsertCompactMergeTaskGeneratorTest.java | 96 ++++++++++- 7 files changed, 512 insertions(+), 90 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java index 976216d06a94..7d230e123816 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java @@ -92,11 +92,20 @@ private MinionConstants() { */ public static final String SEGMENT_DOWNLOAD_PARALLELISM = "segmentDownloadParallelism"; - /** Valid doc ids consensus mode (executor-only). Kept internal; executors pass config string. */ + /** Valid doc ids consensus mode enforced by both the task generators (pre-scheduling) and the executors. */ public enum ValidDocIdsConsensusMode { UNSAFE, EQUAL, MOST_VALID_DOCS } + /** + * Where validDocIds consensus is enforced. STRICT (default) runs the checks in both the task generator + * (pre-scheduling) and the executor; EXECUTOR_ONLY skips the generator-side checks and leaves the executor as the + * sole gate. + */ + public enum ValidDocIdsValidationMode { + STRICT, EXECUTOR_ONLY + } + // Purges rows inside segment that match chosen criteria public static class PurgeTask { public static final String TASK_TYPE = "PurgeTask"; @@ -269,14 +278,25 @@ public static class UpsertCompactionTask { public static final String NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST = "numSegmentsBatchPerServerRequest"; /** - * Valid doc ids consensus mode used by the executor only (generator unchanged). Values: UNSAFE, EQUAL, - * MOST_VALID_DOCS. UNSAFE = use first server with matching CRC and READY; EQUAL = require all replicas - * to have the same valid doc set (default); MOST_VALID_DOCS = use replica with most valid docs. + * Valid doc ids consensus mode enforced by both the task generators (pre-scheduling) and the executors. Values: + * UNSAFE, EQUAL, MOST_VALID_DOCS. UNSAFE = use the first server with matching CRC and GOOD status; EQUAL = + * require all replicas to have the same valid doc set (default); MOST_VALID_DOCS = use the replica with the most + * valid docs. Shared by UpsertCompactionTask and UpsertCompactMergeTask. */ public static final String VALID_DOC_IDS_CONSENSUS_MODE_KEY = "validDocIdsConsensusMode"; /** Default: equal valid doc set consensus across replicas. */ public static final String DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE = "EQUAL"; + + /** + * Whether the consensus checks run in the generator too. STRICT (default) = generator + executor; EXECUTOR_ONLY = + * executor only (generator skips the checks and the bitmap fetch). Shared by UpsertCompactionTask, + * UpsertCompactMergeTask, and SegmentRefreshTask. + */ + public static final String VALID_DOC_IDS_VALIDATION_MODE_KEY = "validDocIdsValidationMode"; + + /** Default: enforce in both generator and executor. */ + public static final String DEFAULT_VALID_DOC_IDS_VALIDATION_MODE = "STRICT"; } public static class UpsertCompactMergeTask { diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java index b96b9a569908..8500f1bc33e5 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java @@ -38,6 +38,7 @@ import org.apache.pinot.common.auth.NullAuthProvider; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.restlet.resources.ValidDocIdsBitmapResponse; +import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo; import org.apache.pinot.common.restlet.resources.ValidDocIdsType; import org.apache.pinot.common.utils.RetentionUtils; import org.apache.pinot.common.utils.RoaringBitmapUtils; @@ -72,14 +73,34 @@ public class MinionTaskUtils { private static final Logger LOGGER = LoggerFactory.getLogger(MinionTaskUtils.class); - /** Package-private for testing: parses validDocIdsComparisonMode config string. */ - static MinionConstants.ValidDocIdsConsensusMode parseValidDocIdsConsensusMode(String value) { + /** Parses the validDocIdsConsensusMode config string. Blank/null defaults to {@code EQUAL}. */ + public static MinionConstants.ValidDocIdsConsensusMode parseValidDocIdsConsensusMode(String value) { if (value == null || value.isBlank()) { return MinionConstants.ValidDocIdsConsensusMode.EQUAL; } return MinionConstants.ValidDocIdsConsensusMode.valueOf(value.toUpperCase().trim()); } + /** Parses the validDocIdsValidationMode config string. Blank/null defaults to {@code STRICT}. */ + public static MinionConstants.ValidDocIdsValidationMode parseValidDocIdsValidationMode(String value) { + if (value == null || value.isBlank()) { + return MinionConstants.ValidDocIdsValidationMode.STRICT; + } + return MinionConstants.ValidDocIdsValidationMode.valueOf(value.toUpperCase().trim()); + } + + /** + * Resolves the consensus mode the generator should apply, given the configured consensus mode and validation mode. + * EXECUTOR_ONLY downgrades the generator to UNSAFE (lenient pick, no bitmaps, no cross-replica enforcement) so the + * executor stays the sole gate; STRICT keeps the configured mode. + */ + public static MinionConstants.ValidDocIdsConsensusMode resolveGeneratorConsensusMode( + MinionConstants.ValidDocIdsConsensusMode consensusMode, + MinionConstants.ValidDocIdsValidationMode validationMode) { + return validationMode == MinionConstants.ValidDocIdsValidationMode.EXECUTOR_ONLY + ? MinionConstants.ValidDocIdsConsensusMode.UNSAFE : consensusMode; + } + private static final String DEFAULT_DIR_PATH_TERMINATOR = "/"; public static final String DATETIME_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; @@ -428,6 +449,134 @@ public static RoaringBitmap getValidDocIdFromServerMatchingCrc(String tableNameW return maxCardinalityMap; } + /** + * Counts how many servers host each segment (its online replica count) from a server-to-segments map, so callers + * can tell whether every assigned replica responded. + */ + public static Map getSegmentToReplicaCount(Map> serverToSegments) { + Map segmentToReplicaCount = new HashMap<>(); + for (List segments : serverToSegments.values()) { + for (String segment : segments) { + segmentToReplicaCount.merge(segment, 1, Integer::sum); + } + } + return segmentToReplicaCount; + } + + /** + * Picks the replica whose validDocIds the generator should use for the segment, or returns {@code null} to skip it. + * Runs the executor's checks (CRC, server health, consensus) on the already-fetched replica metadata, so + * inconsistent segments are dropped before a task is scheduled rather than failed later. By mode: + *
    + *
  • {@code UNSAFE}: first replica with a matching CRC and a healthy server.
  • + *
  • {@code EQUAL}: every replica must respond, match CRC, be healthy, and share an identical bitmap (needs + * {@code includeBitmaps=true}).
  • + *
  • {@code MOST_VALID_DOCS}: every replica must respond, match CRC, and be healthy; pick the one with the most + * valid docs.
  • + *
+ * {@code expectedReplicaCount} is how many replicas must respond for the strict modes (ignored for {@code UNSAFE}). + */ + @Nullable + public static ValidDocIdsMetadataInfo selectValidDocIdsMetadataForConsensus(String taskType, String segmentName, + long expectedCrc, @Nullable List replicas, int expectedReplicaCount, + MinionConstants.ValidDocIdsConsensusMode consensusMode) { + if (replicas == null || replicas.isEmpty()) { + return null; + } + boolean unsafe = consensusMode == MinionConstants.ValidDocIdsConsensusMode.UNSAFE; + List usableReplicas = new ArrayList<>(); + for (ValidDocIdsMetadataInfo replica : replicas) { + // A CRC mismatch usually means the server is still reloading the segment, so its valid doc set can't be + // trusted. UNSAFE skips just this replica; stricter modes skip the whole segment. + long replicaCrc; + try { + replicaCrc = Long.parseLong(replica.getSegmentCrc()); + } catch (NumberFormatException e) { + LOGGER.warn("Unparseable CRC '{}' for segment: {} from server: {}, skipping {} (mode={}) for {}", + replica.getSegmentCrc(), segmentName, replica.getInstanceId(), unsafe ? "replica" : "segment", + consensusMode, taskType); + if (unsafe) { + continue; + } + return null; + } + if (expectedCrc != replicaCrc) { + LOGGER.warn("CRC mismatch for segment: {} (expected={}, server={} reported={}), skipping {} (mode={}) for {}", + segmentName, expectedCrc, replica.getInstanceId(), replicaCrc, unsafe ? "replica" : "segment", + consensusMode, taskType); + if (unsafe) { + continue; + } + return null; + } + // A non-GOOD server may still be mutating the segment, so its valid doc set is unreliable. + if (replica.getServerStatus() != null && replica.getServerStatus() != ServiceStatus.Status.GOOD) { + LOGGER.warn("Server {} is in {} state for segment: {}, skipping {} (mode={}) for {}", replica.getInstanceId(), + replica.getServerStatus(), segmentName, unsafe ? "replica" : "segment", consensusMode, taskType); + if (unsafe) { + continue; + } + return null; + } + if (unsafe) { + return replica; + } + usableReplicas.add(replica); + } + + if (usableReplicas.isEmpty()) { + return null; + } + + // Strict modes need every assigned replica to have responded; a short responder list means a server dropped out + // (network error, parse failure, or it no longer has the segment), so we can't confirm consensus across the full + // replica set and skip the segment. + if (usableReplicas.size() < expectedReplicaCount) { + LOGGER.warn("Only {} of {} replicas responded for segment: {}, cannot confirm consensus, skipping for {}", + usableReplicas.size(), expectedReplicaCount, segmentName, taskType); + return null; + } + + if (consensusMode == MinionConstants.ValidDocIdsConsensusMode.EQUAL) { + ValidDocIdsMetadataInfo first = usableReplicas.get(0); + RoaringBitmap consensusBitmap = deserializeBitmapOrNull(first); + if (consensusBitmap == null) { + // No bitmap means EQUAL can't be verified - skip rather than risk an inconsistent task. Make sure the + // metadata was fetched with includeBitmaps=true. + LOGGER.warn("Missing validDocIds bitmap for segment: {} from server: {}, cannot verify EQUAL consensus, " + + "skipping segment for {}", segmentName, first.getInstanceId(), taskType); + return null; + } + for (int i = 1; i < usableReplicas.size(); i++) { + RoaringBitmap bitmap = deserializeBitmapOrNull(usableReplicas.get(i)); + if (bitmap == null || !bitmap.equals(consensusBitmap)) { + LOGGER.warn("Replicas disagree on validDocIds for segment: {}, skipping segment for {}", segmentName, + taskType); + return null; + } + } + return first; + } + + // MOST_VALID_DOCS: pick the replica reporting the most valid docs. + ValidDocIdsMetadataInfo chosen = usableReplicas.get(0); + for (ValidDocIdsMetadataInfo replica : usableReplicas) { + if (replica.getTotalValidDocs() > chosen.getTotalValidDocs()) { + chosen = replica; + } + } + return chosen; + } + + @Nullable + private static RoaringBitmap deserializeBitmapOrNull(ValidDocIdsMetadataInfo info) { + byte[] bitmap = info.getBitmap(); + if (bitmap == null || bitmap.length == 0) { + return null; + } + return RoaringBitmapUtils.deserialize(bitmap); + } + public static String toUTCString(long epochMillis) { Date date = new Date(epochMillis); SimpleDateFormat isoFormat = new SimpleDateFormat(DATETIME_PATTERN); diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java index 02e15b9d3288..802b52ea5e93 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java @@ -34,7 +34,6 @@ import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo; import org.apache.pinot.common.restlet.resources.ValidDocIdsType; -import org.apache.pinot.common.utils.ServiceStatus; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator; import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils; @@ -146,6 +145,18 @@ public List generateTasks(List tableConfigs) { ValidDocIdsType validDocIdsType = MinionTaskUtils.getValidDocIdsType(tableConfig.getUpsertConfig(), taskConfigs, UpsertCompactionTask.VALID_DOC_IDS_TYPE); + // Validate replicas before scheduling, matching the executor's checks, so inconsistent segments are never + // scheduled. With EXECUTOR_ONLY the generator skips these checks (the executor stays the gate). Bitmaps are + // only needed for EQUAL consensus, so fetch them only then to keep the payload small. + MinionConstants.ValidDocIdsConsensusMode consensusMode = MinionTaskUtils.resolveGeneratorConsensusMode( + MinionTaskUtils.parseValidDocIdsConsensusMode( + taskConfigs.getOrDefault(UpsertCompactionTask.VALID_DOC_IDS_CONSENSUS_MODE_KEY, + UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE)), + MinionTaskUtils.parseValidDocIdsValidationMode( + taskConfigs.getOrDefault(UpsertCompactionTask.VALID_DOC_IDS_VALIDATION_MODE_KEY, + UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_VALIDATION_MODE))); + boolean includeBitmaps = consensusMode == MinionConstants.ValidDocIdsConsensusMode.EQUAL; + // Number of segments to query per server request. If a table has a lot of segments, then we might send a // huge payload to pinot-server in request. Batching the requests will help in reducing the payload size. int numSegmentsBatchPerServerRequest = Integer.parseInt( @@ -154,13 +165,21 @@ public List generateTasks(List tableConfigs) { Map> validDocIdsMetadataList = serverSegmentMetadataReader.getSegmentToValidDocIdsMetadataFromServer(tableNameWithType, serverToSegments, - serverToEndpoints, null, 60_000, validDocIdsType.toString(), numSegmentsBatchPerServerRequest); + serverToEndpoints, null, 60_000, validDocIdsType.toString(), numSegmentsBatchPerServerRequest, + includeBitmaps); Map completedSegmentsMap = completedSegments.stream().collect(Collectors.toMap(SegmentZKMetadata::getSegmentName, Function.identity())); + // Expected replica count per segment (from the External View), so consensus can skip a segment when not all + // of its replicas responded. Only the strict modes use it; UNSAFE never checks it. + Map segmentToReplicaCount = + consensusMode == MinionConstants.ValidDocIdsConsensusMode.UNSAFE ? Map.of() + : MinionTaskUtils.getSegmentToReplicaCount(serverToSegments); + SegmentSelectionResult segmentSelectionResult = - processValidDocIdsMetadata(taskConfigs, completedSegmentsMap, validDocIdsMetadataList); + processValidDocIdsMetadata(taskConfigs, completedSegmentsMap, validDocIdsMetadataList, segmentToReplicaCount, + consensusMode); int skippedSegmentsCount = validDocIdsMetadataList.size() - segmentSelectionResult.getSegmentsForCompaction().size() - segmentSelectionResult.getSegmentsForDeletion().size(); @@ -207,7 +226,8 @@ public List generateTasks(List tableConfigs) { @VisibleForTesting public static SegmentSelectionResult processValidDocIdsMetadata(Map taskConfigs, Map completedSegmentsMap, - Map> validDocIdsMetadataInfoMap) { + Map> validDocIdsMetadataInfoMap, + Map segmentToReplicaCount, MinionConstants.ValidDocIdsConsensusMode consensusMode) { double invalidRecordsThresholdPercent = Double.parseDouble( taskConfigs.getOrDefault(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_PERCENT, String.valueOf(DEFAULT_INVALID_RECORDS_THRESHOLD_PERCENT))); @@ -223,43 +243,33 @@ public static SegmentSelectionResult processValidDocIdsMetadata(Map replicas = validDocIdsMetadataInfoMap.get(segmentName); + ValidDocIdsMetadataInfo validDocIdsMetadata = MinionTaskUtils.selectValidDocIdsMetadataForConsensus( + MinionConstants.UpsertCompactionTask.TASK_TYPE, segmentName, segment.getCrc(), replicas, + segmentToReplicaCount.getOrDefault(segmentName, replicas.size()), consensusMode); + if (validDocIdsMetadata == null) { + continue; + } - long totalDocs = validDocIdsMetadata.getTotalDocs(); - double invalidRecordPercent = ((double) totalInvalidDocs / totalDocs) * 100; - if (totalInvalidDocs == totalDocs) { - LOGGER.debug("Segment {} contains only invalid records, adding it to the deletion list", segmentName); - segmentsForDeletion.add(segment.getSegmentName()); - } else if (invalidRecordPercent >= invalidRecordsThresholdPercent - && totalInvalidDocs >= invalidRecordsThresholdCount) { - LOGGER.debug("Segment {} contains {} invalid records out of {} total records " - + "(count threshold: {}, percent threshold: {}), adding it to the compaction list", segmentName, - totalInvalidDocs, totalDocs, invalidRecordsThresholdCount, invalidRecordsThresholdPercent); - segmentsForCompaction.add(Pair.of(segment, totalInvalidDocs)); - } else { - LOGGER.debug("Segment {} contains {} invalid records out of {} total records " - + "(count threshold: {}, percent threshold: {}), skipping it for compaction", segmentName, - totalInvalidDocs, totalDocs, invalidRecordsThresholdCount, invalidRecordsThresholdPercent); - } - break; + long totalInvalidDocs = validDocIdsMetadata.getTotalInvalidDocs(); + long totalDocs = validDocIdsMetadata.getTotalDocs(); + double invalidRecordPercent = ((double) totalInvalidDocs / totalDocs) * 100; + if (totalInvalidDocs == totalDocs) { + LOGGER.debug("Segment {} contains only invalid records, adding it to the deletion list", segmentName); + segmentsForDeletion.add(segment.getSegmentName()); + } else if (invalidRecordPercent >= invalidRecordsThresholdPercent + && totalInvalidDocs >= invalidRecordsThresholdCount) { + LOGGER.debug("Segment {} contains {} invalid records out of {} total records " + + "(count threshold: {}, percent threshold: {}), adding it to the compaction list", segmentName, + totalInvalidDocs, totalDocs, invalidRecordsThresholdCount, invalidRecordsThresholdPercent); + segmentsForCompaction.add(Pair.of(segment, totalInvalidDocs)); + } else { + LOGGER.debug("Segment {} contains {} invalid records out of {} total records " + + "(count threshold: {}, percent threshold: {}), skipping it for compaction", segmentName, + totalInvalidDocs, totalDocs, invalidRecordsThresholdCount, invalidRecordsThresholdPercent); } } segmentsForCompaction.sort((o1, o2) -> { diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java index 1b9565aa24c8..bca1f9715c1c 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java @@ -178,6 +178,18 @@ public List generateTasks(List tableConfigs) { new ServerSegmentMetadataReader(_clusterInfoAccessor.getExecutor(), _clusterInfoAccessor.getConnectionManager()); + // Reuse the compaction task's consensus-mode and validation-mode config so both tasks behave the same. With + // EXECUTOR_ONLY the generator skips these checks (the executor stays the gate). Bitmaps are only needed for + // EQUAL consensus, so fetch them only then to keep the payload small. + MinionConstants.ValidDocIdsConsensusMode consensusMode = MinionTaskUtils.resolveGeneratorConsensusMode( + MinionTaskUtils.parseValidDocIdsConsensusMode( + taskConfigs.getOrDefault(MinionConstants.UpsertCompactionTask.VALID_DOC_IDS_CONSENSUS_MODE_KEY, + MinionConstants.UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE)), + MinionTaskUtils.parseValidDocIdsValidationMode( + taskConfigs.getOrDefault(MinionConstants.UpsertCompactionTask.VALID_DOC_IDS_VALIDATION_MODE_KEY, + MinionConstants.UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_VALIDATION_MODE))); + boolean includeBitmaps = consensusMode == MinionConstants.ValidDocIdsConsensusMode.EQUAL; + // Number of segments to query per server request. If a table has a lot of segments, then we might send a // huge payload to pinot-server in request. Batching the requests will help in reducing the payload size. int numSegmentsBatchPerServerRequest = Integer.parseInt( @@ -186,16 +198,23 @@ public List generateTasks(List tableConfigs) { Map> validDocIdsMetadataList = serverSegmentMetadataReader.getSegmentToValidDocIdsMetadataFromServer(tableNameWithType, serverToSegments, - serverToEndpoints, null, 60_000, ValidDocIdsType.SNAPSHOT.toString(), numSegmentsBatchPerServerRequest); + serverToEndpoints, null, 60_000, ValidDocIdsType.SNAPSHOT.toString(), numSegmentsBatchPerServerRequest, + includeBitmaps); Map candidateSegmentsMap = candidateSegments.stream().collect(Collectors.toMap(SegmentZKMetadata::getSegmentName, Function.identity())); Set alreadyMergedSegments = getAlreadyMergedSegments(allSegments); + // Expected replica count per segment (from the External View), so consensus can skip a segment when not all + // of its replicas responded. Only the strict modes use it; UNSAFE never checks it. + Map segmentToReplicaCount = + consensusMode == MinionConstants.ValidDocIdsConsensusMode.UNSAFE ? Map.of() + : MinionTaskUtils.getSegmentToReplicaCount(serverToSegments); + SegmentSelectionResult segmentSelectionResult = processValidDocIdsMetadata(tableNameWithType, taskConfigs, candidateSegmentsMap, validDocIdsMetadataList, - alreadyMergedSegments, _clusterInfoAccessor.getControllerMetrics()); + alreadyMergedSegments, segmentToReplicaCount, consensusMode, _clusterInfoAccessor.getControllerMetrics()); if (!segmentSelectionResult.getSegmentsForDeletion().isEmpty()) { pinotHelixResourceManager.deleteSegments(tableNameWithType, segmentSelectionResult.getSegmentsForDeletion(), @@ -249,15 +268,15 @@ public List generateTasks(List tableConfigs) { } /** - * Processes validDocIds metadata to determine segments eligible for deletion or compaction. - * Evaluates segments based on valid/invalid document counts, server readiness, and CRC consistency. - * Requires consensus across all replicas on validDoc counts before proceeding with any operations. - * Marks segments with zero valid documents for deletion and groups others by partition for compaction. + * Determines which segments are eligible for deletion or compact-merge. For each segment, replicas are validated + * via {@code consensusMode} (CRC match, server health, validDocIds agreement); segments that fail are skipped. + * Segments with zero valid docs are marked for deletion, the rest are grouped by partition for compaction. */ @VisibleForTesting public static SegmentSelectionResult processValidDocIdsMetadata(String tableNameWithType, Map taskConfigs, Map candidateSegmentsMap, Map> validDocIdsMetadataInfoMap, Set alreadyMergedSegments, + Map segmentToReplicaCount, MinionConstants.ValidDocIdsConsensusMode consensusMode, ControllerMetrics controllerMetrics) { Map> segmentsEligibleForCompactMerge = new HashMap<>(); Set segmentsForDeletion = new HashSet<>(); @@ -302,40 +321,38 @@ public static SegmentSelectionResult processValidDocIdsMetadata(String tableName } SegmentZKMetadata segment = candidateSegmentsMap.get(segmentName); - // Process with existing logic using the first replica with matching CRC (since all have consensus) - for (ValidDocIdsMetadataInfo validDocIdsMetadata : validDocIdsMetadataInfoMap.get(segmentName)) { - long totalInvalidDocs = validDocIdsMetadata.getTotalInvalidDocs(); - long totalValidDocs = validDocIdsMetadata.getTotalValidDocs(); - long segmentSizeInBytes = validDocIdsMetadata.getSegmentSizeInBytes(); + // Validate replicas (CRC match, server health, validDocIds consensus) before scheduling. Returns null when the + // segment should be skipped so we never schedule a task the executor would later reject. + List replicas = validDocIdsMetadataInfoMap.get(segmentName); + ValidDocIdsMetadataInfo validDocIdsMetadata = MinionTaskUtils.selectValidDocIdsMetadataForConsensus( + MinionConstants.UpsertCompactMergeTask.TASK_TYPE, segmentName, segment.getCrc(), replicas, + segmentToReplicaCount.getOrDefault(segmentName, replicas.size()), consensusMode); + if (validDocIdsMetadata == null) { + continue; + } - // Skip segments if the crc from zk metadata and server does not match. They may be getting reloaded. - if (segment.getCrc() != Long.parseLong(validDocIdsMetadata.getSegmentCrc())) { - LOGGER.warn("CRC mismatch for segment: {}, (segmentZKMetadata={}, validDocIdsMetadata={})", segmentName, - segment.getCrc(), validDocIdsMetadata.getSegmentCrc()); - continue; - } + long totalInvalidDocs = validDocIdsMetadata.getTotalInvalidDocs(); + long totalValidDocs = validDocIdsMetadata.getTotalValidDocs(); + long segmentSizeInBytes = validDocIdsMetadata.getSegmentSizeInBytes(); + long totalDocs = validDocIdsMetadata.getTotalDocs(); - // segments eligible for deletion with no valid records - long totalDocs = validDocIdsMetadata.getTotalDocs(); - if (totalInvalidDocs == totalDocs) { - segmentsForDeletion.add(segmentName); - } else if (alreadyMergedSegments.contains(segmentName)) { - LOGGER.debug("Segment {} already merged. Skipping it for {}", segmentName, + // Segments with no valid records can be deleted outright. + if (totalInvalidDocs == totalDocs) { + segmentsForDeletion.add(segmentName); + } else if (alreadyMergedSegments.contains(segmentName)) { + LOGGER.debug("Segment {} already merged. Skipping it for {}", segmentName, + MinionConstants.UpsertCompactMergeTask.TASK_TYPE); + } else { + Integer partitionID = SegmentUtils.getPartitionIdFromSegmentName(segmentName); + if (partitionID == null) { + LOGGER.warn("Partition ID not found for segment: {}, skipping it for {}", segmentName, MinionConstants.UpsertCompactMergeTask.TASK_TYPE); - break; - } else { - Integer partitionID = SegmentUtils.getPartitionIdFromSegmentName(segmentName); - if (partitionID == null) { - LOGGER.warn("Partition ID not found for segment: {}, skipping it for {}", segmentName, - MinionConstants.UpsertCompactMergeTask.TASK_TYPE); - continue; - } - double expectedSegmentSizeAfterCompaction = (segmentSizeInBytes * totalValidDocs * 1.0) / totalDocs; - segmentsEligibleForCompactMerge.computeIfAbsent(partitionID, k -> new ArrayList<>()) - .add(new SegmentMergerMetadata(segment, totalValidDocs, totalInvalidDocs, - expectedSegmentSizeAfterCompaction)); + continue; } - break; + double expectedSegmentSizeAfterCompaction = (segmentSizeInBytes * totalValidDocs * 1.0) / totalDocs; + segmentsEligibleForCompactMerge.computeIfAbsent(partitionID, k -> new ArrayList<>()) + .add(new SegmentMergerMetadata(segment, totalValidDocs, totalInvalidDocs, + expectedSegmentSizeAfterCompaction)); } } diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java index 5abf106367f8..29c407c007f4 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java @@ -331,6 +331,41 @@ public void testParseValidDocIdsConsensusMode() { () -> MinionTaskUtils.parseValidDocIdsConsensusMode("INVALID_MODE")); } + @Test + public void testParseValidDocIdsValidationMode() { + // Blank/null defaults to STRICT + assertEquals(MinionTaskUtils.parseValidDocIdsValidationMode(null), + MinionConstants.ValidDocIdsValidationMode.STRICT); + assertEquals(MinionTaskUtils.parseValidDocIdsValidationMode(""), + MinionConstants.ValidDocIdsValidationMode.STRICT); + assertEquals(MinionTaskUtils.parseValidDocIdsValidationMode(" "), + MinionConstants.ValidDocIdsValidationMode.STRICT); + + // Case-insensitive parsing + assertEquals(MinionTaskUtils.parseValidDocIdsValidationMode("STRICT"), + MinionConstants.ValidDocIdsValidationMode.STRICT); + assertEquals(MinionTaskUtils.parseValidDocIdsValidationMode("executor_only"), + MinionConstants.ValidDocIdsValidationMode.EXECUTOR_ONLY); + assertEquals(MinionTaskUtils.parseValidDocIdsValidationMode(" EXECUTOR_ONLY "), + MinionConstants.ValidDocIdsValidationMode.EXECUTOR_ONLY); + + expectThrows(IllegalArgumentException.class, + () -> MinionTaskUtils.parseValidDocIdsValidationMode("INVALID_MODE")); + } + + @Test + public void testResolveGeneratorConsensusMode() { + // EXECUTOR_ONLY downgrades the generator to UNSAFE regardless of the configured consensus mode. + for (MinionConstants.ValidDocIdsConsensusMode mode : MinionConstants.ValidDocIdsConsensusMode.values()) { + assertEquals( + MinionTaskUtils.resolveGeneratorConsensusMode(mode, MinionConstants.ValidDocIdsValidationMode.EXECUTOR_ONLY), + MinionConstants.ValidDocIdsConsensusMode.UNSAFE); + // STRICT keeps the configured mode. + assertEquals( + MinionTaskUtils.resolveGeneratorConsensusMode(mode, MinionConstants.ValidDocIdsValidationMode.STRICT), mode); + } + } + /** * Builds a RoaringBitmap with {@code numDocs} valid doc ids (0..numDocs-1). */ diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java index 127f96a162fe..77f32570d82c 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java @@ -28,6 +28,9 @@ import org.apache.helix.model.IdealState; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo; +import org.apache.pinot.common.restlet.resources.ValidDocIdsType; +import org.apache.pinot.common.utils.RoaringBitmapUtils; +import org.apache.pinot.common.utils.ServiceStatus; import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor; import org.apache.pinot.core.common.MinionConstants; import org.apache.pinot.core.common.MinionConstants.UpsertCompactionTask; @@ -42,6 +45,7 @@ import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.TimeUtils; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.roaringbitmap.RoaringBitmap; import org.testng.Assert; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -232,13 +236,13 @@ public void testProcessValidDocIdsMetadata() // no completed segments scenario, there shouldn't be any segment selected for compaction UpsertCompactionTaskGenerator.SegmentSelectionResult segmentSelectionResult = UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, new HashMap<>(), - validDocIdsMetadataInfo); + validDocIdsMetadataInfo, Map.of(), MinionConstants.ValidDocIdsConsensusMode.UNSAFE); assertEquals(segmentSelectionResult.getSegmentsForCompaction().size(), 0); // test with valid crc and thresholds segmentSelectionResult = UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, _completedSegmentsMap, - validDocIdsMetadataInfo); + validDocIdsMetadataInfo, Map.of(), MinionConstants.ValidDocIdsConsensusMode.UNSAFE); assertEquals(segmentSelectionResult.getSegmentsForCompaction().size(), 1); assertEquals(segmentSelectionResult.getSegmentsForDeletion().size(), 1); assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(), @@ -249,7 +253,7 @@ public void testProcessValidDocIdsMetadata() compactionConfigs = getCompactionConfigs("60", "10"); segmentSelectionResult = UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, _completedSegmentsMap, - validDocIdsMetadataInfo); + validDocIdsMetadataInfo, Map.of(), MinionConstants.ValidDocIdsConsensusMode.UNSAFE); assertTrue(segmentSelectionResult.getSegmentsForCompaction().isEmpty()); assertEquals(segmentSelectionResult.getSegmentsForDeletion().size(), 1); assertEquals(segmentSelectionResult.getSegmentsForDeletion().get(0), _completedSegment2.getSegmentName()); @@ -258,7 +262,7 @@ public void testProcessValidDocIdsMetadata() compactionConfigs = getCompactionConfigs("0", "10"); segmentSelectionResult = UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, _completedSegmentsMap, - validDocIdsMetadataInfo); + validDocIdsMetadataInfo, Map.of(), MinionConstants.ValidDocIdsConsensusMode.UNSAFE); assertEquals(segmentSelectionResult.getSegmentsForDeletion().size(), 1); assertEquals(segmentSelectionResult.getSegmentsForCompaction().size(), 1); assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(), @@ -269,7 +273,7 @@ public void testProcessValidDocIdsMetadata() compactionConfigs = getCompactionConfigs("30", "0"); segmentSelectionResult = UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, _completedSegmentsMap, - validDocIdsMetadataInfo); + validDocIdsMetadataInfo, Map.of(), MinionConstants.ValidDocIdsConsensusMode.UNSAFE); assertEquals(segmentSelectionResult.getSegmentsForDeletion().size(), 1); assertEquals(segmentSelectionResult.getSegmentsForCompaction().size(), 1); assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(), @@ -288,7 +292,7 @@ public void testProcessValidDocIdsMetadata() }); segmentSelectionResult = UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, _completedSegmentsMap, - validDocIdsMetadataInfo); + validDocIdsMetadataInfo, Map.of(), MinionConstants.ValidDocIdsConsensusMode.UNSAFE); // completedSegment is supposed to be filtered out Assert.assertEquals(segmentSelectionResult.getSegmentsForCompaction().size(), 0); @@ -313,7 +317,7 @@ public void testProcessValidDocIdsMetadata() compactionConfigs = getCompactionConfigs("30", "0"); segmentSelectionResult = UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, _completedSegmentsMap, - validDocIdsMetadataInfo); + validDocIdsMetadataInfo, Map.of(), MinionConstants.ValidDocIdsConsensusMode.UNSAFE); Assert.assertEquals(segmentSelectionResult.getSegmentsForCompaction().size(), 2); Assert.assertEquals(segmentSelectionResult.getSegmentsForDeletion().size(), 0); assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(), @@ -326,6 +330,101 @@ public void testProcessValidDocIdsMetadata() assertEquals(validDocIdsMetadataInfo.get("testTable__1").get(0).getSegmentCreationTimeMillis(), 9876543210L); } + @Test + public void testProcessValidDocIdsMetadataConsensus() { + Map compactionConfigs = getCompactionConfigs("1", "10"); + String segmentName = _completedSegment.getSegmentName(); + long crc = _completedSegment.getCrc(); + // Both replicas are expected to respond. + Map twoReplicas = Map.of(segmentName, 2); + + // EQUAL: replicas agree (identical bitmaps), so the segment is compacted. + Map> equalReplicas = Map.of(segmentName, List.of( + metaWithBitmap(segmentName, 50, 50, 100, crc, ServiceStatus.Status.GOOD, "server1", range(0, 50)), + metaWithBitmap(segmentName, 50, 50, 100, crc, ServiceStatus.Status.GOOD, "server2", range(0, 50)))); + UpsertCompactionTaskGenerator.SegmentSelectionResult result = + UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, _completedSegmentsMap, + equalReplicas, twoReplicas, MinionConstants.ValidDocIdsConsensusMode.EQUAL); + assertEquals(result.getSegmentsForCompaction().size(), 1); + + // EQUAL: replicas disagree (different bitmaps), so the segment is skipped entirely. + Map> unequalReplicas = Map.of(segmentName, List.of( + metaWithBitmap(segmentName, 50, 50, 100, crc, ServiceStatus.Status.GOOD, "server1", range(0, 50)), + metaWithBitmap(segmentName, 50, 50, 100, crc, ServiceStatus.Status.GOOD, "server2", range(1, 51)))); + result = UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, _completedSegmentsMap, + unequalReplicas, twoReplicas, MinionConstants.ValidDocIdsConsensusMode.EQUAL); + assertTrue(result.getSegmentsForCompaction().isEmpty()); + assertTrue(result.getSegmentsForDeletion().isEmpty()); + + // EQUAL: only one of the two assigned replicas responded, so consensus can't be confirmed and the segment is + // skipped. + Map> oneResponded = Map.of(segmentName, List.of( + metaWithBitmap(segmentName, 50, 50, 100, crc, ServiceStatus.Status.GOOD, "server1", range(0, 50)))); + result = UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, _completedSegmentsMap, + oneResponded, twoReplicas, MinionConstants.ValidDocIdsConsensusMode.EQUAL); + assertTrue(result.getSegmentsForCompaction().isEmpty()); + + // EQUAL: a CRC mismatch on any replica skips the segment. + Map> crcMismatch = Map.of(segmentName, List.of( + metaWithBitmap(segmentName, 50, 50, 100, crc, ServiceStatus.Status.GOOD, "server1", range(0, 50)), + metaWithBitmap(segmentName, 50, 50, 100, crc + 1, ServiceStatus.Status.GOOD, "server2", range(0, 50)))); + result = UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, _completedSegmentsMap, + crcMismatch, twoReplicas, MinionConstants.ValidDocIdsConsensusMode.EQUAL); + assertTrue(result.getSegmentsForCompaction().isEmpty()); + + // EQUAL: an unhealthy server skips the segment. + Map> unhealthy = Map.of(segmentName, List.of( + metaWithBitmap(segmentName, 50, 50, 100, crc, ServiceStatus.Status.GOOD, "server1", range(0, 50)), + metaWithBitmap(segmentName, 50, 50, 100, crc, ServiceStatus.Status.STARTING, "server2", range(0, 50)))); + result = UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, _completedSegmentsMap, + unhealthy, twoReplicas, MinionConstants.ValidDocIdsConsensusMode.EQUAL); + assertTrue(result.getSegmentsForCompaction().isEmpty()); + + // UNSAFE: skip the CRC-mismatched replica and use the healthy one. A missing replica is tolerated. + result = UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, _completedSegmentsMap, + crcMismatch, twoReplicas, MinionConstants.ValidDocIdsConsensusMode.UNSAFE); + assertEquals(result.getSegmentsForCompaction().size(), 1); + + // MOST_VALID_DOCS is strict: a CRC-mismatched replica skips the whole segment (unlike UNSAFE). + result = UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, _completedSegmentsMap, + crcMismatch, twoReplicas, MinionConstants.ValidDocIdsConsensusMode.MOST_VALID_DOCS); + assertTrue(result.getSegmentsForCompaction().isEmpty()); + + // EQUAL: a replica without a bitmap can't be verified, so the segment is skipped. + Map> missingBitmap = Map.of(segmentName, List.of( + new ValidDocIdsMetadataInfo(segmentName, 50, 50, 100, String.valueOf(crc), ValidDocIdsType.SNAPSHOT, 1000, + System.currentTimeMillis(), "server1", ServiceStatus.Status.GOOD))); + result = UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, _completedSegmentsMap, + missingBitmap, Map.of(segmentName, 1), MinionConstants.ValidDocIdsConsensusMode.EQUAL); + assertTrue(result.getSegmentsForCompaction().isEmpty()); + + // MOST_VALID_DOCS: the replica with the most valid docs wins. Here that replica has zero invalid docs, so the + // segment is neither compacted nor deleted - proving the other (all-invalid) replica was not chosen. + Map> mostValidDocs = Map.of(segmentName, List.of( + metaWithBitmap(segmentName, 0, 100, 100, crc, ServiceStatus.Status.GOOD, "server1"), + metaWithBitmap(segmentName, 100, 0, 100, crc, ServiceStatus.Status.GOOD, "server2", range(0, 100)))); + result = UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, _completedSegmentsMap, + mostValidDocs, twoReplicas, MinionConstants.ValidDocIdsConsensusMode.MOST_VALID_DOCS); + assertTrue(result.getSegmentsForCompaction().isEmpty()); + assertTrue(result.getSegmentsForDeletion().isEmpty()); + } + + private static ValidDocIdsMetadataInfo metaWithBitmap(String segmentName, long validDocs, long invalidDocs, + long totalDocs, long crc, ServiceStatus.Status serverStatus, String instanceId, int... validIds) { + RoaringBitmap bitmap = RoaringBitmap.bitmapOf(validIds); + return new ValidDocIdsMetadataInfo(segmentName, validDocs, invalidDocs, totalDocs, String.valueOf(crc), + ValidDocIdsType.SNAPSHOT, 1000, System.currentTimeMillis(), instanceId, serverStatus, + RoaringBitmapUtils.serialize(bitmap)); + } + + private static int[] range(int fromInclusive, int toExclusive) { + int[] ids = new int[toExclusive - fromInclusive]; + for (int i = 0; i < ids.length; i++) { + ids[i] = fromInclusive + i; + } + return ids; + } + @Test public void testUpsertCompactionTaskConfig() { Map upsertCompactionTaskConfig = diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java index 1bc8ad96876f..2bbf58f76bf9 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java @@ -29,6 +29,7 @@ import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo; import org.apache.pinot.common.restlet.resources.ValidDocIdsType; +import org.apache.pinot.common.utils.RoaringBitmapUtils; import org.apache.pinot.common.utils.ServiceStatus; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor; @@ -52,6 +53,7 @@ import org.mockito.MockedStatic; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; +import org.roaringbitmap.RoaringBitmap; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; @@ -375,7 +377,7 @@ public void testProcessValidDocIdsMetadata() { SegmentSelectionResult result = UpsertCompactMergeTaskGenerator.processValidDocIdsMetadata( RAW_TABLE_NAME + "_REALTIME", taskConfigs, candidateSegmentsMap, - validDocIdsMetadata, alreadyMergedSegments, null); + validDocIdsMetadata, alreadyMergedSegments, Map.of(), MinionConstants.ValidDocIdsConsensusMode.UNSAFE, null); Assert.assertNotNull(result); Assert.assertNotNull(result.getSegmentsForCompactMergeByPartition()); @@ -413,13 +415,103 @@ public void testProcessValidDocIdsMetadataWithSegmentsForDeletion() { SegmentSelectionResult result = UpsertCompactMergeTaskGenerator.processValidDocIdsMetadata( RAW_TABLE_NAME + "_REALTIME", taskConfigs, candidateSegmentsMap, - validDocIdsMetadata, alreadyMergedSegments, null); + validDocIdsMetadata, alreadyMergedSegments, Map.of(), MinionConstants.ValidDocIdsConsensusMode.UNSAFE, null); Assert.assertNotNull(result); Assert.assertEquals(result.getSegmentsForDeletion().size(), 1, "Should have one segment for deletion"); Assert.assertTrue(result.getSegmentsForDeletion().contains("testTable__0__0__12345")); } + /** + * Tests that replica consensus is enforced before a segment is selected. Uses the deletion path (a fully-invalid + * segment) as a clean signal: the segment is processed only when its replicas pass the consensus check. + */ + @Test + public void testProcessValidDocIdsMetadataConsensus() { + Map taskConfigs = new HashMap<>(); + String segmentName = _completedSegment.getSegmentName(); + long crc = _completedSegment.getCrc(); + Map candidateSegmentsMap = Map.of(segmentName, _completedSegment); + Set noMerged = Set.of(); + // Both replicas are expected to respond. + Map twoReplicas = Map.of(segmentName, 2); + + // EQUAL: replicas agree (both fully invalid, identical empty bitmaps), so the segment is processed and deleted. + Map> agree = Map.of(segmentName, List.of( + metaWithBitmap(segmentName, 0, 100, 100, crc, ServiceStatus.Status.GOOD, "server1"), + metaWithBitmap(segmentName, 0, 100, 100, crc, ServiceStatus.Status.GOOD, "server2"))); + SegmentSelectionResult result = UpsertCompactMergeTaskGenerator.processValidDocIdsMetadata(RAW_TABLE_NAME, + taskConfigs, candidateSegmentsMap, agree, noMerged, twoReplicas, + MinionConstants.ValidDocIdsConsensusMode.EQUAL, null); + Assert.assertTrue(result.getSegmentsForDeletion().contains(segmentName)); + + // EQUAL: replicas disagree (different valid doc sets), so the segment is skipped. + Map> disagree = Map.of(segmentName, List.of( + metaWithBitmap(segmentName, 0, 100, 100, crc, ServiceStatus.Status.GOOD, "server1"), + metaWithBitmap(segmentName, 1, 99, 100, crc, ServiceStatus.Status.GOOD, "server2", 0))); + result = UpsertCompactMergeTaskGenerator.processValidDocIdsMetadata(RAW_TABLE_NAME, taskConfigs, + candidateSegmentsMap, disagree, noMerged, twoReplicas, MinionConstants.ValidDocIdsConsensusMode.EQUAL, null); + Assert.assertTrue(result.getSegmentsForDeletion().isEmpty()); + + // EQUAL: a CRC mismatch on any replica skips the segment. + Map> crcMismatch = Map.of(segmentName, List.of( + metaWithBitmap(segmentName, 0, 100, 100, crc, ServiceStatus.Status.GOOD, "server1"), + metaWithBitmap(segmentName, 0, 100, 100, crc + 1, ServiceStatus.Status.GOOD, "server2"))); + result = UpsertCompactMergeTaskGenerator.processValidDocIdsMetadata(RAW_TABLE_NAME, taskConfigs, + candidateSegmentsMap, crcMismatch, noMerged, twoReplicas, MinionConstants.ValidDocIdsConsensusMode.EQUAL, + null); + Assert.assertTrue(result.getSegmentsForDeletion().isEmpty()); + + // EQUAL: an unhealthy server skips the segment. + Map> unhealthy = Map.of(segmentName, List.of( + metaWithBitmap(segmentName, 0, 100, 100, crc, ServiceStatus.Status.GOOD, "server1"), + metaWithBitmap(segmentName, 0, 100, 100, crc, ServiceStatus.Status.STARTING, "server2"))); + result = UpsertCompactMergeTaskGenerator.processValidDocIdsMetadata(RAW_TABLE_NAME, taskConfigs, + candidateSegmentsMap, unhealthy, noMerged, twoReplicas, MinionConstants.ValidDocIdsConsensusMode.EQUAL, null); + Assert.assertTrue(result.getSegmentsForDeletion().isEmpty()); + + // UNSAFE: skip the CRC-mismatched replica and use the healthy one, so the segment is still processed. + result = UpsertCompactMergeTaskGenerator.processValidDocIdsMetadata(RAW_TABLE_NAME, taskConfigs, + candidateSegmentsMap, crcMismatch, noMerged, twoReplicas, MinionConstants.ValidDocIdsConsensusMode.UNSAFE, + null); + Assert.assertTrue(result.getSegmentsForDeletion().contains(segmentName)); + + // MOST_VALID_DOCS: the replica with the most valid docs wins, so the fully-valid replica is chosen and the + // segment is not deleted (proving the all-invalid replica was not picked). + Map> mostValidDocs = Map.of(segmentName, List.of( + metaWithBitmap(segmentName, 0, 100, 100, crc, ServiceStatus.Status.GOOD, "server1"), + metaWithBitmap(segmentName, 100, 0, 100, crc, ServiceStatus.Status.GOOD, "server2", range(0, 100)))); + result = UpsertCompactMergeTaskGenerator.processValidDocIdsMetadata(RAW_TABLE_NAME, taskConfigs, + candidateSegmentsMap, mostValidDocs, noMerged, twoReplicas, + MinionConstants.ValidDocIdsConsensusMode.MOST_VALID_DOCS, null); + Assert.assertTrue(result.getSegmentsForDeletion().isEmpty()); + + // EQUAL: only one of the two assigned replicas responded, so consensus can't be confirmed and the segment is + // skipped. + Map> oneResponded = Map.of(segmentName, List.of( + metaWithBitmap(segmentName, 0, 100, 100, crc, ServiceStatus.Status.GOOD, "server1"))); + result = UpsertCompactMergeTaskGenerator.processValidDocIdsMetadata(RAW_TABLE_NAME, taskConfigs, + candidateSegmentsMap, oneResponded, noMerged, twoReplicas, MinionConstants.ValidDocIdsConsensusMode.EQUAL, + null); + Assert.assertTrue(result.getSegmentsForDeletion().isEmpty()); + } + + private static ValidDocIdsMetadataInfo metaWithBitmap(String segmentName, long validDocs, long invalidDocs, + long totalDocs, long crc, ServiceStatus.Status serverStatus, String instanceId, int... validIds) { + RoaringBitmap bitmap = RoaringBitmap.bitmapOf(validIds); + return new ValidDocIdsMetadataInfo(segmentName, validDocs, invalidDocs, totalDocs, String.valueOf(crc), + ValidDocIdsType.SNAPSHOT, 1000, System.currentTimeMillis(), instanceId, serverStatus, + RoaringBitmapUtils.serialize(bitmap)); + } + + private static int[] range(int fromInclusive, int toExclusive) { + int[] ids = new int[toExclusive - fromInclusive]; + for (int i = 0; i < ids.length; i++) { + ids[i] = fromInclusive + i; + } + return ids; + } + /** * Tests getCandidateSegments with various edge cases. */ From 7d7bbcb48fc627afeba9330f66ab349b49a3bee1 Mon Sep 17 00:00:00 2001 From: Chaitanya Deepthi Date: Fri, 26 Jun 2026 23:40:04 -0700 Subject: [PATCH 3/8] Reformat selectValidDocIdsMetadataForConsensus Javadoc for readability --- .../plugin/minion/tasks/MinionTaskUtils.java | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java index 8500f1bc33e5..0aed59add84d 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java @@ -464,17 +464,20 @@ public static Map getSegmentToReplicaCount(MapHow the replica is chosen depends on {@code consensusMode}: *
    - *
  • {@code UNSAFE}: first replica with a matching CRC and a healthy server.
  • - *
  • {@code EQUAL}: every replica must respond, match CRC, be healthy, and share an identical bitmap (needs - * {@code includeBitmaps=true}).
  • - *
  • {@code MOST_VALID_DOCS}: every replica must respond, match CRC, and be healthy; pick the one with the most - * valid docs.
  • + *
  • {@code UNSAFE}: the first replica with a matching CRC and a healthy server.
  • + *
  • {@code EQUAL}: all replicas must match CRC, be healthy, and have an identical bitmap.
  • + *
  • {@code MOST_VALID_DOCS}: all replicas must match CRC and be healthy; the one with the most valid docs + * wins.
  • *
- * {@code expectedReplicaCount} is how many replicas must respond for the strict modes (ignored for {@code UNSAFE}). + * + *

For {@code EQUAL} and {@code MOST_VALID_DOCS}, {@code expectedReplicaCount} replicas must respond, otherwise the + * segment is skipped; {@code UNSAFE} ignores it. */ @Nullable public static ValidDocIdsMetadataInfo selectValidDocIdsMetadataForConsensus(String taskType, String segmentName, From 353fe386d98d17068d5b0ee29e55b7e7d73f7bab Mon Sep 17 00:00:00 2001 From: Chaitanya Deepthi Date: Fri, 26 Jun 2026 23:45:33 -0700 Subject: [PATCH 4/8] Use plain-text formatting for selectValidDocIdsMetadataForConsensus Javadoc --- .../plugin/minion/tasks/MinionTaskUtils.java | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java index 0aed59add84d..c71386271f9f 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java @@ -468,16 +468,13 @@ public static Map getSegmentToReplicaCount(MapHow the replica is chosen depends on {@code consensusMode}: - *

    - *
  • {@code UNSAFE}: the first replica with a matching CRC and a healthy server.
  • - *
  • {@code EQUAL}: all replicas must match CRC, be healthy, and have an identical bitmap.
  • - *
  • {@code MOST_VALID_DOCS}: all replicas must match CRC and be healthy; the one with the most valid docs - * wins.
  • - *
+ * How the replica is chosen depends on {@code consensusMode}: + * - UNSAFE: the first replica with a matching CRC and a healthy server. + * - EQUAL: all replicas must match CRC, be healthy, and have an identical bitmap. + * - MOST_VALID_DOCS: all replicas must match CRC and be healthy; the one with the most valid docs wins. * - *

For {@code EQUAL} and {@code MOST_VALID_DOCS}, {@code expectedReplicaCount} replicas must respond, otherwise the - * segment is skipped; {@code UNSAFE} ignores it. + * For EQUAL and MOST_VALID_DOCS, {@code expectedReplicaCount} replicas must respond, otherwise the segment is + * skipped; UNSAFE ignores it. */ @Nullable public static ValidDocIdsMetadataInfo selectValidDocIdsMetadataForConsensus(String taskType, String segmentName, From bb8e52f869ca7c14f1d565a0f498cef4557f0d7b Mon Sep 17 00:00:00 2001 From: Chaitanya Deepthi Date: Sat, 27 Jun 2026 00:01:12 -0700 Subject: [PATCH 5/8] Drop expectedReplicaCount note from selectValidDocIdsMetadataForConsensus Javadoc --- .../org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java index c71386271f9f..31120783db3e 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java @@ -472,9 +472,6 @@ public static Map getSegmentToReplicaCount(Map Date: Sun, 28 Jun 2026 10:39:39 -0700 Subject: [PATCH 6/8] Bound the validDocIds consensus fetch batch in upsert generators The consensus modes (EQUAL/MOST_VALID_DOCS) fetch validDocIds metadata from every replica, and EQUAL also carries the serialized bitmap in each entry, so reusing the regular numSegmentsBatchPerServerRequest (default 500) can produce very large per-request payloads. Add a shared validDocIdsConsensusFetchBatchSize knob (default 10) on UpsertCompactionTask and route both the compaction and compact-merge generators through MinionTaskUtils.resolveValidDocIdsFetchBatchSize: UNSAFE keeps the regular batch, the consensus modes use the smaller consensus batch. The smaller batch is intentional for the bitmap-bearing fetch and does not inherit a user-set numSegmentsBatchPerServerRequest. --- .../pinot/core/common/MinionConstants.java | 10 ++++++++++ .../plugin/minion/tasks/MinionTaskUtils.java | 15 +++++++++++++++ .../UpsertCompactionTaskGenerator.java | 8 ++++++-- .../UpsertCompactMergeTaskGenerator.java | 8 ++++++-- .../minion/tasks/MinionTaskUtilsTest.java | 18 ++++++++++++++++++ 5 files changed, 55 insertions(+), 4 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java index 7d230e123816..b89a412c10b7 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java @@ -297,6 +297,16 @@ public static class UpsertCompactionTask { /** Default: enforce in both generator and executor. */ public static final String DEFAULT_VALID_DOC_IDS_VALIDATION_MODE = "STRICT"; + + /** + * Per-server batch size for the validDocIds fetch when generator consensus runs (EQUAL/MOST_VALID_DOCS). Kept + * small because consensus fetches from every replica and EQUAL also carries the serialized bitmap in each entry. + * Shared by UpsertCompactionTask and UpsertCompactMergeTask. + */ + public static final String VALID_DOC_IDS_CONSENSUS_FETCH_BATCH_SIZE_KEY = "validDocIdsConsensusFetchBatchSize"; + + /** Default consensus fetch batch size, small since all replicas respond and EQUAL includes the bitmap. */ + public static final int DEFAULT_VALID_DOC_IDS_CONSENSUS_FETCH_BATCH_SIZE = 10; } public static class UpsertCompactMergeTask { diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java index 31120783db3e..ac13521ff33a 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java @@ -101,6 +101,21 @@ public static MinionConstants.ValidDocIdsConsensusMode resolveGeneratorConsensus ? MinionConstants.ValidDocIdsConsensusMode.UNSAFE : consensusMode; } + /** + * Resolves the per-server batch size for the generator's validDocIds fetch. UNSAFE keeps {@code regularBatchSize} + * (the no-bitmap fetch). The consensus modes fetch from every replica (and EQUAL also carries a bitmap per entry), + * so they use the smaller {@code validDocIdsConsensusFetchBatchSize} to keep the payload bounded. + */ + public static int resolveValidDocIdsFetchBatchSize(Map taskConfigs, + MinionConstants.ValidDocIdsConsensusMode consensusMode, int regularBatchSize) { + if (consensusMode == MinionConstants.ValidDocIdsConsensusMode.UNSAFE) { + return regularBatchSize; + } + return Integer.parseInt(taskConfigs.getOrDefault( + MinionConstants.UpsertCompactionTask.VALID_DOC_IDS_CONSENSUS_FETCH_BATCH_SIZE_KEY, + String.valueOf(MinionConstants.UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_CONSENSUS_FETCH_BATCH_SIZE))); + } + private static final String DEFAULT_DIR_PATH_TERMINATOR = "/"; public static final String DATETIME_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java index 802b52ea5e93..61c0fb5c223e 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java @@ -158,10 +158,14 @@ public List generateTasks(List tableConfigs) { boolean includeBitmaps = consensusMode == MinionConstants.ValidDocIdsConsensusMode.EQUAL; // Number of segments to query per server request. If a table has a lot of segments, then we might send a - // huge payload to pinot-server in request. Batching the requests will help in reducing the payload size. - int numSegmentsBatchPerServerRequest = Integer.parseInt( + // huge payload to pinot-server in request. Batching the requests will help in reducing the payload size. The + // consensus modes use a smaller batch (validDocIdsConsensusFetchBatchSize) since they fetch from every replica + // and EQUAL also carries a bitmap per entry; UNSAFE keeps the regular batch. + int regularBatchPerServerRequest = Integer.parseInt( taskConfigs.getOrDefault(UpsertCompactionTask.NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST, String.valueOf(DEFAULT_NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST))); + int numSegmentsBatchPerServerRequest = + MinionTaskUtils.resolveValidDocIdsFetchBatchSize(taskConfigs, consensusMode, regularBatchPerServerRequest); Map> validDocIdsMetadataList = serverSegmentMetadataReader.getSegmentToValidDocIdsMetadataFromServer(tableNameWithType, serverToSegments, diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java index bca1f9715c1c..a668fb3b9284 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java @@ -191,10 +191,14 @@ public List generateTasks(List tableConfigs) { boolean includeBitmaps = consensusMode == MinionConstants.ValidDocIdsConsensusMode.EQUAL; // Number of segments to query per server request. If a table has a lot of segments, then we might send a - // huge payload to pinot-server in request. Batching the requests will help in reducing the payload size. - int numSegmentsBatchPerServerRequest = Integer.parseInt( + // huge payload to pinot-server in request. Batching the requests will help in reducing the payload size. The + // consensus modes use a smaller batch (validDocIdsConsensusFetchBatchSize) since they fetch from every replica + // and EQUAL also carries a bitmap per entry; UNSAFE keeps the regular batch. + int regularBatchPerServerRequest = Integer.parseInt( taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST, String.valueOf(DEFAULT_NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST))); + int numSegmentsBatchPerServerRequest = + MinionTaskUtils.resolveValidDocIdsFetchBatchSize(taskConfigs, consensusMode, regularBatchPerServerRequest); Map> validDocIdsMetadataList = serverSegmentMetadataReader.getSegmentToValidDocIdsMetadataFromServer(tableNameWithType, serverToSegments, diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java index 29c407c007f4..27e2753ae9e6 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java @@ -366,6 +366,24 @@ public void testResolveGeneratorConsensusMode() { } } + @Test + public void testResolveValidDocIdsFetchBatchSize() { + // UNSAFE keeps the regular (no-bitmap) batch and ignores the consensus key. + Map withConsensusKey = Map.of( + MinionConstants.UpsertCompactionTask.VALID_DOC_IDS_CONSENSUS_FETCH_BATCH_SIZE_KEY, "7"); + assertEquals(MinionTaskUtils.resolveValidDocIdsFetchBatchSize(withConsensusKey, + MinionConstants.ValidDocIdsConsensusMode.UNSAFE, 500), 500); + + // The consensus modes use the configured consensus batch size. + for (MinionConstants.ValidDocIdsConsensusMode mode : List.of( + MinionConstants.ValidDocIdsConsensusMode.EQUAL, MinionConstants.ValidDocIdsConsensusMode.MOST_VALID_DOCS)) { + assertEquals(MinionTaskUtils.resolveValidDocIdsFetchBatchSize(withConsensusKey, mode, 500), 7); + // Falls back to the small default when the consensus key is absent (does not inherit the regular batch). + assertEquals(MinionTaskUtils.resolveValidDocIdsFetchBatchSize(Map.of(), mode, 500), + MinionConstants.UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_CONSENSUS_FETCH_BATCH_SIZE); + } + } + /** * Builds a RoaringBitmap with {@code numDocs} valid doc ids (0..numDocs-1). */ From 23658fe08cd65a162c45b88518cf2e458369265b Mon Sep 17 00:00:00 2001 From: Chaitanya Deepthi Date: Sun, 28 Jun 2026 16:38:02 -0700 Subject: [PATCH 7/8] Move shared validDocIds consensus constants to top-level MinionConstants The consensus config (validDocIdsConsensusMode, validDocIdsValidationMode, validDocIdsConsensusFetchBatchSize keys and defaults) is shared by the upsert compaction, compact-merge, and segment-refresh tasks, so it belongs at the top level of MinionConstants next to the ValidDocIdsConsensusMode and ValidDocIdsValidationMode enums rather than nested under UpsertCompactionTask. Update all references in both generators, both executors, MinionTaskUtils, and the tests. Config key string values are unchanged. The two constants that already shipped (VALID_DOC_IDS_CONSENSUS_MODE_KEY and its default) keep a @Deprecated forwarding alias at the old nested location for source/binary compatibility. --- .../pinot/core/common/MinionConstants.java | 58 +++++++++---------- .../plugin/minion/tasks/MinionTaskUtils.java | 4 +- .../UpsertCompactionTaskExecutor.java | 6 +- .../UpsertCompactionTaskGenerator.java | 8 +-- .../UpsertCompactMergeTaskExecutor.java | 6 +- .../UpsertCompactMergeTaskGenerator.java | 8 +-- .../minion/tasks/MinionTaskUtilsTest.java | 4 +- 7 files changed, 47 insertions(+), 47 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java index b89a412c10b7..abe04a29a45b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java @@ -106,6 +106,28 @@ public enum ValidDocIdsValidationMode { STRICT, EXECUTOR_ONLY } + /** + * Valid doc ids consensus mode used by both the task generators (pre-scheduling) and the executors. UNSAFE = first + * server with matching CRC and GOOD status; EQUAL (default) = all replicas must have the same valid doc set; + * MOST_VALID_DOCS = the replica with the most valid docs. + */ + public static final String VALID_DOC_IDS_CONSENSUS_MODE_KEY = "validDocIdsConsensusMode"; + public static final String DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE = "EQUAL"; + + /** + * Whether the consensus checks run in the generator too. STRICT (default) = generator + executor; EXECUTOR_ONLY = + * executor only (generator skips the checks and the bitmap fetch). + */ + public static final String VALID_DOC_IDS_VALIDATION_MODE_KEY = "validDocIdsValidationMode"; + public static final String DEFAULT_VALID_DOC_IDS_VALIDATION_MODE = "STRICT"; + + /** + * Per-server batch size for the validDocIds fetch when generator consensus runs (EQUAL/MOST_VALID_DOCS). Kept small + * because consensus fetches from every replica and EQUAL also carries the serialized bitmap in each entry. + */ + public static final String VALID_DOC_IDS_CONSENSUS_FETCH_BATCH_SIZE_KEY = "validDocIdsConsensusFetchBatchSize"; + public static final int DEFAULT_VALID_DOC_IDS_CONSENSUS_FETCH_BATCH_SIZE = 10; + // Purges rows inside segment that match chosen criteria public static class PurgeTask { public static final String TASK_TYPE = "PurgeTask"; @@ -277,36 +299,14 @@ public static class UpsertCompactionTask { */ public static final String NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST = "numSegmentsBatchPerServerRequest"; - /** - * Valid doc ids consensus mode enforced by both the task generators (pre-scheduling) and the executors. Values: - * UNSAFE, EQUAL, MOST_VALID_DOCS. UNSAFE = use the first server with matching CRC and GOOD status; EQUAL = - * require all replicas to have the same valid doc set (default); MOST_VALID_DOCS = use the replica with the most - * valid docs. Shared by UpsertCompactionTask and UpsertCompactMergeTask. - */ - public static final String VALID_DOC_IDS_CONSENSUS_MODE_KEY = "validDocIdsConsensusMode"; - - /** Default: equal valid doc set consensus across replicas. */ - public static final String DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE = "EQUAL"; - - /** - * Whether the consensus checks run in the generator too. STRICT (default) = generator + executor; EXECUTOR_ONLY = - * executor only (generator skips the checks and the bitmap fetch). Shared by UpsertCompactionTask, - * UpsertCompactMergeTask, and SegmentRefreshTask. - */ - public static final String VALID_DOC_IDS_VALIDATION_MODE_KEY = "validDocIdsValidationMode"; - - /** Default: enforce in both generator and executor. */ - public static final String DEFAULT_VALID_DOC_IDS_VALIDATION_MODE = "STRICT"; - - /** - * Per-server batch size for the validDocIds fetch when generator consensus runs (EQUAL/MOST_VALID_DOCS). Kept - * small because consensus fetches from every replica and EQUAL also carries the serialized bitmap in each entry. - * Shared by UpsertCompactionTask and UpsertCompactMergeTask. - */ - public static final String VALID_DOC_IDS_CONSENSUS_FETCH_BATCH_SIZE_KEY = "validDocIdsConsensusFetchBatchSize"; + /** @deprecated moved to {@link MinionConstants#VALID_DOC_IDS_CONSENSUS_MODE_KEY}. */ + @Deprecated + public static final String VALID_DOC_IDS_CONSENSUS_MODE_KEY = MinionConstants.VALID_DOC_IDS_CONSENSUS_MODE_KEY; - /** Default consensus fetch batch size, small since all replicas respond and EQUAL includes the bitmap. */ - public static final int DEFAULT_VALID_DOC_IDS_CONSENSUS_FETCH_BATCH_SIZE = 10; + /** @deprecated moved to {@link MinionConstants#DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE}. */ + @Deprecated + public static final String DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE = + MinionConstants.DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE; } public static class UpsertCompactMergeTask { diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java index ac13521ff33a..929171a0f117 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java @@ -112,8 +112,8 @@ public static int resolveValidDocIdsFetchBatchSize(Map taskConfi return regularBatchSize; } return Integer.parseInt(taskConfigs.getOrDefault( - MinionConstants.UpsertCompactionTask.VALID_DOC_IDS_CONSENSUS_FETCH_BATCH_SIZE_KEY, - String.valueOf(MinionConstants.UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_CONSENSUS_FETCH_BATCH_SIZE))); + MinionConstants.VALID_DOC_IDS_CONSENSUS_FETCH_BATCH_SIZE_KEY, + String.valueOf(MinionConstants.DEFAULT_VALID_DOC_IDS_CONSENSUS_FETCH_BATCH_SIZE))); } private static final String DEFAULT_DIR_PATH_TERMINATOR = "/"; diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java index ce97909e58c5..675b672a5911 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java @@ -79,9 +79,9 @@ protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File Map taskConfigs = tableConfig.getTaskConfig() != null ? tableConfig.getTaskConfig().getConfigsForTaskType(taskType) : null; String consensusMode = - taskConfigs != null ? taskConfigs.getOrDefault(UpsertCompactionTask.VALID_DOC_IDS_CONSENSUS_MODE_KEY, - UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE) - : UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE; + taskConfigs != null ? taskConfigs.getOrDefault(MinionConstants.VALID_DOC_IDS_CONSENSUS_MODE_KEY, + MinionConstants.DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE) + : MinionConstants.DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE; RoaringBitmap validDocIds = MinionTaskUtils.getValidDocIdFromServerMatchingCrc(tableNameWithType, segmentName, validDocIdsTypeStr, MINION_CONTEXT, originalSegmentCrcFromTaskGenerator, consensusMode); diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java index 61c0fb5c223e..5db9f2529d7a 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java @@ -150,11 +150,11 @@ public List generateTasks(List tableConfigs) { // only needed for EQUAL consensus, so fetch them only then to keep the payload small. MinionConstants.ValidDocIdsConsensusMode consensusMode = MinionTaskUtils.resolveGeneratorConsensusMode( MinionTaskUtils.parseValidDocIdsConsensusMode( - taskConfigs.getOrDefault(UpsertCompactionTask.VALID_DOC_IDS_CONSENSUS_MODE_KEY, - UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE)), + taskConfigs.getOrDefault(MinionConstants.VALID_DOC_IDS_CONSENSUS_MODE_KEY, + MinionConstants.DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE)), MinionTaskUtils.parseValidDocIdsValidationMode( - taskConfigs.getOrDefault(UpsertCompactionTask.VALID_DOC_IDS_VALIDATION_MODE_KEY, - UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_VALIDATION_MODE))); + taskConfigs.getOrDefault(MinionConstants.VALID_DOC_IDS_VALIDATION_MODE_KEY, + MinionConstants.DEFAULT_VALID_DOC_IDS_VALIDATION_MODE))); boolean includeBitmaps = consensusMode == MinionConstants.ValidDocIdsConsensusMode.EQUAL; // Number of segments to query per server request. If a table has a lot of segments, then we might send a diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutor.java index c6847ce321a0..81ea750b1bfd 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutor.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutor.java @@ -112,9 +112,9 @@ protected List convert(PinotTaskConfig pinotTaskConfig, Map taskConfigs = tableConfig.getTaskConfig() != null ? tableConfig.getTaskConfig().getConfigsForTaskType(taskType) : null; String consensusMode = taskConfigs != null ? taskConfigs.getOrDefault( - MinionConstants.UpsertCompactionTask.VALID_DOC_IDS_CONSENSUS_MODE_KEY, - MinionConstants.UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE) - : MinionConstants.UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE; + MinionConstants.VALID_DOC_IDS_CONSENSUS_MODE_KEY, + MinionConstants.DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE) + : MinionConstants.DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE; List recordReaders = segmentMetadataList.stream().map(x -> { RoaringBitmap validDocIds = MinionTaskUtils.getValidDocIdFromServerMatchingCrc(tableNameWithType, x.getName(), diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java index a668fb3b9284..f44e7146cc5d 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java @@ -183,11 +183,11 @@ public List generateTasks(List tableConfigs) { // EQUAL consensus, so fetch them only then to keep the payload small. MinionConstants.ValidDocIdsConsensusMode consensusMode = MinionTaskUtils.resolveGeneratorConsensusMode( MinionTaskUtils.parseValidDocIdsConsensusMode( - taskConfigs.getOrDefault(MinionConstants.UpsertCompactionTask.VALID_DOC_IDS_CONSENSUS_MODE_KEY, - MinionConstants.UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE)), + taskConfigs.getOrDefault(MinionConstants.VALID_DOC_IDS_CONSENSUS_MODE_KEY, + MinionConstants.DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE)), MinionTaskUtils.parseValidDocIdsValidationMode( - taskConfigs.getOrDefault(MinionConstants.UpsertCompactionTask.VALID_DOC_IDS_VALIDATION_MODE_KEY, - MinionConstants.UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_VALIDATION_MODE))); + taskConfigs.getOrDefault(MinionConstants.VALID_DOC_IDS_VALIDATION_MODE_KEY, + MinionConstants.DEFAULT_VALID_DOC_IDS_VALIDATION_MODE))); boolean includeBitmaps = consensusMode == MinionConstants.ValidDocIdsConsensusMode.EQUAL; // Number of segments to query per server request. If a table has a lot of segments, then we might send a diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java index 27e2753ae9e6..e6cb6f5d9e98 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java @@ -370,7 +370,7 @@ public void testResolveGeneratorConsensusMode() { public void testResolveValidDocIdsFetchBatchSize() { // UNSAFE keeps the regular (no-bitmap) batch and ignores the consensus key. Map withConsensusKey = Map.of( - MinionConstants.UpsertCompactionTask.VALID_DOC_IDS_CONSENSUS_FETCH_BATCH_SIZE_KEY, "7"); + MinionConstants.VALID_DOC_IDS_CONSENSUS_FETCH_BATCH_SIZE_KEY, "7"); assertEquals(MinionTaskUtils.resolveValidDocIdsFetchBatchSize(withConsensusKey, MinionConstants.ValidDocIdsConsensusMode.UNSAFE, 500), 500); @@ -380,7 +380,7 @@ public void testResolveValidDocIdsFetchBatchSize() { assertEquals(MinionTaskUtils.resolveValidDocIdsFetchBatchSize(withConsensusKey, mode, 500), 7); // Falls back to the small default when the consensus key is absent (does not inherit the regular batch). assertEquals(MinionTaskUtils.resolveValidDocIdsFetchBatchSize(Map.of(), mode, 500), - MinionConstants.UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_CONSENSUS_FETCH_BATCH_SIZE); + MinionConstants.DEFAULT_VALID_DOC_IDS_CONSENSUS_FETCH_BATCH_SIZE); } } From 018b58dfa52b8edb3c069605fdfc3f4a65784229 Mon Sep 17 00:00:00 2001 From: Chaitanya Deepthi Date: Mon, 29 Jun 2026 10:27:15 -0700 Subject: [PATCH 8/8] Use valid doc count for generator EQUAL consensus; drop bitmap fetch The generator-side EQUAL consensus check now requires every replica to report the same valid doc count instead of comparing full validDocIds bitmaps. Comparing counts avoids serializing a RoaringBitmap per replica back to the controller, which is expensive for large upsert tables. The executor remains the authoritative gate and still verifies byte-identical bitmaps before compacting, so a count match that hides a set difference is scheduled-then- failed there rather than mis-compacted. This rolls back the now-unnecessary generator bitmap-fetch machinery added earlier on this branch (all unreleased): the includeBitmaps validDocIdsMetadata endpoint param, the ValidDocIdsMetadataInfo bitmap field, the ServerSegmentMetadataReader overload, and the validDocIdsConsensusFetchBatchSize config knob. Generators go back to the regular per-server fetch batch. --- .../resources/ValidDocIdsMetadataInfo.java | 32 +----------- .../util/ServerSegmentMetadataReader.java | 33 ++---------- .../pinot/core/common/MinionConstants.java | 7 --- .../plugin/minion/tasks/MinionTaskUtils.java | 43 +++------------ .../UpsertCompactionTaskGenerator.java | 15 ++---- .../UpsertCompactMergeTaskGenerator.java | 15 ++---- .../minion/tasks/MinionTaskUtilsTest.java | 18 ------- .../UpsertCompactionTaskGeneratorTest.java | 52 ++++++------------- .../UpsertCompactMergeTaskGeneratorTest.java | 42 ++++++--------- .../server/api/resources/TablesResource.java | 14 +---- .../pinot/server/api/TablesResourceTest.java | 32 ------------ 11 files changed, 52 insertions(+), 251 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdsMetadataInfo.java b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdsMetadataInfo.java index 197a3ee3b0ba..6fd42c86f659 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdsMetadataInfo.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdsMetadataInfo.java @@ -18,11 +18,8 @@ */ package org.apache.pinot.common.restlet.resources; -import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; -import javax.annotation.Nullable; import org.apache.pinot.common.utils.ServiceStatus; @@ -38,29 +35,14 @@ public class ValidDocIdsMetadataInfo { private final long _segmentCreationTimeMillis; private final String _instanceId; private final ServiceStatus.Status _serverStatus; - // Optional serialized RoaringBitmap of the validDocIds for the segment. Populated only when the caller of the - // batched validDocIdsMetadata endpoint passes includeBitmaps=true. Null when omitted or when the responding - // server is older and doesn't recognise the flag. Field is JsonInclude.NON_NULL on the getter so payloads stay - // small when bitmaps are not requested. - @Nullable - private final byte[] _bitmap; - public ValidDocIdsMetadataInfo(String segmentName, long totalValidDocs, long totalInvalidDocs, long totalDocs, - String segmentCrc, ValidDocIdsType validDocIdsType, long segmentSizeInBytes, long segmentCreationTimeMillis, - String instanceId, ServiceStatus.Status serverStatus) { - this(segmentName, totalValidDocs, totalInvalidDocs, totalDocs, segmentCrc, validDocIdsType, segmentSizeInBytes, - segmentCreationTimeMillis, instanceId, serverStatus, null); - } - - @JsonCreator public ValidDocIdsMetadataInfo(@JsonProperty("segmentName") String segmentName, @JsonProperty("totalValidDocs") long totalValidDocs, @JsonProperty("totalInvalidDocs") long totalInvalidDocs, @JsonProperty("totalDocs") long totalDocs, @JsonProperty("segmentCrc") String segmentCrc, @JsonProperty("validDocIdsType") ValidDocIdsType validDocIdsType, @JsonProperty("segmentSizeInBytes") long segmentSizeInBytes, @JsonProperty("segmentCreationTimeMillis") long segmentCreationTimeMillis, - @JsonProperty("instanceId") String instanceId, @JsonProperty("serverStatus") ServiceStatus.Status serverStatus, - @JsonProperty("bitmap") @Nullable byte[] bitmap) { + @JsonProperty("instanceId") String instanceId, @JsonProperty("serverStatus") ServiceStatus.Status serverStatus) { _segmentName = segmentName; _totalValidDocs = totalValidDocs; _totalInvalidDocs = totalInvalidDocs; @@ -71,7 +53,6 @@ public ValidDocIdsMetadataInfo(@JsonProperty("segmentName") String segmentName, _segmentCreationTimeMillis = segmentCreationTimeMillis; _instanceId = instanceId; _serverStatus = serverStatus; - _bitmap = bitmap; } public String getSegmentName() { @@ -113,15 +94,4 @@ public String getInstanceId() { public ServiceStatus.Status getServerStatus() { return _serverStatus; } - - /** - * Returns the serialized RoaringBitmap of validDocIds for the segment, or null when not requested by the caller - * (or the responding server is older and doesn't emit it). Callers can deserialize via - * {@code RoaringBitmapUtils.deserialize}. - */ - @JsonInclude(JsonInclude.Include.NON_NULL) - @Nullable - public byte[] getBitmap() { - return _bitmap; - } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java index 7a71dca7b67d..a1669a2882b8 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java @@ -282,20 +282,6 @@ public Map> getSegmentToValidDocIdsMetadat Map> serverToSegmentsMap, BiMap serverToEndpoints, @Nullable List segmentNames, int timeoutMs, String validDocIdsType, int numSegmentsBatchPerServerRequest) { - return getSegmentToValidDocIdsMetadataFromServer(tableNameWithType, serverToSegmentsMap, serverToEndpoints, - segmentNames, timeoutMs, validDocIdsType, numSegmentsBatchPerServerRequest, false); - } - - /** - * Overload that also lets the caller request the serialized validDocIds bitmap in each per-segment response entry. - * When {@code includeBitmaps} is true, every {@link ValidDocIdsMetadataInfo} returned by the server includes its - * bitmap bytes (see {@link ValidDocIdsMetadataInfo#getBitmap()}). Use sparingly — the response payload grows with - * the total bitmap size across the requested segments. - */ - public Map> getSegmentToValidDocIdsMetadataFromServer(String tableNameWithType, - Map> serverToSegmentsMap, BiMap serverToEndpoints, - @Nullable List segmentNames, int timeoutMs, String validDocIdsType, - int numSegmentsBatchPerServerRequest, boolean includeBitmaps) { List> serverURLsAndBodies = new ArrayList<>(); for (Map.Entry> serverToSegments : serverToSegmentsMap.entrySet()) { List segmentsForServer = serverToSegments.getValue(); @@ -315,7 +301,7 @@ public Map> getSegmentToValidDocIdsMetadat // huge payload to pinot-server in request. Batching the requests will help in reducing the payload size. Lists.partition(segmentsToQuery, numSegmentsBatchPerServerRequest).forEach(segmentsToQueryBatch -> serverURLsAndBodies.add(generateValidDocIdsMetadataURL(tableNameWithType, segmentsToQueryBatch, - validDocIdsType, serverToEndpoints.get(serverToSegments.getKey()), includeBitmaps))); + validDocIdsType, serverToEndpoints.get(serverToSegments.getKey())))); } BiMap endpointsToServers = serverToEndpoints.inverse(); @@ -496,11 +482,6 @@ private String generateValidDocIdsBitmapURL(String tableNameWithType, String seg private Pair generateValidDocIdsMetadataURL(String tableNameWithType, List segmentNames, String validDocIdsType, String endpoint) { - return generateValidDocIdsMetadataURL(tableNameWithType, segmentNames, validDocIdsType, endpoint, false); - } - - private Pair generateValidDocIdsMetadataURL(String tableNameWithType, List segmentNames, - String validDocIdsType, String endpoint, boolean includeBitmaps) { tableNameWithType = encode(tableNameWithType); TableSegments tableSegments = new TableSegments(segmentNames); String jsonTableSegments; @@ -510,17 +491,11 @@ private Pair generateValidDocIdsMetadataURL(String tableNameWith LOGGER.error("Failed to convert segment names to json request body: segmentNames={}", segmentNames); throw new RuntimeException(e); } - StringBuilder url = new StringBuilder( - String.format("%s/tables/%s/validDocIdsMetadata", endpoint, tableNameWithType)); - String separator = "?"; + String url = String.format("%s/tables/%s/validDocIdsMetadata", endpoint, tableNameWithType); if (validDocIdsType != null) { - url.append(separator).append("validDocIdsType=").append(validDocIdsType); - separator = "&"; - } - if (includeBitmaps) { - url.append(separator).append("includeBitmaps=true"); + url = url + "?validDocIdsType=" + validDocIdsType; } - return Pair.of(url.toString(), jsonTableSegments); + return Pair.of(url, jsonTableSegments); } private String generateStaleSegmentsServerURL(String tableNameWithType, String endpoint) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java index abe04a29a45b..76e6729df94b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java @@ -121,13 +121,6 @@ public enum ValidDocIdsValidationMode { public static final String VALID_DOC_IDS_VALIDATION_MODE_KEY = "validDocIdsValidationMode"; public static final String DEFAULT_VALID_DOC_IDS_VALIDATION_MODE = "STRICT"; - /** - * Per-server batch size for the validDocIds fetch when generator consensus runs (EQUAL/MOST_VALID_DOCS). Kept small - * because consensus fetches from every replica and EQUAL also carries the serialized bitmap in each entry. - */ - public static final String VALID_DOC_IDS_CONSENSUS_FETCH_BATCH_SIZE_KEY = "validDocIdsConsensusFetchBatchSize"; - public static final int DEFAULT_VALID_DOC_IDS_CONSENSUS_FETCH_BATCH_SIZE = 10; - // Purges rows inside segment that match chosen criteria public static class PurgeTask { public static final String TASK_TYPE = "PurgeTask"; diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java index 929171a0f117..17549b7b89ec 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java @@ -101,21 +101,6 @@ public static MinionConstants.ValidDocIdsConsensusMode resolveGeneratorConsensus ? MinionConstants.ValidDocIdsConsensusMode.UNSAFE : consensusMode; } - /** - * Resolves the per-server batch size for the generator's validDocIds fetch. UNSAFE keeps {@code regularBatchSize} - * (the no-bitmap fetch). The consensus modes fetch from every replica (and EQUAL also carries a bitmap per entry), - * so they use the smaller {@code validDocIdsConsensusFetchBatchSize} to keep the payload bounded. - */ - public static int resolveValidDocIdsFetchBatchSize(Map taskConfigs, - MinionConstants.ValidDocIdsConsensusMode consensusMode, int regularBatchSize) { - if (consensusMode == MinionConstants.ValidDocIdsConsensusMode.UNSAFE) { - return regularBatchSize; - } - return Integer.parseInt(taskConfigs.getOrDefault( - MinionConstants.VALID_DOC_IDS_CONSENSUS_FETCH_BATCH_SIZE_KEY, - String.valueOf(MinionConstants.DEFAULT_VALID_DOC_IDS_CONSENSUS_FETCH_BATCH_SIZE))); - } - private static final String DEFAULT_DIR_PATH_TERMINATOR = "/"; public static final String DATETIME_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; @@ -485,7 +470,9 @@ public static Map getSegmentToReplicaCount(Map generateTasks(List tableConfigs) { UpsertCompactionTask.VALID_DOC_IDS_TYPE); // Validate replicas before scheduling, matching the executor's checks, so inconsistent segments are never - // scheduled. With EXECUTOR_ONLY the generator skips these checks (the executor stays the gate). Bitmaps are - // only needed for EQUAL consensus, so fetch them only then to keep the payload small. + // scheduled. With EXECUTOR_ONLY the generator skips these checks (the executor stays the gate). MinionConstants.ValidDocIdsConsensusMode consensusMode = MinionTaskUtils.resolveGeneratorConsensusMode( MinionTaskUtils.parseValidDocIdsConsensusMode( taskConfigs.getOrDefault(MinionConstants.VALID_DOC_IDS_CONSENSUS_MODE_KEY, @@ -155,22 +154,16 @@ public List generateTasks(List tableConfigs) { MinionTaskUtils.parseValidDocIdsValidationMode( taskConfigs.getOrDefault(MinionConstants.VALID_DOC_IDS_VALIDATION_MODE_KEY, MinionConstants.DEFAULT_VALID_DOC_IDS_VALIDATION_MODE))); - boolean includeBitmaps = consensusMode == MinionConstants.ValidDocIdsConsensusMode.EQUAL; // Number of segments to query per server request. If a table has a lot of segments, then we might send a - // huge payload to pinot-server in request. Batching the requests will help in reducing the payload size. The - // consensus modes use a smaller batch (validDocIdsConsensusFetchBatchSize) since they fetch from every replica - // and EQUAL also carries a bitmap per entry; UNSAFE keeps the regular batch. - int regularBatchPerServerRequest = Integer.parseInt( + // huge payload to pinot-server in request. Batching the requests will help in reducing the payload size. + int numSegmentsBatchPerServerRequest = Integer.parseInt( taskConfigs.getOrDefault(UpsertCompactionTask.NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST, String.valueOf(DEFAULT_NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST))); - int numSegmentsBatchPerServerRequest = - MinionTaskUtils.resolveValidDocIdsFetchBatchSize(taskConfigs, consensusMode, regularBatchPerServerRequest); Map> validDocIdsMetadataList = serverSegmentMetadataReader.getSegmentToValidDocIdsMetadataFromServer(tableNameWithType, serverToSegments, - serverToEndpoints, null, 60_000, validDocIdsType.toString(), numSegmentsBatchPerServerRequest, - includeBitmaps); + serverToEndpoints, null, 60_000, validDocIdsType.toString(), numSegmentsBatchPerServerRequest); Map completedSegmentsMap = completedSegments.stream().collect(Collectors.toMap(SegmentZKMetadata::getSegmentName, Function.identity())); diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java index f44e7146cc5d..da1daaca1cdc 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java @@ -179,8 +179,7 @@ public List generateTasks(List tableConfigs) { _clusterInfoAccessor.getConnectionManager()); // Reuse the compaction task's consensus-mode and validation-mode config so both tasks behave the same. With - // EXECUTOR_ONLY the generator skips these checks (the executor stays the gate). Bitmaps are only needed for - // EQUAL consensus, so fetch them only then to keep the payload small. + // EXECUTOR_ONLY the generator skips these checks (the executor stays the gate). MinionConstants.ValidDocIdsConsensusMode consensusMode = MinionTaskUtils.resolveGeneratorConsensusMode( MinionTaskUtils.parseValidDocIdsConsensusMode( taskConfigs.getOrDefault(MinionConstants.VALID_DOC_IDS_CONSENSUS_MODE_KEY, @@ -188,22 +187,16 @@ public List generateTasks(List tableConfigs) { MinionTaskUtils.parseValidDocIdsValidationMode( taskConfigs.getOrDefault(MinionConstants.VALID_DOC_IDS_VALIDATION_MODE_KEY, MinionConstants.DEFAULT_VALID_DOC_IDS_VALIDATION_MODE))); - boolean includeBitmaps = consensusMode == MinionConstants.ValidDocIdsConsensusMode.EQUAL; // Number of segments to query per server request. If a table has a lot of segments, then we might send a - // huge payload to pinot-server in request. Batching the requests will help in reducing the payload size. The - // consensus modes use a smaller batch (validDocIdsConsensusFetchBatchSize) since they fetch from every replica - // and EQUAL also carries a bitmap per entry; UNSAFE keeps the regular batch. - int regularBatchPerServerRequest = Integer.parseInt( + // huge payload to pinot-server in request. Batching the requests will help in reducing the payload size. + int numSegmentsBatchPerServerRequest = Integer.parseInt( taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST, String.valueOf(DEFAULT_NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST))); - int numSegmentsBatchPerServerRequest = - MinionTaskUtils.resolveValidDocIdsFetchBatchSize(taskConfigs, consensusMode, regularBatchPerServerRequest); Map> validDocIdsMetadataList = serverSegmentMetadataReader.getSegmentToValidDocIdsMetadataFromServer(tableNameWithType, serverToSegments, - serverToEndpoints, null, 60_000, ValidDocIdsType.SNAPSHOT.toString(), numSegmentsBatchPerServerRequest, - includeBitmaps); + serverToEndpoints, null, 60_000, ValidDocIdsType.SNAPSHOT.toString(), numSegmentsBatchPerServerRequest); Map candidateSegmentsMap = candidateSegments.stream().collect(Collectors.toMap(SegmentZKMetadata::getSegmentName, Function.identity())); diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java index e6cb6f5d9e98..29c407c007f4 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java @@ -366,24 +366,6 @@ public void testResolveGeneratorConsensusMode() { } } - @Test - public void testResolveValidDocIdsFetchBatchSize() { - // UNSAFE keeps the regular (no-bitmap) batch and ignores the consensus key. - Map withConsensusKey = Map.of( - MinionConstants.VALID_DOC_IDS_CONSENSUS_FETCH_BATCH_SIZE_KEY, "7"); - assertEquals(MinionTaskUtils.resolveValidDocIdsFetchBatchSize(withConsensusKey, - MinionConstants.ValidDocIdsConsensusMode.UNSAFE, 500), 500); - - // The consensus modes use the configured consensus batch size. - for (MinionConstants.ValidDocIdsConsensusMode mode : List.of( - MinionConstants.ValidDocIdsConsensusMode.EQUAL, MinionConstants.ValidDocIdsConsensusMode.MOST_VALID_DOCS)) { - assertEquals(MinionTaskUtils.resolveValidDocIdsFetchBatchSize(withConsensusKey, mode, 500), 7); - // Falls back to the small default when the consensus key is absent (does not inherit the regular batch). - assertEquals(MinionTaskUtils.resolveValidDocIdsFetchBatchSize(Map.of(), mode, 500), - MinionConstants.DEFAULT_VALID_DOC_IDS_CONSENSUS_FETCH_BATCH_SIZE); - } - } - /** * Builds a RoaringBitmap with {@code numDocs} valid doc ids (0..numDocs-1). */ diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java index 77f32570d82c..8383ae932f09 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java @@ -29,7 +29,6 @@ import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo; import org.apache.pinot.common.restlet.resources.ValidDocIdsType; -import org.apache.pinot.common.utils.RoaringBitmapUtils; import org.apache.pinot.common.utils.ServiceStatus; import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor; import org.apache.pinot.core.common.MinionConstants; @@ -45,7 +44,6 @@ import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.TimeUtils; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; -import org.roaringbitmap.RoaringBitmap; import org.testng.Assert; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -338,19 +336,19 @@ public void testProcessValidDocIdsMetadataConsensus() { // Both replicas are expected to respond. Map twoReplicas = Map.of(segmentName, 2); - // EQUAL: replicas agree (identical bitmaps), so the segment is compacted. + // EQUAL: replicas agree on the valid doc count, so the segment is compacted. Map> equalReplicas = Map.of(segmentName, List.of( - metaWithBitmap(segmentName, 50, 50, 100, crc, ServiceStatus.Status.GOOD, "server1", range(0, 50)), - metaWithBitmap(segmentName, 50, 50, 100, crc, ServiceStatus.Status.GOOD, "server2", range(0, 50)))); + meta(segmentName, 50, 50, 100, crc, ServiceStatus.Status.GOOD, "server1"), + meta(segmentName, 50, 50, 100, crc, ServiceStatus.Status.GOOD, "server2"))); UpsertCompactionTaskGenerator.SegmentSelectionResult result = UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, _completedSegmentsMap, equalReplicas, twoReplicas, MinionConstants.ValidDocIdsConsensusMode.EQUAL); assertEquals(result.getSegmentsForCompaction().size(), 1); - // EQUAL: replicas disagree (different bitmaps), so the segment is skipped entirely. + // EQUAL: replicas disagree on the valid doc count, so the segment is skipped entirely. Map> unequalReplicas = Map.of(segmentName, List.of( - metaWithBitmap(segmentName, 50, 50, 100, crc, ServiceStatus.Status.GOOD, "server1", range(0, 50)), - metaWithBitmap(segmentName, 50, 50, 100, crc, ServiceStatus.Status.GOOD, "server2", range(1, 51)))); + meta(segmentName, 50, 50, 100, crc, ServiceStatus.Status.GOOD, "server1"), + meta(segmentName, 60, 40, 100, crc, ServiceStatus.Status.GOOD, "server2"))); result = UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, _completedSegmentsMap, unequalReplicas, twoReplicas, MinionConstants.ValidDocIdsConsensusMode.EQUAL); assertTrue(result.getSegmentsForCompaction().isEmpty()); @@ -359,23 +357,23 @@ public void testProcessValidDocIdsMetadataConsensus() { // EQUAL: only one of the two assigned replicas responded, so consensus can't be confirmed and the segment is // skipped. Map> oneResponded = Map.of(segmentName, List.of( - metaWithBitmap(segmentName, 50, 50, 100, crc, ServiceStatus.Status.GOOD, "server1", range(0, 50)))); + meta(segmentName, 50, 50, 100, crc, ServiceStatus.Status.GOOD, "server1"))); result = UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, _completedSegmentsMap, oneResponded, twoReplicas, MinionConstants.ValidDocIdsConsensusMode.EQUAL); assertTrue(result.getSegmentsForCompaction().isEmpty()); // EQUAL: a CRC mismatch on any replica skips the segment. Map> crcMismatch = Map.of(segmentName, List.of( - metaWithBitmap(segmentName, 50, 50, 100, crc, ServiceStatus.Status.GOOD, "server1", range(0, 50)), - metaWithBitmap(segmentName, 50, 50, 100, crc + 1, ServiceStatus.Status.GOOD, "server2", range(0, 50)))); + meta(segmentName, 50, 50, 100, crc, ServiceStatus.Status.GOOD, "server1"), + meta(segmentName, 50, 50, 100, crc + 1, ServiceStatus.Status.GOOD, "server2"))); result = UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, _completedSegmentsMap, crcMismatch, twoReplicas, MinionConstants.ValidDocIdsConsensusMode.EQUAL); assertTrue(result.getSegmentsForCompaction().isEmpty()); // EQUAL: an unhealthy server skips the segment. Map> unhealthy = Map.of(segmentName, List.of( - metaWithBitmap(segmentName, 50, 50, 100, crc, ServiceStatus.Status.GOOD, "server1", range(0, 50)), - metaWithBitmap(segmentName, 50, 50, 100, crc, ServiceStatus.Status.STARTING, "server2", range(0, 50)))); + meta(segmentName, 50, 50, 100, crc, ServiceStatus.Status.GOOD, "server1"), + meta(segmentName, 50, 50, 100, crc, ServiceStatus.Status.STARTING, "server2"))); result = UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, _completedSegmentsMap, unhealthy, twoReplicas, MinionConstants.ValidDocIdsConsensusMode.EQUAL); assertTrue(result.getSegmentsForCompaction().isEmpty()); @@ -390,39 +388,21 @@ public void testProcessValidDocIdsMetadataConsensus() { crcMismatch, twoReplicas, MinionConstants.ValidDocIdsConsensusMode.MOST_VALID_DOCS); assertTrue(result.getSegmentsForCompaction().isEmpty()); - // EQUAL: a replica without a bitmap can't be verified, so the segment is skipped. - Map> missingBitmap = Map.of(segmentName, List.of( - new ValidDocIdsMetadataInfo(segmentName, 50, 50, 100, String.valueOf(crc), ValidDocIdsType.SNAPSHOT, 1000, - System.currentTimeMillis(), "server1", ServiceStatus.Status.GOOD))); - result = UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, _completedSegmentsMap, - missingBitmap, Map.of(segmentName, 1), MinionConstants.ValidDocIdsConsensusMode.EQUAL); - assertTrue(result.getSegmentsForCompaction().isEmpty()); - // MOST_VALID_DOCS: the replica with the most valid docs wins. Here that replica has zero invalid docs, so the // segment is neither compacted nor deleted - proving the other (all-invalid) replica was not chosen. Map> mostValidDocs = Map.of(segmentName, List.of( - metaWithBitmap(segmentName, 0, 100, 100, crc, ServiceStatus.Status.GOOD, "server1"), - metaWithBitmap(segmentName, 100, 0, 100, crc, ServiceStatus.Status.GOOD, "server2", range(0, 100)))); + meta(segmentName, 0, 100, 100, crc, ServiceStatus.Status.GOOD, "server1"), + meta(segmentName, 100, 0, 100, crc, ServiceStatus.Status.GOOD, "server2"))); result = UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, _completedSegmentsMap, mostValidDocs, twoReplicas, MinionConstants.ValidDocIdsConsensusMode.MOST_VALID_DOCS); assertTrue(result.getSegmentsForCompaction().isEmpty()); assertTrue(result.getSegmentsForDeletion().isEmpty()); } - private static ValidDocIdsMetadataInfo metaWithBitmap(String segmentName, long validDocs, long invalidDocs, - long totalDocs, long crc, ServiceStatus.Status serverStatus, String instanceId, int... validIds) { - RoaringBitmap bitmap = RoaringBitmap.bitmapOf(validIds); + private static ValidDocIdsMetadataInfo meta(String segmentName, long validDocs, long invalidDocs, long totalDocs, + long crc, ServiceStatus.Status serverStatus, String instanceId) { return new ValidDocIdsMetadataInfo(segmentName, validDocs, invalidDocs, totalDocs, String.valueOf(crc), - ValidDocIdsType.SNAPSHOT, 1000, System.currentTimeMillis(), instanceId, serverStatus, - RoaringBitmapUtils.serialize(bitmap)); - } - - private static int[] range(int fromInclusive, int toExclusive) { - int[] ids = new int[toExclusive - fromInclusive]; - for (int i = 0; i < ids.length; i++) { - ids[i] = fromInclusive + i; - } - return ids; + ValidDocIdsType.SNAPSHOT, 1000, System.currentTimeMillis(), instanceId, serverStatus); } @Test diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java index 2bbf58f76bf9..0680d0e1debc 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java @@ -29,7 +29,6 @@ import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo; import org.apache.pinot.common.restlet.resources.ValidDocIdsType; -import org.apache.pinot.common.utils.RoaringBitmapUtils; import org.apache.pinot.common.utils.ServiceStatus; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor; @@ -53,7 +52,6 @@ import org.mockito.MockedStatic; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; -import org.roaringbitmap.RoaringBitmap; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; @@ -438,25 +436,25 @@ public void testProcessValidDocIdsMetadataConsensus() { // EQUAL: replicas agree (both fully invalid, identical empty bitmaps), so the segment is processed and deleted. Map> agree = Map.of(segmentName, List.of( - metaWithBitmap(segmentName, 0, 100, 100, crc, ServiceStatus.Status.GOOD, "server1"), - metaWithBitmap(segmentName, 0, 100, 100, crc, ServiceStatus.Status.GOOD, "server2"))); + meta(segmentName, 0, 100, 100, crc, ServiceStatus.Status.GOOD, "server1"), + meta(segmentName, 0, 100, 100, crc, ServiceStatus.Status.GOOD, "server2"))); SegmentSelectionResult result = UpsertCompactMergeTaskGenerator.processValidDocIdsMetadata(RAW_TABLE_NAME, taskConfigs, candidateSegmentsMap, agree, noMerged, twoReplicas, MinionConstants.ValidDocIdsConsensusMode.EQUAL, null); Assert.assertTrue(result.getSegmentsForDeletion().contains(segmentName)); - // EQUAL: replicas disagree (different valid doc sets), so the segment is skipped. + // EQUAL: replicas disagree on the valid doc count, so the segment is skipped. Map> disagree = Map.of(segmentName, List.of( - metaWithBitmap(segmentName, 0, 100, 100, crc, ServiceStatus.Status.GOOD, "server1"), - metaWithBitmap(segmentName, 1, 99, 100, crc, ServiceStatus.Status.GOOD, "server2", 0))); + meta(segmentName, 0, 100, 100, crc, ServiceStatus.Status.GOOD, "server1"), + meta(segmentName, 1, 99, 100, crc, ServiceStatus.Status.GOOD, "server2"))); result = UpsertCompactMergeTaskGenerator.processValidDocIdsMetadata(RAW_TABLE_NAME, taskConfigs, candidateSegmentsMap, disagree, noMerged, twoReplicas, MinionConstants.ValidDocIdsConsensusMode.EQUAL, null); Assert.assertTrue(result.getSegmentsForDeletion().isEmpty()); // EQUAL: a CRC mismatch on any replica skips the segment. Map> crcMismatch = Map.of(segmentName, List.of( - metaWithBitmap(segmentName, 0, 100, 100, crc, ServiceStatus.Status.GOOD, "server1"), - metaWithBitmap(segmentName, 0, 100, 100, crc + 1, ServiceStatus.Status.GOOD, "server2"))); + meta(segmentName, 0, 100, 100, crc, ServiceStatus.Status.GOOD, "server1"), + meta(segmentName, 0, 100, 100, crc + 1, ServiceStatus.Status.GOOD, "server2"))); result = UpsertCompactMergeTaskGenerator.processValidDocIdsMetadata(RAW_TABLE_NAME, taskConfigs, candidateSegmentsMap, crcMismatch, noMerged, twoReplicas, MinionConstants.ValidDocIdsConsensusMode.EQUAL, null); @@ -464,8 +462,8 @@ public void testProcessValidDocIdsMetadataConsensus() { // EQUAL: an unhealthy server skips the segment. Map> unhealthy = Map.of(segmentName, List.of( - metaWithBitmap(segmentName, 0, 100, 100, crc, ServiceStatus.Status.GOOD, "server1"), - metaWithBitmap(segmentName, 0, 100, 100, crc, ServiceStatus.Status.STARTING, "server2"))); + meta(segmentName, 0, 100, 100, crc, ServiceStatus.Status.GOOD, "server1"), + meta(segmentName, 0, 100, 100, crc, ServiceStatus.Status.STARTING, "server2"))); result = UpsertCompactMergeTaskGenerator.processValidDocIdsMetadata(RAW_TABLE_NAME, taskConfigs, candidateSegmentsMap, unhealthy, noMerged, twoReplicas, MinionConstants.ValidDocIdsConsensusMode.EQUAL, null); Assert.assertTrue(result.getSegmentsForDeletion().isEmpty()); @@ -479,8 +477,8 @@ public void testProcessValidDocIdsMetadataConsensus() { // MOST_VALID_DOCS: the replica with the most valid docs wins, so the fully-valid replica is chosen and the // segment is not deleted (proving the all-invalid replica was not picked). Map> mostValidDocs = Map.of(segmentName, List.of( - metaWithBitmap(segmentName, 0, 100, 100, crc, ServiceStatus.Status.GOOD, "server1"), - metaWithBitmap(segmentName, 100, 0, 100, crc, ServiceStatus.Status.GOOD, "server2", range(0, 100)))); + meta(segmentName, 0, 100, 100, crc, ServiceStatus.Status.GOOD, "server1"), + meta(segmentName, 100, 0, 100, crc, ServiceStatus.Status.GOOD, "server2"))); result = UpsertCompactMergeTaskGenerator.processValidDocIdsMetadata(RAW_TABLE_NAME, taskConfigs, candidateSegmentsMap, mostValidDocs, noMerged, twoReplicas, MinionConstants.ValidDocIdsConsensusMode.MOST_VALID_DOCS, null); @@ -489,27 +487,17 @@ public void testProcessValidDocIdsMetadataConsensus() { // EQUAL: only one of the two assigned replicas responded, so consensus can't be confirmed and the segment is // skipped. Map> oneResponded = Map.of(segmentName, List.of( - metaWithBitmap(segmentName, 0, 100, 100, crc, ServiceStatus.Status.GOOD, "server1"))); + meta(segmentName, 0, 100, 100, crc, ServiceStatus.Status.GOOD, "server1"))); result = UpsertCompactMergeTaskGenerator.processValidDocIdsMetadata(RAW_TABLE_NAME, taskConfigs, candidateSegmentsMap, oneResponded, noMerged, twoReplicas, MinionConstants.ValidDocIdsConsensusMode.EQUAL, null); Assert.assertTrue(result.getSegmentsForDeletion().isEmpty()); } - private static ValidDocIdsMetadataInfo metaWithBitmap(String segmentName, long validDocs, long invalidDocs, - long totalDocs, long crc, ServiceStatus.Status serverStatus, String instanceId, int... validIds) { - RoaringBitmap bitmap = RoaringBitmap.bitmapOf(validIds); + private static ValidDocIdsMetadataInfo meta(String segmentName, long validDocs, long invalidDocs, long totalDocs, + long crc, ServiceStatus.Status serverStatus, String instanceId) { return new ValidDocIdsMetadataInfo(segmentName, validDocs, invalidDocs, totalDocs, String.valueOf(crc), - ValidDocIdsType.SNAPSHOT, 1000, System.currentTimeMillis(), instanceId, serverStatus, - RoaringBitmapUtils.serialize(bitmap)); - } - - private static int[] range(int fromInclusive, int toExclusive) { - int[] ids = new int[toExclusive - fromInclusive]; - for (int i = 0; i < ids.length; i++) { - ids[i] = fromInclusive + i; - } - return ids; + ValidDocIdsType.SNAPSHOT, 1000, System.currentTimeMillis(), instanceId, serverStatus); } /** diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java index ce66e1cf6c5b..013e4a3588ac 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java @@ -674,24 +674,15 @@ public String getValidDocIdsMetadata( @ApiParam(value = "Table name including type", required = true, example = "myTable_REALTIME") @PathParam("tableNameWithType") String tableNameWithType, @ApiParam(value = "Valid doc ids type") @QueryParam("validDocIdsType") String validDocIdsType, - @ApiParam(value = "Whether to include the serialized validDocIds bitmap in each per-segment response entry. " - + "When true the response payload grows linearly with the number of valid doc bitmaps, so callers should " - + "request bitmaps only for the set of segments they actually need cross-replica consensus for.") - @QueryParam("includeBitmaps") @DefaultValue("false") boolean includeBitmaps, TableSegments tableSegments, @Context HttpHeaders headers) { tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType, headers); List segmentNames = tableSegments.getSegments(); return ResourceUtils.convertToJsonString( - processValidDocIdsMetadata(tableNameWithType, segmentNames, validDocIdsType, includeBitmaps)); + processValidDocIdsMetadata(tableNameWithType, segmentNames, validDocIdsType)); } private List> processValidDocIdsMetadata(String tableNameWithType, List segments, String validDocIdsType) { - return processValidDocIdsMetadata(tableNameWithType, segments, validDocIdsType, false); - } - - private List> processValidDocIdsMetadata(String tableNameWithType, List segments, - String validDocIdsType, boolean includeBitmaps) { TableDataManager tableDataManager = ServerResourceUtils.checkGetTableDataManager(_serverInstance, tableNameWithType); List missingSegments = new ArrayList<>(); @@ -764,9 +755,6 @@ private List> processValidDocIdsMetadata(String tableNameWit ((ImmutableSegment) segmentDataManager.getSegment()).getSegmentSizeBytes()); } validDocIdsMetadata.put("segmentCreationTimeMillis", indexSegment.getSegmentMetadata().getIndexCreationTime()); - if (includeBitmaps) { - validDocIdsMetadata.put("bitmap", RoaringBitmapUtils.serialize(validDocIdsSnapshot)); - } allValidDocIdsMetadata.add(validDocIdsMetadata); } if (nonImmutableSegmentCount > 0) { diff --git a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java index 8e42909fd3d1..f43c2f52875f 100644 --- a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java +++ b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java @@ -399,9 +399,6 @@ public void testValidDocIdsMetadataPost() ((ImmutableSegmentImpl) segment).getSegmentSizeBytes()); Assert.assertTrue(validDocIdsMetadata.has("segmentCreationTimeMillis")); Assert.assertTrue(validDocIdsMetadata.get("segmentCreationTimeMillis").asLong() > 0); - // bitmap field is omitted by default (includeBitmaps was not requested). - Assert.assertFalse(validDocIdsMetadata.has("bitmap"), - "Bitmap should not be included by default to keep response payloads small"); // Verify server status information Assert.assertTrue(validDocIdsMetadata.has("serverStatus"), "Server status should be included in response"); @@ -410,35 +407,6 @@ public void testValidDocIdsMetadataPost() Assert.assertEquals(serverStatus, "NOT_STARTED", serverStatus); } - @Test - public void testValidDocIdsMetadataPostWithIncludeBitmaps() - throws IOException { - IndexSegment segment = _realtimeIndexSegments.get(0); - - List segments = List.of(segment.getSegmentName()); - TableSegments tableSegments = new TableSegments(segments); - String validDocIdsMetadataPath = "/tables/" + REALTIME_TABLE_NAME + "/validDocIdsMetadata"; - String response = _webTarget.path(validDocIdsMetadataPath) - .queryParam("includeBitmaps", "true") - .request() - .post(Entity.json(tableSegments), String.class); - JsonNode validDocIdsMetadata = JsonUtils.stringToJsonNode(response).get(0); - - // The metadata counts are still emitted alongside the bitmap. - Assert.assertEquals(validDocIdsMetadata.get("totalValidDocs").asInt(), 8); - Assert.assertEquals(validDocIdsMetadata.get("totalDocs").asInt(), 200000); - - // The bitmap should now be present and its cardinality must match totalValidDocs. - Assert.assertTrue(validDocIdsMetadata.has("bitmap"), - "Bitmap field should be included when includeBitmaps=true"); - byte[] bitmapBytes = validDocIdsMetadata.get("bitmap").binaryValue(); - Assert.assertNotNull(bitmapBytes); - org.roaringbitmap.RoaringBitmap bitmap = new org.roaringbitmap.RoaringBitmap(); - bitmap.deserialize(java.nio.ByteBuffer.wrap(bitmapBytes)); - Assert.assertEquals(bitmap.getCardinality(), 8, - "Deserialized bitmap cardinality must equal totalValidDocs"); - } - @Test public void testValidDocIdsMetadataPostForSnapshotWithDelete() throws IOException {