From ef882f48faea4a66a55f48f753ac5e37aa078377 Mon Sep 17 00:00:00 2001 From: Rekha Seethamraju Date: Mon, 15 Jun 2026 09:20:02 -0700 Subject: [PATCH 01/10] Add stream.config.id support to config model for stable multi-topic IDs Add a stable, immutable config ID mechanism for multi-topic ingestion so that topic deletion no longer shifts partition group ID encoding. - Add STREAM_CONFIG_ID constant to StreamConfigProperties - Add nextStreamConfigId field to StreamIngestionConfig - Add instance methods on StreamIngestionConfig for config ID lookup: getConfigId(), getStreamConfigMapByConfigId(), getConfigIdToStreamConfigMap() - Keep encoding/decoding math as static methods on IngestionConfigUtils: getPinotPartitionIdFromConfigId(), getConfigIdFromPinotPartitionId() - Add unit tests for all new methods and serialization round-trip --- .../ingestion/StreamIngestionConfig.java | 45 ++++++ .../spi/stream/StreamConfigProperties.java | 1 + .../pinot/spi/utils/IngestionConfigUtils.java | 12 ++ .../spi/utils/IngestionConfigUtilsTest.java | 132 ++++++++++++++++++ 4 files changed, 190 insertions(+) diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java index d65070610603..5c7af3030009 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java @@ -19,13 +19,16 @@ package org.apache.pinot.spi.config.table.ingestion; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import java.util.HashMap; import java.util.List; import java.util.Map; import javax.annotation.Nullable; import org.apache.pinot.spi.config.BaseJsonConfig; import org.apache.pinot.spi.config.table.DisasterRecoveryMode; +import org.apache.pinot.spi.stream.StreamConfigProperties; import org.apache.pinot.spi.utils.Enablement; @@ -60,6 +63,10 @@ public class StreamIngestionConfig extends BaseJsonConfig { + " completed (immutable) replica on any server in pause-less ingestion") private DisasterRecoveryMode _disasterRecoveryMode = DisasterRecoveryMode.DEFAULT; + @JsonPropertyDescription("Next stream config ID to assign when a new stream config is added. " + + "Each stream config map gets a stable, immutable ID stored as 'stream.config.id' in the map.") + private int _nextStreamConfigId; + @JsonPropertyDescription("Class to handle realtime offset auto reset") private String _realtimeOffsetAutoResetHandlerClass; @@ -162,4 +169,42 @@ public Enablement getOomProtection() { public void setOomProtection(@Nullable Enablement oomProtection) { _oomProtection = oomProtection == null ? Enablement.DEFAULT : oomProtection; } + + public int getNextStreamConfigId() { + return _nextStreamConfigId; + } + + public void setNextStreamConfigId(int nextStreamConfigId) { + _nextStreamConfigId = nextStreamConfigId; + } + + /// Returns the config ID for the stream config at the given list position. + /// If the entry has a stream.config.id key, returns that; otherwise returns the positional index. + public int getConfigId(int listIndex) { + Map configMap = _streamConfigMaps.get(listIndex); + String idStr = configMap.get(StreamConfigProperties.STREAM_CONFIG_ID); + return idStr != null ? Integer.parseInt(idStr) : listIndex; + } + + /// Returns the stream config map for the given config ID. + /// Falls back to positional index when stream.config.id is not set (legacy tables). + /// Throws IllegalStateException if no matching config is found. + public Map getStreamConfigMapByConfigId(int configId) { + Map result = getConfigIdToStreamConfigMap().get(configId); + if (result == null) { + throw new IllegalStateException("Cannot find stream config with config ID: " + configId); + } + return result; + } + + /// Returns a map from config ID to stream config map. + /// For legacy tables without stream.config.id, uses positional index as the key. + @JsonIgnore + public Map> getConfigIdToStreamConfigMap() { + Map> result = new HashMap<>(); + for (int i = 0; i < _streamConfigMaps.size(); i++) { + result.put(getConfigId(i), _streamConfigMaps.get(i)); + } + return result; + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java index aa00ceb8870a..b66c1f33e8fb 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java @@ -37,6 +37,7 @@ private StreamConfigProperties() { * Generic properties */ public static final String STREAM_TYPE = "streamType"; + public static final String STREAM_CONFIG_ID = "stream.config.id"; public static final String STREAM_TOPIC_NAME = "topic.name"; public static final String STREAM_CONSUMER_FACTORY_CLASS = "consumer.factory.class.name"; public static final String STREAM_CONSUMER_OFFSET_CRITERIA = "consumer.prop.auto.offset.reset"; diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java index b0731ad38315..446d44047c27 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java @@ -359,4 +359,16 @@ public static Map> getStreamConfigIndexToStreamPartitions( } return streamIndexToPartitions; } + + // --- Stable config ID methods (Phase 1: multi-topic safe deletion) --- + + /// Returns the Pinot segment partition id encoded from a stream partition id and a stable config ID. + public static int getPinotPartitionIdFromConfigId(int streamPartitionId, int configId) { + return configId * PARTITION_PADDING_OFFSET + streamPartitionId; + } + + /// Extracts the stable config ID from an encoded Pinot segment partition id. + public static int getConfigIdFromPinotPartitionId(int partitionId) { + return partitionId / PARTITION_PADDING_OFFSET; + } } diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java index b55fb22e71c9..e5260bed881a 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.spi.utils; +import com.fasterxml.jackson.databind.ObjectMapper; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -32,6 +33,7 @@ import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig; import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig; +import org.apache.pinot.spi.stream.StreamConfigProperties; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.testng.Assert; import org.testng.annotations.Test; @@ -230,4 +232,134 @@ public void testGetStreamConfigIndexToStreamPartitions() { Assert.assertEquals(streamConfigIndexToStreamPartitions.get(1), new HashSet<>(Arrays.asList(100, 1))); Assert.assertEquals(streamConfigIndexToStreamPartitions.get(3), new HashSet<>(Arrays.asList(400))); } + + @Test + public void testGetPinotPartitionIdFromConfigId() { + Assert.assertEquals(IngestionConfigUtils.getPinotPartitionIdFromConfigId(5, 0), 5); + Assert.assertEquals(IngestionConfigUtils.getPinotPartitionIdFromConfigId(3, 1), 10003); + Assert.assertEquals(IngestionConfigUtils.getPinotPartitionIdFromConfigId(0, 2), 20000); + Assert.assertEquals(IngestionConfigUtils.getPinotPartitionIdFromConfigId(99, 5), 50099); + } + + @Test + public void testGetConfigIdFromPinotPartitionId() { + Assert.assertEquals(IngestionConfigUtils.getConfigIdFromPinotPartitionId(5), 0); + Assert.assertEquals(IngestionConfigUtils.getConfigIdFromPinotPartitionId(10003), 1); + Assert.assertEquals(IngestionConfigUtils.getConfigIdFromPinotPartitionId(20000), 2); + Assert.assertEquals(IngestionConfigUtils.getConfigIdFromPinotPartitionId(50099), 5); + } + + @Test + public void testGetStreamConfigMapByConfigId() { + Map config0 = new HashMap<>(); + config0.put(StreamConfigProperties.STREAM_CONFIG_ID, "0"); + config0.put("stream.kafka.topic.name", "topic-A"); + + Map config2 = new HashMap<>(); + config2.put(StreamConfigProperties.STREAM_CONFIG_ID, "2"); + config2.put("stream.kafka.topic.name", "topic-B"); + + StreamIngestionConfig sic = new StreamIngestionConfig(Arrays.asList(config0, config2)); + + Assert.assertEquals(sic.getStreamConfigMapByConfigId(0).get("stream.kafka.topic.name"), "topic-A"); + Assert.assertEquals(sic.getStreamConfigMapByConfigId(2).get("stream.kafka.topic.name"), "topic-B"); + + // Config ID 1 was deleted — should throw + try { + sic.getStreamConfigMapByConfigId(1); + Assert.fail("Should fail for missing config ID"); + } catch (IllegalStateException e) { + // expected + } + } + + @Test + public void testGetStreamConfigMapByConfigIdFallback() { + Map config0 = new HashMap<>(); + config0.put("stream.kafka.topic.name", "topic-A"); + Map config1 = new HashMap<>(); + config1.put("stream.kafka.topic.name", "topic-B"); + + StreamIngestionConfig sic = new StreamIngestionConfig(Arrays.asList(config0, config1)); + + Assert.assertEquals(sic.getStreamConfigMapByConfigId(0).get("stream.kafka.topic.name"), "topic-A"); + Assert.assertEquals(sic.getStreamConfigMapByConfigId(1).get("stream.kafka.topic.name"), "topic-B"); + } + + @Test + public void testGetConfigIdToStreamConfigMap() { + Map config0 = new HashMap<>(); + config0.put(StreamConfigProperties.STREAM_CONFIG_ID, "0"); + config0.put("stream.kafka.topic.name", "topic-A"); + + Map config3 = new HashMap<>(); + config3.put(StreamConfigProperties.STREAM_CONFIG_ID, "3"); + config3.put("stream.kafka.topic.name", "topic-B"); + + StreamIngestionConfig sic = new StreamIngestionConfig(Arrays.asList(config0, config3)); + Map> idMap = sic.getConfigIdToStreamConfigMap(); + + Assert.assertEquals(idMap.size(), 2); + Assert.assertEquals(idMap.get(0).get("stream.kafka.topic.name"), "topic-A"); + Assert.assertEquals(idMap.get(3).get("stream.kafka.topic.name"), "topic-B"); + Assert.assertNull(idMap.get(1)); + } + + @Test + public void testGetConfigIdToStreamConfigMapFallback() { + Map config0 = new HashMap<>(); + config0.put("stream.kafka.topic.name", "topic-A"); + Map config1 = new HashMap<>(); + config1.put("stream.kafka.topic.name", "topic-B"); + + StreamIngestionConfig sic = new StreamIngestionConfig(Arrays.asList(config0, config1)); + Map> idMap = sic.getConfigIdToStreamConfigMap(); + + Assert.assertEquals(idMap.size(), 2); + Assert.assertEquals(idMap.get(0).get("stream.kafka.topic.name"), "topic-A"); + Assert.assertEquals(idMap.get(1).get("stream.kafka.topic.name"), "topic-B"); + } + + @Test + public void testGetConfigId() { + Map config0 = new HashMap<>(); + config0.put(StreamConfigProperties.STREAM_CONFIG_ID, "5"); + config0.put("stream.kafka.topic.name", "topic-A"); + Map config1 = new HashMap<>(); + config1.put("stream.kafka.topic.name", "topic-B"); + + StreamIngestionConfig sic = new StreamIngestionConfig(Arrays.asList(config0, config1)); + Assert.assertEquals(sic.getConfigId(0), 5); + Assert.assertEquals(sic.getConfigId(1), 1); + } + + @Test + public void testStreamIngestionConfigSerDe() + throws Exception { + Map config0 = new HashMap<>(); + config0.put(StreamConfigProperties.STREAM_CONFIG_ID, "0"); + config0.put("streamType", "kafka"); + config0.put("stream.kafka.topic.name", "topic-A"); + + Map config1 = new HashMap<>(); + config1.put(StreamConfigProperties.STREAM_CONFIG_ID, "1"); + config1.put("streamType", "kafka"); + config1.put("stream.kafka.topic.name", "topic-B"); + + StreamIngestionConfig original = new StreamIngestionConfig(Arrays.asList(config0, config1)); + original.setNextStreamConfigId(2); + + ObjectMapper mapper = new ObjectMapper(); + String json = mapper.writeValueAsString(original); + StreamIngestionConfig deserialized = mapper.readValue(json, StreamIngestionConfig.class); + + Assert.assertEquals(deserialized.getNextStreamConfigId(), 2); + Assert.assertEquals(deserialized.getStreamConfigMaps().size(), 2); + Assert.assertEquals( + deserialized.getStreamConfigMaps().get(0).get(StreamConfigProperties.STREAM_CONFIG_ID), "0"); + Assert.assertEquals( + deserialized.getStreamConfigMaps().get(1).get(StreamConfigProperties.STREAM_CONFIG_ID), "1"); + Assert.assertEquals( + deserialized.getStreamConfigMaps().get(0).get("stream.kafka.topic.name"), "topic-A"); + } } From 6dd86a515861aaeee3b61a327b740f475e5a3cd5 Mon Sep 17 00:00:00 2001 From: Rekha Seethamraju Date: Tue, 16 Jun 2026 12:03:59 -0700 Subject: [PATCH 02/10] Auto-assign stream.config.id on table config validation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add ensureStreamConfigIds() to TableConfigUtils, called during validateStreamConfigMaps() on both add and update table config paths. Edge cases: - Legacy tables with no IDs → auto-assign from 0 - Partial IDs → assign only missing entries, skip used IDs - Gaps from deletion (e.g. IDs 0, 3) → new entry gets next available - Stale nextStreamConfigId → fails validation (user must fix) - Stale nextStreamConfigId with STREAM_CONFIG_ID skip → corrects with warning - Duplicate config IDs → rejected - Negative config IDs → rejected - Idempotent — running twice produces the same result Add STREAM_CONFIG_ID to ValidationType enum so users can skip the nextStreamConfigId validation via validationTypesToSkip query param. --- .../segment/local/utils/TableConfigUtils.java | 92 +++++- .../local/utils/TableConfigUtilsTest.java | 279 ++++++++++++++++++ 2 files changed, 368 insertions(+), 3 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index 9106f7fe56ba..7d5f66caa27b 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -27,6 +27,7 @@ import com.google.common.collect.Sets; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -200,7 +201,7 @@ private static void validateEffectiveTableConfig(TableConfig tableConfig, Schema validateSegmentAssignmentConfig(tableConfig); validateIngestionConfig(tableConfig, schema); if (tableConfig.getTableType() == TableType.REALTIME) { - validateStreamConfigMaps(tableConfig); + validateStreamConfigMaps(tableConfig, skipTypes); } validateTierConfigList(tableConfig.getTierConfigsList()); validateIndexingConfigAndFieldConfigList(tableConfig, schema); @@ -764,7 +765,92 @@ public static void validateIngestionConfig(TableConfig tableConfig, Schema schem } } - private static void validateStreamConfigMaps(TableConfig tableConfig) { + /** + * Ensures every stream config map has a stable {@code stream.config.id} assigned. + *

Edge cases handled: + *

    + *
  • No entries have stream.config.id (legacy/new table) → auto-assign starting from 0
  • + *
  • Some entries have stream.config.id, some don't → assign only missing ones, skipping used IDs
  • + *
  • nextStreamConfigId is stale (less than max existing ID + 1) → corrected automatically
  • + *
  • Duplicate config IDs → rejected
  • + *
  • Negative config IDs → rejected
  • + *
+ */ + @VisibleForTesting + static void ensureStreamConfigIds(TableConfig tableConfig) { + ensureStreamConfigIds(tableConfig, Collections.emptySet()); + } + + @VisibleForTesting + static void ensureStreamConfigIds(TableConfig tableConfig, Set skipTypes) { + IngestionConfig ingestionConfig = tableConfig.getIngestionConfig(); + if (ingestionConfig == null || ingestionConfig.getStreamIngestionConfig() == null) { + return; + } + StreamIngestionConfig streamIngestionConfig = ingestionConfig.getStreamIngestionConfig(); + List> streamConfigMaps = streamIngestionConfig.getStreamConfigMaps(); + if (streamConfigMaps == null || streamConfigMaps.isEmpty()) { + return; + } + + // First pass: collect existing IDs and validate them + Set usedIds = new HashSet<>(); + int maxExistingId = -1; + for (Map configMap : streamConfigMaps) { + String idStr = configMap.get(StreamConfigProperties.STREAM_CONFIG_ID); + if (idStr != null) { + int configId; + try { + configId = Integer.parseInt(idStr); + } catch (NumberFormatException e) { + throw new IllegalStateException("Invalid stream.config.id value: " + idStr); + } + Preconditions.checkState(configId >= 0, + "stream.config.id must be non-negative, got: %s", configId); + Preconditions.checkState(usedIds.add(configId), + "Duplicate stream.config.id found: %s", configId); + maxExistingId = Math.max(maxExistingId, configId); + } + } + + // Determine the starting point for new IDs + int nextId = streamIngestionConfig.getNextStreamConfigId(); + if (maxExistingId >= 0 && nextId > 0 && nextId <= maxExistingId) { + if (!skipTypes.contains(ValidationType.STREAM_CONFIG_ID)) { + throw new IllegalStateException(String.format( + "nextStreamConfigId (%d) must be greater than the max existing stream.config.id (%d). " + + "Update nextStreamConfigId to at least %d, or skip validation type STREAM_CONFIG_ID.", + nextId, maxExistingId, maxExistingId + 1)); + } + LOGGER.warn("nextStreamConfigId ({}) is not greater than max existing stream.config.id ({}). " + + "Overriding to {} because STREAM_CONFIG_ID validation is skipped.", nextId, maxExistingId, + maxExistingId + 1); + nextId = maxExistingId + 1; + } + + // Second pass: assign IDs to entries that don't have one + int numAssigned = 0; + for (Map configMap : streamConfigMaps) { + if (!configMap.containsKey(StreamConfigProperties.STREAM_CONFIG_ID)) { + while (usedIds.contains(nextId)) { + nextId++; + } + configMap.put(StreamConfigProperties.STREAM_CONFIG_ID, String.valueOf(nextId)); + usedIds.add(nextId); + nextId++; + numAssigned++; + } + } + + streamIngestionConfig.setNextStreamConfigId(nextId); + if (numAssigned > 0) { + LOGGER.info("Auto-assigned stream.config.id to {} stream config(s). nextStreamConfigId is now {}.", + numAssigned, nextId); + } + } + + private static void validateStreamConfigMaps(TableConfig tableConfig, Set skipTypes) { + ensureStreamConfigIds(tableConfig, skipTypes); List> streamConfigMaps = IngestionConfigUtils.getStreamConfigMaps(tableConfig); int numStreamConfigs = streamConfigMaps.size(); List streamConfigs = new ArrayList<>(numStreamConfigs); @@ -2039,7 +2125,7 @@ public static boolean isTableTypeInconsistentDuringConsumption(@Nullable TableCo // enum of all the skip-able validation types. public enum ValidationType { - ALL, TASK, UPSERT, TENANT, MINION_INSTANCES, ACTIVE_TASKS + ALL, TASK, UPSERT, TENANT, MINION_INSTANCES, ACTIVE_TASKS, STREAM_CONFIG_ID } /** diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java index e847de4d2633..fc747069b189 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java @@ -1019,6 +1019,285 @@ public void ingestionServerIngestionOomProtectionTest() { TableConfigUtils.validateIngestionConfig(tableConfig, schema); } + @Test + public void testEnsureStreamConfigIdsNewTableNoIds() { + Map config0 = new HashMap<>(); + config0.put("streamType", "kafka"); + config0.put("stream.kafka.topic.name", "topic-A"); + Map config1 = new HashMap<>(); + config1.put("streamType", "kafka"); + config1.put("stream.kafka.topic.name", "topic-B"); + + StreamIngestionConfig sic = new StreamIngestionConfig(Arrays.asList(config0, config1)); + assertEquals(sic.getNextStreamConfigId(), 0); + + IngestionConfig ingestionConfig = new IngestionConfig(); + ingestionConfig.setStreamIngestionConfig(sic); + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName("timeColumn").setIngestionConfig(ingestionConfig).build(); + + TableConfigUtils.ensureStreamConfigIds(tableConfig); + + assertEquals(config0.get("stream.config.id"), "0"); + assertEquals(config1.get("stream.config.id"), "1"); + assertEquals(sic.getNextStreamConfigId(), 2); + } + + @Test + public void testEnsureStreamConfigIdsSingleStream() { + Map config0 = new HashMap<>(); + config0.put("streamType", "kafka"); + config0.put("stream.kafka.topic.name", "topic-A"); + + StreamIngestionConfig sic = new StreamIngestionConfig(Collections.singletonList(config0)); + IngestionConfig ingestionConfig = new IngestionConfig(); + ingestionConfig.setStreamIngestionConfig(sic); + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName("timeColumn").setIngestionConfig(ingestionConfig).build(); + + TableConfigUtils.ensureStreamConfigIds(tableConfig); + + assertEquals(config0.get("stream.config.id"), "0"); + assertEquals(sic.getNextStreamConfigId(), 1); + } + + @Test + public void testEnsureStreamConfigIdsPartialIds() { + Map config0 = new HashMap<>(); + config0.put("streamType", "kafka"); + config0.put("stream.kafka.topic.name", "topic-A"); + config0.put("stream.config.id", "0"); + + Map configNew = new HashMap<>(); + configNew.put("streamType", "kafka"); + configNew.put("stream.kafka.topic.name", "topic-C"); + + StreamIngestionConfig sic = new StreamIngestionConfig(Arrays.asList(config0, configNew)); + sic.setNextStreamConfigId(1); + IngestionConfig ingestionConfig = new IngestionConfig(); + ingestionConfig.setStreamIngestionConfig(sic); + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName("timeColumn").setIngestionConfig(ingestionConfig).build(); + + TableConfigUtils.ensureStreamConfigIds(tableConfig); + + assertEquals(config0.get("stream.config.id"), "0"); + assertEquals(configNew.get("stream.config.id"), "1"); + assertEquals(sic.getNextStreamConfigId(), 2); + } + + @Test + public void testEnsureStreamConfigIdsPartialIdsWithGap() { + Map config0 = new HashMap<>(); + config0.put("streamType", "kafka"); + config0.put("stream.kafka.topic.name", "topic-A"); + config0.put("stream.config.id", "0"); + + Map config3 = new HashMap<>(); + config3.put("streamType", "kafka"); + config3.put("stream.kafka.topic.name", "topic-D"); + config3.put("stream.config.id", "3"); + + Map configNew = new HashMap<>(); + configNew.put("streamType", "kafka"); + configNew.put("stream.kafka.topic.name", "topic-E"); + + StreamIngestionConfig sic = new StreamIngestionConfig(Arrays.asList(config0, config3, configNew)); + sic.setNextStreamConfigId(4); + IngestionConfig ingestionConfig = new IngestionConfig(); + ingestionConfig.setStreamIngestionConfig(sic); + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName("timeColumn").setIngestionConfig(ingestionConfig).build(); + + TableConfigUtils.ensureStreamConfigIds(tableConfig); + + assertEquals(config0.get("stream.config.id"), "0"); + assertEquals(config3.get("stream.config.id"), "3"); + assertEquals(configNew.get("stream.config.id"), "4"); + assertEquals(sic.getNextStreamConfigId(), 5); + } + + @Test + public void testEnsureStreamConfigIdsStaleNextId() { + // nextStreamConfigId explicitly set but below max existing ID — should fail + Map config0 = new HashMap<>(); + config0.put("streamType", "kafka"); + config0.put("stream.kafka.topic.name", "topic-A"); + config0.put("stream.config.id", "0"); + + Map config5 = new HashMap<>(); + config5.put("streamType", "kafka"); + config5.put("stream.kafka.topic.name", "topic-F"); + config5.put("stream.config.id", "5"); + + StreamIngestionConfig sic = new StreamIngestionConfig(Arrays.asList(config0, config5)); + sic.setNextStreamConfigId(2); // Stale — max existing is 5 + IngestionConfig ingestionConfig = new IngestionConfig(); + ingestionConfig.setStreamIngestionConfig(sic); + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName("timeColumn").setIngestionConfig(ingestionConfig).build(); + + try { + TableConfigUtils.ensureStreamConfigIds(tableConfig); + fail("Should fail when nextStreamConfigId is below max existing stream.config.id"); + } catch (IllegalStateException e) { + assertTrue(e.getMessage().contains("nextStreamConfigId")); + assertTrue(e.getMessage().contains("must be greater than")); + } + } + + @Test + public void testEnsureStreamConfigIdsStaleNextIdWithNewEntry() { + Map config5 = new HashMap<>(); + config5.put("streamType", "kafka"); + config5.put("stream.kafka.topic.name", "topic-F"); + config5.put("stream.config.id", "5"); + + Map configNew = new HashMap<>(); + configNew.put("streamType", "kafka"); + configNew.put("stream.kafka.topic.name", "topic-G"); + + StreamIngestionConfig sic = new StreamIngestionConfig(Arrays.asList(config5, configNew)); + sic.setNextStreamConfigId(1); + IngestionConfig ingestionConfig = new IngestionConfig(); + ingestionConfig.setStreamIngestionConfig(sic); + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName("timeColumn").setIngestionConfig(ingestionConfig).build(); + + try { + TableConfigUtils.ensureStreamConfigIds(tableConfig); + fail("Should fail when nextStreamConfigId is below max existing stream.config.id"); + } catch (IllegalStateException e) { + assertTrue(e.getMessage().contains("nextStreamConfigId")); + } + } + + @Test + public void testEnsureStreamConfigIdsStaleNextIdSkipValidation() { + // Same stale nextId scenario, but with STREAM_CONFIG_ID validation skipped — should correct silently + Map config5 = new HashMap<>(); + config5.put("streamType", "kafka"); + config5.put("stream.kafka.topic.name", "topic-F"); + config5.put("stream.config.id", "5"); + + Map configNew = new HashMap<>(); + configNew.put("streamType", "kafka"); + configNew.put("stream.kafka.topic.name", "topic-G"); + + StreamIngestionConfig sic = new StreamIngestionConfig(Arrays.asList(config5, configNew)); + sic.setNextStreamConfigId(1); + IngestionConfig ingestionConfig = new IngestionConfig(); + ingestionConfig.setStreamIngestionConfig(sic); + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName("timeColumn").setIngestionConfig(ingestionConfig).build(); + + TableConfigUtils.ensureStreamConfigIds(tableConfig, + Set.of(TableConfigUtils.ValidationType.STREAM_CONFIG_ID)); + + assertEquals(config5.get("stream.config.id"), "5"); + assertEquals(configNew.get("stream.config.id"), "6"); + assertEquals(sic.getNextStreamConfigId(), 7); + } + + @Test + public void testEnsureStreamConfigIdsDuplicateIds() { + Map config0 = new HashMap<>(); + config0.put("streamType", "kafka"); + config0.put("stream.kafka.topic.name", "topic-A"); + config0.put("stream.config.id", "1"); + + Map config1 = new HashMap<>(); + config1.put("streamType", "kafka"); + config1.put("stream.kafka.topic.name", "topic-B"); + config1.put("stream.config.id", "1"); + + StreamIngestionConfig sic = new StreamIngestionConfig(Arrays.asList(config0, config1)); + IngestionConfig ingestionConfig = new IngestionConfig(); + ingestionConfig.setStreamIngestionConfig(sic); + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName("timeColumn").setIngestionConfig(ingestionConfig).build(); + + try { + TableConfigUtils.ensureStreamConfigIds(tableConfig); + fail("Should fail for duplicate stream.config.id"); + } catch (IllegalStateException e) { + assertTrue(e.getMessage().contains("Duplicate stream.config.id")); + } + } + + @Test + public void testEnsureStreamConfigIdsNegativeId() { + Map config0 = new HashMap<>(); + config0.put("streamType", "kafka"); + config0.put("stream.kafka.topic.name", "topic-A"); + config0.put("stream.config.id", "-1"); + + StreamIngestionConfig sic = new StreamIngestionConfig(Collections.singletonList(config0)); + IngestionConfig ingestionConfig = new IngestionConfig(); + ingestionConfig.setStreamIngestionConfig(sic); + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName("timeColumn").setIngestionConfig(ingestionConfig).build(); + + try { + TableConfigUtils.ensureStreamConfigIds(tableConfig); + fail("Should fail for negative stream.config.id"); + } catch (IllegalStateException e) { + assertTrue(e.getMessage().contains("non-negative")); + } + } + + @Test + public void testEnsureStreamConfigIdsAllIdsPresent() { + Map config0 = new HashMap<>(); + config0.put("streamType", "kafka"); + config0.put("stream.kafka.topic.name", "topic-A"); + config0.put("stream.config.id", "0"); + + Map config1 = new HashMap<>(); + config1.put("streamType", "kafka"); + config1.put("stream.kafka.topic.name", "topic-B"); + config1.put("stream.config.id", "1"); + + StreamIngestionConfig sic = new StreamIngestionConfig(Arrays.asList(config0, config1)); + sic.setNextStreamConfigId(2); + IngestionConfig ingestionConfig = new IngestionConfig(); + ingestionConfig.setStreamIngestionConfig(sic); + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName("timeColumn").setIngestionConfig(ingestionConfig).build(); + + TableConfigUtils.ensureStreamConfigIds(tableConfig); + + assertEquals(config0.get("stream.config.id"), "0"); + assertEquals(config1.get("stream.config.id"), "1"); + assertEquals(sic.getNextStreamConfigId(), 2); + } + + @Test + public void testEnsureStreamConfigIdsIdempotent() { + Map config0 = new HashMap<>(); + config0.put("streamType", "kafka"); + config0.put("stream.kafka.topic.name", "topic-A"); + Map config1 = new HashMap<>(); + config1.put("streamType", "kafka"); + config1.put("stream.kafka.topic.name", "topic-B"); + + StreamIngestionConfig sic = new StreamIngestionConfig(Arrays.asList(config0, config1)); + IngestionConfig ingestionConfig = new IngestionConfig(); + ingestionConfig.setStreamIngestionConfig(sic); + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName("timeColumn").setIngestionConfig(ingestionConfig).build(); + + TableConfigUtils.ensureStreamConfigIds(tableConfig); + assertEquals(config0.get("stream.config.id"), "0"); + assertEquals(config1.get("stream.config.id"), "1"); + assertEquals(sic.getNextStreamConfigId(), 2); + + TableConfigUtils.ensureStreamConfigIds(tableConfig); + assertEquals(config0.get("stream.config.id"), "0"); + assertEquals(config1.get("stream.config.id"), "1"); + assertEquals(sic.getNextStreamConfigId(), 2); + } + @Test public void ingestionBatchConfigsTest() { Schema schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).build(); From 47fbd6821fc3a0a8a1585d6cceebbe75f917d9dc Mon Sep 17 00:00:00 2001 From: Rekha Seethamraju Date: Tue, 16 Jun 2026 15:01:43 -0700 Subject: [PATCH 03/10] Add MultiTopicLLCSegmentName with 5-part segment name format New segment name format for multi-topic tables: tableName__configId__streamPartitionId__sequenceNumber__creationTime This stores config ID and stream partition ID as separate fields rather than encoding them as configId * 10000 + partitionId, eliminating overflow and collision risks. - MultiTopicLLCSegmentName: 5-part format with of()/isMultiTopicLLCSegment() - getPartitionGroupId() computes encoded integer for backward compat - toLLCSegmentName() converts to 4-part format for legacy callers - LLCSegmentName.isLLCSegment() updated to recognize both formats - SegmentUtils.getPartitionIdFromSegmentName() tries multi-topic format - Disambiguates from UploadedRealtimeSegmentName (also 5-part) by checking that parts[1], parts[2], parts[3] are all integers --- .../pinot/common/utils/LLCSegmentName.java | 11 +- .../utils/MultiTopicLLCSegmentName.java | 183 ++++++++++++++++++ .../pinot/common/utils/SegmentUtils.java | 4 + .../utils/MultiTopicLLCSegmentNameTest.java | 146 ++++++++++++++ 4 files changed, 342 insertions(+), 2 deletions(-) create mode 100644 pinot-common/src/main/java/org/apache/pinot/common/utils/MultiTopicLLCSegmentName.java create mode 100644 pinot-common/src/test/java/org/apache/pinot/common/utils/MultiTopicLLCSegmentNameTest.java diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java index fdff73c0318d..3acf86acc4ea 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java @@ -73,7 +73,8 @@ public static LLCSegmentName of(String segmentName) { } /** - * Returns whether the given segment name represents an LLC segment. + * Returns whether the given segment name represents an LLC segment (either 4-part single-topic + * or 5-part multi-topic format). */ public static boolean isLLCSegment(String segmentName) { int numSeparators = 0; @@ -82,7 +83,13 @@ public static boolean isLLCSegment(String segmentName) { numSeparators++; index += 2; // SEPARATOR.length() } - return numSeparators == 3; + if (numSeparators == 3) { + return true; + } + if (numSeparators == 4) { + return MultiTopicLLCSegmentName.isMultiTopicLLCSegment(segmentName); + } + return false; } @Deprecated diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/MultiTopicLLCSegmentName.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/MultiTopicLLCSegmentName.java new file mode 100644 index 000000000000..803269ca9363 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/MultiTopicLLCSegmentName.java @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.utils; + +import com.fasterxml.jackson.annotation.JsonValue; +import com.google.common.base.Preconditions; +import java.util.Objects; +import javax.annotation.Nullable; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.spi.utils.IngestionConfigUtils; +import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + + +/** + * Segment name for multi-topic LLC realtime segments with 5-part format: + * {@code tableName__configId__streamPartitionId__sequenceNumber__creationTime} + * + *

This format stores the stream config ID and stream partition ID as separate fields + * rather than encoding them into a single partition group ID ({@code configId * 10000 + partitionId}). + * This eliminates collision and overflow risks from the encoding scheme. + * + *

Existing 4-part segments ({@link LLCSegmentName}) are not affected. Both formats can + * coexist in the same table. Use {@link #of(String)} to parse a segment name that may be + * in either format. + */ +public class MultiTopicLLCSegmentName implements Comparable { + private static final String SEPARATOR = "__"; + private static final String DATE_FORMAT = "yyyyMMdd'T'HHmm'Z'"; + private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormat.forPattern(DATE_FORMAT).withZoneUTC(); + + private final String _tableName; + private final int _configId; + private final int _streamPartitionId; + private final int _sequenceNumber; + private final String _creationTime; + private final String _segmentName; + + public MultiTopicLLCSegmentName(String segmentName) { + String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR); + Preconditions.checkArgument(parts.length == 5, "Invalid multi-topic LLC segment name: %s", segmentName); + _tableName = parts[0]; + _configId = Integer.parseInt(parts[1]); + _streamPartitionId = Integer.parseInt(parts[2]); + _sequenceNumber = Integer.parseInt(parts[3]); + _creationTime = parts[4]; + _segmentName = segmentName; + } + + public MultiTopicLLCSegmentName(String tableName, int configId, int streamPartitionId, int sequenceNumber, + long msSinceEpoch) { + Preconditions.checkArgument(!tableName.contains(SEPARATOR), "Illegal table name: %s", tableName); + _tableName = tableName; + _configId = configId; + _streamPartitionId = streamPartitionId; + _sequenceNumber = sequenceNumber; + _creationTime = DATE_FORMATTER.print(msSinceEpoch); + _segmentName = tableName + SEPARATOR + configId + SEPARATOR + streamPartitionId + SEPARATOR + sequenceNumber + + SEPARATOR + _creationTime; + } + + @Nullable + public static MultiTopicLLCSegmentName of(String segmentName) { + try { + return new MultiTopicLLCSegmentName(segmentName); + } catch (Exception e) { + return null; + } + } + + /** + * Returns whether the given segment name is a multi-topic LLC segment (5-part format where + * parts[1] and parts[3] are integers, distinguishing it from UploadedRealtimeSegmentName). + */ + public static boolean isMultiTopicLLCSegment(String segmentName) { + String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR); + if (parts.length != 5) { + return false; + } + try { + Integer.parseInt(parts[1]); // configId + Integer.parseInt(parts[2]); // streamPartitionId + Integer.parseInt(parts[3]); // sequenceNumber + return true; + } catch (NumberFormatException e) { + return false; + } + } + + public String getTableName() { + return _tableName; + } + + public int getConfigId() { + return _configId; + } + + public int getStreamPartitionId() { + return _streamPartitionId; + } + + /// Returns the encoded partition group ID for backward compatibility with code + /// that expects the single-integer encoding ({@code configId * 10000 + streamPartitionId}). + public int getPartitionGroupId() { + return IngestionConfigUtils.getPinotPartitionIdFromConfigId(_streamPartitionId, _configId); + } + + public int getSequenceNumber() { + return _sequenceNumber; + } + + public String getCreationTime() { + return _creationTime; + } + + public long getCreationTimeMs() { + DateTime dateTime = DATE_FORMATTER.parseDateTime(_creationTime); + return dateTime.getMillis(); + } + + @JsonValue + public String getSegmentName() { + return _segmentName; + } + + /// Converts this multi-topic segment name to an equivalent {@link LLCSegmentName} using the + /// encoded partition group ID. Useful for interacting with code that only understands the 4-part format. + public LLCSegmentName toLLCSegmentName() { + return new LLCSegmentName(_tableName, getPartitionGroupId(), _sequenceNumber, getCreationTimeMs()); + } + + @Override + public int compareTo(MultiTopicLLCSegmentName other) { + Preconditions.checkArgument(_tableName.equals(other._tableName), + "Cannot compare segment names from different table: %s, %s", _segmentName, other.getSegmentName()); + if (_configId != other._configId) { + return Integer.compare(_configId, other._configId); + } + if (_streamPartitionId != other._streamPartitionId) { + return Integer.compare(_streamPartitionId, other._streamPartitionId); + } + return Integer.compare(_sequenceNumber, other._sequenceNumber); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof MultiTopicLLCSegmentName)) { + return false; + } + MultiTopicLLCSegmentName that = (MultiTopicLLCSegmentName) o; + return _segmentName.equals(that._segmentName); + } + + @Override + public int hashCode() { + return Objects.hash(_segmentName); + } + + @Override + public String toString() { + return _segmentName; + } +} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java index 80e735fb2f80..762191325a30 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java @@ -74,6 +74,10 @@ public static Integer getPartitionIdFromSegmentName(String segmentName) { if (llcSegmentName != null) { return llcSegmentName.getPartitionGroupId(); } + MultiTopicLLCSegmentName multiTopicLLCSegmentName = MultiTopicLLCSegmentName.of(segmentName); + if (multiTopicLLCSegmentName != null) { + return multiTopicLLCSegmentName.getPartitionGroupId(); + } UploadedRealtimeSegmentName uploadedRealtimeSegmentName = UploadedRealtimeSegmentName.of(segmentName); if (uploadedRealtimeSegmentName != null) { return uploadedRealtimeSegmentName.getPartitionId(); diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/MultiTopicLLCSegmentNameTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/MultiTopicLLCSegmentNameTest.java new file mode 100644 index 000000000000..1d04f7b6c62d --- /dev/null +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/MultiTopicLLCSegmentNameTest.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.utils; + +import org.testng.annotations.Test; + +import static org.testng.Assert.*; + + +public class MultiTopicLLCSegmentNameTest { + + @Test + public void testConstructFromComponents() { + MultiTopicLLCSegmentName name = new MultiTopicLLCSegmentName("myTable", 2, 5, 3, 1700000000000L); + assertEquals(name.getTableName(), "myTable"); + assertEquals(name.getConfigId(), 2); + assertEquals(name.getStreamPartitionId(), 5); + assertEquals(name.getSequenceNumber(), 3); + assertEquals(name.getPartitionGroupId(), 20005); + assertEquals(name.getSegmentName(), "myTable__2__5__3__20231114T2213Z"); + } + + @Test + public void testConstructFromString() { + MultiTopicLLCSegmentName name = new MultiTopicLLCSegmentName("myTable__2__5__3__20231114T2213Z"); + assertEquals(name.getTableName(), "myTable"); + assertEquals(name.getConfigId(), 2); + assertEquals(name.getStreamPartitionId(), 5); + assertEquals(name.getSequenceNumber(), 3); + assertEquals(name.getPartitionGroupId(), 20005); + } + + @Test + public void testOfValidName() { + MultiTopicLLCSegmentName name = MultiTopicLLCSegmentName.of("myTable__0__63__10__20231215T0000Z"); + assertNotNull(name); + assertEquals(name.getConfigId(), 0); + assertEquals(name.getStreamPartitionId(), 63); + assertEquals(name.getSequenceNumber(), 10); + assertEquals(name.getPartitionGroupId(), 63); + } + + @Test + public void testOfInvalidName() { + // 4-part LLC format — should return null + assertNull(MultiTopicLLCSegmentName.of("myTable__10005__3__20231215T0000Z")); + // Uploaded realtime format — should return null (parts[3] is a date, not integer) + assertNull(MultiTopicLLCSegmentName.of("uploaded__myTable__5__20231215T0000Z__suffix")); + // Garbage + assertNull(MultiTopicLLCSegmentName.of("not_a_segment")); + } + + @Test + public void testIsMultiTopicLLCSegment() { + assertTrue(MultiTopicLLCSegmentName.isMultiTopicLLCSegment("myTable__2__5__3__20231215T0000Z")); + assertTrue(MultiTopicLLCSegmentName.isMultiTopicLLCSegment("myTable__0__0__0__20231215T0000Z")); + // 4-part LLC + assertFalse(MultiTopicLLCSegmentName.isMultiTopicLLCSegment("myTable__10005__3__20231215T0000Z")); + // Uploaded realtime — parts[3] is a date string, not parseable as integer + assertFalse(MultiTopicLLCSegmentName.isMultiTopicLLCSegment("uploaded__myTable__5__20231215T0000Z__suffix")); + // 3 parts + assertFalse(MultiTopicLLCSegmentName.isMultiTopicLLCSegment("a__b__c")); + } + + @Test + public void testPartitionGroupIdEncoding() { + // configId=0, partition=5 -> 5 + MultiTopicLLCSegmentName name0 = new MultiTopicLLCSegmentName("myTable", 0, 5, 0, 1700000000000L); + assertEquals(name0.getPartitionGroupId(), 5); + + // configId=1, partition=3 -> 10003 + MultiTopicLLCSegmentName name1 = new MultiTopicLLCSegmentName("myTable", 1, 3, 0, 1700000000000L); + assertEquals(name1.getPartitionGroupId(), 10003); + + // configId=5, partition=99 -> 50099 + MultiTopicLLCSegmentName name5 = new MultiTopicLLCSegmentName("myTable", 5, 99, 0, 1700000000000L); + assertEquals(name5.getPartitionGroupId(), 50099); + } + + @Test + public void testToLLCSegmentName() { + MultiTopicLLCSegmentName multiTopic = new MultiTopicLLCSegmentName("myTable", 1, 5, 3, 1700000000000L); + LLCSegmentName llc = multiTopic.toLLCSegmentName(); + assertEquals(llc.getTableName(), "myTable"); + assertEquals(llc.getPartitionGroupId(), 10005); + assertEquals(llc.getSequenceNumber(), 3); + } + + @Test + public void testCompareTo() { + MultiTopicLLCSegmentName a = new MultiTopicLLCSegmentName("myTable", 0, 5, 1, 1700000000000L); + MultiTopicLLCSegmentName b = new MultiTopicLLCSegmentName("myTable", 0, 5, 2, 1700000000000L); + MultiTopicLLCSegmentName c = new MultiTopicLLCSegmentName("myTable", 1, 0, 0, 1700000000000L); + + assertTrue(a.compareTo(b) < 0); + assertTrue(b.compareTo(a) > 0); + assertTrue(a.compareTo(c) < 0); // configId 0 < configId 1 + } + + @Test + public void testEqualsAndHashCode() { + MultiTopicLLCSegmentName a = new MultiTopicLLCSegmentName("myTable", 1, 5, 3, 1700000000000L); + MultiTopicLLCSegmentName b = new MultiTopicLLCSegmentName(a.getSegmentName()); + assertEquals(a, b); + assertEquals(a.hashCode(), b.hashCode()); + + MultiTopicLLCSegmentName c = new MultiTopicLLCSegmentName("myTable", 1, 5, 4, 1700000000000L); + assertNotEquals(a, c); + } + + @Test + public void testLLCSegmentNameIsLLCSegmentIncludesMultiTopic() { + // 4-part format + assertTrue(LLCSegmentName.isLLCSegment("myTable__10005__3__20231215T0000Z")); + // 5-part multi-topic format + assertTrue(LLCSegmentName.isLLCSegment("myTable__1__5__3__20231215T0000Z")); + // Uploaded realtime — should NOT be detected as LLC + assertFalse(LLCSegmentName.isLLCSegment("uploaded__myTable__5__20231215T0000Z__suffix")); + } + + @Test + public void testSegmentUtilsPartitionId() { + // Multi-topic segment returns encoded partition group ID + assertEquals(SegmentUtils.getPartitionIdFromSegmentName("myTable__1__5__3__20231215T0000Z"), + Integer.valueOf(10005)); + // Standard LLC segment + assertEquals(SegmentUtils.getPartitionIdFromSegmentName("myTable__10005__3__20231215T0000Z"), + Integer.valueOf(10005)); + } +} From 20ac41b1c62d711172a47c6d2bea4494e13fd740 Mon Sep 17 00:00:00 2001 From: Rekha Seethamraju Date: Tue, 16 Jun 2026 15:47:10 -0700 Subject: [PATCH 04/10] Create new segments with 5-part multi-topic format Multi-topic tables now create segments with the 5-part format: tableName__configId__streamPartitionId__sequenceNumber__creationTime Changes: - LLCSegmentName string constructor parses both 4-part and 5-part formats, computing encoded partitionGroupId for 5-part - getNextLLCSegmentName() takes isMultiTopic flag to produce 5-part - setupNewPartitionGroup() creates 5-part for multi-topic tables - createNewSegmentMetadata() uses getNextLLCSegmentName() instead of direct LLCSegmentName construction All existing code that takes LLCSegmentName continues to work because getPartitionGroupId() returns the encoded value for both formats, and getSegmentName() returns the original string. --- .../pinot/common/utils/LLCSegmentName.java | 21 +++++++--- .../PinotLLCRealtimeSegmentManager.java | 39 ++++++++++++++----- 2 files changed, 46 insertions(+), 14 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java index 3acf86acc4ea..c4e0aa5f8a53 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java @@ -23,6 +23,7 @@ import java.util.Objects; import javax.annotation.Nullable; import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.spi.utils.IngestionConfigUtils; import org.joda.time.DateTime; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; @@ -41,11 +42,21 @@ public class LLCSegmentName implements Comparable { public LLCSegmentName(String segmentName) { String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR); - Preconditions.checkArgument(parts.length == 4, "Invalid LLC segment name: %s", segmentName); - _tableName = parts[0]; - _partitionGroupId = Integer.parseInt(parts[1]); - _sequenceNumber = Integer.parseInt(parts[2]); - _creationTime = parts[3]; + if (parts.length == 5) { + // Multi-topic format: tableName__configId__streamPartitionId__sequenceNumber__creationTime + _tableName = parts[0]; + int configId = Integer.parseInt(parts[1]); + int streamPartitionId = Integer.parseInt(parts[2]); + _partitionGroupId = configId * IngestionConfigUtils.PARTITION_PADDING_OFFSET + streamPartitionId; + _sequenceNumber = Integer.parseInt(parts[3]); + _creationTime = parts[4]; + } else { + Preconditions.checkArgument(parts.length == 4, "Invalid LLC segment name: %s", segmentName); + _tableName = parts[0]; + _partitionGroupId = Integer.parseInt(parts[1]); + _sequenceNumber = Integer.parseInt(parts[2]); + _creationTime = parts[3]; + } _segmentName = segmentName; } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 0e27d1565290..b51c4242ab11 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -85,6 +85,7 @@ import org.apache.pinot.common.restlet.resources.TableLLCSegmentUploadResponse; import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.common.utils.MultiTopicLLCSegmentName; import org.apache.pinot.common.utils.PauselessConsumptionUtils; import org.apache.pinot.common.utils.URIUtils; import org.apache.pinot.common.utils.helix.HelixHelper; @@ -858,8 +859,8 @@ private String createNewSegmentMetadata(TableConfig tableConfig, } String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName); long newSegmentCreationTimeMs = getCurrentTimeMs(); - LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, committingSegmentPartitionGroupId, - committingLLCSegment.getSequenceNumber() + 1, newSegmentCreationTimeMs); + LLCSegmentName newLLCSegment = getNextLLCSegmentName(committingLLCSegment, newSegmentCreationTimeMs, + streamConfigs.size() > 1); StreamConfig streamConfig = IngestionConfigUtils.getStreamConfigFromPinotPartitionId(streamConfigs, committingSegmentPartitionGroupId); @@ -1848,7 +1849,8 @@ IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, List instancePartitionsMap, StreamPartitionMsgOffset startOffset) { int numReplicas = getNumReplicas(tableConfig, instancePartitions); LLCSegmentName latestLLCSegmentName = new LLCSegmentName(latestSegmentZKMetadata.getSegmentName()); - LLCSegmentName newLLCSegmentName = getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs); + LLCSegmentName newLLCSegmentName = getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs, + IngestionConfigUtils.hasMultipleStreams(tableConfig)); CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(latestSegmentZKMetadata.getSegmentName(), startOffset.toString(), 0); createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs, committingSegmentDescriptor, @@ -2104,9 +2107,19 @@ private StreamPartitionMsgOffset selectStartOffset(OffsetCriteria offsetCriteria } } - private LLCSegmentName getNextLLCSegmentName(LLCSegmentName lastLLCSegmentName, long creationTimeMs) { - return new LLCSegmentName(lastLLCSegmentName.getTableName(), lastLLCSegmentName.getPartitionGroupId(), - lastLLCSegmentName.getSequenceNumber() + 1, creationTimeMs); + private LLCSegmentName getNextLLCSegmentName(LLCSegmentName lastLLCSegmentName, long creationTimeMs, + boolean isMultiTopic) { + String tableName = lastLLCSegmentName.getTableName(); + int nextSeq = lastLLCSegmentName.getSequenceNumber() + 1; + if (isMultiTopic) { + int partitionGroupId = lastLLCSegmentName.getPartitionGroupId(); + int configId = IngestionConfigUtils.getConfigIdFromPinotPartitionId(partitionGroupId); + int streamPartitionId = IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(partitionGroupId); + String segmentName = new MultiTopicLLCSegmentName(tableName, configId, streamPartitionId, + nextSeq, creationTimeMs).getSegmentName(); + return new LLCSegmentName(segmentName); + } + return new LLCSegmentName(tableName, lastLLCSegmentName.getPartitionGroupId(), nextSeq, creationTimeMs); } /** @@ -2131,8 +2144,16 @@ private String setupNewPartitionGroup(TableConfig tableConfig, StreamConfig stre partitionGroupId, realtimeTableName, sequence, startOffset); String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName); - LLCSegmentName newLLCSegmentName = - new LLCSegmentName(rawTableName, partitionGroupId, sequence, creationTimeMs); + LLCSegmentName newLLCSegmentName; + if (IngestionConfigUtils.hasMultipleStreams(tableConfig)) { + int configId = IngestionConfigUtils.getConfigIdFromPinotPartitionId(partitionGroupId); + int streamPartitionId = IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(partitionGroupId); + String segmentName = new MultiTopicLLCSegmentName(rawTableName, configId, streamPartitionId, + sequence, creationTimeMs).getSegmentName(); + newLLCSegmentName = new LLCSegmentName(segmentName); + } else { + newLLCSegmentName = new LLCSegmentName(rawTableName, partitionGroupId, sequence, creationTimeMs); + } String newSegmentName = newLLCSegmentName.getSegmentName(); CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(null, From 9f8cec9e5171265575b15d4bb0d8081aa2c6d922 Mon Sep 17 00:00:00 2001 From: Rekha Seethamraju Date: Thu, 18 Jun 2026 08:04:29 -0700 Subject: [PATCH 05/10] Add integration tests for stable topic ID assignment and segment naming Tests covering: - stream.config.id assignment and uniqueness across stream configs - 5-part multi-topic segment name format with correct config ID / partition encoding - Config ID lookup methods (getConfigId, getStreamConfigMapByConfigId, getConfigIdToStreamConfigMap) - Backward compatibility: LLCSegmentName.of() parses 5-part format and produces same partition group ID --- ...tiTopicRealtimeClusterIntegrationTest.java | 133 ++++++++++++++++++ 1 file changed, 133 insertions(+) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MultiTopicRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MultiTopicRealtimeClusterIntegrationTest.java index d2013f22399f..7bb5d537963a 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MultiTopicRealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MultiTopicRealtimeClusterIntegrationTest.java @@ -30,6 +30,7 @@ import java.util.stream.Collectors; import org.apache.helix.model.IdealState; import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.common.utils.MultiTopicLLCSegmentName; import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils; import org.apache.pinot.plugin.inputformat.csv.CSVMessageDecoder; import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties; @@ -406,4 +407,136 @@ public void testDistinctSources() assertEquals(rows.get(i).get(0).asText(), sourceName(i)); } } + + @Test + public void testStreamConfigIdsAssigned() { + String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME); + TableConfig tableConfig = getSharedHelixResourceManager().getTableConfig(realtimeTableName); + assertNotNull(tableConfig, "Table config should exist"); + + StreamIngestionConfig streamIngestionConfig = + tableConfig.getIngestionConfig().getStreamIngestionConfig(); + assertNotNull(streamIngestionConfig, "Stream ingestion config should exist"); + + List> streamConfigMaps = streamIngestionConfig.getStreamConfigMaps(); + int numTopics = getNumTopics(); + assertEquals(streamConfigMaps.size(), numTopics, "Should have " + numTopics + " stream configs"); + + Set configIds = new HashSet<>(); + for (int i = 0; i < numTopics; i++) { + Map configMap = streamConfigMaps.get(i); + String configIdStr = configMap.get(StreamConfigProperties.STREAM_CONFIG_ID); + assertNotNull(configIdStr, "stream.config.id should be set for stream config at index " + i); + + int configId = Integer.parseInt(configIdStr); + assertTrue(configId >= 0, "Config ID should be non-negative, got: " + configId); + assertTrue(configIds.add(configId), "Config IDs should be unique, duplicate: " + configId); + } + + assertEquals(streamIngestionConfig.getNextStreamConfigId(), numTopics, + "nextStreamConfigId should equal the number of topics"); + } + + @Test + public void testMultiTopicSegmentNameFormat() { + String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME); + IdealState idealState = getSharedHelixResourceManager().getTableIdealState(realtimeTableName); + assertNotNull(idealState); + + int numTopics = getNumTopics(); + if (numTopics <= 1) { + return; + } + + int multiTopicSegmentCount = 0; + Set configIdsSeen = new HashSet<>(); + + for (String segmentName : idealState.getPartitionSet()) { + MultiTopicLLCSegmentName multiTopicName = MultiTopicLLCSegmentName.of(segmentName); + if (multiTopicName != null) { + multiTopicSegmentCount++; + configIdsSeen.add(multiTopicName.getConfigId()); + + assertTrue(multiTopicName.getConfigId() >= 0, + "Config ID should be non-negative: " + segmentName); + assertTrue(multiTopicName.getStreamPartitionId() >= 0, + "Stream partition ID should be non-negative: " + segmentName); + assertTrue(multiTopicName.getStreamPartitionId() < NUM_PARTITIONS_PER_TOPIC, + "Stream partition ID should be < " + NUM_PARTITIONS_PER_TOPIC + ": " + segmentName); + + int expectedPartitionGroupId = + IngestionConfigUtils.getPinotPartitionIdFromConfigId( + multiTopicName.getStreamPartitionId(), multiTopicName.getConfigId()); + assertEquals(multiTopicName.getPartitionGroupId(), expectedPartitionGroupId, + "Partition group ID encoding mismatch for segment: " + segmentName); + + LLCSegmentName llcParsed = new LLCSegmentName(segmentName); + assertEquals(llcParsed.getPartitionGroupId(), expectedPartitionGroupId, + "LLCSegmentName should parse 5-part format and produce same partition group ID"); + } + } + + assertTrue(multiTopicSegmentCount > 0, + "Should have at least one segment in 5-part multi-topic format"); + + for (int i = 0; i < numTopics; i++) { + assertTrue(configIdsSeen.contains(i), + "Should have segments with config ID " + i); + } + } + + @Test + public void testConfigIdLookupMethods() { + String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME); + TableConfig tableConfig = getSharedHelixResourceManager().getTableConfig(realtimeTableName); + assertNotNull(tableConfig); + + StreamIngestionConfig streamIngestionConfig = + tableConfig.getIngestionConfig().getStreamIngestionConfig(); + int numTopics = getNumTopics(); + + for (int i = 0; i < numTopics; i++) { + int configId = streamIngestionConfig.getConfigId(i); + assertEquals(configId, i, "Config ID at index " + i + " should be " + i); + + Map configMap = streamIngestionConfig.getStreamConfigMapByConfigId(configId); + assertNotNull(configMap, "Should find stream config for config ID " + configId); + + String topicName = configMap.get( + StreamConfigProperties.constructStreamProperty("kafka", StreamConfigProperties.STREAM_TOPIC_NAME)); + assertEquals(topicName, topicName(i), + "Topic name for config ID " + configId + " should match"); + } + + Map> configIdMap = streamIngestionConfig.getConfigIdToStreamConfigMap(); + assertEquals(configIdMap.size(), numTopics, "Config ID map should have " + numTopics + " entries"); + for (int i = 0; i < numTopics; i++) { + assertTrue(configIdMap.containsKey(i), "Config ID map should contain key " + i); + } + } + + @Test + public void testSegmentNameBackwardCompatibility() { + String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME); + IdealState idealState = getSharedHelixResourceManager().getTableIdealState(realtimeTableName); + assertNotNull(idealState); + + for (String segmentName : idealState.getPartitionSet()) { + if (!LLCSegmentName.isLLCSegment(segmentName)) { + continue; + } + + LLCSegmentName llcName = LLCSegmentName.of(segmentName); + assertNotNull(llcName, "LLCSegmentName.of() should parse segment: " + segmentName); + + MultiTopicLLCSegmentName multiTopicName = MultiTopicLLCSegmentName.of(segmentName); + if (multiTopicName != null) { + assertEquals(llcName.getPartitionGroupId(), multiTopicName.getPartitionGroupId(), + "Partition group ID should match between LLCSegmentName and MultiTopicLLCSegmentName " + + "for segment: " + segmentName); + assertEquals(llcName.getTableName(), multiTopicName.getTableName()); + assertEquals(llcName.getSequenceNumber(), multiTopicName.getSequenceNumber()); + } + } + } } From e1d612bf07c1033ae4de3fa9deb1698b1f2ca370 Mon Sep 17 00:00:00 2001 From: Rekha Seethamraju Date: Thu, 18 Jun 2026 08:06:57 -0700 Subject: [PATCH 06/10] Add getStreamConfigId() to LLCSegmentName Stores the stream config ID directly on LLCSegmentName at parse time instead of requiring callers to derive it via IngestionConfigUtils each time. For 4-part single-topic segments this is always 0; for 5-part multi-topic segments it is the configId field parsed from the name. --- .../org/apache/pinot/common/utils/LLCSegmentName.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java index c4e0aa5f8a53..3c31d5d19702 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java @@ -36,6 +36,7 @@ public class LLCSegmentName implements Comparable { private final String _tableName; private final int _partitionGroupId; + private final int _streamConfigId; private final int _sequenceNumber; private final String _creationTime; private final String _segmentName; @@ -47,6 +48,7 @@ public LLCSegmentName(String segmentName) { _tableName = parts[0]; int configId = Integer.parseInt(parts[1]); int streamPartitionId = Integer.parseInt(parts[2]); + _streamConfigId = configId; _partitionGroupId = configId * IngestionConfigUtils.PARTITION_PADDING_OFFSET + streamPartitionId; _sequenceNumber = Integer.parseInt(parts[3]); _creationTime = parts[4]; @@ -54,6 +56,7 @@ public LLCSegmentName(String segmentName) { Preconditions.checkArgument(parts.length == 4, "Invalid LLC segment name: %s", segmentName); _tableName = parts[0]; _partitionGroupId = Integer.parseInt(parts[1]); + _streamConfigId = 0; _sequenceNumber = Integer.parseInt(parts[2]); _creationTime = parts[3]; } @@ -64,6 +67,7 @@ public LLCSegmentName(String tableName, int partitionGroupId, int sequenceNumber Preconditions.checkArgument(!tableName.contains(SEPARATOR), "Illegal table name: %s", tableName); _tableName = tableName; _partitionGroupId = partitionGroupId; + _streamConfigId = 0; _sequenceNumber = sequenceNumber; // ISO8601 date: 20160120T1234Z _creationTime = DATE_FORMATTER.print(msSinceEpoch); @@ -123,6 +127,10 @@ public int getPartitionGroupId() { return _partitionGroupId; } + public int getStreamConfigId() { + return _streamConfigId; + } + public int getSequenceNumber() { return _sequenceNumber; } From 599f70f8dde18f7696f8c6a5d81451c6a8703d54 Mon Sep 17 00:00:00 2001 From: Rekha Seethamraju Date: Thu, 18 Jun 2026 08:56:11 -0700 Subject: [PATCH 07/10] Migrate all decode sites to use stable stream config ID All sites that previously derived stream config by integer-dividing partitionGroupId / 10000 (treating the quotient as a positional list index) now look up the stream config by its stable stream.config.id instead. This makes the encoding position-independent so topics can be deleted and re-added without corrupting existing segment-to-topic mappings. Changes: - IngestionConfigUtils: add getConfigIdFromStreamConfig(), update getStreamConfigFromPinotPartitionId() to do stable-ID lookup instead of list.get(index), update getStreamConfigMap() likewise - PinotLLCRealtimeSegmentManager: replace all StreamPartitionMsgOffset[] arrays keyed by index with Map; use LLCSegmentName. getStreamConfigId() where available; use getStreamConfigFromPinotPartitionId at all fetch/repair sites; encode new partitionGroupIds using configId not list index - RealtimeSegmentDataManager: look up stream config by configId from LLCSegmentName instead of list index - IngestionDelayTracker: key _streamConfigIndexToStreamMetadataProvider by stable configId; use getPinotPartitionIdFromConfigId for encoding - PinotTableRestletResource: group watermarks by configId not index - PartitionGroupMetadataFetcher: filter and encode using stable configId --- .../resources/PinotTableRestletResource.java | 27 +++++------ .../PinotLLCRealtimeSegmentManager.java | 48 +++++++++---------- .../realtime/IngestionDelayTracker.java | 33 +++++++------ .../realtime/RealtimeSegmentDataManager.java | 8 ++-- .../stream/PartitionGroupMetadataFetcher.java | 9 ++-- .../pinot/spi/utils/IngestionConfigUtils.java | 38 ++++++++++----- 6 files changed, 84 insertions(+), 79 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java index 91985f5997ea..2bbec915bd06 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java @@ -392,31 +392,28 @@ List getStreamMetadataList(List streamConfigs, throws Exception { Map streamPartitionCountMap = _pinotHelixResourceManager.getRealtimeSegmentManager().getPartitionCountMap(streamConfigs); - Map> partitionGroupMetadataByStreamConfigIndex = new HashMap<>(); + Map> partitionGroupMetadataByConfigId = new HashMap<>(); for (WatermarkInductionResult.Watermark watermark : watermarkInductionResult.getWatermarks()) { - int streamConfigIndex = - IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(watermark.getPartitionGroupId()); - Preconditions.checkArgument(streamConfigIndex >= 0 && streamConfigIndex < streamConfigs.size(), - "Invalid stream config index %s from watermark partition ID %s. Expected index in range [0, %s)", - streamConfigIndex, watermark.getPartitionGroupId(), streamConfigs.size()); - partitionGroupMetadataByStreamConfigIndex.computeIfAbsent(streamConfigIndex, ignored -> new ArrayList<>()).add( + int configId = IngestionConfigUtils.getConfigIdFromPinotPartitionId(watermark.getPartitionGroupId()); + partitionGroupMetadataByConfigId.computeIfAbsent(configId, ignored -> new ArrayList<>()).add( new PartitionGroupMetadata(watermark.getPartitionGroupId(), new LongMsgOffset(watermark.getOffset()), watermark.getSequenceNumber())); } - // Iterate in order by streamConfigIndex to ensure deterministic ordering - List streamMetadataList = new ArrayList<>(partitionGroupMetadataByStreamConfigIndex.size()); - for (int streamConfigIndex = 0; streamConfigIndex < streamConfigs.size(); streamConfigIndex++) { + // Iterate over stream configs in order to ensure deterministic ordering + List streamMetadataList = new ArrayList<>(partitionGroupMetadataByConfigId.size()); + for (int i = 0; i < streamConfigs.size(); i++) { + int configId = IngestionConfigUtils.getConfigIdFromStreamConfig(streamConfigs.get(i)); List partitionGroupMetadataList = - partitionGroupMetadataByStreamConfigIndex.get(streamConfigIndex); + partitionGroupMetadataByConfigId.get(configId); if (partitionGroupMetadataList == null) { - // No watermarks for this stream config index, skip it + // No watermarks for this stream config, skip it continue; } - Integer partitionCount = streamPartitionCountMap.get(streamConfigIndex); + Integer partitionCount = streamPartitionCountMap.get(configId); Preconditions.checkState(partitionCount != null, - "Cannot find partition count for stream config index: %s", streamConfigIndex); - streamMetadataList.add(new StreamMetadata(streamConfigs.get(streamConfigIndex), + "Cannot find partition count for stream config ID: %s", configId); + streamMetadataList.add(new StreamMetadata(streamConfigs.get(i), partitionCount, partitionGroupMetadataList)); } return streamMetadataList; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index b51c4242ab11..805c75b16e94 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -316,18 +316,17 @@ public List getPartitionGroupConsumptionStatusL } } else { // Multiple streams - StreamPartitionMsgOffsetFactory[] offsetFactories = new StreamPartitionMsgOffsetFactory[numStreams]; + Map offsetFactoriesByConfigId = new HashMap<>(); for (Map.Entry entry : partitionGroupIdToLatestSegment.entrySet()) { int partitionGroupId = entry.getKey(); - int index = IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(partitionGroupId); - int streamPartitionId = IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(partitionGroupId); LLCSegmentName llcSegmentName = entry.getValue(); + int configId = llcSegmentName.getStreamConfigId(); + int streamPartitionId = IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(partitionGroupId); SegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(tableNameWithType, llcSegmentName.getSegmentName()); - StreamPartitionMsgOffsetFactory offsetFactory = offsetFactories[index]; - if (offsetFactory == null) { - offsetFactory = StreamConsumerFactoryProvider.create(streamConfigs.get(index)).createStreamMsgOffsetFactory(); - offsetFactories[index] = offsetFactory; - } + StreamPartitionMsgOffsetFactory offsetFactory = + offsetFactoriesByConfigId.computeIfAbsent(configId, k -> StreamConsumerFactoryProvider.create( + IngestionConfigUtils.getStreamConfigFromPinotPartitionId(streamConfigs, partitionGroupId)) + .createStreamMsgOffsetFactory()); PartitionGroupConsumptionStatus partitionGroupConsumptionStatus = new PartitionGroupConsumptionStatus(partitionGroupId, streamPartitionId, llcSegmentName.getSequenceNumber(), offsetFactory.create(segmentZKMetadata.getStartOffset()), @@ -1273,10 +1272,10 @@ private PartitionIdsWithIdealState getPartitionIdsWithIdealState(List IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(partitionId, index)) + .map(partitionId -> IngestionConfigUtils.getPinotPartitionIdFromConfigId(partitionId, configId)) .collect(Collectors.toSet())); } catch (UnsupportedOperationException ignored) { allPartitionIdsFetched = false; @@ -1582,8 +1581,7 @@ public static boolean isTopicPaused(IdealState idealState, int topicIndex) { public static boolean isTopicPaused(IdealState idealState, String segmentName) { LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName); if (llcSegmentName != null) { - return isTopicPaused(idealState, - IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(llcSegmentName.getPartitionGroupId())); + return isTopicPaused(idealState, llcSegmentName.getStreamConfigId()); } return false; } @@ -1830,7 +1828,8 @@ IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, List instanceStateMap = instanceStatesMap.get(latestSegmentName); if (instanceStateMap != null) { @@ -1855,7 +1854,7 @@ IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, List buildPartitionGroupConsumptionStatusFromZK zkMetadata.getStatus().toString())); } } else { - StreamPartitionMsgOffsetFactory[] offsetFactories = new StreamPartitionMsgOffsetFactory[numStreams]; + Map offsetFactoriesByConfigId = new HashMap<>(); for (Map.Entry entry : latestSegmentZKMetadataMap.entrySet()) { int partitionGroupId = entry.getKey(); - int index = IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(partitionGroupId); int streamPartitionId = IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(partitionGroupId); SegmentZKMetadata zkMetadata = entry.getValue(); LLCSegmentName llcSegmentName = new LLCSegmentName(zkMetadata.getSegmentName()); - StreamPartitionMsgOffsetFactory offsetFactory = offsetFactories[index]; - if (offsetFactory == null) { - offsetFactory = - StreamConsumerFactoryProvider.create(streamConfigs.get(index)).createStreamMsgOffsetFactory(); - offsetFactories[index] = offsetFactory; - } + int configId = llcSegmentName.getStreamConfigId(); + StreamPartitionMsgOffsetFactory offsetFactory = + offsetFactoriesByConfigId.computeIfAbsent(configId, k -> StreamConsumerFactoryProvider.create( + IngestionConfigUtils.getStreamConfigFromPinotPartitionId(streamConfigs, partitionGroupId)) + .createStreamMsgOffsetFactory()); result.add(new PartitionGroupConsumptionStatus(partitionGroupId, streamPartitionId, llcSegmentName.getSequenceNumber(), offsetFactory.create(zkMetadata.getStartOffset()), @@ -2914,8 +2911,7 @@ private Set findConsumingSegmentsOfTopics(IdealState idealState, List consumingSegments = new TreeSet<>(); idealState.getRecord().getMapFields().forEach((segmentName, instanceToStateMap) -> { LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName); - if (llcSegmentName != null && !topicIndices.contains( - IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(llcSegmentName.getPartitionGroupId()))) { + if (llcSegmentName != null && !topicIndices.contains(llcSegmentName.getStreamConfigId())) { return; } for (String state : instanceToStateMap.values()) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java index e26390b9e4fb..724bed0ea868 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java @@ -198,14 +198,13 @@ private void createOrUpdateStreamMetadataProvider() { List streamConfigs = IngestionConfigUtils.getStreamConfigs(_realTimeTableDataManager.getCachedTableConfigAndSchema().getLeft()); - for (int streamConfigIndex = 0; streamConfigIndex < streamConfigs.size(); streamConfigIndex++) { - if (_streamConfigIndexToStreamMetadataProvider.containsKey(streamConfigIndex)) { + for (StreamConfig streamConfig : streamConfigs) { + int configId = IngestionConfigUtils.getConfigIdFromStreamConfig(streamConfig); + if (_streamConfigIndexToStreamMetadataProvider.containsKey(configId)) { continue; } - StreamConfig streamConfig = null; StreamMetadataProvider streamMetadataProvider; try { - streamConfig = streamConfigs.get(streamConfigIndex); StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig); String clientId = IngestionConfigUtils.getTableTopicUniqueClientId(IngestionDelayTracker.class.getSimpleName(), streamConfig); @@ -216,7 +215,7 @@ private void createOrUpdateStreamMetadataProvider() { } assert streamMetadataProvider != null; - _streamConfigIndexToStreamMetadataProvider.put(streamConfigIndex, streamMetadataProvider); + _streamConfigIndexToStreamMetadataProvider.put(configId, streamMetadataProvider); if ((streamMetadataProvider.supportsOffsetLag()) && (_partitionIdToLatestOffset == null)) { _partitionIdToLatestOffset = new ConcurrentHashMap<>(); @@ -268,37 +267,37 @@ private void trackIngestionDelay() { @VisibleForTesting void updateLatestStreamOffset(Set partitionsHosted) { - Map> streamIndexToStreamPartitionIds = + Map> configIdToStreamPartitionIds = IngestionConfigUtils.getStreamConfigIndexToStreamPartitions(partitionsHosted); - if (streamIndexToStreamPartitionIds.size() > _streamConfigIndexToStreamMetadataProvider.size()) { + if (configIdToStreamPartitionIds.size() > _streamConfigIndexToStreamMetadataProvider.size()) { // There might be a new stream config added or need to retry creation of streamMetadataProvider for a stream // which might have failed before. createOrUpdateStreamMetadataProvider(); } for (Map.Entry entry : _streamConfigIndexToStreamMetadataProvider.entrySet()) { - int streamIndex = entry.getKey(); + int configId = entry.getKey(); StreamMetadataProvider streamMetadataProvider = entry.getValue(); - if (!streamIndexToStreamPartitionIds.containsKey(streamIndex)) { + if (!configIdToStreamPartitionIds.containsKey(configId)) { // Server is not hosting any partitions of this stream. continue; } if (streamMetadataProvider.supportsOffsetLag()) { - Set streamPartitionIds = streamIndexToStreamPartitionIds.get(streamIndex); + Set streamPartitionIds = configIdToStreamPartitionIds.get(configId); try { Map streamPartitionIdToLatestOffset = streamMetadataProvider.fetchLatestStreamOffset(streamPartitionIds, LATEST_STREAM_OFFSET_FETCH_TIMEOUT_MS); - if (streamIndex > 0) { + if (configId > 0) { // Need to convert stream partition Ids back to pinot partition Ids. for (Map.Entry latestOffsetEntry : streamPartitionIdToLatestOffset.entrySet()) { _partitionIdToLatestOffset.put( - IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(latestOffsetEntry.getKey(), - streamIndex), latestOffsetEntry.getValue()); + IngestionConfigUtils.getPinotPartitionIdFromConfigId(latestOffsetEntry.getKey(), configId), + latestOffsetEntry.getValue()); } } else { _partitionIdToLatestOffset.putAll(streamPartitionIdToLatestOffset); @@ -371,8 +370,8 @@ void setClock(Clock clock) { @VisibleForTesting void createMetrics(int partitionId) { - int streamConfigIndex = IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(partitionId); - StreamMetadataProvider streamMetadataProvider = _streamConfigIndexToStreamMetadataProvider.get(streamConfigIndex); + int configId = IngestionConfigUtils.getConfigIdFromPinotPartitionId(partitionId); + StreamMetadataProvider streamMetadataProvider = _streamConfigIndexToStreamMetadataProvider.get(configId); if (streamMetadataProvider != null && streamMetadataProvider.supportsOffsetLag()) { _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId, ServerGauge.REALTIME_INGESTION_OFFSET_LAG, @@ -395,9 +394,9 @@ void createMetrics(int partitionId) { } private void removeMetrics(int partitionId) { - int streamConfigIndex = IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(partitionId); + int configId = IngestionConfigUtils.getConfigIdFromPinotPartitionId(partitionId); StreamMetadataProvider streamMetadataProvider = - _streamConfigIndexToStreamMetadataProvider.get(streamConfigIndex); + _streamConfigIndexToStreamMetadataProvider.get(configId); // Remove all metrics associated with this partition if (streamMetadataProvider != null && streamMetadataProvider.supportsOffsetLag()) { _serverMetrics.removePartitionGauge(_metricName, partitionId, ServerGauge.REALTIME_INGESTION_OFFSET_LAG); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index 6246ff58570a..9db1e00226e2 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -1797,10 +1797,10 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf } else { // Multiple streams _streamPartitionId = IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(_partitionGroupId); - int index = IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(_partitionGroupId); - Preconditions.checkState(numStreams > index, "Cannot find stream config of index: %s for table: %s", index, - _tableNameWithType); - _streamConfig = new StreamConfig(_tableNameWithType, streamConfigMaps.get(index)); + int configId = llcSegmentName.getStreamConfigId(); + Map streamConfigMap = tableConfig.getIngestionConfig().getStreamIngestionConfig() + .getStreamConfigMapByConfigId(configId); + _streamConfig = new StreamConfig(_tableNameWithType, streamConfigMap); } _streamConsumerFactory = StreamConsumerFactoryProvider.create(_streamConfig); _streamPartitionMsgOffsetFactory = _streamConsumerFactory.createStreamMsgOffsetFactory(); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java index edf6d6ccdd46..46492342e266 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java @@ -127,11 +127,10 @@ private Boolean fetchMultipleStreams() PartitionGroupMetadataFetcher.class.getSimpleName() + "-" + streamConfig.getTableNameWithType() + "-" + topicName; StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig); - int index = i; + int configId = IngestionConfigUtils.getConfigIdFromStreamConfig(streamConfig); List topicPartitionGroupConsumptionStatusList = _partitionGroupConsumptionStatusList.stream() - .filter(partitionGroupConsumptionStatus -> IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId( - partitionGroupConsumptionStatus.getPartitionGroupId()) == index) + .filter(s -> IngestionConfigUtils.getConfigIdFromPinotPartitionId(s.getPartitionGroupId()) == configId) .collect(Collectors.toList()); try (StreamMetadataProvider streamMetadataProvider = streamConsumerFactory.createStreamMetadataProvider( StreamConsumerFactory.getUniqueClientId(clientId))) { @@ -142,8 +141,8 @@ private Boolean fetchMultipleStreams() _forceGetOffsetFromStream) .stream() .map(metadata -> new PartitionGroupMetadata( - IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(metadata.getPartitionGroupId(), - index), metadata.getStartOffset(), metadata.getSequenceNumber())) + IngestionConfigUtils.getPinotPartitionIdFromConfigId(metadata.getPartitionGroupId(), configId), + metadata.getStartOffset(), metadata.getSequenceNumber())) .collect(Collectors.toList()); _streamMetadataList.add( new StreamMetadata(streamConfig, partitionGroupMetadataList.size(), partitionGroupMetadataList)); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java index 446d44047c27..ed9a5dab2dca 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java @@ -33,9 +33,11 @@ import org.apache.pinot.spi.config.table.ingestion.AggregationConfig; import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig; import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; +import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties; import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.stream.StreamConfigProperties; import org.apache.pinot.spi.stream.StreamConsumerFactory; @@ -137,9 +139,23 @@ public static int getStreamPartitionIdFromPinotPartitionId(int partitionId) { } /// Returns the StreamConfig for the given Pinot segment partition id. + /// For multi-topic tables, uses the stable config ID stored in the stream config rather than positional index. public static StreamConfig getStreamConfigFromPinotPartitionId(List streamConfigs, int partitionId) { - return streamConfigs.size() > 1 ? streamConfigs.get(getStreamConfigIndexFromPinotPartitionId(partitionId)) - : streamConfigs.get(0); + if (streamConfigs.size() == 1) { + return streamConfigs.get(0); + } + int configId = getConfigIdFromPinotPartitionId(partitionId); + return streamConfigs.stream() + .filter(sc -> configId == getConfigIdFromStreamConfig(sc)) + .findFirst() + .orElseThrow(() -> new IllegalStateException( + "Cannot find stream config with config ID: " + configId + " for partition: " + partitionId)); + } + + /// Returns the config ID stored in a StreamConfig, or 0 if not set (single-topic backward compat). + public static int getConfigIdFromStreamConfig(StreamConfig streamConfig) { + String idStr = streamConfig.getStreamConfigsMap().get(StreamConfigProperties.STREAM_CONFIG_ID); + return idStr != null ? Integer.parseInt(idStr) : 0; } /// Returns the index of the StreamConfigs from the Pinot segment partition id. @@ -150,23 +166,21 @@ public static int getStreamConfigIndexFromPinotPartitionId(int partitionId) { } /** - * Fetches the streamConfig from the list of streamConfigs according to the partition id. + * Fetches the streamConfig map from the table config for the given Pinot partition id. + * For multi-topic tables, uses the stable config ID to look up the correct stream config. */ public static Map getStreamConfigMap(TableConfig tableConfig, int partitionId) { List> streamConfigMaps = getStreamConfigMaps(tableConfig); int numStreams = streamConfigMaps.size(); if (numStreams == 1) { - // Single stream - // NOTE: We skip partition id translation logic to handle cases where custom stream might return partition id - // larger than 10000. + // Single stream — skip partition id translation to handle custom streams with large partition ids. return streamConfigMaps.get(0); - } else { - // Multiple streams - int index = getStreamConfigIndexFromPinotPartitionId(partitionId); - Preconditions.checkState(numStreams > index, "Cannot find stream config of index: %s for table: %s", index, - tableConfig.getTableName()); - return streamConfigMaps.get(index); } + // Multi-stream: look up by stable config ID rather than positional index. + int configId = getConfigIdFromPinotPartitionId(partitionId); + StreamIngestionConfig streamIngestionConfig = + tableConfig.getIngestionConfig().getStreamIngestionConfig(); + return streamIngestionConfig.getStreamConfigMapByConfigId(configId); } public static List getAggregationConfigs(TableConfig tableConfig) { From 29fe99318b8a8ee58bb9d3079285dd50329fbd96 Mon Sep 17 00:00:00 2001 From: Rekha Seethamraju Date: Thu, 18 Jun 2026 10:27:29 -0700 Subject: [PATCH 08/10] Use segment name directly for stream config lookup instead of table config scan Now that LLCSegmentName.getStreamConfigId() provides the stable config ID directly from the segment name, callers no longer need to linear-scan stream configs or call through the table config. Add IngestionConfigUtils.buildConfigIdToStreamConfigMap() to build a O(1) lookup map once per call site, then use it with the config ID from the segment name. Eliminates all remaining calls to getStreamConfigFromPinotPartitionId() in hot paths. --- .../PinotLLCRealtimeSegmentManager.java | 18 ++++++++++++------ .../pinot/spi/utils/IngestionConfigUtils.java | 10 ++++++++++ 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 805c75b16e94..b8780b7577bf 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -316,6 +316,8 @@ public List getPartitionGroupConsumptionStatusL } } else { // Multiple streams + Map configIdToStreamConfig = + IngestionConfigUtils.buildConfigIdToStreamConfigMap(streamConfigs); Map offsetFactoriesByConfigId = new HashMap<>(); for (Map.Entry entry : partitionGroupIdToLatestSegment.entrySet()) { int partitionGroupId = entry.getKey(); @@ -325,7 +327,7 @@ public List getPartitionGroupConsumptionStatusL SegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(tableNameWithType, llcSegmentName.getSegmentName()); StreamPartitionMsgOffsetFactory offsetFactory = offsetFactoriesByConfigId.computeIfAbsent(configId, k -> StreamConsumerFactoryProvider.create( - IngestionConfigUtils.getStreamConfigFromPinotPartitionId(streamConfigs, partitionGroupId)) + configIdToStreamConfig.get(k)) .createStreamMsgOffsetFactory()); PartitionGroupConsumptionStatus partitionGroupConsumptionStatus = new PartitionGroupConsumptionStatus(partitionGroupId, streamPartitionId, llcSegmentName.getSequenceNumber(), @@ -861,8 +863,9 @@ private String createNewSegmentMetadata(TableConfig tableConfig, LLCSegmentName newLLCSegment = getNextLLCSegmentName(committingLLCSegment, newSegmentCreationTimeMs, streamConfigs.size() > 1); - StreamConfig streamConfig = - IngestionConfigUtils.getStreamConfigFromPinotPartitionId(streamConfigs, committingSegmentPartitionGroupId); + Map configIdToStreamConfig = + IngestionConfigUtils.buildConfigIdToStreamConfigMap(streamConfigs); + StreamConfig streamConfig = configIdToStreamConfig.get(committingLLCSegment.getStreamConfigId()); createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, newSegmentCreationTimeMs, committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, partitionIds.size(), numReplicas); @@ -1778,6 +1781,8 @@ IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, List> instanceStatesMap = idealState.getRecord().getMapFields(); StreamPartitionMsgOffsetFactory offsetFactory = StreamConsumerFactoryProvider.create(streamConfigs.get(0)).createStreamMsgOffsetFactory(); + Map configIdToStreamConfig = + IngestionConfigUtils.buildConfigIdToStreamConfigMap(streamConfigs); // Get the latest segment ZK metadata for each partition Map latestSegmentZKMetadataMap = getLatestSegmentZKMetadataMap(realtimeTableName); @@ -1828,8 +1833,7 @@ IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, List instanceStateMap = instanceStatesMap.get(latestSegmentName); if (instanceStateMap != null) { @@ -2061,6 +2065,8 @@ List buildPartitionGroupConsumptionStatusFromZK zkMetadata.getStatus().toString())); } } else { + Map configIdToStreamConfig = + IngestionConfigUtils.buildConfigIdToStreamConfigMap(streamConfigs); Map offsetFactoriesByConfigId = new HashMap<>(); for (Map.Entry entry : latestSegmentZKMetadataMap.entrySet()) { int partitionGroupId = entry.getKey(); @@ -2070,7 +2076,7 @@ List buildPartitionGroupConsumptionStatusFromZK int configId = llcSegmentName.getStreamConfigId(); StreamPartitionMsgOffsetFactory offsetFactory = offsetFactoriesByConfigId.computeIfAbsent(configId, k -> StreamConsumerFactoryProvider.create( - IngestionConfigUtils.getStreamConfigFromPinotPartitionId(streamConfigs, partitionGroupId)) + configIdToStreamConfig.get(k)) .createStreamMsgOffsetFactory()); result.add(new PartitionGroupConsumptionStatus(partitionGroupId, streamPartitionId, llcSegmentName.getSequenceNumber(), diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java index ed9a5dab2dca..f69ea9ebe125 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java @@ -385,4 +385,14 @@ public static int getPinotPartitionIdFromConfigId(int streamPartitionId, int con public static int getConfigIdFromPinotPartitionId(int partitionId) { return partitionId / PARTITION_PADDING_OFFSET; } + + /// Builds a map from stable config ID to StreamConfig for O(1) lookup by config ID. + /// For single-topic tables, returns a singleton map with key 0. + public static Map buildConfigIdToStreamConfigMap(List streamConfigs) { + Map result = new HashMap<>(streamConfigs.size()); + for (StreamConfig streamConfig : streamConfigs) { + result.put(getConfigIdFromStreamConfig(streamConfig), streamConfig); + } + return result; + } } From 1d946cd03e83da5195e5cdc9d127a544e7ade1a3 Mon Sep 17 00:00:00 2001 From: Rekha Seethamraju Date: Thu, 18 Jun 2026 10:47:29 -0700 Subject: [PATCH 09/10] Read config ID from segment name in IngestionDelayTracker, not from arithmetic getHostedPartitionsGroupIds() now returns Map populated from LLCSegmentName.getStreamConfigId(), eliminating the partitionId / 10000 arithmetic in createMetrics() and removeMetrics(). _partitionsHostedByThisServer changed from Map to Map to store the config ID alongside the partition ID. --- .../realtime/IngestionDelayTracker.java | 19 +++++++++++-------- .../realtime/RealtimeTableDataManager.java | 10 +++++----- .../realtime/IngestionDelayTrackerTest.java | 4 ++-- 3 files changed, 18 insertions(+), 15 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java index 724bed0ea868..8eabddd24e58 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java @@ -154,7 +154,8 @@ void update(long ingestionTimeMs, @Nullable StreamPartitionMsgOffset currentOffs private Clock _clock = Clock.systemUTC(); protected volatile Map _partitionIdToLatestOffset; - protected volatile ConcurrentHashMap _partitionsHostedByThisServer = new ConcurrentHashMap<>(); + // Maps partitionGroupId -> streamConfigId, populated from LLCSegmentName so no arithmetic needed. + protected volatile ConcurrentHashMap _partitionsHostedByThisServer = new ConcurrentHashMap<>(); // Map of StreamMetadataProvider to fetch upstream latest stream offset (Table can have multiple upstream topics) // This map is accessed by: // 1. _ingestionDelayTrackingScheduler thread. @@ -370,7 +371,8 @@ void setClock(Clock clock) { @VisibleForTesting void createMetrics(int partitionId) { - int configId = IngestionConfigUtils.getConfigIdFromPinotPartitionId(partitionId); + int configId = _partitionsHostedByThisServer.getOrDefault(partitionId, + IngestionConfigUtils.getConfigIdFromPinotPartitionId(partitionId)); StreamMetadataProvider streamMetadataProvider = _streamConfigIndexToStreamMetadataProvider.get(configId); if (streamMetadataProvider != null && streamMetadataProvider.supportsOffsetLag()) { @@ -394,7 +396,8 @@ void createMetrics(int partitionId) { } private void removeMetrics(int partitionId) { - int configId = IngestionConfigUtils.getConfigIdFromPinotPartitionId(partitionId); + int configId = _partitionsHostedByThisServer.getOrDefault(partitionId, + IngestionConfigUtils.getConfigIdFromPinotPartitionId(partitionId)); StreamMetadataProvider streamMetadataProvider = _streamConfigIndexToStreamMetadataProvider.get(configId); // Remove all metrics associated with this partition @@ -507,7 +510,11 @@ public void timeoutInactivePartitions() { } Set partitionsHostedByThisServer; try { - partitionsHostedByThisServer = _realTimeTableDataManager.getHostedPartitionsGroupIds(); + Map partitionGroupIdToConfigId = + _realTimeTableDataManager.getHostedPartitionsGroupIds(); + partitionsHostedByThisServer = partitionGroupIdToConfigId.keySet(); + ConcurrentHashMap newMap = new ConcurrentHashMap<>(partitionGroupIdToConfigId); + _partitionsHostedByThisServer = newMap; } catch (Exception e) { LOGGER.error("Failed to get partitions hosted by this server, table={}, exception={}:{}", _tableNameWithType, e.getClass(), e.getMessage()); @@ -519,10 +526,6 @@ public void timeoutInactivePartitions() { removePartitionId(partitionId); } } - - ConcurrentHashMap newMap = new ConcurrentHashMap<>(); - partitionsHostedByThisServer.forEach(p -> newMap.put(p, true)); - _partitionsHostedByThisServer = newMap; } /** diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index 6cdfd08b2e19..0f27a21334b3 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -29,7 +29,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collections; -import java.util.HashSet; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -408,15 +408,15 @@ public StreamMetadataProvider getStreamMetadataProvider(RealtimeSegmentDataManag * Returns all partitionGroupIds for the partitions hosted by this server for current table. * @apiNote this involves Zookeeper read and should not be used frequently due to efficiency concerns. */ - public Set getHostedPartitionsGroupIds() { - Set partitionsHostedByThisServer = new HashSet<>(); + public Map getHostedPartitionsGroupIds() { + Map partitionGroupIdToConfigId = new HashMap<>(); List segments = TableStateUtils.getSegmentsInGivenStateForThisInstance(_helixManager, _tableNameWithType, CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING); for (String segmentNameStr : segments) { LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr); - partitionsHostedByThisServer.add(segmentName.getPartitionGroupId()); + partitionGroupIdToConfigId.put(segmentName.getPartitionGroupId(), segmentName.getStreamConfigId()); } - return partitionsHostedByThisServer; + return partitionGroupIdToConfigId; } public RealtimeSegmentStatsHistory getStatsHistory() { diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java index 80d5444e3ed9..564485ebf9bc 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java @@ -501,8 +501,8 @@ public void testIngestionDelay() { final String segment0 = new LLCSegmentName(RAW_TABLE_NAME, partition0, 0, 123).getSegmentName(); final int partition1 = 1; - ingestionDelayTracker._partitionsHostedByThisServer.put(partition0, true); - ingestionDelayTracker._partitionsHostedByThisServer.put(partition1, true); + ingestionDelayTracker._partitionsHostedByThisServer.put(partition0, 0); + ingestionDelayTracker._partitionsHostedByThisServer.put(partition1, 0); ingestionDelayTracker.updateMetrics(segment0, partition0, System.currentTimeMillis(), System.currentTimeMillis(), new LongMsgOffset(50)); From d602e2659d19d8ccbf5476b992473e33f4419618 Mon Sep 17 00:00:00 2001 From: Rekha Seethamraju Date: Mon, 22 Jun 2026 10:41:17 -0700 Subject: [PATCH 10/10] Fix test failures from ensureStreamConfigIds mutation of shared map references IngestionDelayTrackerTest: set distinct stream.config.id on each stream config so createOrUpdateStreamMetadataProvider creates separate providers. TableConfigUtilsTest: use independent map copies for multi-stream-config tests to prevent ensureStreamConfigIds from poisoning subsequent test cases via shared mutable map references. --- .../realtime/IngestionDelayTrackerTest.java | 9 ++++++-- .../local/utils/TableConfigUtilsTest.java | 22 ++++++++++++++----- 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java index 564485ebf9bc..f096e83755e2 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java @@ -42,6 +42,7 @@ import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig; import org.apache.pinot.spi.stream.LongMsgOffset; +import org.apache.pinot.spi.stream.StreamConfigProperties; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; import org.apache.pinot.spi.utils.IngestionConfigUtils; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; @@ -468,8 +469,12 @@ public void testUpdateLatestStreamOffset() { IngestionConfig ingestionConfig = new IngestionConfig(); List> streamConfigMaps = new ArrayList<>(); - streamConfigMaps.add(getStreamConfigs()); - streamConfigMaps.add(getStreamConfigs()); + Map streamConfig0 = getStreamConfigs(); + streamConfig0.put(StreamConfigProperties.STREAM_CONFIG_ID, "0"); + Map streamConfig1 = getStreamConfigs(); + streamConfig1.put(StreamConfigProperties.STREAM_CONFIG_ID, "1"); + streamConfigMaps.add(streamConfig0); + streamConfigMaps.add(streamConfig1); StreamIngestionConfig streamIngestionConfig = new StreamIngestionConfig(streamConfigMaps); ingestionConfig.setStreamIngestionConfig(streamIngestionConfig); TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME) diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java index fc747069b189..4a27f17616ba 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java @@ -929,8 +929,12 @@ public void ingestionStreamConfigsTest() { TableConfigUtils.validateStreamConfig(new StreamConfig("test", streamConfigs)); // Test for multiple stream configs with pauseless consumption enabled - should fail + Map pauselessStreamConfigs1 = new HashMap<>(streamConfigs); + pauselessStreamConfigs1.remove(StreamConfigProperties.STREAM_CONFIG_ID); + Map pauselessStreamConfigs2 = new HashMap<>(streamConfigs); + pauselessStreamConfigs2.remove(StreamConfigProperties.STREAM_CONFIG_ID); StreamIngestionConfig streamIngestionConfigWithPauseless = - new StreamIngestionConfig(Arrays.asList(streamConfigs, streamConfigs)); + new StreamIngestionConfig(Arrays.asList(pauselessStreamConfigs1, pauselessStreamConfigs2)); streamIngestionConfigWithPauseless.setPauselessConsumptionEnabled(true); IngestionConfig ingestionConfigWithPauseless = new IngestionConfig(); @@ -951,8 +955,13 @@ public void ingestionStreamConfigsTest() { } // Test for multiple stream configs with pauseless consumption disabled - should pass + // Use fresh copies because ensureStreamConfigIds mutates the map (adds stream.config.id) + Map dupStreamConfigs1 = new HashMap<>(streamConfigs); + dupStreamConfigs1.remove(StreamConfigProperties.STREAM_CONFIG_ID); + Map dupStreamConfigs2 = new HashMap<>(streamConfigs); + dupStreamConfigs2.remove(StreamConfigProperties.STREAM_CONFIG_ID); StreamIngestionConfig streamIngestionConfigWithoutPauseless = - new StreamIngestionConfig(Arrays.asList(streamConfigs, streamConfigs)); + new StreamIngestionConfig(Arrays.asList(dupStreamConfigs1, dupStreamConfigs2)); streamIngestionConfigWithoutPauseless.setPauselessConsumptionEnabled(false); IngestionConfig ingestionConfigWithoutPauseless = new IngestionConfig(); @@ -973,10 +982,13 @@ public void ingestionStreamConfigsTest() { } // Test for multiple stream configs with pauseless consumption disabled and unique topic names - should pass - Map anotherStreamConfig = new HashMap<>(streamConfigs); - anotherStreamConfig.put("stream.kafka.topic.name", "myTopic2"); + Map uniqueStreamConfig1 = new HashMap<>(streamConfigs); + uniqueStreamConfig1.remove(StreamConfigProperties.STREAM_CONFIG_ID); + Map uniqueStreamConfig2 = new HashMap<>(streamConfigs); + uniqueStreamConfig2.remove(StreamConfigProperties.STREAM_CONFIG_ID); + uniqueStreamConfig2.put("stream.kafka.topic.name", "myTopic2"); StreamIngestionConfig streamIngestionConfigWithoutPauselessUniqueTopics = - new StreamIngestionConfig(Arrays.asList(streamConfigs, anotherStreamConfig)); + new StreamIngestionConfig(Arrays.asList(uniqueStreamConfig1, uniqueStreamConfig2)); streamIngestionConfigWithoutPauselessUniqueTopics.setPauselessConsumptionEnabled(false); IngestionConfig ingestionConfigWithoutPauselessUniqueTopics = new IngestionConfig();