Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.Builder;
Expand Down Expand Up @@ -150,23 +149,26 @@ public static Mono<ScrapedClusterState> scrape(ClusterDescription clusterDescrip
)));
}

private static Map<String, TopicState> topicStateMap(
static Map<String, TopicState> topicStateMap(
InternalLogDirStats segmentStats,
Map<String, TopicDescription> topicDescriptions,
Map<String, List<ConfigEntry>> topicConfigs,
Map<TopicPartition, Long> latestOffsets,
Map<TopicPartition, Long> earliestOffsets) {

Map<String, Map<Integer, Long>> earliestByTopic = groupByTopic(earliestOffsets);
Map<String, Map<Integer, Long>> latestByTopic = groupByTopic(latestOffsets);
Map<String, Map<Integer, SegmentStats>> partitionsStatsByTopic =
segmentStats.getPartitionsStats() == null ? null : groupByTopic(segmentStats.getPartitionsStats());

return topicDescriptions.entrySet().stream().map(entry -> new TopicState(
entry.getKey(),
entry.getValue(),
topicConfigs.getOrDefault(entry.getKey(), List.of()),
filterTopic(entry.getKey(), earliestOffsets),
filterTopic(entry.getKey(), latestOffsets),
earliestByTopic.getOrDefault(entry.getKey(), Map.of()),
latestByTopic.getOrDefault(entry.getKey(), Map.of()),
segmentStats.getTopicStats().get(entry.getKey()),
Optional.ofNullable(segmentStats.getPartitionsStats())
.map(topicForFilter -> filterTopic(entry.getKey(), topicForFilter))
.orElse(null)
partitionsStatsByTopic == null ? null : partitionsStatsByTopic.getOrDefault(entry.getKey(), Map.of())
)).collect(Collectors.toMap(
TopicState::name,
Function.identity()
Expand Down Expand Up @@ -227,11 +229,11 @@ private static TopicsIndex buildTopicIndex(ClustersProperties clustersProperties
return new FilterTopicIndex(topics);
}

private static <T> Map<Integer, T> filterTopic(String topicForFilter, Map<TopicPartition, T> tpMap) {
return tpMap.entrySet()
.stream()
.filter(tp -> tp.getKey().topic().equals(topicForFilter))
.collect(Collectors.toMap(e -> e.getKey().partition(), Map.Entry::getValue));
private static <T> Map<String, Map<Integer, T>> groupByTopic(Map<TopicPartition, T> tpMap) {
Map<String, Map<Integer, T>> result = new HashMap<>();
tpMap.forEach((tp, v) ->
result.computeIfAbsent(tp.topic(), k -> new HashMap<>()).put(tp.partition(), v));
return result;
}

private static InternalTopic buildInternalTopic(TopicState state,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@

import static org.assertj.core.api.Assertions.assertThat;

import io.kafbat.ui.model.InternalLogDirStats;
import io.kafbat.ui.model.InternalLogDirStats.SegmentStats;
import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;

class ScrapedClusterStateTest {
Expand All @@ -14,4 +22,83 @@ void emptyStateHasNonNullTopicIndex() throws Exception {
assertThat(empty.getTopicIndex().find("search", true, true, null)).isEmpty();
}
}

@Test
void topicStateMapGroupsOffsetsAndStatsPerTopic() throws Exception {
SegmentStats partA0 = new SegmentStats(10L, 1);
SegmentStats partA1 = new SegmentStats(20L, 2);
SegmentStats partB0 = new SegmentStats(30L, 3);
SegmentStats topicAStats = new SegmentStats(30L, 3);
SegmentStats topicBStats = new SegmentStats(30L, 3);

InternalLogDirStats segmentStats = buildLogDirStats(
Map.of(
new TopicPartition("a", 0), partA0,
new TopicPartition("a", 1), partA1,
new TopicPartition("b", 0), partB0
),
Map.of("a", topicAStats, "b", topicBStats)
);

Map<String, TopicDescription> descriptions = Map.of(
"a", topicDescription("a"),
"b", topicDescription("b")
);
Map<String, List<ConfigEntry>> configs = Map.of(
"a", List.of(new ConfigEntry("retention.ms", "1000"))
);
Map<TopicPartition, Long> latest = Map.of(
new TopicPartition("a", 0), 100L,
new TopicPartition("a", 1), 200L,
new TopicPartition("b", 0), 300L
);
Map<TopicPartition, Long> earliest = Map.of(
new TopicPartition("a", 0), 0L,
new TopicPartition("a", 1), 5L,
new TopicPartition("b", 0), 10L
);

Map<String, ScrapedClusterState.TopicState> result =
ScrapedClusterState.topicStateMap(segmentStats, descriptions, configs, latest, earliest);

assertThat(result).containsOnlyKeys("a", "b");

var a = result.get("a");
assertThat(a.name()).isEqualTo("a");
assertThat(a.description()).isSameAs(descriptions.get("a"));
assertThat(a.configs()).containsExactly(new ConfigEntry("retention.ms", "1000"));
assertThat(a.startOffsets()).containsOnly(Map.entry(0, 0L), Map.entry(1, 5L));
assertThat(a.endOffsets()).containsOnly(Map.entry(0, 100L), Map.entry(1, 200L));
assertThat(a.segmentStats()).isEqualTo(topicAStats);
assertThat(a.partitionsSegmentStats()).containsOnly(Map.entry(0, partA0), Map.entry(1, partA1));

var b = result.get("b");
assertThat(b.configs()).isEmpty();
assertThat(b.startOffsets()).containsOnly(Map.entry(0, 10L));
assertThat(b.endOffsets()).containsOnly(Map.entry(0, 300L));
assertThat(b.segmentStats()).isEqualTo(topicBStats);
assertThat(b.partitionsSegmentStats()).containsOnly(Map.entry(0, partB0));
}

private static TopicDescription topicDescription(String name) {
return new TopicDescription(name, false, List.of());
}

// InternalLogDirStats is @Value with an explicit 1-arg constructor, so Lombok does not
// generate an all-args constructor. Reflection lets tests set the two fields topicStateMap
// actually reads without constructing a full Map<Integer, Map<String, LogDirDescription>>.
private static InternalLogDirStats buildLogDirStats(
Map<TopicPartition, SegmentStats> partitionsStats,
Map<String, SegmentStats> topicStats) throws Exception {
InternalLogDirStats instance = InternalLogDirStats.empty();
setField(instance, "partitionsStats", partitionsStats);
setField(instance, "topicStats", topicStats);
return instance;
}

private static void setField(Object target, String name, Object value) throws Exception {
Field field = target.getClass().getDeclaredField(name);
field.setAccessible(true);
field.set(target, value);
}
}
Loading