Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
5623fa9
Add includeBitmaps option to validDocIdsMetadata endpoint
deepthi912 Jun 25, 2026
cac3f5e
Add validDocIds replica consensus checks to upsert task generators
Jun 26, 2026
7d7bbcb
Reformat selectValidDocIdsMetadataForConsensus Javadoc for readability
Jun 27, 2026
353fe38
Use plain-text formatting for selectValidDocIdsMetadataForConsensus J…
Jun 27, 2026
bb8e52f
Drop expectedReplicaCount note from selectValidDocIdsMetadataForConse…
Jun 27, 2026
74f8d42
Bound the validDocIds consensus fetch batch in upsert generators
deepthi912 Jun 28, 2026
23658fe
Move shared validDocIds consensus constants to top-level MinionConstants
deepthi912 Jun 28, 2026
018b58d
Use valid doc count for generator EQUAL consensus; drop bitmap fetch
deepthi912 Jun 29, 2026
e58fadf
Match validDocIds CRC by data CRC, consistently in generator and exec…
deepthi912 Jun 29, 2026
e35845f
Keep validDocIds consensus constants in UpsertCompactionTask
deepthi912 Jun 29, 2026
653f7b2
Fix validDocIds endpoints NPE / per-segment ZK read for segmentDataCrc
deepthi912 Jun 29, 2026
347e158
Move segmentDataCrc field next to segmentCrc in the validDocIds respo…
deepthi912 Jun 29, 2026
8266083
Use Markdown (///) Javadoc style for the new validDocIds consensus me…
deepthi912 Jun 30, 2026
6695204
Gather expected replica count during validDocIds metadata read
deepthi912 Jun 30, 2026
8892b65
Address review: tidy validDocIds metadata DTOs and consensus helpers
deepthi912 Jun 30, 2026
136a2ca
Remove unneeded comment on replica-count tally
deepthi912 Jun 30, 2026
7e80b26
Remove verbose per-scenario comments from consensus generator tests
deepthi912 Jun 30, 2026
6e115f8
Address review comments on validDocIds DTOs and server data CRC
deepthi912 Jun 30, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
*/
package org.apache.pinot.common.restlet.resources;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.annotation.Nullable;
import org.apache.pinot.common.utils.ServiceStatus;


Expand All @@ -35,14 +38,29 @@ public class ValidDocIdsMetadataInfo {
private final long _segmentCreationTimeMillis;
private final String _instanceId;
private final ServiceStatus.Status _serverStatus;
// Optional serialized RoaringBitmap of the validDocIds for the segment. Populated only when the caller of the
// batched validDocIdsMetadata endpoint passes includeBitmaps=true. Null when omitted or when the responding
// server is older and doesn't recognise the flag. Field is JsonInclude.NON_NULL on the getter so payloads stay
// small when bitmaps are not requested.
@Nullable
private final byte[] _bitmap;

public ValidDocIdsMetadataInfo(String segmentName, long totalValidDocs, long totalInvalidDocs, long totalDocs,
Comment thread
deepthi912 marked this conversation as resolved.
Outdated
String segmentCrc, ValidDocIdsType validDocIdsType, long segmentSizeInBytes, long segmentCreationTimeMillis,
String instanceId, ServiceStatus.Status serverStatus) {
this(segmentName, totalValidDocs, totalInvalidDocs, totalDocs, segmentCrc, validDocIdsType, segmentSizeInBytes,
segmentCreationTimeMillis, instanceId, serverStatus, null);
}

@JsonCreator
public ValidDocIdsMetadataInfo(@JsonProperty("segmentName") String segmentName,
@JsonProperty("totalValidDocs") long totalValidDocs, @JsonProperty("totalInvalidDocs") long totalInvalidDocs,
@JsonProperty("totalDocs") long totalDocs, @JsonProperty("segmentCrc") String segmentCrc,
@JsonProperty("validDocIdsType") ValidDocIdsType validDocIdsType,
@JsonProperty("segmentSizeInBytes") long segmentSizeInBytes,
@JsonProperty("segmentCreationTimeMillis") long segmentCreationTimeMillis,
@JsonProperty("instanceId") String instanceId, @JsonProperty("serverStatus") ServiceStatus.Status serverStatus) {
@JsonProperty("instanceId") String instanceId, @JsonProperty("serverStatus") ServiceStatus.Status serverStatus,
@JsonProperty("bitmap") @Nullable byte[] bitmap) {
_segmentName = segmentName;
_totalValidDocs = totalValidDocs;
_totalInvalidDocs = totalInvalidDocs;
Expand All @@ -53,6 +71,7 @@ public ValidDocIdsMetadataInfo(@JsonProperty("segmentName") String segmentName,
_segmentCreationTimeMillis = segmentCreationTimeMillis;
_instanceId = instanceId;
_serverStatus = serverStatus;
_bitmap = bitmap;
}

public String getSegmentName() {
Expand Down Expand Up @@ -94,4 +113,15 @@ public String getInstanceId() {
public ServiceStatus.Status getServerStatus() {
return _serverStatus;
}

/**
* Returns the serialized RoaringBitmap of validDocIds for the segment, or null when not requested by the caller
* (or the responding server is older and doesn't emit it). Callers can deserialize via
* {@code RoaringBitmapUtils.deserialize}.
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
public byte[] getBitmap() {
return _bitmap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,20 @@ public Map<String, List<ValidDocIdsMetadataInfo>> getSegmentToValidDocIdsMetadat
Map<String, List<String>> serverToSegmentsMap, BiMap<String, String> serverToEndpoints,
@Nullable List<String> segmentNames, int timeoutMs, String validDocIdsType,
int numSegmentsBatchPerServerRequest) {
return getSegmentToValidDocIdsMetadataFromServer(tableNameWithType, serverToSegmentsMap, serverToEndpoints,
segmentNames, timeoutMs, validDocIdsType, numSegmentsBatchPerServerRequest, false);
}

/**
* Overload that also lets the caller request the serialized validDocIds bitmap in each per-segment response entry.
* When {@code includeBitmaps} is true, every {@link ValidDocIdsMetadataInfo} returned by the server includes its
* bitmap bytes (see {@link ValidDocIdsMetadataInfo#getBitmap()}). Use sparingly — the response payload grows with
* the total bitmap size across the requested segments.
*/
public Map<String, List<ValidDocIdsMetadataInfo>> getSegmentToValidDocIdsMetadataFromServer(String tableNameWithType,
Map<String, List<String>> serverToSegmentsMap, BiMap<String, String> serverToEndpoints,
@Nullable List<String> segmentNames, int timeoutMs, String validDocIdsType,
int numSegmentsBatchPerServerRequest, boolean includeBitmaps) {
List<Pair<String, String>> serverURLsAndBodies = new ArrayList<>();
for (Map.Entry<String, List<String>> serverToSegments : serverToSegmentsMap.entrySet()) {
List<String> segmentsForServer = serverToSegments.getValue();
Expand All @@ -301,7 +315,7 @@ public Map<String, List<ValidDocIdsMetadataInfo>> getSegmentToValidDocIdsMetadat
// huge payload to pinot-server in request. Batching the requests will help in reducing the payload size.
Lists.partition(segmentsToQuery, numSegmentsBatchPerServerRequest).forEach(segmentsToQueryBatch ->
serverURLsAndBodies.add(generateValidDocIdsMetadataURL(tableNameWithType, segmentsToQueryBatch,
validDocIdsType, serverToEndpoints.get(serverToSegments.getKey()))));
validDocIdsType, serverToEndpoints.get(serverToSegments.getKey()), includeBitmaps)));
}

BiMap<String, String> endpointsToServers = serverToEndpoints.inverse();
Expand Down Expand Up @@ -482,6 +496,11 @@ private String generateValidDocIdsBitmapURL(String tableNameWithType, String seg

private Pair<String, String> generateValidDocIdsMetadataURL(String tableNameWithType, List<String> segmentNames,
String validDocIdsType, String endpoint) {
return generateValidDocIdsMetadataURL(tableNameWithType, segmentNames, validDocIdsType, endpoint, false);
}

private Pair<String, String> generateValidDocIdsMetadataURL(String tableNameWithType, List<String> segmentNames,
String validDocIdsType, String endpoint, boolean includeBitmaps) {
tableNameWithType = encode(tableNameWithType);
TableSegments tableSegments = new TableSegments(segmentNames);
String jsonTableSegments;
Expand All @@ -491,11 +510,17 @@ private Pair<String, String> generateValidDocIdsMetadataURL(String tableNameWith
LOGGER.error("Failed to convert segment names to json request body: segmentNames={}", segmentNames);
throw new RuntimeException(e);
}
String url = String.format("%s/tables/%s/validDocIdsMetadata", endpoint, tableNameWithType);
StringBuilder url = new StringBuilder(
String.format("%s/tables/%s/validDocIdsMetadata", endpoint, tableNameWithType));
String separator = "?";
if (validDocIdsType != null) {
url = url + "?validDocIdsType=" + validDocIdsType;
url.append(separator).append("validDocIdsType=").append(validDocIdsType);
separator = "&";
}
if (includeBitmaps) {
url.append(separator).append("includeBitmaps=true");
}
return Pair.of(url, jsonTableSegments);
return Pair.of(url.toString(), jsonTableSegments);
}

private String generateStaleSegmentsServerURL(String tableNameWithType, String endpoint) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,42 @@ private MinionConstants() {
*/
public static final String SEGMENT_DOWNLOAD_PARALLELISM = "segmentDownloadParallelism";

/** Valid doc ids consensus mode (executor-only). Kept internal; executors pass config string. */
/** Valid doc ids consensus mode enforced by both the task generators (pre-scheduling) and the executors. */
public enum ValidDocIdsConsensusMode {
UNSAFE, EQUAL, MOST_VALID_DOCS
}

/**
* Where validDocIds consensus is enforced. STRICT (default) runs the checks in both the task generator
* (pre-scheduling) and the executor; EXECUTOR_ONLY skips the generator-side checks and leaves the executor as the
* sole gate.
*/
public enum ValidDocIdsValidationMode {
STRICT, EXECUTOR_ONLY
}

/**
* Valid doc ids consensus mode used by both the task generators (pre-scheduling) and the executors. UNSAFE = first
* server with matching CRC and GOOD status; EQUAL (default) = all replicas must have the same valid doc set;
* MOST_VALID_DOCS = the replica with the most valid docs.
*/
public static final String VALID_DOC_IDS_CONSENSUS_MODE_KEY = "validDocIdsConsensusMode";
public static final String DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE = "EQUAL";

/**
* Whether the consensus checks run in the generator too. STRICT (default) = generator + executor; EXECUTOR_ONLY =
* executor only (generator skips the checks and the bitmap fetch).
*/
public static final String VALID_DOC_IDS_VALIDATION_MODE_KEY = "validDocIdsValidationMode";
public static final String DEFAULT_VALID_DOC_IDS_VALIDATION_MODE = "STRICT";

/**
* Per-server batch size for the validDocIds fetch when generator consensus runs (EQUAL/MOST_VALID_DOCS). Kept small
* because consensus fetches from every replica and EQUAL also carries the serialized bitmap in each entry.
*/
public static final String VALID_DOC_IDS_CONSENSUS_FETCH_BATCH_SIZE_KEY = "validDocIdsConsensusFetchBatchSize";
public static final int DEFAULT_VALID_DOC_IDS_CONSENSUS_FETCH_BATCH_SIZE = 10;

// Purges rows inside segment that match chosen criteria
public static class PurgeTask {
public static final String TASK_TYPE = "PurgeTask";
Expand Down Expand Up @@ -268,15 +299,14 @@ public static class UpsertCompactionTask {
*/
public static final String NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST = "numSegmentsBatchPerServerRequest";

/**
* Valid doc ids consensus mode used by the executor only (generator unchanged). Values: UNSAFE, EQUAL,
* MOST_VALID_DOCS. UNSAFE = use first server with matching CRC and READY; EQUAL = require all replicas
* to have the same valid doc set (default); MOST_VALID_DOCS = use replica with most valid docs.
*/
public static final String VALID_DOC_IDS_CONSENSUS_MODE_KEY = "validDocIdsConsensusMode";
/** @deprecated moved to {@link MinionConstants#VALID_DOC_IDS_CONSENSUS_MODE_KEY}. */
@Deprecated
public static final String VALID_DOC_IDS_CONSENSUS_MODE_KEY = MinionConstants.VALID_DOC_IDS_CONSENSUS_MODE_KEY;

/** Default: equal valid doc set consensus across replicas. */
public static final String DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE = "EQUAL";
/** @deprecated moved to {@link MinionConstants#DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE}. */
@Deprecated
public static final String DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE =
MinionConstants.DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE;
}

public static class UpsertCompactMergeTask {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.pinot.common.auth.NullAuthProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.restlet.resources.ValidDocIdsBitmapResponse;
import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo;
import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
import org.apache.pinot.common.utils.RetentionUtils;
import org.apache.pinot.common.utils.RoaringBitmapUtils;
Expand Down Expand Up @@ -72,14 +73,49 @@
public class MinionTaskUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(MinionTaskUtils.class);

/** Package-private for testing: parses validDocIdsComparisonMode config string. */
static MinionConstants.ValidDocIdsConsensusMode parseValidDocIdsConsensusMode(String value) {
/** Parses the validDocIdsConsensusMode config string. Blank/null defaults to {@code EQUAL}. */
Comment thread
deepthi912 marked this conversation as resolved.
Outdated
public static MinionConstants.ValidDocIdsConsensusMode parseValidDocIdsConsensusMode(String value) {
if (value == null || value.isBlank()) {
return MinionConstants.ValidDocIdsConsensusMode.EQUAL;
}
return MinionConstants.ValidDocIdsConsensusMode.valueOf(value.toUpperCase().trim());
}

/** Parses the validDocIdsValidationMode config string. Blank/null defaults to {@code STRICT}. */
public static MinionConstants.ValidDocIdsValidationMode parseValidDocIdsValidationMode(String value) {
if (value == null || value.isBlank()) {
Comment thread
deepthi912 marked this conversation as resolved.
Outdated
return MinionConstants.ValidDocIdsValidationMode.STRICT;
}
return MinionConstants.ValidDocIdsValidationMode.valueOf(value.toUpperCase().trim());
}

/**
* Resolves the consensus mode the generator should apply, given the configured consensus mode and validation mode.
* EXECUTOR_ONLY downgrades the generator to UNSAFE (lenient pick, no bitmaps, no cross-replica enforcement) so the
* executor stays the sole gate; STRICT keeps the configured mode.
*/
public static MinionConstants.ValidDocIdsConsensusMode resolveGeneratorConsensusMode(
MinionConstants.ValidDocIdsConsensusMode consensusMode,
MinionConstants.ValidDocIdsValidationMode validationMode) {
return validationMode == MinionConstants.ValidDocIdsValidationMode.EXECUTOR_ONLY
? MinionConstants.ValidDocIdsConsensusMode.UNSAFE : consensusMode;
}

/**
* Resolves the per-server batch size for the generator's validDocIds fetch. UNSAFE keeps {@code regularBatchSize}
* (the no-bitmap fetch). The consensus modes fetch from every replica (and EQUAL also carries a bitmap per entry),
* so they use the smaller {@code validDocIdsConsensusFetchBatchSize} to keep the payload bounded.
*/
public static int resolveValidDocIdsFetchBatchSize(Map<String, String> taskConfigs,
MinionConstants.ValidDocIdsConsensusMode consensusMode, int regularBatchSize) {
if (consensusMode == MinionConstants.ValidDocIdsConsensusMode.UNSAFE) {
return regularBatchSize;
}
return Integer.parseInt(taskConfigs.getOrDefault(
MinionConstants.VALID_DOC_IDS_CONSENSUS_FETCH_BATCH_SIZE_KEY,
String.valueOf(MinionConstants.DEFAULT_VALID_DOC_IDS_CONSENSUS_FETCH_BATCH_SIZE)));
}

private static final String DEFAULT_DIR_PATH_TERMINATOR = "/";

public static final String DATETIME_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
Expand Down Expand Up @@ -428,6 +464,131 @@ public static RoaringBitmap getValidDocIdFromServerMatchingCrc(String tableNameW
return maxCardinalityMap;
}

/**
* Counts how many servers host each segment (its online replica count) from a server-to-segments map, so callers
* can tell whether every assigned replica responded.
*/
public static Map<String, Integer> getSegmentToReplicaCount(Map<String, List<String>> serverToSegments) {
Comment thread
deepthi912 marked this conversation as resolved.
Outdated
Map<String, Integer> segmentToReplicaCount = new HashMap<>();
for (List<String> segments : serverToSegments.values()) {
for (String segment : segments) {
segmentToReplicaCount.merge(segment, 1, Integer::sum);
}
}
return segmentToReplicaCount;
}

/**
* Picks the replica whose validDocIds the generator should use for a segment, or returns {@code null} to skip the
* segment. Applies the same checks the executor would (CRC match, healthy server, replica consensus) so bad
* segments are dropped before a task is scheduled.
*
* How the replica is chosen depends on {@code consensusMode}:
* - UNSAFE: the first replica with a matching CRC and a healthy server.
* - EQUAL: all replicas must match CRC, be healthy, and have an identical bitmap.
* - MOST_VALID_DOCS: all replicas must match CRC and be healthy; the one with the most valid docs wins.
*/
@Nullable
public static ValidDocIdsMetadataInfo selectValidDocIdsMetadataForConsensus(String taskType, String segmentName,
long expectedCrc, @Nullable List<ValidDocIdsMetadataInfo> replicas, int expectedReplicaCount,
MinionConstants.ValidDocIdsConsensusMode consensusMode) {
if (replicas == null || replicas.isEmpty()) {
return null;
}
boolean unsafe = consensusMode == MinionConstants.ValidDocIdsConsensusMode.UNSAFE;
List<ValidDocIdsMetadataInfo> usableReplicas = new ArrayList<>();
for (ValidDocIdsMetadataInfo replica : replicas) {
// A CRC mismatch usually means the server is still reloading the segment, so its valid doc set can't be
// trusted. UNSAFE skips just this replica; stricter modes skip the whole segment.
long replicaCrc;
try {
replicaCrc = Long.parseLong(replica.getSegmentCrc());
} catch (NumberFormatException e) {
LOGGER.warn("Unparseable CRC '{}' for segment: {} from server: {}, skipping {} (mode={}) for {}",
replica.getSegmentCrc(), segmentName, replica.getInstanceId(), unsafe ? "replica" : "segment",
consensusMode, taskType);
if (unsafe) {
continue;
}
return null;
}
if (expectedCrc != replicaCrc) {
LOGGER.warn("CRC mismatch for segment: {} (expected={}, server={} reported={}), skipping {} (mode={}) for {}",
segmentName, expectedCrc, replica.getInstanceId(), replicaCrc, unsafe ? "replica" : "segment",
consensusMode, taskType);
if (unsafe) {
continue;
}
return null;
}
// A non-GOOD server may still be mutating the segment, so its valid doc set is unreliable.
if (replica.getServerStatus() != null && replica.getServerStatus() != ServiceStatus.Status.GOOD) {
LOGGER.warn("Server {} is in {} state for segment: {}, skipping {} (mode={}) for {}", replica.getInstanceId(),
replica.getServerStatus(), segmentName, unsafe ? "replica" : "segment", consensusMode, taskType);
if (unsafe) {
continue;
}
return null;
}
if (unsafe) {
return replica;
}
usableReplicas.add(replica);
}

if (usableReplicas.isEmpty()) {
return null;
}

// Strict modes need every assigned replica to have responded; a short responder list means a server dropped out
// (network error, parse failure, or it no longer has the segment), so we can't confirm consensus across the full
// replica set and skip the segment.
if (usableReplicas.size() < expectedReplicaCount) {
LOGGER.warn("Only {} of {} replicas responded for segment: {}, cannot confirm consensus, skipping for {}",
usableReplicas.size(), expectedReplicaCount, segmentName, taskType);
return null;
}

if (consensusMode == MinionConstants.ValidDocIdsConsensusMode.EQUAL) {
ValidDocIdsMetadataInfo first = usableReplicas.get(0);
RoaringBitmap consensusBitmap = deserializeBitmapOrNull(first);
if (consensusBitmap == null) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes controller-first rolling upgrades incompatible under the new default STRICT/EQUAL path. Older servers still answer this endpoint but omit bitmap, and ValidDocIdsMetadataInfo explicitly treats that as expected for old servers; here we convert that mixed-version response into a hard skip, so upsert compaction/compact-merge task generation stops until every server is upgraded. Please keep the generator default executor-only, or add an old-server fallback before making bitmap-based prescheduling the default.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be okay, the task failures would be transient until the upgrade finishes. Added a note for the new segmentDataCrc field I added.

Comment thread
deepthi912 marked this conversation as resolved.
Outdated
// No bitmap means EQUAL can't be verified - skip rather than risk an inconsistent task. Make sure the
// metadata was fetched with includeBitmaps=true.
LOGGER.warn("Missing validDocIds bitmap for segment: {} from server: {}, cannot verify EQUAL consensus, "
+ "skipping segment for {}", segmentName, first.getInstanceId(), taskType);
return null;
}
for (int i = 1; i < usableReplicas.size(); i++) {
RoaringBitmap bitmap = deserializeBitmapOrNull(usableReplicas.get(i));
if (bitmap == null || !bitmap.equals(consensusBitmap)) {
LOGGER.warn("Replicas disagree on validDocIds for segment: {}, skipping segment for {}", segmentName,
taskType);
return null;
}
}
return first;
}

// MOST_VALID_DOCS: pick the replica reporting the most valid docs.
ValidDocIdsMetadataInfo chosen = usableReplicas.get(0);
for (ValidDocIdsMetadataInfo replica : usableReplicas) {
if (replica.getTotalValidDocs() > chosen.getTotalValidDocs()) {
chosen = replica;
}
}
return chosen;
}

@Nullable
private static RoaringBitmap deserializeBitmapOrNull(ValidDocIdsMetadataInfo info) {
byte[] bitmap = info.getBitmap();
if (bitmap == null || bitmap.length == 0) {
return null;
}
return RoaringBitmapUtils.deserialize(bitmap);
}

public static String toUTCString(long epochMillis) {
Date date = new Date(epochMillis);
SimpleDateFormat isoFormat = new SimpleDateFormat(DATETIME_PATTERN);
Expand Down
Loading
Loading