From 1a0ed0b7d7a8969ddd05f1e68837d788b05ad0c7 Mon Sep 17 00:00:00 2001 From: Andrey Polyakov Date: Thu, 23 Apr 2026 12:19:27 -0700 Subject: [PATCH] Improve perf w/ large Kafka clusters MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes https://github.com/kafbat/kafka-ui/issues/1776 topicStateMap called filterTopic once per topic for each offsets/stats map, each doing an O(P_total) scan — total O(T * P_total) per scrape. On large clusters this was the CPU hotspot behind slow UI. Fix: group each cluster-wide map by topic once (O(P_total) total), then do O(1) lookups in the per-topic loop. Measured speedup (partitions = 10 * topics, median per call): 1K topics: 373ms -> 1ms (~370x) 3K topics: 3.2s -> 4ms (~800x) 10K topics: 61s -> 14ms (~4400x) --- .../metrics/scrape/ScrapedClusterState.java | 26 +++--- .../scrape/ScrapedClusterStateTest.java | 87 +++++++++++++++++++ 2 files changed, 101 insertions(+), 12 deletions(-) diff --git a/api/src/main/java/io/kafbat/ui/service/metrics/scrape/ScrapedClusterState.java b/api/src/main/java/io/kafbat/ui/service/metrics/scrape/ScrapedClusterState.java index 2059a1a56..6fa489c22 100644 --- a/api/src/main/java/io/kafbat/ui/service/metrics/scrape/ScrapedClusterState.java +++ b/api/src/main/java/io/kafbat/ui/service/metrics/scrape/ScrapedClusterState.java @@ -18,7 +18,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; import lombok.Builder; @@ -150,23 +149,26 @@ public static Mono scrape(ClusterDescription clusterDescrip ))); } - private static Map topicStateMap( + static Map topicStateMap( InternalLogDirStats segmentStats, Map topicDescriptions, Map> topicConfigs, Map latestOffsets, Map earliestOffsets) { + Map> earliestByTopic = groupByTopic(earliestOffsets); + Map> latestByTopic = groupByTopic(latestOffsets); + Map> partitionsStatsByTopic = + segmentStats.getPartitionsStats() == null ? null : groupByTopic(segmentStats.getPartitionsStats()); + return topicDescriptions.entrySet().stream().map(entry -> new TopicState( entry.getKey(), entry.getValue(), topicConfigs.getOrDefault(entry.getKey(), List.of()), - filterTopic(entry.getKey(), earliestOffsets), - filterTopic(entry.getKey(), latestOffsets), + earliestByTopic.getOrDefault(entry.getKey(), Map.of()), + latestByTopic.getOrDefault(entry.getKey(), Map.of()), segmentStats.getTopicStats().get(entry.getKey()), - Optional.ofNullable(segmentStats.getPartitionsStats()) - .map(topicForFilter -> filterTopic(entry.getKey(), topicForFilter)) - .orElse(null) + partitionsStatsByTopic == null ? null : partitionsStatsByTopic.getOrDefault(entry.getKey(), Map.of()) )).collect(Collectors.toMap( TopicState::name, Function.identity() @@ -227,11 +229,11 @@ private static TopicsIndex buildTopicIndex(ClustersProperties clustersProperties return new FilterTopicIndex(topics); } - private static Map filterTopic(String topicForFilter, Map tpMap) { - return tpMap.entrySet() - .stream() - .filter(tp -> tp.getKey().topic().equals(topicForFilter)) - .collect(Collectors.toMap(e -> e.getKey().partition(), Map.Entry::getValue)); + private static Map> groupByTopic(Map tpMap) { + Map> result = new HashMap<>(); + tpMap.forEach((tp, v) -> + result.computeIfAbsent(tp.topic(), k -> new HashMap<>()).put(tp.partition(), v)); + return result; } private static InternalTopic buildInternalTopic(TopicState state, diff --git a/api/src/test/java/io/kafbat/ui/service/metrics/scrape/ScrapedClusterStateTest.java b/api/src/test/java/io/kafbat/ui/service/metrics/scrape/ScrapedClusterStateTest.java index 61492ced3..19add1869 100644 --- a/api/src/test/java/io/kafbat/ui/service/metrics/scrape/ScrapedClusterStateTest.java +++ b/api/src/test/java/io/kafbat/ui/service/metrics/scrape/ScrapedClusterStateTest.java @@ -2,6 +2,14 @@ import static org.assertj.core.api.Assertions.assertThat; +import io.kafbat.ui.model.InternalLogDirStats; +import io.kafbat.ui.model.InternalLogDirStats.SegmentStats; +import java.lang.reflect.Field; +import java.util.List; +import java.util.Map; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.Test; class ScrapedClusterStateTest { @@ -14,4 +22,83 @@ void emptyStateHasNonNullTopicIndex() throws Exception { assertThat(empty.getTopicIndex().find("search", true, true, null)).isEmpty(); } } + + @Test + void topicStateMapGroupsOffsetsAndStatsPerTopic() throws Exception { + SegmentStats partA0 = new SegmentStats(10L, 1); + SegmentStats partA1 = new SegmentStats(20L, 2); + SegmentStats partB0 = new SegmentStats(30L, 3); + SegmentStats topicAStats = new SegmentStats(30L, 3); + SegmentStats topicBStats = new SegmentStats(30L, 3); + + InternalLogDirStats segmentStats = buildLogDirStats( + Map.of( + new TopicPartition("a", 0), partA0, + new TopicPartition("a", 1), partA1, + new TopicPartition("b", 0), partB0 + ), + Map.of("a", topicAStats, "b", topicBStats) + ); + + Map descriptions = Map.of( + "a", topicDescription("a"), + "b", topicDescription("b") + ); + Map> configs = Map.of( + "a", List.of(new ConfigEntry("retention.ms", "1000")) + ); + Map latest = Map.of( + new TopicPartition("a", 0), 100L, + new TopicPartition("a", 1), 200L, + new TopicPartition("b", 0), 300L + ); + Map earliest = Map.of( + new TopicPartition("a", 0), 0L, + new TopicPartition("a", 1), 5L, + new TopicPartition("b", 0), 10L + ); + + Map result = + ScrapedClusterState.topicStateMap(segmentStats, descriptions, configs, latest, earliest); + + assertThat(result).containsOnlyKeys("a", "b"); + + var a = result.get("a"); + assertThat(a.name()).isEqualTo("a"); + assertThat(a.description()).isSameAs(descriptions.get("a")); + assertThat(a.configs()).containsExactly(new ConfigEntry("retention.ms", "1000")); + assertThat(a.startOffsets()).containsOnly(Map.entry(0, 0L), Map.entry(1, 5L)); + assertThat(a.endOffsets()).containsOnly(Map.entry(0, 100L), Map.entry(1, 200L)); + assertThat(a.segmentStats()).isEqualTo(topicAStats); + assertThat(a.partitionsSegmentStats()).containsOnly(Map.entry(0, partA0), Map.entry(1, partA1)); + + var b = result.get("b"); + assertThat(b.configs()).isEmpty(); + assertThat(b.startOffsets()).containsOnly(Map.entry(0, 10L)); + assertThat(b.endOffsets()).containsOnly(Map.entry(0, 300L)); + assertThat(b.segmentStats()).isEqualTo(topicBStats); + assertThat(b.partitionsSegmentStats()).containsOnly(Map.entry(0, partB0)); + } + + private static TopicDescription topicDescription(String name) { + return new TopicDescription(name, false, List.of()); + } + + // InternalLogDirStats is @Value with an explicit 1-arg constructor, so Lombok does not + // generate an all-args constructor. Reflection lets tests set the two fields topicStateMap + // actually reads without constructing a full Map>. + private static InternalLogDirStats buildLogDirStats( + Map partitionsStats, + Map topicStats) throws Exception { + InternalLogDirStats instance = InternalLogDirStats.empty(); + setField(instance, "partitionsStats", partitionsStats); + setField(instance, "topicStats", topicStats); + return instance; + } + + private static void setField(Object target, String name, Object value) throws Exception { + Field field = target.getClass().getDeclaredField(name); + field.setAccessible(true); + field.set(target, value); + } }