Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.helix.AccessOption;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.spi.utils.CommonConstants;


/**
Expand Down Expand Up @@ -82,7 +85,11 @@ public void init(IdealState idealState, ExternalView externalView, Set<String> o
listener.init(idealState, externalView, segments, znRecords);
}
for (int i = 0; i < numSegments; i++) {
if (znRecords.get(i) != null) {
// Only cache segments that are both non-consuming in EV AND have a committed ZNRecord.
// If a segment is ONLINE in EV but its ZNRecord still has startTime=-1 (brief window between
// server updating EV and writing ZNRecord to ZK), do not cache it so it is re-evaluated on
// the next onAssignmentChange once the ZNRecord is consistent.
if (!isConsumingInExternalView(externalView, segments.get(i)) && isCommittedZNRecord(znRecords.get(i))) {
_onlineSegmentsCached.add(segments.get(i));
}
}
Expand All @@ -98,18 +105,27 @@ public synchronized void onAssignmentChange(IdealState idealState, ExternalView
List<String> segments = new ArrayList<>();
List<String> segmentZKMetadataPaths = new ArrayList<>();
for (String segment : onlineSegments) {
if (!_onlineSegmentsCached.contains(segment)) {
segments.add(segment);
segmentZKMetadataPaths.add(_segmentZKMetadataPathPrefix + segment);
if (_onlineSegmentsCached.contains(segment)) {
continue;
}
// Skip segments still in CONSUMING state — they'll be re-evaluated on the next EV change
// when they transition to ONLINE (i.e., when they commit).
if (isConsumingInExternalView(externalView, segment)) {
continue;
}
segments.add(segment);
segmentZKMetadataPaths.add(_segmentZKMetadataPathPrefix + segment);
}
List<ZNRecord> znRecords = _propertyStore.get(segmentZKMetadataPaths, null, AccessOption.PERSISTENT, false);
for (SegmentZkMetadataFetchListener listener : _listeners) {
listener.onAssignmentChange(idealState, externalView, onlineSegments, segments, znRecords);
}
int numSegments = segments.size();
for (int i = 0; i < numSegments; i++) {
if (znRecords.get(i) != null) {
// Cache only if ZNRecord is committed (startTime >= 0). Segments that are ONLINE in EV but still
// have startTime=-1 in ZK (brief inconsistency between EV and ZNRecord updates) are left uncached
// so they are retried on the next onAssignmentChange.
if (isCommittedZNRecord(znRecords.get(i))) {
_onlineSegmentsCached.add(segments.get(i));
}
}
Expand All @@ -130,4 +146,26 @@ public synchronized void refreshSegment(String segment) {
}
}
}

/**
* Returns true if the ZNRecord represents a committed segment with a valid startTime.
* A null ZNRecord or one with startTime=-1 (consuming, or briefly inconsistent after commit)
* should not be cached — the segment will be re-fetched on the next onAssignmentChange.
*/
private static boolean isCommittedZNRecord(@Nullable ZNRecord znRecord) {
return znRecord != null && znRecord.getLongField(CommonConstants.Segment.START_TIME, -1L) >= 0L;
}

/**
* Returns true if the segment is in CONSUMING state on any server in the ExternalView.
* Such segments should not be cached in {@code _onlineSegmentsCached} — they will be re-evaluated
* on the next ExternalView change, at which point they will have transitioned to ONLINE (committed).
*/
private static boolean isConsumingInExternalView(ExternalView externalView, String segment) {
if (externalView == null) {
return false;
}
Map<String, String> stateMap = externalView.getStateMap(segment);
return stateMap != null && stateMap.containsValue(CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,16 @@ private Interval extractIntervalFromSegmentZKMetaZNRecord(String segment, @Nulla
@Override
public synchronized void onAssignmentChange(IdealState idealState, ExternalView externalView,
Set<String> onlineSegments, List<String> pulledSegments, List<ZNRecord> znRecords) {
// NOTE: We don't update all the segment ZK metadata for every external view change, but only the new added/removed
// ones. The refreshed segment ZK metadata change won't be picked up.
for (int idx = 0; idx < pulledSegments.size(); idx++) {
String segment = pulledSegments.get(idx);
ZNRecord zNrecord = znRecords.get(idx);
_intervalMap.computeIfAbsent(segment, k -> extractIntervalFromSegmentZKMetaZNRecord(k, zNrecord));
// Always update segments that have DEFAULT_INTERVAL, which covers two cases:
// 1. New segments not yet in the map
// 2. Segments that transitioned from CONSUMING (DEFAULT_INTERVAL) to COMMITTED (valid time range)
Interval existing = _intervalMap.get(segment);
if (existing == null || existing == DEFAULT_INTERVAL) {
_intervalMap.put(segment, extractIntervalFromSegmentZKMetaZNRecord(segment, zNrecord));
}
}
_intervalMap.keySet().retainAll(onlineSegments);
_intervalTree = new IntervalTree<>(_intervalMap);
Expand Down
Loading