diff --git a/api/src/main/java/io/kafbat/ui/model/InternalClusterState.java b/api/src/main/java/io/kafbat/ui/model/InternalClusterState.java index dbc3cf2622..c91989261d 100644 --- a/api/src/main/java/io/kafbat/ui/model/InternalClusterState.java +++ b/api/src/main/java/io/kafbat/ui/model/InternalClusterState.java @@ -4,6 +4,7 @@ import io.kafbat.ui.api.model.ControllerType; import java.math.BigDecimal; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; import lombok.Data; @@ -54,23 +55,9 @@ public InternalClusterState(KafkaCluster cluster, Statistics statistics) { features = statistics.getFeatures(); - bytesInPerSec = statistics - .getMetrics() - .getIoRates() - .brokerBytesInPerSec() - .values() - .stream() - .reduce(BigDecimal::add) - .orElse(null); - - bytesOutPerSec = statistics - .getMetrics() - .getIoRates() - .brokerBytesOutPerSec() - .values() - .stream() - .reduce(BigDecimal::add) - .orElse(null); + var ioRates = statistics.getMetrics().getIoRates(); + bytesInPerSec = sumWithTopicFallback(ioRates.brokerBytesInPerSec(), ioRates.topicBytesInPerSec()); + bytesOutPerSec = sumWithTopicFallback(ioRates.brokerBytesOutPerSec(), ioRates.topicBytesOutPerSec()); var partitionsStats = new PartitionsStats(statistics.topicDescriptions().toList()); onlinePartitionCount = partitionsStats.getOnlinePartitionCount(); @@ -82,6 +69,25 @@ public InternalClusterState(KafkaCluster cluster, Statistics statistics) { controller = statistics.getController(); } + /** + * Aggregates a cluster-wide IO rate from the per-broker rates. + * + *

Some brokers do not expose the topic-less {@code BrokerTopicMetrics} aggregate over JMX + * (observed with Confluent {@code cp-kafka}), so the per-broker map ends up empty even though + * per-topic rates are scraped successfully. Without a fallback the cluster/broker throughput is + * reported as {@code null} ("0 bytes") while every topic still shows a non-zero rate. In that + * case we fall back to summing the per-topic rates, which by definition equals the all-topics + * broker aggregate (bytes in/out are additive across topics, counted once at the leader broker). + */ + @Nullable + static BigDecimal sumWithTopicFallback(Map brokerRates, + Map topicRates) { + return brokerRates.values().stream() + .reduce(BigDecimal::add) + .or(() -> topicRates.values().stream().reduce(BigDecimal::add)) + .orElse(null); + } + @Nullable private static Integer getActiveControllers(Statistics statistics) { if (ControllerType.KRAFT == statistics.getController()) { diff --git a/api/src/test/java/io/kafbat/ui/model/InternalClusterStateTest.java b/api/src/test/java/io/kafbat/ui/model/InternalClusterStateTest.java new file mode 100644 index 0000000000..24602ed98c --- /dev/null +++ b/api/src/test/java/io/kafbat/ui/model/InternalClusterStateTest.java @@ -0,0 +1,32 @@ +package io.kafbat.ui.model; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.math.BigDecimal; +import java.util.Map; +import org.junit.jupiter.api.Test; + +class InternalClusterStateTest { + + @Test + void usesPerBrokerRatesWhenPresent() { + var result = InternalClusterState.sumWithTopicFallback( + Map.of(1, new BigDecimal("10"), 2, new BigDecimal("5")), + Map.of("ignored", new BigDecimal("999"))); + assertThat(result).isEqualByComparingTo("15"); + } + + @Test + void fallsBackToTopicRatesWhenPerBrokerRatesEmpty() { + // brokers that don't expose the topic-less BrokerTopicMetrics aggregate (e.g. cp-kafka over JMX) + var result = InternalClusterState.sumWithTopicFallback( + Map.of(), + Map.of("topicA", new BigDecimal("3.0"), "topicB", new BigDecimal("4.5"))); + assertThat(result).isEqualByComparingTo("7.5"); + } + + @Test + void returnsNullWhenNeitherSourceHasRates() { + assertThat(InternalClusterState.sumWithTopicFallback(Map.of(), Map.of())).isNull(); + } +}