From 0fe3b41b850bd46f0eebc4dfb9fe02092b9008fc Mon Sep 17 00:00:00 2001 From: Roman Zabaluev Date: Thu, 7 May 2026 22:02:57 +0800 Subject: [PATCH] test --- .../service/ClustersStatisticsScheduler.java | 5 ++- .../io/kafbat/ui/service/StatisticsCache.java | 36 ++++++++--------- .../io/kafbat/ui/service/TopicsService.java | 4 ++ .../metrics/scrape/ScrapedClusterState.java | 40 ++++++++++++++++--- 4 files changed, 60 insertions(+), 25 deletions(-) diff --git a/api/src/main/java/io/kafbat/ui/service/ClustersStatisticsScheduler.java b/api/src/main/java/io/kafbat/ui/service/ClustersStatisticsScheduler.java index 0d5eb84d7..b2268e31a 100644 --- a/api/src/main/java/io/kafbat/ui/service/ClustersStatisticsScheduler.java +++ b/api/src/main/java/io/kafbat/ui/service/ClustersStatisticsScheduler.java @@ -1,5 +1,6 @@ package io.kafbat.ui.service; +import java.time.Duration; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Scheduled; @@ -12,6 +13,8 @@ @Slf4j public class ClustersStatisticsScheduler { + private static final Duration STATS_UPDATE_TIMEOUT = Duration.ofSeconds(90); + private final ClustersStorage clustersStorage; private final StatisticsService statisticsService; @@ -27,6 +30,6 @@ public void updateStatistics() { .doOnSuccess(m -> log.debug("Metrics updated for cluster: {}", cluster.getName())); }) .then() - .block(); + .block(STATS_UPDATE_TIMEOUT); } } diff --git a/api/src/main/java/io/kafbat/ui/service/StatisticsCache.java b/api/src/main/java/io/kafbat/ui/service/StatisticsCache.java index 0b8274959..b14425377 100644 --- a/api/src/main/java/io/kafbat/ui/service/StatisticsCache.java +++ b/api/src/main/java/io/kafbat/ui/service/StatisticsCache.java @@ -25,37 +25,37 @@ public StatisticsCache(ClustersStorage clustersStorage) { clustersStorage.getKafkaClusters().forEach(c -> cache.put(c.getName(), initializing)); } - public synchronized void replace(KafkaCluster c, Statistics stats) { + public void replace(KafkaCluster c, Statistics stats) { cache.put(c.getName(), stats); } - public synchronized void update(KafkaCluster c, - Map descriptions, - Map> configs, - InternalPartitionsOffsets partitionsOffsets, - ClustersProperties clustersProperties) { - var stats = get(c); - replace( - c, - stats.withClusterState(s -> + public void update(KafkaCluster c, + Map descriptions, + Map> configs, + InternalPartitionsOffsets partitionsOffsets, + ClustersProperties clustersProperties) { + var oldStats = get(c); + cache.put(c.getName(), + oldStats.withClusterState(s -> s.updateTopics(descriptions, configs, partitionsOffsets, clustersProperties) ) ); try { - if (!stats.getStatus().equals(ServerStatusDTO.INITIALIZING)) { - stats.close(); + if (!oldStats.getStatus().equals(ServerStatusDTO.INITIALIZING)) { + oldStats.close(); } } catch (Exception e) { log.error("Error closing cluster {} stats", c.getName(), e); } } - public synchronized void onTopicDelete(KafkaCluster c, String topic) { - var stats = get(c); - replace( - c, - stats.withClusterState(s -> s.topicDeleted(topic)) - ); + public void onTopicDelete(KafkaCluster c, String topic) { + cache.compute(c.getName(), (name, stats) -> { + if (stats == null) { + return null; + } + return stats.withClusterState(s -> s.topicDeleted(topic)); + }); } public Statistics get(KafkaCluster c) { diff --git a/api/src/main/java/io/kafbat/ui/service/TopicsService.java b/api/src/main/java/io/kafbat/ui/service/TopicsService.java index d9e800b05..a9599eed7 100644 --- a/api/src/main/java/io/kafbat/ui/service/TopicsService.java +++ b/api/src/main/java/io/kafbat/ui/service/TopicsService.java @@ -20,6 +20,7 @@ import io.kafbat.ui.model.PartitionsIncreaseDTO; import io.kafbat.ui.model.PartitionsIncreaseResponseDTO; import io.kafbat.ui.model.ReplicationFactorChangeDTO; +import io.kafbat.ui.model.ServerStatusDTO; import io.kafbat.ui.model.ReplicationFactorChangeResponseDTO; import io.kafbat.ui.model.Statistics; import io.kafbat.ui.model.TopicCreationDTO; @@ -469,6 +470,9 @@ public Mono cloneTopic( public Mono> getTopics(KafkaCluster cluster, String search, Boolean showInternal, Boolean fts) { Statistics stats = statisticsCache.get(cluster); + if (stats.getStatus() == ServerStatusDTO.INITIALIZING || stats.getStatus() == ServerStatusDTO.OFFLINE) { + return Mono.just(List.of()); + } ScrapedClusterState clusterState = stats.getClusterState(); boolean useFts = clustersProperties.getFts().use(fts); try { diff --git a/api/src/main/java/io/kafbat/ui/service/metrics/scrape/ScrapedClusterState.java b/api/src/main/java/io/kafbat/ui/service/metrics/scrape/ScrapedClusterState.java index 2059a1a56..b1fd12ed5 100644 --- a/api/src/main/java/io/kafbat/ui/service/metrics/scrape/ScrapedClusterState.java +++ b/api/src/main/java/io/kafbat/ui/service/metrics/scrape/ScrapedClusterState.java @@ -4,6 +4,7 @@ import static io.kafbat.ui.model.InternalLogDirStats.SegmentStats; import static io.kafbat.ui.service.ReactiveAdminClient.ClusterDescription; +import com.google.common.collect.ImmutableTable; import com.google.common.collect.Table; import io.kafbat.ui.config.ClustersProperties; import io.kafbat.ui.model.InternalLogDirStats; @@ -14,6 +15,7 @@ import io.kafbat.ui.service.index.LuceneTopicsIndex; import io.kafbat.ui.service.index.TopicsIndex; import jakarta.annotation.Nullable; +import java.time.Duration; import java.time.Instant; import java.util.HashMap; import java.util.List; @@ -125,20 +127,46 @@ public ScrapedClusterState topicDeleted(String topic) { .build(); } + private static final Duration SCRAPE_CALL_TIMEOUT = Duration.ofSeconds(30); + public static Mono scrape(ClusterDescription clusterDescription, ReactiveAdminClient ac, ClustersProperties clustersProperties) { return Mono.zip( ac.describeLogDirs(clusterDescription.getNodes().stream().map(Node::id).toList()) - .map(InternalLogDirStats::new), - ac.listConsumerGroups().map(l -> l.stream().map(ConsumerGroupListing::groupId).toList()), - ac.describeTopics(), + .map(InternalLogDirStats::new) + .timeout(SCRAPE_CALL_TIMEOUT) + .doOnError(e -> log.warn("Failed to describe log dirs", e)) + .onErrorReturn(new InternalLogDirStats(Map.of())), + ac.listConsumerGroups().map(l -> l.stream().map(ConsumerGroupListing::groupId).toList()) + .timeout(SCRAPE_CALL_TIMEOUT) + .doOnError(e -> log.warn("Failed to list consumer groups", e)) + .onErrorReturn(List.of()), + ac.describeTopics() + .timeout(SCRAPE_CALL_TIMEOUT) + .doOnError(e -> log.warn("Failed to describe topics", e)) + .onErrorReturn(Map.of()), ac.getTopicsConfig() + .timeout(SCRAPE_CALL_TIMEOUT) + .doOnError(e -> log.warn("Failed to get topics config", e)) + .onErrorReturn(Map.of()) ).flatMap(phase1 -> Mono.zip( - ac.listOffsets(phase1.getT3().values(), OffsetSpec.latest()), - ac.listOffsets(phase1.getT3().values(), OffsetSpec.earliest()), - ac.describeConsumerGroups(phase1.getT2()), + ac.listOffsets(phase1.getT3().values(), OffsetSpec.latest()) + .timeout(SCRAPE_CALL_TIMEOUT) + .doOnError(e -> log.warn("Failed to list latest offsets", e)) + .onErrorReturn(Map.of()), + ac.listOffsets(phase1.getT3().values(), OffsetSpec.earliest()) + .timeout(SCRAPE_CALL_TIMEOUT) + .doOnError(e -> log.warn("Failed to list earliest offsets", e)) + .onErrorReturn(Map.of()), + ac.describeConsumerGroups(phase1.getT2()) + .timeout(SCRAPE_CALL_TIMEOUT) + .doOnError(e -> log.warn("Failed to describe consumer groups", e)) + .onErrorReturn(Map.of()), ac.listConsumerGroupOffsets(phase1.getT2(), null) + .timeout(SCRAPE_CALL_TIMEOUT) + .doOnError(e -> log.warn("Failed to list consumer group offsets", e)) + .onErrorReturn(ImmutableTable.of()) ).map(phase2 -> create( clusterDescription,