Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 23 additions & 17 deletions api/src/main/java/io/kafbat/ui/model/InternalClusterState.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.kafbat.ui.api.model.ControllerType;
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.Data;
Expand Down Expand Up @@ -54,23 +55,9 @@ public InternalClusterState(KafkaCluster cluster, Statistics statistics) {

features = statistics.getFeatures();

bytesInPerSec = statistics
.getMetrics()
.getIoRates()
.brokerBytesInPerSec()
.values()
.stream()
.reduce(BigDecimal::add)
.orElse(null);

bytesOutPerSec = statistics
.getMetrics()
.getIoRates()
.brokerBytesOutPerSec()
.values()
.stream()
.reduce(BigDecimal::add)
.orElse(null);
var ioRates = statistics.getMetrics().getIoRates();
bytesInPerSec = sumWithTopicFallback(ioRates.brokerBytesInPerSec(), ioRates.topicBytesInPerSec());
bytesOutPerSec = sumWithTopicFallback(ioRates.brokerBytesOutPerSec(), ioRates.topicBytesOutPerSec());

var partitionsStats = new PartitionsStats(statistics.topicDescriptions().toList());
onlinePartitionCount = partitionsStats.getOnlinePartitionCount();
Expand All @@ -82,6 +69,25 @@ public InternalClusterState(KafkaCluster cluster, Statistics statistics) {
controller = statistics.getController();
}

/**
* Aggregates a cluster-wide IO rate from the per-broker rates.
*
* <p>Some brokers do not expose the topic-less {@code BrokerTopicMetrics} aggregate over JMX
* (observed with Confluent {@code cp-kafka}), so the per-broker map ends up empty even though
* per-topic rates are scraped successfully. Without a fallback the cluster/broker throughput is
* reported as {@code null} ("0 bytes") while every topic still shows a non-zero rate. In that
* case we fall back to summing the per-topic rates, which by definition equals the all-topics
* broker aggregate (bytes in/out are additive across topics, counted once at the leader broker).
*/
@Nullable
static BigDecimal sumWithTopicFallback(Map<Integer, BigDecimal> brokerRates,
Map<String, BigDecimal> topicRates) {
return brokerRates.values().stream()
.reduce(BigDecimal::add)
.or(() -> topicRates.values().stream().reduce(BigDecimal::add))
.orElse(null);
}

@Nullable
private static Integer getActiveControllers(Statistics statistics) {
if (ControllerType.KRAFT == statistics.getController()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.kafbat.ui.model;

import static org.assertj.core.api.Assertions.assertThat;

import java.math.BigDecimal;
import java.util.Map;
import org.junit.jupiter.api.Test;

class InternalClusterStateTest {

@Test
void usesPerBrokerRatesWhenPresent() {
var result = InternalClusterState.sumWithTopicFallback(
Map.of(1, new BigDecimal("10"), 2, new BigDecimal("5")),
Map.of("ignored", new BigDecimal("999")));
assertThat(result).isEqualByComparingTo("15");
}

@Test
void fallsBackToTopicRatesWhenPerBrokerRatesEmpty() {
// brokers that don't expose the topic-less BrokerTopicMetrics aggregate (e.g. cp-kafka over JMX)
var result = InternalClusterState.sumWithTopicFallback(
Map.of(),
Map.of("topicA", new BigDecimal("3.0"), "topicB", new BigDecimal("4.5")));
assertThat(result).isEqualByComparingTo("7.5");
}

@Test
void returnsNullWhenNeitherSourceHasRates() {
assertThat(InternalClusterState.sumWithTopicFallback(Map.of(), Map.of())).isNull();
}
}
Loading