Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
*/
package org.apache.pinot.common.restlet.resources;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.annotation.Nullable;
import org.apache.pinot.common.utils.ServiceStatus;


Expand All @@ -35,14 +38,29 @@ public class ValidDocIdsMetadataInfo {
private final long _segmentCreationTimeMillis;
private final String _instanceId;
private final ServiceStatus.Status _serverStatus;
// Optional serialized RoaringBitmap of the validDocIds for the segment. Populated only when the caller of the
// batched validDocIdsMetadata endpoint passes includeBitmaps=true. Null when omitted or when the responding
// server is older and doesn't recognise the flag. Field is JsonInclude.NON_NULL on the getter so payloads stay
// small when bitmaps are not requested.
@Nullable
private final byte[] _bitmap;

public ValidDocIdsMetadataInfo(String segmentName, long totalValidDocs, long totalInvalidDocs, long totalDocs,
String segmentCrc, ValidDocIdsType validDocIdsType, long segmentSizeInBytes, long segmentCreationTimeMillis,
String instanceId, ServiceStatus.Status serverStatus) {
this(segmentName, totalValidDocs, totalInvalidDocs, totalDocs, segmentCrc, validDocIdsType, segmentSizeInBytes,
segmentCreationTimeMillis, instanceId, serverStatus, null);
}

@JsonCreator
public ValidDocIdsMetadataInfo(@JsonProperty("segmentName") String segmentName,
@JsonProperty("totalValidDocs") long totalValidDocs, @JsonProperty("totalInvalidDocs") long totalInvalidDocs,
@JsonProperty("totalDocs") long totalDocs, @JsonProperty("segmentCrc") String segmentCrc,
@JsonProperty("validDocIdsType") ValidDocIdsType validDocIdsType,
@JsonProperty("segmentSizeInBytes") long segmentSizeInBytes,
@JsonProperty("segmentCreationTimeMillis") long segmentCreationTimeMillis,
@JsonProperty("instanceId") String instanceId, @JsonProperty("serverStatus") ServiceStatus.Status serverStatus) {
@JsonProperty("instanceId") String instanceId, @JsonProperty("serverStatus") ServiceStatus.Status serverStatus,
@JsonProperty("bitmap") @Nullable byte[] bitmap) {
_segmentName = segmentName;
_totalValidDocs = totalValidDocs;
_totalInvalidDocs = totalInvalidDocs;
Expand All @@ -53,6 +71,7 @@ public ValidDocIdsMetadataInfo(@JsonProperty("segmentName") String segmentName,
_segmentCreationTimeMillis = segmentCreationTimeMillis;
_instanceId = instanceId;
_serverStatus = serverStatus;
_bitmap = bitmap;
}

public String getSegmentName() {
Expand Down Expand Up @@ -94,4 +113,15 @@ public String getInstanceId() {
public ServiceStatus.Status getServerStatus() {
return _serverStatus;
}

/**
* Returns the serialized RoaringBitmap of validDocIds for the segment, or null when not requested by the caller
* (or the responding server is older and doesn't emit it). Callers can deserialize via
* {@code RoaringBitmapUtils.deserialize}.
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
public byte[] getBitmap() {
return _bitmap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,20 @@ public Map<String, List<ValidDocIdsMetadataInfo>> getSegmentToValidDocIdsMetadat
Map<String, List<String>> serverToSegmentsMap, BiMap<String, String> serverToEndpoints,
@Nullable List<String> segmentNames, int timeoutMs, String validDocIdsType,
int numSegmentsBatchPerServerRequest) {
return getSegmentToValidDocIdsMetadataFromServer(tableNameWithType, serverToSegmentsMap, serverToEndpoints,
segmentNames, timeoutMs, validDocIdsType, numSegmentsBatchPerServerRequest, false);
}

/**
* Overload that also lets the caller request the serialized validDocIds bitmap in each per-segment response entry.
* When {@code includeBitmaps} is true, every {@link ValidDocIdsMetadataInfo} returned by the server includes its
* bitmap bytes (see {@link ValidDocIdsMetadataInfo#getBitmap()}). Use sparingly — the response payload grows with
* the total bitmap size across the requested segments.
*/
public Map<String, List<ValidDocIdsMetadataInfo>> getSegmentToValidDocIdsMetadataFromServer(String tableNameWithType,
Map<String, List<String>> serverToSegmentsMap, BiMap<String, String> serverToEndpoints,
@Nullable List<String> segmentNames, int timeoutMs, String validDocIdsType,
int numSegmentsBatchPerServerRequest, boolean includeBitmaps) {
List<Pair<String, String>> serverURLsAndBodies = new ArrayList<>();
for (Map.Entry<String, List<String>> serverToSegments : serverToSegmentsMap.entrySet()) {
List<String> segmentsForServer = serverToSegments.getValue();
Expand All @@ -301,7 +315,7 @@ public Map<String, List<ValidDocIdsMetadataInfo>> getSegmentToValidDocIdsMetadat
// huge payload to pinot-server in request. Batching the requests will help in reducing the payload size.
Lists.partition(segmentsToQuery, numSegmentsBatchPerServerRequest).forEach(segmentsToQueryBatch ->
serverURLsAndBodies.add(generateValidDocIdsMetadataURL(tableNameWithType, segmentsToQueryBatch,
validDocIdsType, serverToEndpoints.get(serverToSegments.getKey()))));
validDocIdsType, serverToEndpoints.get(serverToSegments.getKey()), includeBitmaps)));
}

BiMap<String, String> endpointsToServers = serverToEndpoints.inverse();
Expand Down Expand Up @@ -482,6 +496,11 @@ private String generateValidDocIdsBitmapURL(String tableNameWithType, String seg

private Pair<String, String> generateValidDocIdsMetadataURL(String tableNameWithType, List<String> segmentNames,
String validDocIdsType, String endpoint) {
return generateValidDocIdsMetadataURL(tableNameWithType, segmentNames, validDocIdsType, endpoint, false);
}

private Pair<String, String> generateValidDocIdsMetadataURL(String tableNameWithType, List<String> segmentNames,
String validDocIdsType, String endpoint, boolean includeBitmaps) {
tableNameWithType = encode(tableNameWithType);
TableSegments tableSegments = new TableSegments(segmentNames);
String jsonTableSegments;
Expand All @@ -491,11 +510,17 @@ private Pair<String, String> generateValidDocIdsMetadataURL(String tableNameWith
LOGGER.error("Failed to convert segment names to json request body: segmentNames={}", segmentNames);
throw new RuntimeException(e);
}
String url = String.format("%s/tables/%s/validDocIdsMetadata", endpoint, tableNameWithType);
StringBuilder url = new StringBuilder(
String.format("%s/tables/%s/validDocIdsMetadata", endpoint, tableNameWithType));
String separator = "?";
if (validDocIdsType != null) {
url = url + "?validDocIdsType=" + validDocIdsType;
url.append(separator).append("validDocIdsType=").append(validDocIdsType);
separator = "&";
}
if (includeBitmaps) {
url.append(separator).append("includeBitmaps=true");
}
return Pair.of(url, jsonTableSegments);
return Pair.of(url.toString(), jsonTableSegments);
}

private String generateStaleSegmentsServerURL(String tableNameWithType, String endpoint) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,20 @@ private MinionConstants() {
*/
public static final String SEGMENT_DOWNLOAD_PARALLELISM = "segmentDownloadParallelism";

/** Valid doc ids consensus mode (executor-only). Kept internal; executors pass config string. */
/** Valid doc ids consensus mode enforced by both the task generators (pre-scheduling) and the executors. */
public enum ValidDocIdsConsensusMode {
UNSAFE, EQUAL, MOST_VALID_DOCS
}

/**
* Where validDocIds consensus is enforced. STRICT (default) runs the checks in both the task generator
* (pre-scheduling) and the executor; EXECUTOR_ONLY skips the generator-side checks and leaves the executor as the
* sole gate.
*/
public enum ValidDocIdsValidationMode {
STRICT, EXECUTOR_ONLY
}

// Purges rows inside segment that match chosen criteria
public static class PurgeTask {
public static final String TASK_TYPE = "PurgeTask";
Expand Down Expand Up @@ -269,14 +278,25 @@ public static class UpsertCompactionTask {
public static final String NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST = "numSegmentsBatchPerServerRequest";

/**
* Valid doc ids consensus mode used by the executor only (generator unchanged). Values: UNSAFE, EQUAL,
* MOST_VALID_DOCS. UNSAFE = use first server with matching CRC and READY; EQUAL = require all replicas
* to have the same valid doc set (default); MOST_VALID_DOCS = use replica with most valid docs.
* Valid doc ids consensus mode enforced by both the task generators (pre-scheduling) and the executors. Values:
* UNSAFE, EQUAL, MOST_VALID_DOCS. UNSAFE = use the first server with matching CRC and GOOD status; EQUAL =
* require all replicas to have the same valid doc set (default); MOST_VALID_DOCS = use the replica with the most
* valid docs. Shared by UpsertCompactionTask and UpsertCompactMergeTask.
*/
public static final String VALID_DOC_IDS_CONSENSUS_MODE_KEY = "validDocIdsConsensusMode";

/** Default: equal valid doc set consensus across replicas. */
public static final String DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE = "EQUAL";

/**
* Whether the consensus checks run in the generator too. STRICT (default) = generator + executor; EXECUTOR_ONLY =
* executor only (generator skips the checks and the bitmap fetch). Shared by UpsertCompactionTask,
* UpsertCompactMergeTask, and SegmentRefreshTask.
*/
public static final String VALID_DOC_IDS_VALIDATION_MODE_KEY = "validDocIdsValidationMode";

/** Default: enforce in both generator and executor. */
public static final String DEFAULT_VALID_DOC_IDS_VALIDATION_MODE = "STRICT";
}

public static class UpsertCompactMergeTask {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.pinot.common.auth.NullAuthProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.restlet.resources.ValidDocIdsBitmapResponse;
import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo;
import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
import org.apache.pinot.common.utils.RetentionUtils;
import org.apache.pinot.common.utils.RoaringBitmapUtils;
Expand Down Expand Up @@ -72,14 +73,34 @@
public class MinionTaskUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(MinionTaskUtils.class);

/** Package-private for testing: parses validDocIdsComparisonMode config string. */
static MinionConstants.ValidDocIdsConsensusMode parseValidDocIdsConsensusMode(String value) {
/** Parses the validDocIdsConsensusMode config string. Blank/null defaults to {@code EQUAL}. */
public static MinionConstants.ValidDocIdsConsensusMode parseValidDocIdsConsensusMode(String value) {
if (value == null || value.isBlank()) {
return MinionConstants.ValidDocIdsConsensusMode.EQUAL;
}
return MinionConstants.ValidDocIdsConsensusMode.valueOf(value.toUpperCase().trim());
}

/** Parses the validDocIdsValidationMode config string. Blank/null defaults to {@code STRICT}. */
public static MinionConstants.ValidDocIdsValidationMode parseValidDocIdsValidationMode(String value) {
if (value == null || value.isBlank()) {
return MinionConstants.ValidDocIdsValidationMode.STRICT;
}
return MinionConstants.ValidDocIdsValidationMode.valueOf(value.toUpperCase().trim());
}

/**
* Resolves the consensus mode the generator should apply, given the configured consensus mode and validation mode.
* EXECUTOR_ONLY downgrades the generator to UNSAFE (lenient pick, no bitmaps, no cross-replica enforcement) so the
* executor stays the sole gate; STRICT keeps the configured mode.
*/
public static MinionConstants.ValidDocIdsConsensusMode resolveGeneratorConsensusMode(
MinionConstants.ValidDocIdsConsensusMode consensusMode,
MinionConstants.ValidDocIdsValidationMode validationMode) {
return validationMode == MinionConstants.ValidDocIdsValidationMode.EXECUTOR_ONLY
? MinionConstants.ValidDocIdsConsensusMode.UNSAFE : consensusMode;
}

private static final String DEFAULT_DIR_PATH_TERMINATOR = "/";

public static final String DATETIME_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
Expand Down Expand Up @@ -428,6 +449,131 @@ public static RoaringBitmap getValidDocIdFromServerMatchingCrc(String tableNameW
return maxCardinalityMap;
}

/**
* Counts how many servers host each segment (its online replica count) from a server-to-segments map, so callers
* can tell whether every assigned replica responded.
*/
public static Map<String, Integer> getSegmentToReplicaCount(Map<String, List<String>> serverToSegments) {
Map<String, Integer> segmentToReplicaCount = new HashMap<>();
for (List<String> segments : serverToSegments.values()) {
for (String segment : segments) {
segmentToReplicaCount.merge(segment, 1, Integer::sum);
}
}
return segmentToReplicaCount;
}

/**
* Picks the replica whose validDocIds the generator should use for a segment, or returns {@code null} to skip the
* segment. Applies the same checks the executor would (CRC match, healthy server, replica consensus) so bad
* segments are dropped before a task is scheduled.
*
* How the replica is chosen depends on {@code consensusMode}:
* - UNSAFE: the first replica with a matching CRC and a healthy server.
* - EQUAL: all replicas must match CRC, be healthy, and have an identical bitmap.
* - MOST_VALID_DOCS: all replicas must match CRC and be healthy; the one with the most valid docs wins.
*/
@Nullable
public static ValidDocIdsMetadataInfo selectValidDocIdsMetadataForConsensus(String taskType, String segmentName,
long expectedCrc, @Nullable List<ValidDocIdsMetadataInfo> replicas, int expectedReplicaCount,
MinionConstants.ValidDocIdsConsensusMode consensusMode) {
if (replicas == null || replicas.isEmpty()) {
return null;
}
boolean unsafe = consensusMode == MinionConstants.ValidDocIdsConsensusMode.UNSAFE;
List<ValidDocIdsMetadataInfo> usableReplicas = new ArrayList<>();
for (ValidDocIdsMetadataInfo replica : replicas) {
// A CRC mismatch usually means the server is still reloading the segment, so its valid doc set can't be
// trusted. UNSAFE skips just this replica; stricter modes skip the whole segment.
long replicaCrc;
try {
replicaCrc = Long.parseLong(replica.getSegmentCrc());
} catch (NumberFormatException e) {
LOGGER.warn("Unparseable CRC '{}' for segment: {} from server: {}, skipping {} (mode={}) for {}",
replica.getSegmentCrc(), segmentName, replica.getInstanceId(), unsafe ? "replica" : "segment",
consensusMode, taskType);
if (unsafe) {
continue;
}
return null;
}
if (expectedCrc != replicaCrc) {
LOGGER.warn("CRC mismatch for segment: {} (expected={}, server={} reported={}), skipping {} (mode={}) for {}",
segmentName, expectedCrc, replica.getInstanceId(), replicaCrc, unsafe ? "replica" : "segment",
consensusMode, taskType);
if (unsafe) {
continue;
}
return null;
}
// A non-GOOD server may still be mutating the segment, so its valid doc set is unreliable.
if (replica.getServerStatus() != null && replica.getServerStatus() != ServiceStatus.Status.GOOD) {
LOGGER.warn("Server {} is in {} state for segment: {}, skipping {} (mode={}) for {}", replica.getInstanceId(),
replica.getServerStatus(), segmentName, unsafe ? "replica" : "segment", consensusMode, taskType);
if (unsafe) {
continue;
}
return null;
}
if (unsafe) {
return replica;
}
usableReplicas.add(replica);
}

if (usableReplicas.isEmpty()) {
return null;
}

// Strict modes need every assigned replica to have responded; a short responder list means a server dropped out
// (network error, parse failure, or it no longer has the segment), so we can't confirm consensus across the full
// replica set and skip the segment.
if (usableReplicas.size() < expectedReplicaCount) {
LOGGER.warn("Only {} of {} replicas responded for segment: {}, cannot confirm consensus, skipping for {}",
usableReplicas.size(), expectedReplicaCount, segmentName, taskType);
return null;
}

if (consensusMode == MinionConstants.ValidDocIdsConsensusMode.EQUAL) {
ValidDocIdsMetadataInfo first = usableReplicas.get(0);
RoaringBitmap consensusBitmap = deserializeBitmapOrNull(first);
if (consensusBitmap == null) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes controller-first rolling upgrades incompatible under the new default STRICT/EQUAL path. Older servers still answer this endpoint but omit bitmap, and ValidDocIdsMetadataInfo explicitly treats that as expected for old servers; here we convert that mixed-version response into a hard skip, so upsert compaction/compact-merge task generation stops until every server is upgraded. Please keep the generator default executor-only, or add an old-server fallback before making bitmap-based prescheduling the default.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still breaks mixed-version rolling upgrades. With the new defaults (EQUAL + STRICT), the generator requests includeBitmaps=true. Older servers ignore that query param and return ValidDocIdsMetadataInfo without bitmap, and this branch turns that into a hard skip, so controller-first upgrades stop scheduling upsert compaction / compact-merge tasks until all servers are upgraded or operators manually flip EXECUTOR_ONLY. Pinot normally needs controller/server roll-forward compatibility without per-table config changes, so this needs a fallback for older servers or the generator-side default needs to stay EXECUTOR_ONLY.

// No bitmap means EQUAL can't be verified - skip rather than risk an inconsistent task. Make sure the
// metadata was fetched with includeBitmaps=true.
LOGGER.warn("Missing validDocIds bitmap for segment: {} from server: {}, cannot verify EQUAL consensus, "
+ "skipping segment for {}", segmentName, first.getInstanceId(), taskType);
return null;
}
for (int i = 1; i < usableReplicas.size(); i++) {
RoaringBitmap bitmap = deserializeBitmapOrNull(usableReplicas.get(i));
if (bitmap == null || !bitmap.equals(consensusBitmap)) {
LOGGER.warn("Replicas disagree on validDocIds for segment: {}, skipping segment for {}", segmentName,
taskType);
return null;
}
}
return first;
}

// MOST_VALID_DOCS: pick the replica reporting the most valid docs.
ValidDocIdsMetadataInfo chosen = usableReplicas.get(0);
for (ValidDocIdsMetadataInfo replica : usableReplicas) {
if (replica.getTotalValidDocs() > chosen.getTotalValidDocs()) {
chosen = replica;
}
}
return chosen;
}

@Nullable
private static RoaringBitmap deserializeBitmapOrNull(ValidDocIdsMetadataInfo info) {
byte[] bitmap = info.getBitmap();
if (bitmap == null || bitmap.length == 0) {
return null;
}
return RoaringBitmapUtils.deserialize(bitmap);
}

public static String toUTCString(long epochMillis) {
Date date = new Date(epochMillis);
SimpleDateFormat isoFormat = new SimpleDateFormat(DATETIME_PATTERN);
Expand Down
Loading
Loading