Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kafbat.ui.service;

import java.time.Duration;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
Expand All @@ -12,6 +13,8 @@
@Slf4j
public class ClustersStatisticsScheduler {

private static final Duration STATS_UPDATE_TIMEOUT = Duration.ofSeconds(90);

private final ClustersStorage clustersStorage;

private final StatisticsService statisticsService;
Expand All @@ -27,6 +30,6 @@ public void updateStatistics() {
.doOnSuccess(m -> log.debug("Metrics updated for cluster: {}", cluster.getName()));
})
.then()
.block();
.block(STATS_UPDATE_TIMEOUT);
}
}
36 changes: 18 additions & 18 deletions api/src/main/java/io/kafbat/ui/service/StatisticsCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,37 +25,37 @@ public StatisticsCache(ClustersStorage clustersStorage) {
clustersStorage.getKafkaClusters().forEach(c -> cache.put(c.getName(), initializing));
}

public synchronized void replace(KafkaCluster c, Statistics stats) {
public void replace(KafkaCluster c, Statistics stats) {
cache.put(c.getName(), stats);
}

public synchronized void update(KafkaCluster c,
Map<String, TopicDescription> descriptions,
Map<String, List<ConfigEntry>> configs,
InternalPartitionsOffsets partitionsOffsets,
ClustersProperties clustersProperties) {
var stats = get(c);
replace(
c,
stats.withClusterState(s ->
public void update(KafkaCluster c,
Map<String, TopicDescription> descriptions,
Map<String, List<ConfigEntry>> configs,
InternalPartitionsOffsets partitionsOffsets,
ClustersProperties clustersProperties) {
var oldStats = get(c);
cache.put(c.getName(),
oldStats.withClusterState(s ->
s.updateTopics(descriptions, configs, partitionsOffsets, clustersProperties)
)
);
try {
if (!stats.getStatus().equals(ServerStatusDTO.INITIALIZING)) {
stats.close();
if (!oldStats.getStatus().equals(ServerStatusDTO.INITIALIZING)) {
oldStats.close();
}
} catch (Exception e) {
log.error("Error closing cluster {} stats", c.getName(), e);
}
}

public synchronized void onTopicDelete(KafkaCluster c, String topic) {
var stats = get(c);
replace(
c,
stats.withClusterState(s -> s.topicDeleted(topic))
);
public void onTopicDelete(KafkaCluster c, String topic) {
cache.compute(c.getName(), (name, stats) -> {
if (stats == null) {
return null;
}
return stats.withClusterState(s -> s.topicDeleted(topic));
});
}

public Statistics get(KafkaCluster c) {
Expand Down
4 changes: 4 additions & 0 deletions api/src/main/java/io/kafbat/ui/service/TopicsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.kafbat.ui.model.PartitionsIncreaseDTO;
import io.kafbat.ui.model.PartitionsIncreaseResponseDTO;
import io.kafbat.ui.model.ReplicationFactorChangeDTO;
import io.kafbat.ui.model.ServerStatusDTO;
import io.kafbat.ui.model.ReplicationFactorChangeResponseDTO;
import io.kafbat.ui.model.Statistics;
import io.kafbat.ui.model.TopicCreationDTO;
Expand Down Expand Up @@ -469,6 +470,9 @@ public Mono<InternalTopic> cloneTopic(

public Mono<List<InternalTopic>> getTopics(KafkaCluster cluster, String search, Boolean showInternal, Boolean fts) {
Statistics stats = statisticsCache.get(cluster);
if (stats.getStatus() == ServerStatusDTO.INITIALIZING || stats.getStatus() == ServerStatusDTO.OFFLINE) {
return Mono.just(List.of());
}
ScrapedClusterState clusterState = stats.getClusterState();
boolean useFts = clustersProperties.getFts().use(fts);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static io.kafbat.ui.model.InternalLogDirStats.SegmentStats;
import static io.kafbat.ui.service.ReactiveAdminClient.ClusterDescription;

import com.google.common.collect.ImmutableTable;
import com.google.common.collect.Table;
import io.kafbat.ui.config.ClustersProperties;
import io.kafbat.ui.model.InternalLogDirStats;
Expand All @@ -14,6 +15,7 @@
import io.kafbat.ui.service.index.LuceneTopicsIndex;
import io.kafbat.ui.service.index.TopicsIndex;
import jakarta.annotation.Nullable;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -125,20 +127,46 @@ public ScrapedClusterState topicDeleted(String topic) {
.build();
}

private static final Duration SCRAPE_CALL_TIMEOUT = Duration.ofSeconds(30);

public static Mono<ScrapedClusterState> scrape(ClusterDescription clusterDescription,
ReactiveAdminClient ac, ClustersProperties clustersProperties) {
return Mono.zip(
ac.describeLogDirs(clusterDescription.getNodes().stream().map(Node::id).toList())
.map(InternalLogDirStats::new),
ac.listConsumerGroups().map(l -> l.stream().map(ConsumerGroupListing::groupId).toList()),
ac.describeTopics(),
.map(InternalLogDirStats::new)
.timeout(SCRAPE_CALL_TIMEOUT)
.doOnError(e -> log.warn("Failed to describe log dirs", e))
.onErrorReturn(new InternalLogDirStats(Map.of())),
ac.listConsumerGroups().map(l -> l.stream().map(ConsumerGroupListing::groupId).toList())
.timeout(SCRAPE_CALL_TIMEOUT)
.doOnError(e -> log.warn("Failed to list consumer groups", e))
.onErrorReturn(List.of()),
ac.describeTopics()
.timeout(SCRAPE_CALL_TIMEOUT)
.doOnError(e -> log.warn("Failed to describe topics", e))
.onErrorReturn(Map.of()),
ac.getTopicsConfig()
.timeout(SCRAPE_CALL_TIMEOUT)
.doOnError(e -> log.warn("Failed to get topics config", e))
.onErrorReturn(Map.of())
).flatMap(phase1 ->
Mono.zip(
ac.listOffsets(phase1.getT3().values(), OffsetSpec.latest()),
ac.listOffsets(phase1.getT3().values(), OffsetSpec.earliest()),
ac.describeConsumerGroups(phase1.getT2()),
ac.listOffsets(phase1.getT3().values(), OffsetSpec.latest())
.timeout(SCRAPE_CALL_TIMEOUT)
.doOnError(e -> log.warn("Failed to list latest offsets", e))
.onErrorReturn(Map.of()),
ac.listOffsets(phase1.getT3().values(), OffsetSpec.earliest())
.timeout(SCRAPE_CALL_TIMEOUT)
.doOnError(e -> log.warn("Failed to list earliest offsets", e))
.onErrorReturn(Map.of()),
ac.describeConsumerGroups(phase1.getT2())
.timeout(SCRAPE_CALL_TIMEOUT)
.doOnError(e -> log.warn("Failed to describe consumer groups", e))
.onErrorReturn(Map.of()),
ac.listConsumerGroupOffsets(phase1.getT2(), null)
.timeout(SCRAPE_CALL_TIMEOUT)
.doOnError(e -> log.warn("Failed to list consumer group offsets", e))
.onErrorReturn(ImmutableTable.of())
).map(phase2 ->
create(
clusterDescription,
Expand Down
Loading