diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java index fdff73c0318d..3c31d5d19702 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java @@ -23,6 +23,7 @@ import java.util.Objects; import javax.annotation.Nullable; import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.spi.utils.IngestionConfigUtils; import org.joda.time.DateTime; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; @@ -35,17 +36,30 @@ public class LLCSegmentName implements Comparable { private final String _tableName; private final int _partitionGroupId; + private final int _streamConfigId; private final int _sequenceNumber; private final String _creationTime; private final String _segmentName; public LLCSegmentName(String segmentName) { String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR); - Preconditions.checkArgument(parts.length == 4, "Invalid LLC segment name: %s", segmentName); - _tableName = parts[0]; - _partitionGroupId = Integer.parseInt(parts[1]); - _sequenceNumber = Integer.parseInt(parts[2]); - _creationTime = parts[3]; + if (parts.length == 5) { + // Multi-topic format: tableName__configId__streamPartitionId__sequenceNumber__creationTime + _tableName = parts[0]; + int configId = Integer.parseInt(parts[1]); + int streamPartitionId = Integer.parseInt(parts[2]); + _streamConfigId = configId; + _partitionGroupId = configId * IngestionConfigUtils.PARTITION_PADDING_OFFSET + streamPartitionId; + _sequenceNumber = Integer.parseInt(parts[3]); + _creationTime = parts[4]; + } else { + Preconditions.checkArgument(parts.length == 4, "Invalid LLC segment name: %s", segmentName); + _tableName = parts[0]; + _partitionGroupId = Integer.parseInt(parts[1]); + _streamConfigId = 0; + _sequenceNumber = Integer.parseInt(parts[2]); + _creationTime = parts[3]; + } _segmentName = segmentName; } @@ -53,6 +67,7 @@ public LLCSegmentName(String tableName, int partitionGroupId, int sequenceNumber Preconditions.checkArgument(!tableName.contains(SEPARATOR), "Illegal table name: %s", tableName); _tableName = tableName; _partitionGroupId = partitionGroupId; + _streamConfigId = 0; _sequenceNumber = sequenceNumber; // ISO8601 date: 20160120T1234Z _creationTime = DATE_FORMATTER.print(msSinceEpoch); @@ -73,7 +88,8 @@ public static LLCSegmentName of(String segmentName) { } /** - * Returns whether the given segment name represents an LLC segment. + * Returns whether the given segment name represents an LLC segment (either 4-part single-topic + * or 5-part multi-topic format). */ public static boolean isLLCSegment(String segmentName) { int numSeparators = 0; @@ -82,7 +98,13 @@ public static boolean isLLCSegment(String segmentName) { numSeparators++; index += 2; // SEPARATOR.length() } - return numSeparators == 3; + if (numSeparators == 3) { + return true; + } + if (numSeparators == 4) { + return MultiTopicLLCSegmentName.isMultiTopicLLCSegment(segmentName); + } + return false; } @Deprecated @@ -105,6 +127,10 @@ public int getPartitionGroupId() { return _partitionGroupId; } + public int getStreamConfigId() { + return _streamConfigId; + } + public int getSequenceNumber() { return _sequenceNumber; } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/MultiTopicLLCSegmentName.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/MultiTopicLLCSegmentName.java new file mode 100644 index 000000000000..803269ca9363 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/MultiTopicLLCSegmentName.java @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.utils; + +import com.fasterxml.jackson.annotation.JsonValue; +import com.google.common.base.Preconditions; +import java.util.Objects; +import javax.annotation.Nullable; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.spi.utils.IngestionConfigUtils; +import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + + +/** + * Segment name for multi-topic LLC realtime segments with 5-part format: + * {@code tableName__configId__streamPartitionId__sequenceNumber__creationTime} + * + *

This format stores the stream config ID and stream partition ID as separate fields + * rather than encoding them into a single partition group ID ({@code configId * 10000 + partitionId}). + * This eliminates collision and overflow risks from the encoding scheme. + * + *

Existing 4-part segments ({@link LLCSegmentName}) are not affected. Both formats can + * coexist in the same table. Use {@link #of(String)} to parse a segment name that may be + * in either format. + */ +public class MultiTopicLLCSegmentName implements Comparable { + private static final String SEPARATOR = "__"; + private static final String DATE_FORMAT = "yyyyMMdd'T'HHmm'Z'"; + private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormat.forPattern(DATE_FORMAT).withZoneUTC(); + + private final String _tableName; + private final int _configId; + private final int _streamPartitionId; + private final int _sequenceNumber; + private final String _creationTime; + private final String _segmentName; + + public MultiTopicLLCSegmentName(String segmentName) { + String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR); + Preconditions.checkArgument(parts.length == 5, "Invalid multi-topic LLC segment name: %s", segmentName); + _tableName = parts[0]; + _configId = Integer.parseInt(parts[1]); + _streamPartitionId = Integer.parseInt(parts[2]); + _sequenceNumber = Integer.parseInt(parts[3]); + _creationTime = parts[4]; + _segmentName = segmentName; + } + + public MultiTopicLLCSegmentName(String tableName, int configId, int streamPartitionId, int sequenceNumber, + long msSinceEpoch) { + Preconditions.checkArgument(!tableName.contains(SEPARATOR), "Illegal table name: %s", tableName); + _tableName = tableName; + _configId = configId; + _streamPartitionId = streamPartitionId; + _sequenceNumber = sequenceNumber; + _creationTime = DATE_FORMATTER.print(msSinceEpoch); + _segmentName = tableName + SEPARATOR + configId + SEPARATOR + streamPartitionId + SEPARATOR + sequenceNumber + + SEPARATOR + _creationTime; + } + + @Nullable + public static MultiTopicLLCSegmentName of(String segmentName) { + try { + return new MultiTopicLLCSegmentName(segmentName); + } catch (Exception e) { + return null; + } + } + + /** + * Returns whether the given segment name is a multi-topic LLC segment (5-part format where + * parts[1] and parts[3] are integers, distinguishing it from UploadedRealtimeSegmentName). + */ + public static boolean isMultiTopicLLCSegment(String segmentName) { + String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR); + if (parts.length != 5) { + return false; + } + try { + Integer.parseInt(parts[1]); // configId + Integer.parseInt(parts[2]); // streamPartitionId + Integer.parseInt(parts[3]); // sequenceNumber + return true; + } catch (NumberFormatException e) { + return false; + } + } + + public String getTableName() { + return _tableName; + } + + public int getConfigId() { + return _configId; + } + + public int getStreamPartitionId() { + return _streamPartitionId; + } + + /// Returns the encoded partition group ID for backward compatibility with code + /// that expects the single-integer encoding ({@code configId * 10000 + streamPartitionId}). + public int getPartitionGroupId() { + return IngestionConfigUtils.getPinotPartitionIdFromConfigId(_streamPartitionId, _configId); + } + + public int getSequenceNumber() { + return _sequenceNumber; + } + + public String getCreationTime() { + return _creationTime; + } + + public long getCreationTimeMs() { + DateTime dateTime = DATE_FORMATTER.parseDateTime(_creationTime); + return dateTime.getMillis(); + } + + @JsonValue + public String getSegmentName() { + return _segmentName; + } + + /// Converts this multi-topic segment name to an equivalent {@link LLCSegmentName} using the + /// encoded partition group ID. Useful for interacting with code that only understands the 4-part format. + public LLCSegmentName toLLCSegmentName() { + return new LLCSegmentName(_tableName, getPartitionGroupId(), _sequenceNumber, getCreationTimeMs()); + } + + @Override + public int compareTo(MultiTopicLLCSegmentName other) { + Preconditions.checkArgument(_tableName.equals(other._tableName), + "Cannot compare segment names from different table: %s, %s", _segmentName, other.getSegmentName()); + if (_configId != other._configId) { + return Integer.compare(_configId, other._configId); + } + if (_streamPartitionId != other._streamPartitionId) { + return Integer.compare(_streamPartitionId, other._streamPartitionId); + } + return Integer.compare(_sequenceNumber, other._sequenceNumber); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof MultiTopicLLCSegmentName)) { + return false; + } + MultiTopicLLCSegmentName that = (MultiTopicLLCSegmentName) o; + return _segmentName.equals(that._segmentName); + } + + @Override + public int hashCode() { + return Objects.hash(_segmentName); + } + + @Override + public String toString() { + return _segmentName; + } +} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java index 80e735fb2f80..762191325a30 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java @@ -74,6 +74,10 @@ public static Integer getPartitionIdFromSegmentName(String segmentName) { if (llcSegmentName != null) { return llcSegmentName.getPartitionGroupId(); } + MultiTopicLLCSegmentName multiTopicLLCSegmentName = MultiTopicLLCSegmentName.of(segmentName); + if (multiTopicLLCSegmentName != null) { + return multiTopicLLCSegmentName.getPartitionGroupId(); + } UploadedRealtimeSegmentName uploadedRealtimeSegmentName = UploadedRealtimeSegmentName.of(segmentName); if (uploadedRealtimeSegmentName != null) { return uploadedRealtimeSegmentName.getPartitionId(); diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/MultiTopicLLCSegmentNameTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/MultiTopicLLCSegmentNameTest.java new file mode 100644 index 000000000000..1d04f7b6c62d --- /dev/null +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/MultiTopicLLCSegmentNameTest.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.utils; + +import org.testng.annotations.Test; + +import static org.testng.Assert.*; + + +public class MultiTopicLLCSegmentNameTest { + + @Test + public void testConstructFromComponents() { + MultiTopicLLCSegmentName name = new MultiTopicLLCSegmentName("myTable", 2, 5, 3, 1700000000000L); + assertEquals(name.getTableName(), "myTable"); + assertEquals(name.getConfigId(), 2); + assertEquals(name.getStreamPartitionId(), 5); + assertEquals(name.getSequenceNumber(), 3); + assertEquals(name.getPartitionGroupId(), 20005); + assertEquals(name.getSegmentName(), "myTable__2__5__3__20231114T2213Z"); + } + + @Test + public void testConstructFromString() { + MultiTopicLLCSegmentName name = new MultiTopicLLCSegmentName("myTable__2__5__3__20231114T2213Z"); + assertEquals(name.getTableName(), "myTable"); + assertEquals(name.getConfigId(), 2); + assertEquals(name.getStreamPartitionId(), 5); + assertEquals(name.getSequenceNumber(), 3); + assertEquals(name.getPartitionGroupId(), 20005); + } + + @Test + public void testOfValidName() { + MultiTopicLLCSegmentName name = MultiTopicLLCSegmentName.of("myTable__0__63__10__20231215T0000Z"); + assertNotNull(name); + assertEquals(name.getConfigId(), 0); + assertEquals(name.getStreamPartitionId(), 63); + assertEquals(name.getSequenceNumber(), 10); + assertEquals(name.getPartitionGroupId(), 63); + } + + @Test + public void testOfInvalidName() { + // 4-part LLC format — should return null + assertNull(MultiTopicLLCSegmentName.of("myTable__10005__3__20231215T0000Z")); + // Uploaded realtime format — should return null (parts[3] is a date, not integer) + assertNull(MultiTopicLLCSegmentName.of("uploaded__myTable__5__20231215T0000Z__suffix")); + // Garbage + assertNull(MultiTopicLLCSegmentName.of("not_a_segment")); + } + + @Test + public void testIsMultiTopicLLCSegment() { + assertTrue(MultiTopicLLCSegmentName.isMultiTopicLLCSegment("myTable__2__5__3__20231215T0000Z")); + assertTrue(MultiTopicLLCSegmentName.isMultiTopicLLCSegment("myTable__0__0__0__20231215T0000Z")); + // 4-part LLC + assertFalse(MultiTopicLLCSegmentName.isMultiTopicLLCSegment("myTable__10005__3__20231215T0000Z")); + // Uploaded realtime — parts[3] is a date string, not parseable as integer + assertFalse(MultiTopicLLCSegmentName.isMultiTopicLLCSegment("uploaded__myTable__5__20231215T0000Z__suffix")); + // 3 parts + assertFalse(MultiTopicLLCSegmentName.isMultiTopicLLCSegment("a__b__c")); + } + + @Test + public void testPartitionGroupIdEncoding() { + // configId=0, partition=5 -> 5 + MultiTopicLLCSegmentName name0 = new MultiTopicLLCSegmentName("myTable", 0, 5, 0, 1700000000000L); + assertEquals(name0.getPartitionGroupId(), 5); + + // configId=1, partition=3 -> 10003 + MultiTopicLLCSegmentName name1 = new MultiTopicLLCSegmentName("myTable", 1, 3, 0, 1700000000000L); + assertEquals(name1.getPartitionGroupId(), 10003); + + // configId=5, partition=99 -> 50099 + MultiTopicLLCSegmentName name5 = new MultiTopicLLCSegmentName("myTable", 5, 99, 0, 1700000000000L); + assertEquals(name5.getPartitionGroupId(), 50099); + } + + @Test + public void testToLLCSegmentName() { + MultiTopicLLCSegmentName multiTopic = new MultiTopicLLCSegmentName("myTable", 1, 5, 3, 1700000000000L); + LLCSegmentName llc = multiTopic.toLLCSegmentName(); + assertEquals(llc.getTableName(), "myTable"); + assertEquals(llc.getPartitionGroupId(), 10005); + assertEquals(llc.getSequenceNumber(), 3); + } + + @Test + public void testCompareTo() { + MultiTopicLLCSegmentName a = new MultiTopicLLCSegmentName("myTable", 0, 5, 1, 1700000000000L); + MultiTopicLLCSegmentName b = new MultiTopicLLCSegmentName("myTable", 0, 5, 2, 1700000000000L); + MultiTopicLLCSegmentName c = new MultiTopicLLCSegmentName("myTable", 1, 0, 0, 1700000000000L); + + assertTrue(a.compareTo(b) < 0); + assertTrue(b.compareTo(a) > 0); + assertTrue(a.compareTo(c) < 0); // configId 0 < configId 1 + } + + @Test + public void testEqualsAndHashCode() { + MultiTopicLLCSegmentName a = new MultiTopicLLCSegmentName("myTable", 1, 5, 3, 1700000000000L); + MultiTopicLLCSegmentName b = new MultiTopicLLCSegmentName(a.getSegmentName()); + assertEquals(a, b); + assertEquals(a.hashCode(), b.hashCode()); + + MultiTopicLLCSegmentName c = new MultiTopicLLCSegmentName("myTable", 1, 5, 4, 1700000000000L); + assertNotEquals(a, c); + } + + @Test + public void testLLCSegmentNameIsLLCSegmentIncludesMultiTopic() { + // 4-part format + assertTrue(LLCSegmentName.isLLCSegment("myTable__10005__3__20231215T0000Z")); + // 5-part multi-topic format + assertTrue(LLCSegmentName.isLLCSegment("myTable__1__5__3__20231215T0000Z")); + // Uploaded realtime — should NOT be detected as LLC + assertFalse(LLCSegmentName.isLLCSegment("uploaded__myTable__5__20231215T0000Z__suffix")); + } + + @Test + public void testSegmentUtilsPartitionId() { + // Multi-topic segment returns encoded partition group ID + assertEquals(SegmentUtils.getPartitionIdFromSegmentName("myTable__1__5__3__20231215T0000Z"), + Integer.valueOf(10005)); + // Standard LLC segment + assertEquals(SegmentUtils.getPartitionIdFromSegmentName("myTable__10005__3__20231215T0000Z"), + Integer.valueOf(10005)); + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java index 91985f5997ea..2bbec915bd06 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java @@ -392,31 +392,28 @@ List getStreamMetadataList(List streamConfigs, throws Exception { Map streamPartitionCountMap = _pinotHelixResourceManager.getRealtimeSegmentManager().getPartitionCountMap(streamConfigs); - Map> partitionGroupMetadataByStreamConfigIndex = new HashMap<>(); + Map> partitionGroupMetadataByConfigId = new HashMap<>(); for (WatermarkInductionResult.Watermark watermark : watermarkInductionResult.getWatermarks()) { - int streamConfigIndex = - IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(watermark.getPartitionGroupId()); - Preconditions.checkArgument(streamConfigIndex >= 0 && streamConfigIndex < streamConfigs.size(), - "Invalid stream config index %s from watermark partition ID %s. Expected index in range [0, %s)", - streamConfigIndex, watermark.getPartitionGroupId(), streamConfigs.size()); - partitionGroupMetadataByStreamConfigIndex.computeIfAbsent(streamConfigIndex, ignored -> new ArrayList<>()).add( + int configId = IngestionConfigUtils.getConfigIdFromPinotPartitionId(watermark.getPartitionGroupId()); + partitionGroupMetadataByConfigId.computeIfAbsent(configId, ignored -> new ArrayList<>()).add( new PartitionGroupMetadata(watermark.getPartitionGroupId(), new LongMsgOffset(watermark.getOffset()), watermark.getSequenceNumber())); } - // Iterate in order by streamConfigIndex to ensure deterministic ordering - List streamMetadataList = new ArrayList<>(partitionGroupMetadataByStreamConfigIndex.size()); - for (int streamConfigIndex = 0; streamConfigIndex < streamConfigs.size(); streamConfigIndex++) { + // Iterate over stream configs in order to ensure deterministic ordering + List streamMetadataList = new ArrayList<>(partitionGroupMetadataByConfigId.size()); + for (int i = 0; i < streamConfigs.size(); i++) { + int configId = IngestionConfigUtils.getConfigIdFromStreamConfig(streamConfigs.get(i)); List partitionGroupMetadataList = - partitionGroupMetadataByStreamConfigIndex.get(streamConfigIndex); + partitionGroupMetadataByConfigId.get(configId); if (partitionGroupMetadataList == null) { - // No watermarks for this stream config index, skip it + // No watermarks for this stream config, skip it continue; } - Integer partitionCount = streamPartitionCountMap.get(streamConfigIndex); + Integer partitionCount = streamPartitionCountMap.get(configId); Preconditions.checkState(partitionCount != null, - "Cannot find partition count for stream config index: %s", streamConfigIndex); - streamMetadataList.add(new StreamMetadata(streamConfigs.get(streamConfigIndex), + "Cannot find partition count for stream config ID: %s", configId); + streamMetadataList.add(new StreamMetadata(streamConfigs.get(i), partitionCount, partitionGroupMetadataList)); } return streamMetadataList; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 0e27d1565290..b8780b7577bf 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -85,6 +85,7 @@ import org.apache.pinot.common.restlet.resources.TableLLCSegmentUploadResponse; import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.common.utils.MultiTopicLLCSegmentName; import org.apache.pinot.common.utils.PauselessConsumptionUtils; import org.apache.pinot.common.utils.URIUtils; import org.apache.pinot.common.utils.helix.HelixHelper; @@ -315,18 +316,19 @@ public List getPartitionGroupConsumptionStatusL } } else { // Multiple streams - StreamPartitionMsgOffsetFactory[] offsetFactories = new StreamPartitionMsgOffsetFactory[numStreams]; + Map configIdToStreamConfig = + IngestionConfigUtils.buildConfigIdToStreamConfigMap(streamConfigs); + Map offsetFactoriesByConfigId = new HashMap<>(); for (Map.Entry entry : partitionGroupIdToLatestSegment.entrySet()) { int partitionGroupId = entry.getKey(); - int index = IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(partitionGroupId); - int streamPartitionId = IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(partitionGroupId); LLCSegmentName llcSegmentName = entry.getValue(); + int configId = llcSegmentName.getStreamConfigId(); + int streamPartitionId = IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(partitionGroupId); SegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(tableNameWithType, llcSegmentName.getSegmentName()); - StreamPartitionMsgOffsetFactory offsetFactory = offsetFactories[index]; - if (offsetFactory == null) { - offsetFactory = StreamConsumerFactoryProvider.create(streamConfigs.get(index)).createStreamMsgOffsetFactory(); - offsetFactories[index] = offsetFactory; - } + StreamPartitionMsgOffsetFactory offsetFactory = + offsetFactoriesByConfigId.computeIfAbsent(configId, k -> StreamConsumerFactoryProvider.create( + configIdToStreamConfig.get(k)) + .createStreamMsgOffsetFactory()); PartitionGroupConsumptionStatus partitionGroupConsumptionStatus = new PartitionGroupConsumptionStatus(partitionGroupId, streamPartitionId, llcSegmentName.getSequenceNumber(), offsetFactory.create(segmentZKMetadata.getStartOffset()), @@ -858,11 +860,12 @@ private String createNewSegmentMetadata(TableConfig tableConfig, } String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName); long newSegmentCreationTimeMs = getCurrentTimeMs(); - LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, committingSegmentPartitionGroupId, - committingLLCSegment.getSequenceNumber() + 1, newSegmentCreationTimeMs); + LLCSegmentName newLLCSegment = getNextLLCSegmentName(committingLLCSegment, newSegmentCreationTimeMs, + streamConfigs.size() > 1); - StreamConfig streamConfig = - IngestionConfigUtils.getStreamConfigFromPinotPartitionId(streamConfigs, committingSegmentPartitionGroupId); + Map configIdToStreamConfig = + IngestionConfigUtils.buildConfigIdToStreamConfigMap(streamConfigs); + StreamConfig streamConfig = configIdToStreamConfig.get(committingLLCSegment.getStreamConfigId()); createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, newSegmentCreationTimeMs, committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, partitionIds.size(), numReplicas); @@ -1272,10 +1275,10 @@ private PartitionIdsWithIdealState getPartitionIdsWithIdealState(List IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(partitionId, index)) + .map(partitionId -> IngestionConfigUtils.getPinotPartitionIdFromConfigId(partitionId, configId)) .collect(Collectors.toSet())); } catch (UnsupportedOperationException ignored) { allPartitionIdsFetched = false; @@ -1581,8 +1584,7 @@ public static boolean isTopicPaused(IdealState idealState, int topicIndex) { public static boolean isTopicPaused(IdealState idealState, String segmentName) { LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName); if (llcSegmentName != null) { - return isTopicPaused(idealState, - IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(llcSegmentName.getPartitionGroupId())); + return isTopicPaused(idealState, llcSegmentName.getStreamConfigId()); } return false; } @@ -1779,6 +1781,8 @@ IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, List> instanceStatesMap = idealState.getRecord().getMapFields(); StreamPartitionMsgOffsetFactory offsetFactory = StreamConsumerFactoryProvider.create(streamConfigs.get(0)).createStreamMsgOffsetFactory(); + Map configIdToStreamConfig = + IngestionConfigUtils.buildConfigIdToStreamConfigMap(streamConfigs); // Get the latest segment ZK metadata for each partition Map latestSegmentZKMetadataMap = getLatestSegmentZKMetadataMap(realtimeTableName); @@ -1829,7 +1833,7 @@ IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, List instanceStateMap = instanceStatesMap.get(latestSegmentName); if (instanceStateMap != null) { @@ -1848,12 +1852,13 @@ IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, List instancePartitionsMap, StreamPartitionMsgOffset startOffset) { int numReplicas = getNumReplicas(tableConfig, instancePartitions); LLCSegmentName latestLLCSegmentName = new LLCSegmentName(latestSegmentZKMetadata.getSegmentName()); - LLCSegmentName newLLCSegmentName = getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs); + LLCSegmentName newLLCSegmentName = getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs, + IngestionConfigUtils.hasMultipleStreams(tableConfig)); CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(latestSegmentZKMetadata.getSegmentName(), startOffset.toString(), 0); createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs, committingSegmentDescriptor, @@ -2059,19 +2065,19 @@ List buildPartitionGroupConsumptionStatusFromZK zkMetadata.getStatus().toString())); } } else { - StreamPartitionMsgOffsetFactory[] offsetFactories = new StreamPartitionMsgOffsetFactory[numStreams]; + Map configIdToStreamConfig = + IngestionConfigUtils.buildConfigIdToStreamConfigMap(streamConfigs); + Map offsetFactoriesByConfigId = new HashMap<>(); for (Map.Entry entry : latestSegmentZKMetadataMap.entrySet()) { int partitionGroupId = entry.getKey(); - int index = IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(partitionGroupId); int streamPartitionId = IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(partitionGroupId); SegmentZKMetadata zkMetadata = entry.getValue(); LLCSegmentName llcSegmentName = new LLCSegmentName(zkMetadata.getSegmentName()); - StreamPartitionMsgOffsetFactory offsetFactory = offsetFactories[index]; - if (offsetFactory == null) { - offsetFactory = - StreamConsumerFactoryProvider.create(streamConfigs.get(index)).createStreamMsgOffsetFactory(); - offsetFactories[index] = offsetFactory; - } + int configId = llcSegmentName.getStreamConfigId(); + StreamPartitionMsgOffsetFactory offsetFactory = + offsetFactoriesByConfigId.computeIfAbsent(configId, k -> StreamConsumerFactoryProvider.create( + configIdToStreamConfig.get(k)) + .createStreamMsgOffsetFactory()); result.add(new PartitionGroupConsumptionStatus(partitionGroupId, streamPartitionId, llcSegmentName.getSequenceNumber(), offsetFactory.create(zkMetadata.getStartOffset()), @@ -2104,9 +2110,19 @@ private StreamPartitionMsgOffset selectStartOffset(OffsetCriteria offsetCriteria } } - private LLCSegmentName getNextLLCSegmentName(LLCSegmentName lastLLCSegmentName, long creationTimeMs) { - return new LLCSegmentName(lastLLCSegmentName.getTableName(), lastLLCSegmentName.getPartitionGroupId(), - lastLLCSegmentName.getSequenceNumber() + 1, creationTimeMs); + private LLCSegmentName getNextLLCSegmentName(LLCSegmentName lastLLCSegmentName, long creationTimeMs, + boolean isMultiTopic) { + String tableName = lastLLCSegmentName.getTableName(); + int nextSeq = lastLLCSegmentName.getSequenceNumber() + 1; + if (isMultiTopic) { + int partitionGroupId = lastLLCSegmentName.getPartitionGroupId(); + int configId = IngestionConfigUtils.getConfigIdFromPinotPartitionId(partitionGroupId); + int streamPartitionId = IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(partitionGroupId); + String segmentName = new MultiTopicLLCSegmentName(tableName, configId, streamPartitionId, + nextSeq, creationTimeMs).getSegmentName(); + return new LLCSegmentName(segmentName); + } + return new LLCSegmentName(tableName, lastLLCSegmentName.getPartitionGroupId(), nextSeq, creationTimeMs); } /** @@ -2131,8 +2147,16 @@ private String setupNewPartitionGroup(TableConfig tableConfig, StreamConfig stre partitionGroupId, realtimeTableName, sequence, startOffset); String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName); - LLCSegmentName newLLCSegmentName = - new LLCSegmentName(rawTableName, partitionGroupId, sequence, creationTimeMs); + LLCSegmentName newLLCSegmentName; + if (IngestionConfigUtils.hasMultipleStreams(tableConfig)) { + int configId = IngestionConfigUtils.getConfigIdFromPinotPartitionId(partitionGroupId); + int streamPartitionId = IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(partitionGroupId); + String segmentName = new MultiTopicLLCSegmentName(rawTableName, configId, streamPartitionId, + sequence, creationTimeMs).getSegmentName(); + newLLCSegmentName = new LLCSegmentName(segmentName); + } else { + newLLCSegmentName = new LLCSegmentName(rawTableName, partitionGroupId, sequence, creationTimeMs); + } String newSegmentName = newLLCSegmentName.getSegmentName(); CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(null, @@ -2893,8 +2917,7 @@ private Set findConsumingSegmentsOfTopics(IdealState idealState, List consumingSegments = new TreeSet<>(); idealState.getRecord().getMapFields().forEach((segmentName, instanceToStateMap) -> { LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName); - if (llcSegmentName != null && !topicIndices.contains( - IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(llcSegmentName.getPartitionGroupId()))) { + if (llcSegmentName != null && !topicIndices.contains(llcSegmentName.getStreamConfigId())) { return; } for (String state : instanceToStateMap.values()) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java index e26390b9e4fb..8eabddd24e58 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java @@ -154,7 +154,8 @@ void update(long ingestionTimeMs, @Nullable StreamPartitionMsgOffset currentOffs private Clock _clock = Clock.systemUTC(); protected volatile Map _partitionIdToLatestOffset; - protected volatile ConcurrentHashMap _partitionsHostedByThisServer = new ConcurrentHashMap<>(); + // Maps partitionGroupId -> streamConfigId, populated from LLCSegmentName so no arithmetic needed. + protected volatile ConcurrentHashMap _partitionsHostedByThisServer = new ConcurrentHashMap<>(); // Map of StreamMetadataProvider to fetch upstream latest stream offset (Table can have multiple upstream topics) // This map is accessed by: // 1. _ingestionDelayTrackingScheduler thread. @@ -198,14 +199,13 @@ private void createOrUpdateStreamMetadataProvider() { List streamConfigs = IngestionConfigUtils.getStreamConfigs(_realTimeTableDataManager.getCachedTableConfigAndSchema().getLeft()); - for (int streamConfigIndex = 0; streamConfigIndex < streamConfigs.size(); streamConfigIndex++) { - if (_streamConfigIndexToStreamMetadataProvider.containsKey(streamConfigIndex)) { + for (StreamConfig streamConfig : streamConfigs) { + int configId = IngestionConfigUtils.getConfigIdFromStreamConfig(streamConfig); + if (_streamConfigIndexToStreamMetadataProvider.containsKey(configId)) { continue; } - StreamConfig streamConfig = null; StreamMetadataProvider streamMetadataProvider; try { - streamConfig = streamConfigs.get(streamConfigIndex); StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig); String clientId = IngestionConfigUtils.getTableTopicUniqueClientId(IngestionDelayTracker.class.getSimpleName(), streamConfig); @@ -216,7 +216,7 @@ private void createOrUpdateStreamMetadataProvider() { } assert streamMetadataProvider != null; - _streamConfigIndexToStreamMetadataProvider.put(streamConfigIndex, streamMetadataProvider); + _streamConfigIndexToStreamMetadataProvider.put(configId, streamMetadataProvider); if ((streamMetadataProvider.supportsOffsetLag()) && (_partitionIdToLatestOffset == null)) { _partitionIdToLatestOffset = new ConcurrentHashMap<>(); @@ -268,37 +268,37 @@ private void trackIngestionDelay() { @VisibleForTesting void updateLatestStreamOffset(Set partitionsHosted) { - Map> streamIndexToStreamPartitionIds = + Map> configIdToStreamPartitionIds = IngestionConfigUtils.getStreamConfigIndexToStreamPartitions(partitionsHosted); - if (streamIndexToStreamPartitionIds.size() > _streamConfigIndexToStreamMetadataProvider.size()) { + if (configIdToStreamPartitionIds.size() > _streamConfigIndexToStreamMetadataProvider.size()) { // There might be a new stream config added or need to retry creation of streamMetadataProvider for a stream // which might have failed before. createOrUpdateStreamMetadataProvider(); } for (Map.Entry entry : _streamConfigIndexToStreamMetadataProvider.entrySet()) { - int streamIndex = entry.getKey(); + int configId = entry.getKey(); StreamMetadataProvider streamMetadataProvider = entry.getValue(); - if (!streamIndexToStreamPartitionIds.containsKey(streamIndex)) { + if (!configIdToStreamPartitionIds.containsKey(configId)) { // Server is not hosting any partitions of this stream. continue; } if (streamMetadataProvider.supportsOffsetLag()) { - Set streamPartitionIds = streamIndexToStreamPartitionIds.get(streamIndex); + Set streamPartitionIds = configIdToStreamPartitionIds.get(configId); try { Map streamPartitionIdToLatestOffset = streamMetadataProvider.fetchLatestStreamOffset(streamPartitionIds, LATEST_STREAM_OFFSET_FETCH_TIMEOUT_MS); - if (streamIndex > 0) { + if (configId > 0) { // Need to convert stream partition Ids back to pinot partition Ids. for (Map.Entry latestOffsetEntry : streamPartitionIdToLatestOffset.entrySet()) { _partitionIdToLatestOffset.put( - IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(latestOffsetEntry.getKey(), - streamIndex), latestOffsetEntry.getValue()); + IngestionConfigUtils.getPinotPartitionIdFromConfigId(latestOffsetEntry.getKey(), configId), + latestOffsetEntry.getValue()); } } else { _partitionIdToLatestOffset.putAll(streamPartitionIdToLatestOffset); @@ -371,8 +371,9 @@ void setClock(Clock clock) { @VisibleForTesting void createMetrics(int partitionId) { - int streamConfigIndex = IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(partitionId); - StreamMetadataProvider streamMetadataProvider = _streamConfigIndexToStreamMetadataProvider.get(streamConfigIndex); + int configId = _partitionsHostedByThisServer.getOrDefault(partitionId, + IngestionConfigUtils.getConfigIdFromPinotPartitionId(partitionId)); + StreamMetadataProvider streamMetadataProvider = _streamConfigIndexToStreamMetadataProvider.get(configId); if (streamMetadataProvider != null && streamMetadataProvider.supportsOffsetLag()) { _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId, ServerGauge.REALTIME_INGESTION_OFFSET_LAG, @@ -395,9 +396,10 @@ void createMetrics(int partitionId) { } private void removeMetrics(int partitionId) { - int streamConfigIndex = IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(partitionId); + int configId = _partitionsHostedByThisServer.getOrDefault(partitionId, + IngestionConfigUtils.getConfigIdFromPinotPartitionId(partitionId)); StreamMetadataProvider streamMetadataProvider = - _streamConfigIndexToStreamMetadataProvider.get(streamConfigIndex); + _streamConfigIndexToStreamMetadataProvider.get(configId); // Remove all metrics associated with this partition if (streamMetadataProvider != null && streamMetadataProvider.supportsOffsetLag()) { _serverMetrics.removePartitionGauge(_metricName, partitionId, ServerGauge.REALTIME_INGESTION_OFFSET_LAG); @@ -508,7 +510,11 @@ public void timeoutInactivePartitions() { } Set partitionsHostedByThisServer; try { - partitionsHostedByThisServer = _realTimeTableDataManager.getHostedPartitionsGroupIds(); + Map partitionGroupIdToConfigId = + _realTimeTableDataManager.getHostedPartitionsGroupIds(); + partitionsHostedByThisServer = partitionGroupIdToConfigId.keySet(); + ConcurrentHashMap newMap = new ConcurrentHashMap<>(partitionGroupIdToConfigId); + _partitionsHostedByThisServer = newMap; } catch (Exception e) { LOGGER.error("Failed to get partitions hosted by this server, table={}, exception={}:{}", _tableNameWithType, e.getClass(), e.getMessage()); @@ -520,10 +526,6 @@ public void timeoutInactivePartitions() { removePartitionId(partitionId); } } - - ConcurrentHashMap newMap = new ConcurrentHashMap<>(); - partitionsHostedByThisServer.forEach(p -> newMap.put(p, true)); - _partitionsHostedByThisServer = newMap; } /** diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index 6246ff58570a..9db1e00226e2 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -1797,10 +1797,10 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf } else { // Multiple streams _streamPartitionId = IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(_partitionGroupId); - int index = IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(_partitionGroupId); - Preconditions.checkState(numStreams > index, "Cannot find stream config of index: %s for table: %s", index, - _tableNameWithType); - _streamConfig = new StreamConfig(_tableNameWithType, streamConfigMaps.get(index)); + int configId = llcSegmentName.getStreamConfigId(); + Map streamConfigMap = tableConfig.getIngestionConfig().getStreamIngestionConfig() + .getStreamConfigMapByConfigId(configId); + _streamConfig = new StreamConfig(_tableNameWithType, streamConfigMap); } _streamConsumerFactory = StreamConsumerFactoryProvider.create(_streamConfig); _streamPartitionMsgOffsetFactory = _streamConsumerFactory.createStreamMsgOffsetFactory(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index 6cdfd08b2e19..0f27a21334b3 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -29,7 +29,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collections; -import java.util.HashSet; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -408,15 +408,15 @@ public StreamMetadataProvider getStreamMetadataProvider(RealtimeSegmentDataManag * Returns all partitionGroupIds for the partitions hosted by this server for current table. * @apiNote this involves Zookeeper read and should not be used frequently due to efficiency concerns. */ - public Set getHostedPartitionsGroupIds() { - Set partitionsHostedByThisServer = new HashSet<>(); + public Map getHostedPartitionsGroupIds() { + Map partitionGroupIdToConfigId = new HashMap<>(); List segments = TableStateUtils.getSegmentsInGivenStateForThisInstance(_helixManager, _tableNameWithType, CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING); for (String segmentNameStr : segments) { LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr); - partitionsHostedByThisServer.add(segmentName.getPartitionGroupId()); + partitionGroupIdToConfigId.put(segmentName.getPartitionGroupId(), segmentName.getStreamConfigId()); } - return partitionsHostedByThisServer; + return partitionGroupIdToConfigId; } public RealtimeSegmentStatsHistory getStatsHistory() { diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java index 80d5444e3ed9..f096e83755e2 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java @@ -42,6 +42,7 @@ import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig; import org.apache.pinot.spi.stream.LongMsgOffset; +import org.apache.pinot.spi.stream.StreamConfigProperties; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; import org.apache.pinot.spi.utils.IngestionConfigUtils; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; @@ -468,8 +469,12 @@ public void testUpdateLatestStreamOffset() { IngestionConfig ingestionConfig = new IngestionConfig(); List> streamConfigMaps = new ArrayList<>(); - streamConfigMaps.add(getStreamConfigs()); - streamConfigMaps.add(getStreamConfigs()); + Map streamConfig0 = getStreamConfigs(); + streamConfig0.put(StreamConfigProperties.STREAM_CONFIG_ID, "0"); + Map streamConfig1 = getStreamConfigs(); + streamConfig1.put(StreamConfigProperties.STREAM_CONFIG_ID, "1"); + streamConfigMaps.add(streamConfig0); + streamConfigMaps.add(streamConfig1); StreamIngestionConfig streamIngestionConfig = new StreamIngestionConfig(streamConfigMaps); ingestionConfig.setStreamIngestionConfig(streamIngestionConfig); TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME) @@ -501,8 +506,8 @@ public void testIngestionDelay() { final String segment0 = new LLCSegmentName(RAW_TABLE_NAME, partition0, 0, 123).getSegmentName(); final int partition1 = 1; - ingestionDelayTracker._partitionsHostedByThisServer.put(partition0, true); - ingestionDelayTracker._partitionsHostedByThisServer.put(partition1, true); + ingestionDelayTracker._partitionsHostedByThisServer.put(partition0, 0); + ingestionDelayTracker._partitionsHostedByThisServer.put(partition1, 0); ingestionDelayTracker.updateMetrics(segment0, partition0, System.currentTimeMillis(), System.currentTimeMillis(), new LongMsgOffset(50)); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MultiTopicRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MultiTopicRealtimeClusterIntegrationTest.java index d2013f22399f..7bb5d537963a 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MultiTopicRealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MultiTopicRealtimeClusterIntegrationTest.java @@ -30,6 +30,7 @@ import java.util.stream.Collectors; import org.apache.helix.model.IdealState; import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.common.utils.MultiTopicLLCSegmentName; import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils; import org.apache.pinot.plugin.inputformat.csv.CSVMessageDecoder; import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties; @@ -406,4 +407,136 @@ public void testDistinctSources() assertEquals(rows.get(i).get(0).asText(), sourceName(i)); } } + + @Test + public void testStreamConfigIdsAssigned() { + String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME); + TableConfig tableConfig = getSharedHelixResourceManager().getTableConfig(realtimeTableName); + assertNotNull(tableConfig, "Table config should exist"); + + StreamIngestionConfig streamIngestionConfig = + tableConfig.getIngestionConfig().getStreamIngestionConfig(); + assertNotNull(streamIngestionConfig, "Stream ingestion config should exist"); + + List> streamConfigMaps = streamIngestionConfig.getStreamConfigMaps(); + int numTopics = getNumTopics(); + assertEquals(streamConfigMaps.size(), numTopics, "Should have " + numTopics + " stream configs"); + + Set configIds = new HashSet<>(); + for (int i = 0; i < numTopics; i++) { + Map configMap = streamConfigMaps.get(i); + String configIdStr = configMap.get(StreamConfigProperties.STREAM_CONFIG_ID); + assertNotNull(configIdStr, "stream.config.id should be set for stream config at index " + i); + + int configId = Integer.parseInt(configIdStr); + assertTrue(configId >= 0, "Config ID should be non-negative, got: " + configId); + assertTrue(configIds.add(configId), "Config IDs should be unique, duplicate: " + configId); + } + + assertEquals(streamIngestionConfig.getNextStreamConfigId(), numTopics, + "nextStreamConfigId should equal the number of topics"); + } + + @Test + public void testMultiTopicSegmentNameFormat() { + String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME); + IdealState idealState = getSharedHelixResourceManager().getTableIdealState(realtimeTableName); + assertNotNull(idealState); + + int numTopics = getNumTopics(); + if (numTopics <= 1) { + return; + } + + int multiTopicSegmentCount = 0; + Set configIdsSeen = new HashSet<>(); + + for (String segmentName : idealState.getPartitionSet()) { + MultiTopicLLCSegmentName multiTopicName = MultiTopicLLCSegmentName.of(segmentName); + if (multiTopicName != null) { + multiTopicSegmentCount++; + configIdsSeen.add(multiTopicName.getConfigId()); + + assertTrue(multiTopicName.getConfigId() >= 0, + "Config ID should be non-negative: " + segmentName); + assertTrue(multiTopicName.getStreamPartitionId() >= 0, + "Stream partition ID should be non-negative: " + segmentName); + assertTrue(multiTopicName.getStreamPartitionId() < NUM_PARTITIONS_PER_TOPIC, + "Stream partition ID should be < " + NUM_PARTITIONS_PER_TOPIC + ": " + segmentName); + + int expectedPartitionGroupId = + IngestionConfigUtils.getPinotPartitionIdFromConfigId( + multiTopicName.getStreamPartitionId(), multiTopicName.getConfigId()); + assertEquals(multiTopicName.getPartitionGroupId(), expectedPartitionGroupId, + "Partition group ID encoding mismatch for segment: " + segmentName); + + LLCSegmentName llcParsed = new LLCSegmentName(segmentName); + assertEquals(llcParsed.getPartitionGroupId(), expectedPartitionGroupId, + "LLCSegmentName should parse 5-part format and produce same partition group ID"); + } + } + + assertTrue(multiTopicSegmentCount > 0, + "Should have at least one segment in 5-part multi-topic format"); + + for (int i = 0; i < numTopics; i++) { + assertTrue(configIdsSeen.contains(i), + "Should have segments with config ID " + i); + } + } + + @Test + public void testConfigIdLookupMethods() { + String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME); + TableConfig tableConfig = getSharedHelixResourceManager().getTableConfig(realtimeTableName); + assertNotNull(tableConfig); + + StreamIngestionConfig streamIngestionConfig = + tableConfig.getIngestionConfig().getStreamIngestionConfig(); + int numTopics = getNumTopics(); + + for (int i = 0; i < numTopics; i++) { + int configId = streamIngestionConfig.getConfigId(i); + assertEquals(configId, i, "Config ID at index " + i + " should be " + i); + + Map configMap = streamIngestionConfig.getStreamConfigMapByConfigId(configId); + assertNotNull(configMap, "Should find stream config for config ID " + configId); + + String topicName = configMap.get( + StreamConfigProperties.constructStreamProperty("kafka", StreamConfigProperties.STREAM_TOPIC_NAME)); + assertEquals(topicName, topicName(i), + "Topic name for config ID " + configId + " should match"); + } + + Map> configIdMap = streamIngestionConfig.getConfigIdToStreamConfigMap(); + assertEquals(configIdMap.size(), numTopics, "Config ID map should have " + numTopics + " entries"); + for (int i = 0; i < numTopics; i++) { + assertTrue(configIdMap.containsKey(i), "Config ID map should contain key " + i); + } + } + + @Test + public void testSegmentNameBackwardCompatibility() { + String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME); + IdealState idealState = getSharedHelixResourceManager().getTableIdealState(realtimeTableName); + assertNotNull(idealState); + + for (String segmentName : idealState.getPartitionSet()) { + if (!LLCSegmentName.isLLCSegment(segmentName)) { + continue; + } + + LLCSegmentName llcName = LLCSegmentName.of(segmentName); + assertNotNull(llcName, "LLCSegmentName.of() should parse segment: " + segmentName); + + MultiTopicLLCSegmentName multiTopicName = MultiTopicLLCSegmentName.of(segmentName); + if (multiTopicName != null) { + assertEquals(llcName.getPartitionGroupId(), multiTopicName.getPartitionGroupId(), + "Partition group ID should match between LLCSegmentName and MultiTopicLLCSegmentName " + + "for segment: " + segmentName); + assertEquals(llcName.getTableName(), multiTopicName.getTableName()); + assertEquals(llcName.getSequenceNumber(), multiTopicName.getSequenceNumber()); + } + } + } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index 9106f7fe56ba..7d5f66caa27b 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -27,6 +27,7 @@ import com.google.common.collect.Sets; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -200,7 +201,7 @@ private static void validateEffectiveTableConfig(TableConfig tableConfig, Schema validateSegmentAssignmentConfig(tableConfig); validateIngestionConfig(tableConfig, schema); if (tableConfig.getTableType() == TableType.REALTIME) { - validateStreamConfigMaps(tableConfig); + validateStreamConfigMaps(tableConfig, skipTypes); } validateTierConfigList(tableConfig.getTierConfigsList()); validateIndexingConfigAndFieldConfigList(tableConfig, schema); @@ -764,7 +765,92 @@ public static void validateIngestionConfig(TableConfig tableConfig, Schema schem } } - private static void validateStreamConfigMaps(TableConfig tableConfig) { + /** + * Ensures every stream config map has a stable {@code stream.config.id} assigned. + *

Edge cases handled: + *

+ */ + @VisibleForTesting + static void ensureStreamConfigIds(TableConfig tableConfig) { + ensureStreamConfigIds(tableConfig, Collections.emptySet()); + } + + @VisibleForTesting + static void ensureStreamConfigIds(TableConfig tableConfig, Set skipTypes) { + IngestionConfig ingestionConfig = tableConfig.getIngestionConfig(); + if (ingestionConfig == null || ingestionConfig.getStreamIngestionConfig() == null) { + return; + } + StreamIngestionConfig streamIngestionConfig = ingestionConfig.getStreamIngestionConfig(); + List> streamConfigMaps = streamIngestionConfig.getStreamConfigMaps(); + if (streamConfigMaps == null || streamConfigMaps.isEmpty()) { + return; + } + + // First pass: collect existing IDs and validate them + Set usedIds = new HashSet<>(); + int maxExistingId = -1; + for (Map configMap : streamConfigMaps) { + String idStr = configMap.get(StreamConfigProperties.STREAM_CONFIG_ID); + if (idStr != null) { + int configId; + try { + configId = Integer.parseInt(idStr); + } catch (NumberFormatException e) { + throw new IllegalStateException("Invalid stream.config.id value: " + idStr); + } + Preconditions.checkState(configId >= 0, + "stream.config.id must be non-negative, got: %s", configId); + Preconditions.checkState(usedIds.add(configId), + "Duplicate stream.config.id found: %s", configId); + maxExistingId = Math.max(maxExistingId, configId); + } + } + + // Determine the starting point for new IDs + int nextId = streamIngestionConfig.getNextStreamConfigId(); + if (maxExistingId >= 0 && nextId > 0 && nextId <= maxExistingId) { + if (!skipTypes.contains(ValidationType.STREAM_CONFIG_ID)) { + throw new IllegalStateException(String.format( + "nextStreamConfigId (%d) must be greater than the max existing stream.config.id (%d). " + + "Update nextStreamConfigId to at least %d, or skip validation type STREAM_CONFIG_ID.", + nextId, maxExistingId, maxExistingId + 1)); + } + LOGGER.warn("nextStreamConfigId ({}) is not greater than max existing stream.config.id ({}). " + + "Overriding to {} because STREAM_CONFIG_ID validation is skipped.", nextId, maxExistingId, + maxExistingId + 1); + nextId = maxExistingId + 1; + } + + // Second pass: assign IDs to entries that don't have one + int numAssigned = 0; + for (Map configMap : streamConfigMaps) { + if (!configMap.containsKey(StreamConfigProperties.STREAM_CONFIG_ID)) { + while (usedIds.contains(nextId)) { + nextId++; + } + configMap.put(StreamConfigProperties.STREAM_CONFIG_ID, String.valueOf(nextId)); + usedIds.add(nextId); + nextId++; + numAssigned++; + } + } + + streamIngestionConfig.setNextStreamConfigId(nextId); + if (numAssigned > 0) { + LOGGER.info("Auto-assigned stream.config.id to {} stream config(s). nextStreamConfigId is now {}.", + numAssigned, nextId); + } + } + + private static void validateStreamConfigMaps(TableConfig tableConfig, Set skipTypes) { + ensureStreamConfigIds(tableConfig, skipTypes); List> streamConfigMaps = IngestionConfigUtils.getStreamConfigMaps(tableConfig); int numStreamConfigs = streamConfigMaps.size(); List streamConfigs = new ArrayList<>(numStreamConfigs); @@ -2039,7 +2125,7 @@ public static boolean isTableTypeInconsistentDuringConsumption(@Nullable TableCo // enum of all the skip-able validation types. public enum ValidationType { - ALL, TASK, UPSERT, TENANT, MINION_INSTANCES, ACTIVE_TASKS + ALL, TASK, UPSERT, TENANT, MINION_INSTANCES, ACTIVE_TASKS, STREAM_CONFIG_ID } /** diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java index e847de4d2633..4a27f17616ba 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java @@ -929,8 +929,12 @@ public void ingestionStreamConfigsTest() { TableConfigUtils.validateStreamConfig(new StreamConfig("test", streamConfigs)); // Test for multiple stream configs with pauseless consumption enabled - should fail + Map pauselessStreamConfigs1 = new HashMap<>(streamConfigs); + pauselessStreamConfigs1.remove(StreamConfigProperties.STREAM_CONFIG_ID); + Map pauselessStreamConfigs2 = new HashMap<>(streamConfigs); + pauselessStreamConfigs2.remove(StreamConfigProperties.STREAM_CONFIG_ID); StreamIngestionConfig streamIngestionConfigWithPauseless = - new StreamIngestionConfig(Arrays.asList(streamConfigs, streamConfigs)); + new StreamIngestionConfig(Arrays.asList(pauselessStreamConfigs1, pauselessStreamConfigs2)); streamIngestionConfigWithPauseless.setPauselessConsumptionEnabled(true); IngestionConfig ingestionConfigWithPauseless = new IngestionConfig(); @@ -951,8 +955,13 @@ public void ingestionStreamConfigsTest() { } // Test for multiple stream configs with pauseless consumption disabled - should pass + // Use fresh copies because ensureStreamConfigIds mutates the map (adds stream.config.id) + Map dupStreamConfigs1 = new HashMap<>(streamConfigs); + dupStreamConfigs1.remove(StreamConfigProperties.STREAM_CONFIG_ID); + Map dupStreamConfigs2 = new HashMap<>(streamConfigs); + dupStreamConfigs2.remove(StreamConfigProperties.STREAM_CONFIG_ID); StreamIngestionConfig streamIngestionConfigWithoutPauseless = - new StreamIngestionConfig(Arrays.asList(streamConfigs, streamConfigs)); + new StreamIngestionConfig(Arrays.asList(dupStreamConfigs1, dupStreamConfigs2)); streamIngestionConfigWithoutPauseless.setPauselessConsumptionEnabled(false); IngestionConfig ingestionConfigWithoutPauseless = new IngestionConfig(); @@ -973,10 +982,13 @@ public void ingestionStreamConfigsTest() { } // Test for multiple stream configs with pauseless consumption disabled and unique topic names - should pass - Map anotherStreamConfig = new HashMap<>(streamConfigs); - anotherStreamConfig.put("stream.kafka.topic.name", "myTopic2"); + Map uniqueStreamConfig1 = new HashMap<>(streamConfigs); + uniqueStreamConfig1.remove(StreamConfigProperties.STREAM_CONFIG_ID); + Map uniqueStreamConfig2 = new HashMap<>(streamConfigs); + uniqueStreamConfig2.remove(StreamConfigProperties.STREAM_CONFIG_ID); + uniqueStreamConfig2.put("stream.kafka.topic.name", "myTopic2"); StreamIngestionConfig streamIngestionConfigWithoutPauselessUniqueTopics = - new StreamIngestionConfig(Arrays.asList(streamConfigs, anotherStreamConfig)); + new StreamIngestionConfig(Arrays.asList(uniqueStreamConfig1, uniqueStreamConfig2)); streamIngestionConfigWithoutPauselessUniqueTopics.setPauselessConsumptionEnabled(false); IngestionConfig ingestionConfigWithoutPauselessUniqueTopics = new IngestionConfig(); @@ -1019,6 +1031,285 @@ public void ingestionServerIngestionOomProtectionTest() { TableConfigUtils.validateIngestionConfig(tableConfig, schema); } + @Test + public void testEnsureStreamConfigIdsNewTableNoIds() { + Map config0 = new HashMap<>(); + config0.put("streamType", "kafka"); + config0.put("stream.kafka.topic.name", "topic-A"); + Map config1 = new HashMap<>(); + config1.put("streamType", "kafka"); + config1.put("stream.kafka.topic.name", "topic-B"); + + StreamIngestionConfig sic = new StreamIngestionConfig(Arrays.asList(config0, config1)); + assertEquals(sic.getNextStreamConfigId(), 0); + + IngestionConfig ingestionConfig = new IngestionConfig(); + ingestionConfig.setStreamIngestionConfig(sic); + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName("timeColumn").setIngestionConfig(ingestionConfig).build(); + + TableConfigUtils.ensureStreamConfigIds(tableConfig); + + assertEquals(config0.get("stream.config.id"), "0"); + assertEquals(config1.get("stream.config.id"), "1"); + assertEquals(sic.getNextStreamConfigId(), 2); + } + + @Test + public void testEnsureStreamConfigIdsSingleStream() { + Map config0 = new HashMap<>(); + config0.put("streamType", "kafka"); + config0.put("stream.kafka.topic.name", "topic-A"); + + StreamIngestionConfig sic = new StreamIngestionConfig(Collections.singletonList(config0)); + IngestionConfig ingestionConfig = new IngestionConfig(); + ingestionConfig.setStreamIngestionConfig(sic); + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName("timeColumn").setIngestionConfig(ingestionConfig).build(); + + TableConfigUtils.ensureStreamConfigIds(tableConfig); + + assertEquals(config0.get("stream.config.id"), "0"); + assertEquals(sic.getNextStreamConfigId(), 1); + } + + @Test + public void testEnsureStreamConfigIdsPartialIds() { + Map config0 = new HashMap<>(); + config0.put("streamType", "kafka"); + config0.put("stream.kafka.topic.name", "topic-A"); + config0.put("stream.config.id", "0"); + + Map configNew = new HashMap<>(); + configNew.put("streamType", "kafka"); + configNew.put("stream.kafka.topic.name", "topic-C"); + + StreamIngestionConfig sic = new StreamIngestionConfig(Arrays.asList(config0, configNew)); + sic.setNextStreamConfigId(1); + IngestionConfig ingestionConfig = new IngestionConfig(); + ingestionConfig.setStreamIngestionConfig(sic); + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName("timeColumn").setIngestionConfig(ingestionConfig).build(); + + TableConfigUtils.ensureStreamConfigIds(tableConfig); + + assertEquals(config0.get("stream.config.id"), "0"); + assertEquals(configNew.get("stream.config.id"), "1"); + assertEquals(sic.getNextStreamConfigId(), 2); + } + + @Test + public void testEnsureStreamConfigIdsPartialIdsWithGap() { + Map config0 = new HashMap<>(); + config0.put("streamType", "kafka"); + config0.put("stream.kafka.topic.name", "topic-A"); + config0.put("stream.config.id", "0"); + + Map config3 = new HashMap<>(); + config3.put("streamType", "kafka"); + config3.put("stream.kafka.topic.name", "topic-D"); + config3.put("stream.config.id", "3"); + + Map configNew = new HashMap<>(); + configNew.put("streamType", "kafka"); + configNew.put("stream.kafka.topic.name", "topic-E"); + + StreamIngestionConfig sic = new StreamIngestionConfig(Arrays.asList(config0, config3, configNew)); + sic.setNextStreamConfigId(4); + IngestionConfig ingestionConfig = new IngestionConfig(); + ingestionConfig.setStreamIngestionConfig(sic); + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName("timeColumn").setIngestionConfig(ingestionConfig).build(); + + TableConfigUtils.ensureStreamConfigIds(tableConfig); + + assertEquals(config0.get("stream.config.id"), "0"); + assertEquals(config3.get("stream.config.id"), "3"); + assertEquals(configNew.get("stream.config.id"), "4"); + assertEquals(sic.getNextStreamConfigId(), 5); + } + + @Test + public void testEnsureStreamConfigIdsStaleNextId() { + // nextStreamConfigId explicitly set but below max existing ID — should fail + Map config0 = new HashMap<>(); + config0.put("streamType", "kafka"); + config0.put("stream.kafka.topic.name", "topic-A"); + config0.put("stream.config.id", "0"); + + Map config5 = new HashMap<>(); + config5.put("streamType", "kafka"); + config5.put("stream.kafka.topic.name", "topic-F"); + config5.put("stream.config.id", "5"); + + StreamIngestionConfig sic = new StreamIngestionConfig(Arrays.asList(config0, config5)); + sic.setNextStreamConfigId(2); // Stale — max existing is 5 + IngestionConfig ingestionConfig = new IngestionConfig(); + ingestionConfig.setStreamIngestionConfig(sic); + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName("timeColumn").setIngestionConfig(ingestionConfig).build(); + + try { + TableConfigUtils.ensureStreamConfigIds(tableConfig); + fail("Should fail when nextStreamConfigId is below max existing stream.config.id"); + } catch (IllegalStateException e) { + assertTrue(e.getMessage().contains("nextStreamConfigId")); + assertTrue(e.getMessage().contains("must be greater than")); + } + } + + @Test + public void testEnsureStreamConfigIdsStaleNextIdWithNewEntry() { + Map config5 = new HashMap<>(); + config5.put("streamType", "kafka"); + config5.put("stream.kafka.topic.name", "topic-F"); + config5.put("stream.config.id", "5"); + + Map configNew = new HashMap<>(); + configNew.put("streamType", "kafka"); + configNew.put("stream.kafka.topic.name", "topic-G"); + + StreamIngestionConfig sic = new StreamIngestionConfig(Arrays.asList(config5, configNew)); + sic.setNextStreamConfigId(1); + IngestionConfig ingestionConfig = new IngestionConfig(); + ingestionConfig.setStreamIngestionConfig(sic); + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName("timeColumn").setIngestionConfig(ingestionConfig).build(); + + try { + TableConfigUtils.ensureStreamConfigIds(tableConfig); + fail("Should fail when nextStreamConfigId is below max existing stream.config.id"); + } catch (IllegalStateException e) { + assertTrue(e.getMessage().contains("nextStreamConfigId")); + } + } + + @Test + public void testEnsureStreamConfigIdsStaleNextIdSkipValidation() { + // Same stale nextId scenario, but with STREAM_CONFIG_ID validation skipped — should correct silently + Map config5 = new HashMap<>(); + config5.put("streamType", "kafka"); + config5.put("stream.kafka.topic.name", "topic-F"); + config5.put("stream.config.id", "5"); + + Map configNew = new HashMap<>(); + configNew.put("streamType", "kafka"); + configNew.put("stream.kafka.topic.name", "topic-G"); + + StreamIngestionConfig sic = new StreamIngestionConfig(Arrays.asList(config5, configNew)); + sic.setNextStreamConfigId(1); + IngestionConfig ingestionConfig = new IngestionConfig(); + ingestionConfig.setStreamIngestionConfig(sic); + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName("timeColumn").setIngestionConfig(ingestionConfig).build(); + + TableConfigUtils.ensureStreamConfigIds(tableConfig, + Set.of(TableConfigUtils.ValidationType.STREAM_CONFIG_ID)); + + assertEquals(config5.get("stream.config.id"), "5"); + assertEquals(configNew.get("stream.config.id"), "6"); + assertEquals(sic.getNextStreamConfigId(), 7); + } + + @Test + public void testEnsureStreamConfigIdsDuplicateIds() { + Map config0 = new HashMap<>(); + config0.put("streamType", "kafka"); + config0.put("stream.kafka.topic.name", "topic-A"); + config0.put("stream.config.id", "1"); + + Map config1 = new HashMap<>(); + config1.put("streamType", "kafka"); + config1.put("stream.kafka.topic.name", "topic-B"); + config1.put("stream.config.id", "1"); + + StreamIngestionConfig sic = new StreamIngestionConfig(Arrays.asList(config0, config1)); + IngestionConfig ingestionConfig = new IngestionConfig(); + ingestionConfig.setStreamIngestionConfig(sic); + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName("timeColumn").setIngestionConfig(ingestionConfig).build(); + + try { + TableConfigUtils.ensureStreamConfigIds(tableConfig); + fail("Should fail for duplicate stream.config.id"); + } catch (IllegalStateException e) { + assertTrue(e.getMessage().contains("Duplicate stream.config.id")); + } + } + + @Test + public void testEnsureStreamConfigIdsNegativeId() { + Map config0 = new HashMap<>(); + config0.put("streamType", "kafka"); + config0.put("stream.kafka.topic.name", "topic-A"); + config0.put("stream.config.id", "-1"); + + StreamIngestionConfig sic = new StreamIngestionConfig(Collections.singletonList(config0)); + IngestionConfig ingestionConfig = new IngestionConfig(); + ingestionConfig.setStreamIngestionConfig(sic); + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName("timeColumn").setIngestionConfig(ingestionConfig).build(); + + try { + TableConfigUtils.ensureStreamConfigIds(tableConfig); + fail("Should fail for negative stream.config.id"); + } catch (IllegalStateException e) { + assertTrue(e.getMessage().contains("non-negative")); + } + } + + @Test + public void testEnsureStreamConfigIdsAllIdsPresent() { + Map config0 = new HashMap<>(); + config0.put("streamType", "kafka"); + config0.put("stream.kafka.topic.name", "topic-A"); + config0.put("stream.config.id", "0"); + + Map config1 = new HashMap<>(); + config1.put("streamType", "kafka"); + config1.put("stream.kafka.topic.name", "topic-B"); + config1.put("stream.config.id", "1"); + + StreamIngestionConfig sic = new StreamIngestionConfig(Arrays.asList(config0, config1)); + sic.setNextStreamConfigId(2); + IngestionConfig ingestionConfig = new IngestionConfig(); + ingestionConfig.setStreamIngestionConfig(sic); + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName("timeColumn").setIngestionConfig(ingestionConfig).build(); + + TableConfigUtils.ensureStreamConfigIds(tableConfig); + + assertEquals(config0.get("stream.config.id"), "0"); + assertEquals(config1.get("stream.config.id"), "1"); + assertEquals(sic.getNextStreamConfigId(), 2); + } + + @Test + public void testEnsureStreamConfigIdsIdempotent() { + Map config0 = new HashMap<>(); + config0.put("streamType", "kafka"); + config0.put("stream.kafka.topic.name", "topic-A"); + Map config1 = new HashMap<>(); + config1.put("streamType", "kafka"); + config1.put("stream.kafka.topic.name", "topic-B"); + + StreamIngestionConfig sic = new StreamIngestionConfig(Arrays.asList(config0, config1)); + IngestionConfig ingestionConfig = new IngestionConfig(); + ingestionConfig.setStreamIngestionConfig(sic); + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName("timeColumn").setIngestionConfig(ingestionConfig).build(); + + TableConfigUtils.ensureStreamConfigIds(tableConfig); + assertEquals(config0.get("stream.config.id"), "0"); + assertEquals(config1.get("stream.config.id"), "1"); + assertEquals(sic.getNextStreamConfigId(), 2); + + TableConfigUtils.ensureStreamConfigIds(tableConfig); + assertEquals(config0.get("stream.config.id"), "0"); + assertEquals(config1.get("stream.config.id"), "1"); + assertEquals(sic.getNextStreamConfigId(), 2); + } + @Test public void ingestionBatchConfigsTest() { Schema schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).build(); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java index d65070610603..5c7af3030009 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java @@ -19,13 +19,16 @@ package org.apache.pinot.spi.config.table.ingestion; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import java.util.HashMap; import java.util.List; import java.util.Map; import javax.annotation.Nullable; import org.apache.pinot.spi.config.BaseJsonConfig; import org.apache.pinot.spi.config.table.DisasterRecoveryMode; +import org.apache.pinot.spi.stream.StreamConfigProperties; import org.apache.pinot.spi.utils.Enablement; @@ -60,6 +63,10 @@ public class StreamIngestionConfig extends BaseJsonConfig { + " completed (immutable) replica on any server in pause-less ingestion") private DisasterRecoveryMode _disasterRecoveryMode = DisasterRecoveryMode.DEFAULT; + @JsonPropertyDescription("Next stream config ID to assign when a new stream config is added. " + + "Each stream config map gets a stable, immutable ID stored as 'stream.config.id' in the map.") + private int _nextStreamConfigId; + @JsonPropertyDescription("Class to handle realtime offset auto reset") private String _realtimeOffsetAutoResetHandlerClass; @@ -162,4 +169,42 @@ public Enablement getOomProtection() { public void setOomProtection(@Nullable Enablement oomProtection) { _oomProtection = oomProtection == null ? Enablement.DEFAULT : oomProtection; } + + public int getNextStreamConfigId() { + return _nextStreamConfigId; + } + + public void setNextStreamConfigId(int nextStreamConfigId) { + _nextStreamConfigId = nextStreamConfigId; + } + + /// Returns the config ID for the stream config at the given list position. + /// If the entry has a stream.config.id key, returns that; otherwise returns the positional index. + public int getConfigId(int listIndex) { + Map configMap = _streamConfigMaps.get(listIndex); + String idStr = configMap.get(StreamConfigProperties.STREAM_CONFIG_ID); + return idStr != null ? Integer.parseInt(idStr) : listIndex; + } + + /// Returns the stream config map for the given config ID. + /// Falls back to positional index when stream.config.id is not set (legacy tables). + /// Throws IllegalStateException if no matching config is found. + public Map getStreamConfigMapByConfigId(int configId) { + Map result = getConfigIdToStreamConfigMap().get(configId); + if (result == null) { + throw new IllegalStateException("Cannot find stream config with config ID: " + configId); + } + return result; + } + + /// Returns a map from config ID to stream config map. + /// For legacy tables without stream.config.id, uses positional index as the key. + @JsonIgnore + public Map> getConfigIdToStreamConfigMap() { + Map> result = new HashMap<>(); + for (int i = 0; i < _streamConfigMaps.size(); i++) { + result.put(getConfigId(i), _streamConfigMaps.get(i)); + } + return result; + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java index edf6d6ccdd46..46492342e266 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java @@ -127,11 +127,10 @@ private Boolean fetchMultipleStreams() PartitionGroupMetadataFetcher.class.getSimpleName() + "-" + streamConfig.getTableNameWithType() + "-" + topicName; StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig); - int index = i; + int configId = IngestionConfigUtils.getConfigIdFromStreamConfig(streamConfig); List topicPartitionGroupConsumptionStatusList = _partitionGroupConsumptionStatusList.stream() - .filter(partitionGroupConsumptionStatus -> IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId( - partitionGroupConsumptionStatus.getPartitionGroupId()) == index) + .filter(s -> IngestionConfigUtils.getConfigIdFromPinotPartitionId(s.getPartitionGroupId()) == configId) .collect(Collectors.toList()); try (StreamMetadataProvider streamMetadataProvider = streamConsumerFactory.createStreamMetadataProvider( StreamConsumerFactory.getUniqueClientId(clientId))) { @@ -142,8 +141,8 @@ private Boolean fetchMultipleStreams() _forceGetOffsetFromStream) .stream() .map(metadata -> new PartitionGroupMetadata( - IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(metadata.getPartitionGroupId(), - index), metadata.getStartOffset(), metadata.getSequenceNumber())) + IngestionConfigUtils.getPinotPartitionIdFromConfigId(metadata.getPartitionGroupId(), configId), + metadata.getStartOffset(), metadata.getSequenceNumber())) .collect(Collectors.toList()); _streamMetadataList.add( new StreamMetadata(streamConfig, partitionGroupMetadataList.size(), partitionGroupMetadataList)); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java index aa00ceb8870a..b66c1f33e8fb 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java @@ -37,6 +37,7 @@ private StreamConfigProperties() { * Generic properties */ public static final String STREAM_TYPE = "streamType"; + public static final String STREAM_CONFIG_ID = "stream.config.id"; public static final String STREAM_TOPIC_NAME = "topic.name"; public static final String STREAM_CONSUMER_FACTORY_CLASS = "consumer.factory.class.name"; public static final String STREAM_CONSUMER_OFFSET_CRITERIA = "consumer.prop.auto.offset.reset"; diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java index b0731ad38315..f69ea9ebe125 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java @@ -33,9 +33,11 @@ import org.apache.pinot.spi.config.table.ingestion.AggregationConfig; import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig; import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; +import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties; import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.stream.StreamConfigProperties; import org.apache.pinot.spi.stream.StreamConsumerFactory; @@ -137,9 +139,23 @@ public static int getStreamPartitionIdFromPinotPartitionId(int partitionId) { } /// Returns the StreamConfig for the given Pinot segment partition id. + /// For multi-topic tables, uses the stable config ID stored in the stream config rather than positional index. public static StreamConfig getStreamConfigFromPinotPartitionId(List streamConfigs, int partitionId) { - return streamConfigs.size() > 1 ? streamConfigs.get(getStreamConfigIndexFromPinotPartitionId(partitionId)) - : streamConfigs.get(0); + if (streamConfigs.size() == 1) { + return streamConfigs.get(0); + } + int configId = getConfigIdFromPinotPartitionId(partitionId); + return streamConfigs.stream() + .filter(sc -> configId == getConfigIdFromStreamConfig(sc)) + .findFirst() + .orElseThrow(() -> new IllegalStateException( + "Cannot find stream config with config ID: " + configId + " for partition: " + partitionId)); + } + + /// Returns the config ID stored in a StreamConfig, or 0 if not set (single-topic backward compat). + public static int getConfigIdFromStreamConfig(StreamConfig streamConfig) { + String idStr = streamConfig.getStreamConfigsMap().get(StreamConfigProperties.STREAM_CONFIG_ID); + return idStr != null ? Integer.parseInt(idStr) : 0; } /// Returns the index of the StreamConfigs from the Pinot segment partition id. @@ -150,23 +166,21 @@ public static int getStreamConfigIndexFromPinotPartitionId(int partitionId) { } /** - * Fetches the streamConfig from the list of streamConfigs according to the partition id. + * Fetches the streamConfig map from the table config for the given Pinot partition id. + * For multi-topic tables, uses the stable config ID to look up the correct stream config. */ public static Map getStreamConfigMap(TableConfig tableConfig, int partitionId) { List> streamConfigMaps = getStreamConfigMaps(tableConfig); int numStreams = streamConfigMaps.size(); if (numStreams == 1) { - // Single stream - // NOTE: We skip partition id translation logic to handle cases where custom stream might return partition id - // larger than 10000. + // Single stream — skip partition id translation to handle custom streams with large partition ids. return streamConfigMaps.get(0); - } else { - // Multiple streams - int index = getStreamConfigIndexFromPinotPartitionId(partitionId); - Preconditions.checkState(numStreams > index, "Cannot find stream config of index: %s for table: %s", index, - tableConfig.getTableName()); - return streamConfigMaps.get(index); } + // Multi-stream: look up by stable config ID rather than positional index. + int configId = getConfigIdFromPinotPartitionId(partitionId); + StreamIngestionConfig streamIngestionConfig = + tableConfig.getIngestionConfig().getStreamIngestionConfig(); + return streamIngestionConfig.getStreamConfigMapByConfigId(configId); } public static List getAggregationConfigs(TableConfig tableConfig) { @@ -359,4 +373,26 @@ public static Map> getStreamConfigIndexToStreamPartitions( } return streamIndexToPartitions; } + + // --- Stable config ID methods (Phase 1: multi-topic safe deletion) --- + + /// Returns the Pinot segment partition id encoded from a stream partition id and a stable config ID. + public static int getPinotPartitionIdFromConfigId(int streamPartitionId, int configId) { + return configId * PARTITION_PADDING_OFFSET + streamPartitionId; + } + + /// Extracts the stable config ID from an encoded Pinot segment partition id. + public static int getConfigIdFromPinotPartitionId(int partitionId) { + return partitionId / PARTITION_PADDING_OFFSET; + } + + /// Builds a map from stable config ID to StreamConfig for O(1) lookup by config ID. + /// For single-topic tables, returns a singleton map with key 0. + public static Map buildConfigIdToStreamConfigMap(List streamConfigs) { + Map result = new HashMap<>(streamConfigs.size()); + for (StreamConfig streamConfig : streamConfigs) { + result.put(getConfigIdFromStreamConfig(streamConfig), streamConfig); + } + return result; + } } diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java index b55fb22e71c9..e5260bed881a 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.spi.utils; +import com.fasterxml.jackson.databind.ObjectMapper; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -32,6 +33,7 @@ import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig; import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig; +import org.apache.pinot.spi.stream.StreamConfigProperties; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.testng.Assert; import org.testng.annotations.Test; @@ -230,4 +232,134 @@ public void testGetStreamConfigIndexToStreamPartitions() { Assert.assertEquals(streamConfigIndexToStreamPartitions.get(1), new HashSet<>(Arrays.asList(100, 1))); Assert.assertEquals(streamConfigIndexToStreamPartitions.get(3), new HashSet<>(Arrays.asList(400))); } + + @Test + public void testGetPinotPartitionIdFromConfigId() { + Assert.assertEquals(IngestionConfigUtils.getPinotPartitionIdFromConfigId(5, 0), 5); + Assert.assertEquals(IngestionConfigUtils.getPinotPartitionIdFromConfigId(3, 1), 10003); + Assert.assertEquals(IngestionConfigUtils.getPinotPartitionIdFromConfigId(0, 2), 20000); + Assert.assertEquals(IngestionConfigUtils.getPinotPartitionIdFromConfigId(99, 5), 50099); + } + + @Test + public void testGetConfigIdFromPinotPartitionId() { + Assert.assertEquals(IngestionConfigUtils.getConfigIdFromPinotPartitionId(5), 0); + Assert.assertEquals(IngestionConfigUtils.getConfigIdFromPinotPartitionId(10003), 1); + Assert.assertEquals(IngestionConfigUtils.getConfigIdFromPinotPartitionId(20000), 2); + Assert.assertEquals(IngestionConfigUtils.getConfigIdFromPinotPartitionId(50099), 5); + } + + @Test + public void testGetStreamConfigMapByConfigId() { + Map config0 = new HashMap<>(); + config0.put(StreamConfigProperties.STREAM_CONFIG_ID, "0"); + config0.put("stream.kafka.topic.name", "topic-A"); + + Map config2 = new HashMap<>(); + config2.put(StreamConfigProperties.STREAM_CONFIG_ID, "2"); + config2.put("stream.kafka.topic.name", "topic-B"); + + StreamIngestionConfig sic = new StreamIngestionConfig(Arrays.asList(config0, config2)); + + Assert.assertEquals(sic.getStreamConfigMapByConfigId(0).get("stream.kafka.topic.name"), "topic-A"); + Assert.assertEquals(sic.getStreamConfigMapByConfigId(2).get("stream.kafka.topic.name"), "topic-B"); + + // Config ID 1 was deleted — should throw + try { + sic.getStreamConfigMapByConfigId(1); + Assert.fail("Should fail for missing config ID"); + } catch (IllegalStateException e) { + // expected + } + } + + @Test + public void testGetStreamConfigMapByConfigIdFallback() { + Map config0 = new HashMap<>(); + config0.put("stream.kafka.topic.name", "topic-A"); + Map config1 = new HashMap<>(); + config1.put("stream.kafka.topic.name", "topic-B"); + + StreamIngestionConfig sic = new StreamIngestionConfig(Arrays.asList(config0, config1)); + + Assert.assertEquals(sic.getStreamConfigMapByConfigId(0).get("stream.kafka.topic.name"), "topic-A"); + Assert.assertEquals(sic.getStreamConfigMapByConfigId(1).get("stream.kafka.topic.name"), "topic-B"); + } + + @Test + public void testGetConfigIdToStreamConfigMap() { + Map config0 = new HashMap<>(); + config0.put(StreamConfigProperties.STREAM_CONFIG_ID, "0"); + config0.put("stream.kafka.topic.name", "topic-A"); + + Map config3 = new HashMap<>(); + config3.put(StreamConfigProperties.STREAM_CONFIG_ID, "3"); + config3.put("stream.kafka.topic.name", "topic-B"); + + StreamIngestionConfig sic = new StreamIngestionConfig(Arrays.asList(config0, config3)); + Map> idMap = sic.getConfigIdToStreamConfigMap(); + + Assert.assertEquals(idMap.size(), 2); + Assert.assertEquals(idMap.get(0).get("stream.kafka.topic.name"), "topic-A"); + Assert.assertEquals(idMap.get(3).get("stream.kafka.topic.name"), "topic-B"); + Assert.assertNull(idMap.get(1)); + } + + @Test + public void testGetConfigIdToStreamConfigMapFallback() { + Map config0 = new HashMap<>(); + config0.put("stream.kafka.topic.name", "topic-A"); + Map config1 = new HashMap<>(); + config1.put("stream.kafka.topic.name", "topic-B"); + + StreamIngestionConfig sic = new StreamIngestionConfig(Arrays.asList(config0, config1)); + Map> idMap = sic.getConfigIdToStreamConfigMap(); + + Assert.assertEquals(idMap.size(), 2); + Assert.assertEquals(idMap.get(0).get("stream.kafka.topic.name"), "topic-A"); + Assert.assertEquals(idMap.get(1).get("stream.kafka.topic.name"), "topic-B"); + } + + @Test + public void testGetConfigId() { + Map config0 = new HashMap<>(); + config0.put(StreamConfigProperties.STREAM_CONFIG_ID, "5"); + config0.put("stream.kafka.topic.name", "topic-A"); + Map config1 = new HashMap<>(); + config1.put("stream.kafka.topic.name", "topic-B"); + + StreamIngestionConfig sic = new StreamIngestionConfig(Arrays.asList(config0, config1)); + Assert.assertEquals(sic.getConfigId(0), 5); + Assert.assertEquals(sic.getConfigId(1), 1); + } + + @Test + public void testStreamIngestionConfigSerDe() + throws Exception { + Map config0 = new HashMap<>(); + config0.put(StreamConfigProperties.STREAM_CONFIG_ID, "0"); + config0.put("streamType", "kafka"); + config0.put("stream.kafka.topic.name", "topic-A"); + + Map config1 = new HashMap<>(); + config1.put(StreamConfigProperties.STREAM_CONFIG_ID, "1"); + config1.put("streamType", "kafka"); + config1.put("stream.kafka.topic.name", "topic-B"); + + StreamIngestionConfig original = new StreamIngestionConfig(Arrays.asList(config0, config1)); + original.setNextStreamConfigId(2); + + ObjectMapper mapper = new ObjectMapper(); + String json = mapper.writeValueAsString(original); + StreamIngestionConfig deserialized = mapper.readValue(json, StreamIngestionConfig.class); + + Assert.assertEquals(deserialized.getNextStreamConfigId(), 2); + Assert.assertEquals(deserialized.getStreamConfigMaps().size(), 2); + Assert.assertEquals( + deserialized.getStreamConfigMaps().get(0).get(StreamConfigProperties.STREAM_CONFIG_ID), "0"); + Assert.assertEquals( + deserialized.getStreamConfigMaps().get(1).get(StreamConfigProperties.STREAM_CONFIG_ID), "1"); + Assert.assertEquals( + deserialized.getStreamConfigMaps().get(0).get("stream.kafka.topic.name"), "topic-A"); + } }