From 27c12a4cf3268fed0721813eaaae42676dc2c7ab Mon Sep 17 00:00:00 2001 From: Shaurya Chaturvedi Date: Thu, 25 Jun 2026 16:37:56 -0700 Subject: [PATCH 1/3] Fix time segment pruning for remote clusters in multi-cluster routing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When enableMultiClusterRouting=true, the local broker builds a TimeSegmentPruner for each remote cluster. Two bugs caused all remote segments to be mapped to DEFAULT_INTERVAL=[0,MAX], bypassing time pruning entirely and inflating numSegmentsQueried (684 observed vs ~108 expected on a DCA+PHX federated table). Bug 1 — TimeSegmentPruner.onAssignmentChange used computeIfAbsent: REALTIME segments start CONSUMING with startTime=-1, so they map to DEFAULT_INTERVAL. When they commit and get a valid startTime, computeIfAbsent silently skips the update because the key already exists. Fix: replace with a conditional put that re-evaluates any segment currently at DEFAULT_INTERVAL. Bug 2 — SegmentZkMetadataFetcher cached consuming segments immediately: The cache used znRecord != null as the caching condition. Consuming segments have a non-null ZNRecord (just startTime=-1), so they were cached and never re-fetched. For local routing this is masked by per-segment refreshSegment() UDMs sent by servers on commit; for remote routing there is no UDM path so onAssignmentChange() is the only update path — and it permanently skipped the cached consuming segments. Fix: skip segments whose ExternalView state is CONSUMING instead of caching them. When a segment transitions CONSUMING->ONLINE (i.e. commits), the next ExternalView change finds it uncached and non-consuming, fetches its committed ZNRecord, and Bug 1's fix updates the interval. This is O(transitions) per ExternalView change rather than O(consuming segments), avoiding redundant ZK reads on clusters with many small tables. Verified on live nodes: numSegmentsQueried with enableMultiClusterRouting=true dropped from 684 to 108 (= 54+54, matching two local-only queries). Co-Authored-By: Claude Sonnet 4.6 (1M context) --- .../SegmentZkMetadataFetcher.java | 31 ++++++++++++++++--- .../segmentpruner/TimeSegmentPruner.java | 10 ++++-- 2 files changed, 34 insertions(+), 7 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentmetadata/SegmentZkMetadataFetcher.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentmetadata/SegmentZkMetadataFetcher.java index b6d996e8f45f..8b4cee477937 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentmetadata/SegmentZkMetadataFetcher.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentmetadata/SegmentZkMetadataFetcher.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.helix.AccessOption; import org.apache.helix.model.ExternalView; @@ -28,6 +29,7 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.metadata.ZKMetadataProvider; +import org.apache.pinot.spi.utils.CommonConstants; /** @@ -82,7 +84,7 @@ public void init(IdealState idealState, ExternalView externalView, Set o listener.init(idealState, externalView, segments, znRecords); } for (int i = 0; i < numSegments; i++) { - if (znRecords.get(i) != null) { + if (!isConsumingInExternalView(externalView, segments.get(i))) { _onlineSegmentsCached.add(segments.get(i)); } } @@ -98,10 +100,16 @@ public synchronized void onAssignmentChange(IdealState idealState, ExternalView List segments = new ArrayList<>(); List segmentZKMetadataPaths = new ArrayList<>(); for (String segment : onlineSegments) { - if (!_onlineSegmentsCached.contains(segment)) { - segments.add(segment); - segmentZKMetadataPaths.add(_segmentZKMetadataPathPrefix + segment); + if (_onlineSegmentsCached.contains(segment)) { + continue; } + // Skip segments still in CONSUMING state — they'll be re-evaluated on the next EV change + // when they transition to ONLINE (i.e., when they commit). + if (isConsumingInExternalView(externalView, segment)) { + continue; + } + segments.add(segment); + segmentZKMetadataPaths.add(_segmentZKMetadataPathPrefix + segment); } List znRecords = _propertyStore.get(segmentZKMetadataPaths, null, AccessOption.PERSISTENT, false); for (SegmentZkMetadataFetchListener listener : _listeners) { @@ -109,6 +117,7 @@ public synchronized void onAssignmentChange(IdealState idealState, ExternalView } int numSegments = segments.size(); for (int i = 0; i < numSegments; i++) { + // All fetched segments are non-consuming (guaranteed by the EV check above), cache them if metadata exists. if (znRecords.get(i) != null) { _onlineSegmentsCached.add(segments.get(i)); } @@ -130,4 +139,18 @@ public synchronized void refreshSegment(String segment) { } } } + + /** + * Returns true if the segment is in CONSUMING state on any server in the ExternalView. + * Such segments should not be cached in {@code _onlineSegmentsCached} — they will be re-evaluated + * on the next ExternalView change, at which point they will have transitioned to ONLINE (committed). + */ + private static boolean isConsumingInExternalView(ExternalView externalView, String segment) { + if (externalView == null) { + return false; + } + Map stateMap = externalView.getStateMap(segment); + return stateMap != null && stateMap.containsValue(CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING); + } + } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java index 56c5546a2d68..3ee62491971d 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java @@ -107,12 +107,16 @@ private Interval extractIntervalFromSegmentZKMetaZNRecord(String segment, @Nulla @Override public synchronized void onAssignmentChange(IdealState idealState, ExternalView externalView, Set onlineSegments, List pulledSegments, List znRecords) { - // NOTE: We don't update all the segment ZK metadata for every external view change, but only the new added/removed - // ones. The refreshed segment ZK metadata change won't be picked up. for (int idx = 0; idx < pulledSegments.size(); idx++) { String segment = pulledSegments.get(idx); ZNRecord zNrecord = znRecords.get(idx); - _intervalMap.computeIfAbsent(segment, k -> extractIntervalFromSegmentZKMetaZNRecord(k, zNrecord)); + // Always update segments that have DEFAULT_INTERVAL, which covers two cases: + // 1. New segments not yet in the map + // 2. Segments that transitioned from CONSUMING (DEFAULT_INTERVAL) to COMMITTED (valid time range) + Interval existing = _intervalMap.get(segment); + if (existing == null || existing == DEFAULT_INTERVAL) { + _intervalMap.put(segment, extractIntervalFromSegmentZKMetaZNRecord(segment, zNrecord)); + } } _intervalMap.keySet().retainAll(onlineSegments); _intervalTree = new IntervalTree<>(_intervalMap); From 758e3c91da50f11fc16ce87668ce5abb1014c475 Mon Sep 17 00:00:00 2001 From: shauryachats Date: Fri, 26 Jun 2026 00:14:00 +0000 Subject: [PATCH 2/3] Fixed checkstyle --- .../broker/routing/segmentmetadata/SegmentZkMetadataFetcher.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentmetadata/SegmentZkMetadataFetcher.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentmetadata/SegmentZkMetadataFetcher.java index 8b4cee477937..7c85a82e77f3 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentmetadata/SegmentZkMetadataFetcher.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentmetadata/SegmentZkMetadataFetcher.java @@ -152,5 +152,4 @@ private static boolean isConsumingInExternalView(ExternalView externalView, Stri Map stateMap = externalView.getStateMap(segment); return stateMap != null && stateMap.containsValue(CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING); } - } From bb7dcef8af0e2bedf724ba92f780f3a8a91aa3ea Mon Sep 17 00:00:00 2001 From: Shaurya Chaturvedi Date: Fri, 26 Jun 2026 09:53:47 -0700 Subject: [PATCH 3/3] Fix stale DEFAULT_INTERVAL for segments with inconsistent EV/ZNRecord state When a segment commits, the server updates ExternalView to ONLINE before writing the ZNRecord to ZK. During SegmentZkMetadataFetcher.init(), if a segment is ONLINE in EV but its ZNRecord still has startTime=-1 (brief inconsistency window), the segment was cached immediately due to the !isConsumingInExternalView check. Since cached segments are never re-fetched in onAssignmentChange, the segment stayed at DEFAULT_INTERVAL permanently. Fix: only cache a segment if its ZNRecord also has startTime >= 0 (committed). Segments in the inconsistency window remain uncached and are re-evaluated on the next onAssignmentChange once the ZNRecord is consistent. This is the same check used for consuming segments but applied to caching decisions rather than fetch-skip decisions, ensuring that EV state and ZNRecord state are both valid before treating a segment as permanently resolved. --- .../SegmentZkMetadataFetcher.java | 22 ++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentmetadata/SegmentZkMetadataFetcher.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentmetadata/SegmentZkMetadataFetcher.java index 7c85a82e77f3..a73112b09c3f 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentmetadata/SegmentZkMetadataFetcher.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentmetadata/SegmentZkMetadataFetcher.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import javax.annotation.Nullable; import org.apache.helix.AccessOption; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; @@ -84,7 +85,11 @@ public void init(IdealState idealState, ExternalView externalView, Set o listener.init(idealState, externalView, segments, znRecords); } for (int i = 0; i < numSegments; i++) { - if (!isConsumingInExternalView(externalView, segments.get(i))) { + // Only cache segments that are both non-consuming in EV AND have a committed ZNRecord. + // If a segment is ONLINE in EV but its ZNRecord still has startTime=-1 (brief window between + // server updating EV and writing ZNRecord to ZK), do not cache it so it is re-evaluated on + // the next onAssignmentChange once the ZNRecord is consistent. + if (!isConsumingInExternalView(externalView, segments.get(i)) && isCommittedZNRecord(znRecords.get(i))) { _onlineSegmentsCached.add(segments.get(i)); } } @@ -117,8 +122,10 @@ public synchronized void onAssignmentChange(IdealState idealState, ExternalView } int numSegments = segments.size(); for (int i = 0; i < numSegments; i++) { - // All fetched segments are non-consuming (guaranteed by the EV check above), cache them if metadata exists. - if (znRecords.get(i) != null) { + // Cache only if ZNRecord is committed (startTime >= 0). Segments that are ONLINE in EV but still + // have startTime=-1 in ZK (brief inconsistency between EV and ZNRecord updates) are left uncached + // so they are retried on the next onAssignmentChange. + if (isCommittedZNRecord(znRecords.get(i))) { _onlineSegmentsCached.add(segments.get(i)); } } @@ -140,6 +147,15 @@ public synchronized void refreshSegment(String segment) { } } + /** + * Returns true if the ZNRecord represents a committed segment with a valid startTime. + * A null ZNRecord or one with startTime=-1 (consuming, or briefly inconsistent after commit) + * should not be cached — the segment will be re-fetched on the next onAssignmentChange. + */ + private static boolean isCommittedZNRecord(@Nullable ZNRecord znRecord) { + return znRecord != null && znRecord.getLongField(CommonConstants.Segment.START_TIME, -1L) >= 0L; + } + /** * Returns true if the segment is in CONSUMING state on any server in the ExternalView. * Such segments should not be cached in {@code _onlineSegmentsCached} — they will be re-evaluated