Fix time segment pruning for remote clusters in multi-cluster routing#18855
Fix time segment pruning for remote clusters in multi-cluster routing#18855shauryachats wants to merge 3 commits into
Conversation
When enableMultiClusterRouting=true, the local broker builds a TimeSegmentPruner for each remote cluster. Two bugs caused all remote segments to be mapped to DEFAULT_INTERVAL=[0,MAX], bypassing time pruning entirely and inflating numSegmentsQueried (684 observed vs ~108 expected on a DCA+PHX federated table). Bug 1 — TimeSegmentPruner.onAssignmentChange used computeIfAbsent: REALTIME segments start CONSUMING with startTime=-1, so they map to DEFAULT_INTERVAL. When they commit and get a valid startTime, computeIfAbsent silently skips the update because the key already exists. Fix: replace with a conditional put that re-evaluates any segment currently at DEFAULT_INTERVAL. Bug 2 — SegmentZkMetadataFetcher cached consuming segments immediately: The cache used znRecord != null as the caching condition. Consuming segments have a non-null ZNRecord (just startTime=-1), so they were cached and never re-fetched. For local routing this is masked by per-segment refreshSegment() UDMs sent by servers on commit; for remote routing there is no UDM path so onAssignmentChange() is the only update path — and it permanently skipped the cached consuming segments. Fix: skip segments whose ExternalView state is CONSUMING instead of caching them. When a segment transitions CONSUMING->ONLINE (i.e. commits), the next ExternalView change finds it uncached and non-consuming, fetches its committed ZNRecord, and Bug 1's fix updates the interval. This is O(transitions) per ExternalView change rather than O(consuming segments), avoiding redundant ZK reads on clusters with many small tables. Verified on live nodes: numSegmentsQueried with enableMultiClusterRouting=true dropped from 684 to 108 (= 54+54, matching two local-only queries). Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #18855 +/- ##
=============================================
- Coverage 64.81% 37.17% -27.65%
+ Complexity 1322 1321 -1
=============================================
Files 3393 3393
Lines 211246 211315 +69
Branches 33208 33229 +21
=============================================
- Hits 136917 78553 -58364
- Misses 63284 125558 +62274
+ Partials 11045 7204 -3841
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
xiangfu0
left a comment
There was a problem hiding this comment.
Found one high-signal issue; see inline comment.
| } | ||
| for (int i = 0; i < numSegments; i++) { | ||
| if (znRecords.get(i) != null) { | ||
| if (!isConsumingInExternalView(externalView, segments.get(i))) { |
There was a problem hiding this comment.
This init-time cache update no longer checks whether the ZK fetch actually returned metadata. If an ONLINE segment comes back with a null ZNRecord here, TimeSegmentPruner.init() records DEFAULT_INTERVAL and this branch still marks the segment as cached, so later assignment changes will never retry the fetch. That regresses the old behavior and can leave the broker permanently unable to time-prune that segment until restart/refresh; please keep the original znRecords.get(i) != null guard in addition to the CONSUMING check before caching.
There was a problem hiding this comment.
Good catch! Just realised there is actually a deeper issue here beyond just null ZNRecords.
Even when the ZNRecord is non-null, there's a brief race window when a segment commits: the server updates the ExternalView to ONLINE before it finishes writing the committed ZNRecord to ZK. During this window, the segment is not CONSUMING in EV (so it passes the EV check), the ZNRecord exists but still has startTime = -1, and extractIntervalFromSegmentZKMetaZNRecord returns DEFAULT_INTERVAL. If cached at that moment, the segment is stuck with DEFAULT_INTERVAL forever since cached segments are never re-fetched.
Added an isCommittedZNRecord helper (znRecord != null && startTime >= 0) that handles both the null case, applied in both init() and onAssignmentChange().
… state When a segment commits, the server updates ExternalView to ONLINE before writing the ZNRecord to ZK. During SegmentZkMetadataFetcher.init(), if a segment is ONLINE in EV but its ZNRecord still has startTime=-1 (brief inconsistency window), the segment was cached immediately due to the !isConsumingInExternalView check. Since cached segments are never re-fetched in onAssignmentChange, the segment stayed at DEFAULT_INTERVAL permanently. Fix: only cache a segment if its ZNRecord also has startTime >= 0 (committed). Segments in the inconsistency window remain uncached and are re-evaluated on the next onAssignmentChange once the ZNRecord is consistent. This is the same check used for consuming segments but applied to caching decisions rather than fetch-skip decisions, ensuring that EV state and ZNRecord state are both valid before treating a segment as permanently resolved.
Motivation
When
enableMultiClusterRouting=true, a broker (e.g. in DCA) builds a full routing table for each remote cluster (e.g.PHX), including a
TimeSegmentPrunerper table. The pruner is supposed to skip segments whose time range does notoverlap the query's time filter — the same pruning that works correctly for local segments.
In practice, time pruning was silently broken for all remote segments. Every remote segment was mapped to
DEFAULT_INTERVAL = [0, Long.MAX_VALUE], which matches every query, so nothing was pruned.On a federated table spanning two clusters in internal testing,
numSegmentsQueriedwithenableMultiClusterRouting=truewas 6× higher than two equivalent local-only queries combined (684 vs ~108).Root cause
Two bugs in the routing update pipeline conspired to produce this outcome.
Bug 1 —
TimeSegmentPruner.onAssignmentChange:computeIfAbsentprevented committed segments from being updatedTimeSegmentPrunermaintains a map ofsegment → time interval. REALTIME segments first appear in CONSUMING state withstartTime = -1, so they are mapped toDEFAULT_INTERVAL. When the segment later commits and ZooKeeper is updated with a validstartTime/endTime, the pruner should replaceDEFAULT_INTERVALwith the real interval.The old code used
computeIfAbsent, which only writes if the key is absent. Since the CONSUMING entry already exists, the update is silently skipped. The segment stays atDEFAULT_INTERVALpermanently.Bug 2 —
SegmentZkMetadataFetcher.onAssignmentChange: consuming segments were cached and never re-fetchedSegmentZkMetadataFetchermaintains_onlineSegmentsCachedto avoid redundant ZK reads. Once a segment's ZNRecord is fetched, it is added to the cache and skipped on all subsequentonAssignmentChangecalls.The original caching condition was
znRecord != null. A CONSUMING segment does have a non-null ZNRecord (it just hasstartTime = -1), so it was immediately cached. From that point on it was never re-fetched — even after it committed and ZK was updated with a valid time range.TimeSegmentPrunernever received the committed ZNRecord, so Bug 1's fix never had a chance to run.For local routing this is masked: when a server commits a segment, it sends a Helix user-defined message (UDM)
directly to the local broker, which calls
refreshSegment()— bypassing the cache entirely. Remote brokers never receivethese UDMs (because remote brokers are spectators, not participants ), so
onAssignmentChangeis the sole update path for remote segments.Fix
SegmentZkMetadataFetcher— use ExternalView state instead of caching consuming segmentsRather than caching a segment as soon as its ZNRecord is non-null, we consult the ExternalView that is already delivered
with every
onAssignmentChangecall:A segment goes through exactly three states in this logic:
The ExternalView change that fires
onAssignmentChangeis the same event that flips the segment's state from CONSUMING to ONLINE. We use that state, already in memory, to decide what to fetch — no additional ZK reads required to make the decision.TimeSegmentPruner— re-evaluate segments atDEFAULT_INTERVALWith the above fix ensuring the committed ZNRecord now flows through on each transition, the
computeIfAbsentinTimeSegmentPruneris replaced with a conditional put that updates any segment still atDEFAULT_INTERVAL:Why this does not degrade performance
onAssignmentChangeis called on a background thread in response to Helix ExternalView changes — it is never on thequery path.
The original code fetched a segment's ZNRecord once (when first seen as CONSUMING), cached it, and skipped it forever.
The new code does the same for committed segments. The key difference is what happens to CONSUMING segments: instead of fetching and caching them eagerly, we skip them entirely using the ExternalView state already in memory. No ZK read is issued for a CONSUMING segment on any
onAssignmentChangecall.The number of ZK reads per
onAssignmentChangeis now O(segments that just committed in this batch) — typically 1–5 per ExternalView change during active ingestion — compared to O(all currently-consuming segments) that a naive re-fetch approach would require. In steady state between commits, zero extra ZK reads are performed beyond what the original code did.The
IntervalTreerebuild inTimeSegmentPrunerat the end of everyonAssignmentChangeis O(N log N) in totalsegments and was already happening before this change — unaffected.
Verification
Tested on a staging cluster:
numSegmentsQueriedwith multi-cluster routing now equals the sum of both clusters' local-only counts.numSegmentsProcessedis unchanged — correct results, no data regression.