Skip to content

Commit

Permalink
remove partitionInfoList from TopicVo again and inject it where needed
Browse files Browse the repository at this point in the history
  • Loading branch information
Jork Zijlstra committed May 17, 2021
1 parent 04e5395 commit 9b99d8b
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 32 deletions.
10 changes: 0 additions & 10 deletions src/main/java/kafdrop/model/TopicVO.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ public final class TopicVO implements Comparable<TopicVO> {

private Map<Integer, TopicPartitionVO> partitions = new TreeMap<>();

private List<PartitionInfo> partitionInfoList = new ArrayList<>();

private Map<String, String> config = Collections.emptyMap();

public TopicVO(String name) {
Expand All @@ -56,14 +54,6 @@ public void setPartitions(Map<Integer, TopicPartitionVO> partitions) {
this.partitions = partitions;
}

public List<PartitionInfo> getPartitionInfoList() {
return this.partitionInfoList;
}

public void setPartitionInfoList(List<PartitionInfo> partitionInfoList) {
this.partitionInfoList = partitionInfoList;
}

public Optional<TopicPartitionVO> getPartition(int partitionId) {
return Optional.ofNullable(partitions.get(partitionId));
}
Expand Down
31 changes: 18 additions & 13 deletions src/main/java/kafdrop/service/KafkaHighLevelConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,17 @@ private void initializeClient() {
}
}

synchronized void setAllPartitionSizes(List<TopicVO> topics) {
synchronized void setAllPartitionSizes(Map<String, List<PartitionInfo>> topicsMap, List<TopicVO> topics) {
initializeClient();

Map<TopicVO, List<TopicPartition>> allTopics = topics.stream().map(topicVO -> {
List<TopicPartition> topicPartitions = topicVO.getPartitionInfoList().stream()
.map(partitionInfo -> {
return new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
}).collect(Collectors.toList());
List<TopicPartition> topicPartitions = topicsMap.get(topicVO.getName()).stream()
.map(partitionInfo -> {
return new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
}).collect(Collectors.toList());

return Pair.of(topicVO, topicPartitions);
}
return Pair.of(topicVO, topicPartitions);
}
).collect(Collectors.toMap(Pair::getKey, Pair::getValue));

List<TopicPartition> allTopicPartitions = allTopics.values().stream().flatMap(Collection::stream)
Expand Down Expand Up @@ -206,21 +206,27 @@ private static String deserialize(MessageDeserializer deserializer, byte[] bytes
return bytes != null ? deserializer.deserializeMessage(ByteBuffer.wrap(bytes)) : "empty";
}

synchronized Map<String, TopicVO> getTopicInfos(String[] topics) {
synchronized Map<String, List<PartitionInfo>> getAllTopic() {
initializeClient();

return kafkaConsumer.listTopics();
}

synchronized Map<String, TopicVO> getTopicInfos(Map<String, List<PartitionInfo>> allTopicsMap, String[] topics) {
initializeClient();

final Map<String, List<PartitionInfo>> topicsMap;
topicsMap = kafkaConsumer.listTopics();
// final Map<String, List<PartitionInfo>> topicsMap;
// topicsMap = kafkaConsumer.listTopics();

final var topicSet = topicsMap.keySet();
final var topicSet = allTopicsMap.keySet();
if (topics.length == 0) {
topics = Arrays.copyOf(topicSet.toArray(), topicSet.size(), String[].class);
}
final var topicVos = new HashMap<String, TopicVO>(topics.length, 1f);

for (var topic : topics) {
if (topicSet.contains(topic)) {
topicVos.put(topic, getTopicInfo(topic, topicsMap.get(topic)));
topicVos.put(topic, getTopicInfo(topic, allTopicsMap.get(topic)));
}
}

Expand All @@ -229,7 +235,6 @@ synchronized Map<String, TopicVO> getTopicInfos(String[] topics) {

private TopicVO getTopicInfo(String topic, List<PartitionInfo> partitionInfoList) {
final var topicVo = new TopicVO(topic);
topicVo.setPartitionInfoList(partitionInfoList);
final var partitions = new TreeMap<Integer, TopicPartitionVO>();

for (var partitionInfo : partitionInfoList) {
Expand Down
24 changes: 15 additions & 9 deletions src/main/java/kafdrop/service/KafkaMonitorImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public ClusterSummaryVO getClusterSummary(Collection<TopicVO> topics) {

@Override
public List<TopicVO> getTopics() {
final var topicVos = getTopicMetadata().values().stream()
final var topicVos = getTopicMetadata(highLevelConsumer.getAllTopic()).values().stream()
.sorted(Comparator.comparing(TopicVO::getName))
.collect(Collectors.toList());

Expand All @@ -108,22 +108,28 @@ public List<TopicVO> getTopics() {

@Override
public List<TopicVO> getTopicsWithOffsets() {
final var topicVos = getTopics();
Map<String, List<PartitionInfo>> topicsMap = highLevelConsumer.getAllTopic();

setTopicPartitionSizes(topicVos);
final var topicVos = getTopicMetadata(topicsMap).values().stream()
.sorted(Comparator.comparing(TopicVO::getName))
.collect(Collectors.toList());

setTopicPartitionSizes(topicsMap, topicVos);

return topicVos;
}

@Override
public Optional<TopicVO> getTopic(String topic) {
final var topicVo = Optional.ofNullable(getTopicMetadata(topic).get(topic));
topicVo.ifPresent(vo -> setTopicPartitionSizes(Collections.singletonList(vo)));
Map<String, List<PartitionInfo>> topicsMap = highLevelConsumer.getAllTopic();

final var topicVo = Optional.ofNullable(getTopicMetadata(topicsMap, topic).get(topic));
topicVo.ifPresent(vo -> setTopicPartitionSizes(topicsMap, Collections.singletonList(vo)));
return topicVo;
}

private Map<String, TopicVO> getTopicMetadata(String... topics) {
final var topicInfos = highLevelConsumer.getTopicInfos(topics);
private Map<String, TopicVO> getTopicMetadata(Map<String, List<PartitionInfo>> allTopicsMap, String... topics) {
final var topicInfos = highLevelConsumer.getTopicInfos(allTopicsMap, topics);
final var retrievedTopicNames = topicInfos.keySet();
final var topicConfigs = highLevelAdminClient.describeTopicConfigs(retrievedTopicNames);

Expand Down Expand Up @@ -198,8 +204,8 @@ private static Map<String, String> headersToMap(Headers headers) {
return map;
}

private void setTopicPartitionSizes(List<TopicVO> topics) {
highLevelConsumer.setAllPartitionSizes(topics);
private void setTopicPartitionSizes(Map<String, List<PartitionInfo>> topicsMap, List<TopicVO> topics) {
highLevelConsumer.setAllPartitionSizes(topicsMap, topics);
}

@Override
Expand Down

0 comments on commit 9b99d8b

Please sign in to comment.