-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Enforce validDocIds consensus in upsert task generators and add includeBitmaps to validDocIdsMetadata API #18853
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
5623fa9
cac3f5e
7d7bbcb
353fe38
bb8e52f
74f8d42
23658fe
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,6 +38,7 @@ | |
| import org.apache.pinot.common.auth.NullAuthProvider; | ||
| import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; | ||
| import org.apache.pinot.common.restlet.resources.ValidDocIdsBitmapResponse; | ||
| import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo; | ||
| import org.apache.pinot.common.restlet.resources.ValidDocIdsType; | ||
| import org.apache.pinot.common.utils.RetentionUtils; | ||
| import org.apache.pinot.common.utils.RoaringBitmapUtils; | ||
|
|
@@ -72,14 +73,49 @@ | |
| public class MinionTaskUtils { | ||
| private static final Logger LOGGER = LoggerFactory.getLogger(MinionTaskUtils.class); | ||
|
|
||
| /** Package-private for testing: parses validDocIdsComparisonMode config string. */ | ||
| static MinionConstants.ValidDocIdsConsensusMode parseValidDocIdsConsensusMode(String value) { | ||
| /** Parses the validDocIdsConsensusMode config string. Blank/null defaults to {@code EQUAL}. */ | ||
| public static MinionConstants.ValidDocIdsConsensusMode parseValidDocIdsConsensusMode(String value) { | ||
| if (value == null || value.isBlank()) { | ||
| return MinionConstants.ValidDocIdsConsensusMode.EQUAL; | ||
| } | ||
| return MinionConstants.ValidDocIdsConsensusMode.valueOf(value.toUpperCase().trim()); | ||
| } | ||
|
|
||
| /** Parses the validDocIdsValidationMode config string. Blank/null defaults to {@code STRICT}. */ | ||
| public static MinionConstants.ValidDocIdsValidationMode parseValidDocIdsValidationMode(String value) { | ||
| if (value == null || value.isBlank()) { | ||
| return MinionConstants.ValidDocIdsValidationMode.STRICT; | ||
| } | ||
| return MinionConstants.ValidDocIdsValidationMode.valueOf(value.toUpperCase().trim()); | ||
| } | ||
|
|
||
| /** | ||
| * Resolves the consensus mode the generator should apply, given the configured consensus mode and validation mode. | ||
| * EXECUTOR_ONLY downgrades the generator to UNSAFE (lenient pick, no bitmaps, no cross-replica enforcement) so the | ||
| * executor stays the sole gate; STRICT keeps the configured mode. | ||
| */ | ||
| public static MinionConstants.ValidDocIdsConsensusMode resolveGeneratorConsensusMode( | ||
| MinionConstants.ValidDocIdsConsensusMode consensusMode, | ||
| MinionConstants.ValidDocIdsValidationMode validationMode) { | ||
| return validationMode == MinionConstants.ValidDocIdsValidationMode.EXECUTOR_ONLY | ||
| ? MinionConstants.ValidDocIdsConsensusMode.UNSAFE : consensusMode; | ||
| } | ||
|
|
||
| /** | ||
| * Resolves the per-server batch size for the generator's validDocIds fetch. UNSAFE keeps {@code regularBatchSize} | ||
| * (the no-bitmap fetch). The consensus modes fetch from every replica (and EQUAL also carries a bitmap per entry), | ||
| * so they use the smaller {@code validDocIdsConsensusFetchBatchSize} to keep the payload bounded. | ||
| */ | ||
| public static int resolveValidDocIdsFetchBatchSize(Map<String, String> taskConfigs, | ||
| MinionConstants.ValidDocIdsConsensusMode consensusMode, int regularBatchSize) { | ||
| if (consensusMode == MinionConstants.ValidDocIdsConsensusMode.UNSAFE) { | ||
| return regularBatchSize; | ||
| } | ||
| return Integer.parseInt(taskConfigs.getOrDefault( | ||
| MinionConstants.VALID_DOC_IDS_CONSENSUS_FETCH_BATCH_SIZE_KEY, | ||
| String.valueOf(MinionConstants.DEFAULT_VALID_DOC_IDS_CONSENSUS_FETCH_BATCH_SIZE))); | ||
| } | ||
|
|
||
| private static final String DEFAULT_DIR_PATH_TERMINATOR = "/"; | ||
|
|
||
| public static final String DATETIME_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; | ||
|
|
@@ -428,6 +464,131 @@ public static RoaringBitmap getValidDocIdFromServerMatchingCrc(String tableNameW | |
| return maxCardinalityMap; | ||
| } | ||
|
|
||
| /** | ||
| * Counts how many servers host each segment (its online replica count) from a server-to-segments map, so callers | ||
| * can tell whether every assigned replica responded. | ||
| */ | ||
| public static Map<String, Integer> getSegmentToReplicaCount(Map<String, List<String>> serverToSegments) { | ||
| Map<String, Integer> segmentToReplicaCount = new HashMap<>(); | ||
| for (List<String> segments : serverToSegments.values()) { | ||
| for (String segment : segments) { | ||
| segmentToReplicaCount.merge(segment, 1, Integer::sum); | ||
| } | ||
| } | ||
| return segmentToReplicaCount; | ||
| } | ||
|
|
||
| /** | ||
| * Picks the replica whose validDocIds the generator should use for a segment, or returns {@code null} to skip the | ||
| * segment. Applies the same checks the executor would (CRC match, healthy server, replica consensus) so bad | ||
| * segments are dropped before a task is scheduled. | ||
| * | ||
| * How the replica is chosen depends on {@code consensusMode}: | ||
| * - UNSAFE: the first replica with a matching CRC and a healthy server. | ||
| * - EQUAL: all replicas must match CRC, be healthy, and have an identical bitmap. | ||
| * - MOST_VALID_DOCS: all replicas must match CRC and be healthy; the one with the most valid docs wins. | ||
| */ | ||
| @Nullable | ||
| public static ValidDocIdsMetadataInfo selectValidDocIdsMetadataForConsensus(String taskType, String segmentName, | ||
| long expectedCrc, @Nullable List<ValidDocIdsMetadataInfo> replicas, int expectedReplicaCount, | ||
| MinionConstants.ValidDocIdsConsensusMode consensusMode) { | ||
| if (replicas == null || replicas.isEmpty()) { | ||
| return null; | ||
| } | ||
| boolean unsafe = consensusMode == MinionConstants.ValidDocIdsConsensusMode.UNSAFE; | ||
| List<ValidDocIdsMetadataInfo> usableReplicas = new ArrayList<>(); | ||
| for (ValidDocIdsMetadataInfo replica : replicas) { | ||
| // A CRC mismatch usually means the server is still reloading the segment, so its valid doc set can't be | ||
| // trusted. UNSAFE skips just this replica; stricter modes skip the whole segment. | ||
| long replicaCrc; | ||
| try { | ||
| replicaCrc = Long.parseLong(replica.getSegmentCrc()); | ||
| } catch (NumberFormatException e) { | ||
| LOGGER.warn("Unparseable CRC '{}' for segment: {} from server: {}, skipping {} (mode={}) for {}", | ||
| replica.getSegmentCrc(), segmentName, replica.getInstanceId(), unsafe ? "replica" : "segment", | ||
| consensusMode, taskType); | ||
| if (unsafe) { | ||
| continue; | ||
| } | ||
| return null; | ||
| } | ||
| if (expectedCrc != replicaCrc) { | ||
| LOGGER.warn("CRC mismatch for segment: {} (expected={}, server={} reported={}), skipping {} (mode={}) for {}", | ||
| segmentName, expectedCrc, replica.getInstanceId(), replicaCrc, unsafe ? "replica" : "segment", | ||
| consensusMode, taskType); | ||
| if (unsafe) { | ||
| continue; | ||
| } | ||
| return null; | ||
| } | ||
| // A non-GOOD server may still be mutating the segment, so its valid doc set is unreliable. | ||
| if (replica.getServerStatus() != null && replica.getServerStatus() != ServiceStatus.Status.GOOD) { | ||
| LOGGER.warn("Server {} is in {} state for segment: {}, skipping {} (mode={}) for {}", replica.getInstanceId(), | ||
| replica.getServerStatus(), segmentName, unsafe ? "replica" : "segment", consensusMode, taskType); | ||
| if (unsafe) { | ||
| continue; | ||
| } | ||
| return null; | ||
| } | ||
| if (unsafe) { | ||
| return replica; | ||
| } | ||
| usableReplicas.add(replica); | ||
| } | ||
|
|
||
| if (usableReplicas.isEmpty()) { | ||
| return null; | ||
| } | ||
|
|
||
| // Strict modes need every assigned replica to have responded; a short responder list means a server dropped out | ||
| // (network error, parse failure, or it no longer has the segment), so we can't confirm consensus across the full | ||
| // replica set and skip the segment. | ||
| if (usableReplicas.size() < expectedReplicaCount) { | ||
| LOGGER.warn("Only {} of {} replicas responded for segment: {}, cannot confirm consensus, skipping for {}", | ||
| usableReplicas.size(), expectedReplicaCount, segmentName, taskType); | ||
| return null; | ||
| } | ||
|
|
||
| if (consensusMode == MinionConstants.ValidDocIdsConsensusMode.EQUAL) { | ||
| ValidDocIdsMetadataInfo first = usableReplicas.get(0); | ||
| RoaringBitmap consensusBitmap = deserializeBitmapOrNull(first); | ||
| if (consensusBitmap == null) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This still breaks mixed-version rolling upgrades. With the new defaults ( |
||
| // No bitmap means EQUAL can't be verified - skip rather than risk an inconsistent task. Make sure the | ||
| // metadata was fetched with includeBitmaps=true. | ||
| LOGGER.warn("Missing validDocIds bitmap for segment: {} from server: {}, cannot verify EQUAL consensus, " | ||
| + "skipping segment for {}", segmentName, first.getInstanceId(), taskType); | ||
| return null; | ||
| } | ||
| for (int i = 1; i < usableReplicas.size(); i++) { | ||
| RoaringBitmap bitmap = deserializeBitmapOrNull(usableReplicas.get(i)); | ||
| if (bitmap == null || !bitmap.equals(consensusBitmap)) { | ||
| LOGGER.warn("Replicas disagree on validDocIds for segment: {}, skipping segment for {}", segmentName, | ||
| taskType); | ||
| return null; | ||
| } | ||
| } | ||
| return first; | ||
| } | ||
|
|
||
| // MOST_VALID_DOCS: pick the replica reporting the most valid docs. | ||
| ValidDocIdsMetadataInfo chosen = usableReplicas.get(0); | ||
| for (ValidDocIdsMetadataInfo replica : usableReplicas) { | ||
| if (replica.getTotalValidDocs() > chosen.getTotalValidDocs()) { | ||
| chosen = replica; | ||
| } | ||
| } | ||
| return chosen; | ||
| } | ||
|
|
||
| @Nullable | ||
| private static RoaringBitmap deserializeBitmapOrNull(ValidDocIdsMetadataInfo info) { | ||
| byte[] bitmap = info.getBitmap(); | ||
| if (bitmap == null || bitmap.length == 0) { | ||
| return null; | ||
| } | ||
| return RoaringBitmapUtils.deserialize(bitmap); | ||
| } | ||
|
|
||
| public static String toUTCString(long epochMillis) { | ||
| Date date = new Date(epochMillis); | ||
| SimpleDateFormat isoFormat = new SimpleDateFormat(DATETIME_PATTERN); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes controller-first rolling upgrades incompatible under the new default STRICT/EQUAL path. Older servers still answer this endpoint but omit
bitmap, andValidDocIdsMetadataInfoexplicitly treats that as expected for old servers; here we convert that mixed-version response into a hard skip, so upsert compaction/compact-merge task generation stops until every server is upgraded. Please keep the generator default executor-only, or add an old-server fallback before making bitmap-based prescheduling the default.