Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.helix.AccessOption;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.spi.utils.CommonConstants;


/**
Expand Down Expand Up @@ -82,7 +84,7 @@ public void init(IdealState idealState, ExternalView externalView, Set<String> o
listener.init(idealState, externalView, segments, znRecords);
}
for (int i = 0; i < numSegments; i++) {
if (znRecords.get(i) != null) {
if (!isConsumingInExternalView(externalView, segments.get(i))) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This init-time cache update no longer checks whether the ZK fetch actually returned metadata. If an ONLINE segment comes back with a null ZNRecord here, TimeSegmentPruner.init() records DEFAULT_INTERVAL and this branch still marks the segment as cached, so later assignment changes will never retry the fetch. That regresses the old behavior and can leave the broker permanently unable to time-prune that segment until restart/refresh; please keep the original znRecords.get(i) != null guard in addition to the CONSUMING check before caching.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Just realised there is actually a deeper issue here beyond just null ZNRecords.

Even when the ZNRecord is non-null, there's a brief race window when a segment commits: the server updates the ExternalView to ONLINE before it finishes writing the committed ZNRecord to ZK. During this window, the segment is not CONSUMING in EV (so it passes the EV check), the ZNRecord exists but still has startTime = -1, and extractIntervalFromSegmentZKMetaZNRecord returns DEFAULT_INTERVAL. If cached at that moment, the segment is stuck with DEFAULT_INTERVAL forever since cached segments are never re-fetched.

Added an isCommittedZNRecord helper (znRecord != null && startTime >= 0) that handles both the null case, applied in both init() and onAssignmentChange().

_onlineSegmentsCached.add(segments.get(i));
}
}
Expand All @@ -98,17 +100,24 @@ public synchronized void onAssignmentChange(IdealState idealState, ExternalView
List<String> segments = new ArrayList<>();
List<String> segmentZKMetadataPaths = new ArrayList<>();
for (String segment : onlineSegments) {
if (!_onlineSegmentsCached.contains(segment)) {
segments.add(segment);
segmentZKMetadataPaths.add(_segmentZKMetadataPathPrefix + segment);
if (_onlineSegmentsCached.contains(segment)) {
continue;
}
// Skip segments still in CONSUMING state — they'll be re-evaluated on the next EV change
// when they transition to ONLINE (i.e., when they commit).
if (isConsumingInExternalView(externalView, segment)) {
continue;
}
segments.add(segment);
segmentZKMetadataPaths.add(_segmentZKMetadataPathPrefix + segment);
}
List<ZNRecord> znRecords = _propertyStore.get(segmentZKMetadataPaths, null, AccessOption.PERSISTENT, false);
for (SegmentZkMetadataFetchListener listener : _listeners) {
listener.onAssignmentChange(idealState, externalView, onlineSegments, segments, znRecords);
}
int numSegments = segments.size();
for (int i = 0; i < numSegments; i++) {
// All fetched segments are non-consuming (guaranteed by the EV check above), cache them if metadata exists.
if (znRecords.get(i) != null) {
_onlineSegmentsCached.add(segments.get(i));
}
Expand All @@ -130,4 +139,17 @@ public synchronized void refreshSegment(String segment) {
}
}
}

/**
* Returns true if the segment is in CONSUMING state on any server in the ExternalView.
* Such segments should not be cached in {@code _onlineSegmentsCached} — they will be re-evaluated
* on the next ExternalView change, at which point they will have transitioned to ONLINE (committed).
*/
private static boolean isConsumingInExternalView(ExternalView externalView, String segment) {
if (externalView == null) {
return false;
}
Map<String, String> stateMap = externalView.getStateMap(segment);
return stateMap != null && stateMap.containsValue(CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,16 @@ private Interval extractIntervalFromSegmentZKMetaZNRecord(String segment, @Nulla
@Override
public synchronized void onAssignmentChange(IdealState idealState, ExternalView externalView,
Set<String> onlineSegments, List<String> pulledSegments, List<ZNRecord> znRecords) {
// NOTE: We don't update all the segment ZK metadata for every external view change, but only the new added/removed
// ones. The refreshed segment ZK metadata change won't be picked up.
for (int idx = 0; idx < pulledSegments.size(); idx++) {
String segment = pulledSegments.get(idx);
ZNRecord zNrecord = znRecords.get(idx);
_intervalMap.computeIfAbsent(segment, k -> extractIntervalFromSegmentZKMetaZNRecord(k, zNrecord));
// Always update segments that have DEFAULT_INTERVAL, which covers two cases:
// 1. New segments not yet in the map
// 2. Segments that transitioned from CONSUMING (DEFAULT_INTERVAL) to COMMITTED (valid time range)
Interval existing = _intervalMap.get(segment);
if (existing == null || existing == DEFAULT_INTERVAL) {
_intervalMap.put(segment, extractIntervalFromSegmentZKMetaZNRecord(segment, zNrecord));
}
}
_intervalMap.keySet().retainAll(onlineSegments);
_intervalTree = new IntervalTree<>(_intervalMap);
Expand Down
Loading