Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Homepage performance improvement #277

14 changes: 4 additions & 10 deletions src/main/java/kafdrop/controller/ConsumerController.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,8 @@ public ConsumerController(KafkaMonitor kafkaMonitor) {

@RequestMapping("/{groupId:.+}")
public String consumerDetail(@PathVariable("groupId") String groupId, Model model) throws ConsumerNotFoundException {
final var topicVos = kafkaMonitor.getTopics();
final var consumer = kafkaMonitor.getConsumers(topicVos)
.stream()
.filter(c -> c.getGroupId().equals(groupId))
.findAny();
final var consumer = kafkaMonitor.getConsumersByGroup(groupId).stream().findAny();

model.addAttribute("consumer", consumer.orElseThrow(() -> new ConsumerNotFoundException(groupId)));
return "consumer-detail";
}
Expand All @@ -53,11 +50,8 @@ public String consumerDetail(@PathVariable("groupId") String groupId, Model mode
})
@GetMapping(path = "/{groupId:.+}", produces = MediaType.APPLICATION_JSON_VALUE)
public @ResponseBody ConsumerVO getConsumer(@PathVariable("groupId") String groupId) throws ConsumerNotFoundException {
final var topicVos = kafkaMonitor.getTopics();
final var consumer = kafkaMonitor.getConsumers(topicVos)
.stream()
.filter(c -> c.getGroupId().equals(groupId))
.findAny();
final var consumer = kafkaMonitor.getConsumersByGroup(groupId).stream().findAny();

return consumer.orElseThrow(() -> new ConsumerNotFoundException(groupId));
}
}
4 changes: 2 additions & 2 deletions src/main/java/kafdrop/controller/TopicController.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public String topicDetails(@PathVariable("name") String topicName, Model model)
final var topic = kafkaMonitor.getTopic(topicName)
.orElseThrow(() -> new TopicNotFoundException(topicName));
model.addAttribute("topic", topic);
model.addAttribute("consumers", kafkaMonitor.getConsumers(Collections.singleton(topic)));
model.addAttribute("consumers", kafkaMonitor.getConsumersByTopics(Collections.singleton(topic)));
model.addAttribute("topicDeleteEnabled", topicDeleteEnabled);
model.addAttribute("keyFormat", defaultKeyFormat);
model.addAttribute("format", defaultFormat);
Expand Down Expand Up @@ -125,7 +125,7 @@ public String createTopicPage(Model model) {
public @ResponseBody List<ConsumerVO> getConsumers(@PathVariable("name") String topicName) {
final var topic = kafkaMonitor.getTopic(topicName)
.orElseThrow(() -> new TopicNotFoundException(topicName));
return kafkaMonitor.getConsumers(Collections.singleton(topic));
return kafkaMonitor.getConsumersByTopics(Collections.singleton(topic));
}

/**
Expand Down
4 changes: 0 additions & 4 deletions src/main/java/kafdrop/model/TopicVO.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,6 @@ public void setConfig(Map<String, String> config) {
this.config = config;
}

public Map<Integer, TopicPartitionVO> getPartitionMap() {
return Collections.unmodifiableMap(partitions);
}

public Collection<TopicPartitionVO> getPartitions() {
return Collections.unmodifiableCollection(partitions.values());
}
Expand Down
94 changes: 49 additions & 45 deletions src/main/java/kafdrop/service/KafkaHighLevelConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@
import kafdrop.config.*;
import kafdrop.model.*;
import kafdrop.util.*;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.*;
import org.apache.kafka.common.serialization.*;
import org.slf4j.*;
import org.springframework.stereotype.*;

import javax.annotation.*;
import java.nio.*;
import java.time.*;
import javax.annotation.PostConstruct;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.*;
import java.util.stream.*;
import java.util.stream.Collectors;

@Service
public final class KafkaHighLevelConsumer {
Expand Down Expand Up @@ -47,36 +48,37 @@ private void initializeClient() {
}
}

synchronized Map<Integer, TopicPartitionVO> getPartitionSize(String topic) {
synchronized void setTopicPartitionSizes(List<TopicVO> topics) {
initializeClient();

final var partitionInfoSet = kafkaConsumer.partitionsFor(topic);
kafkaConsumer.assign(partitionInfoSet.stream()
.map(partitionInfo -> new TopicPartition(partitionInfo.topic(),
partitionInfo.partition()))
.collect(Collectors.toList()));

kafkaConsumer.poll(Duration.ofMillis(0));
final Set<TopicPartition> assignedPartitionList = kafkaConsumer.assignment();
final TopicVO topicVO = getTopicInfo(topic);
final Map<Integer, TopicPartitionVO> partitionsVo = topicVO.getPartitionMap();

kafkaConsumer.seekToBeginning(assignedPartitionList);
assignedPartitionList.forEach(topicPartition -> {
final TopicPartitionVO topicPartitionVo = partitionsVo.get(topicPartition.partition());
final long startOffset = kafkaConsumer.position(topicPartition);
LOG.debug("topic: {}, partition: {}, startOffset: {}", topicPartition.topic(), topicPartition.partition(), startOffset);
topicPartitionVo.setFirstOffset(startOffset);
});

kafkaConsumer.seekToEnd(assignedPartitionList);
assignedPartitionList.forEach(topicPartition -> {
final long latestOffset = kafkaConsumer.position(topicPartition);
LOG.debug("topic: {}, partition: {}, latestOffset: {}", topicPartition.topic(), topicPartition.partition(), latestOffset);
final TopicPartitionVO partitionVo = partitionsVo.get(topicPartition.partition());
partitionVo.setSize(latestOffset);
});
return partitionsVo;
Map<TopicVO, List<TopicPartition>> allTopics = topics.stream().map(topicVO -> {
List<TopicPartition> topicPartitions = topicVO.getPartitions().stream().map(topicPartitionVO ->
new TopicPartition(topicVO.getName(), topicPartitionVO.getId())
).collect(Collectors.toList());

return Pair.of(topicVO, topicPartitions);
}).collect(Collectors.toMap(Pair::getKey, Pair::getValue));

List<TopicPartition> allTopicPartitions = allTopics.values().stream()
.flatMap(Collection::stream)
.collect(Collectors.toList());

kafkaConsumer.assign(allTopicPartitions);
Map<TopicPartition, Long> beginningOffset = kafkaConsumer.beginningOffsets(allTopicPartitions);
Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(allTopicPartitions);

allTopics.forEach((topicVO, topicPartitions) -> topicPartitions.forEach(topicPartition -> {
Optional<TopicPartitionVO> partition = topicVO.getPartition(topicPartition.partition());

partition.ifPresent(p -> {
Long startOffset = beginningOffset.get(topicPartition);
Long endOffset = endOffsets.get(topicPartition);

LOG.debug("topic: {}, partition: {}, startOffset: {}, endOffset: {}", topicPartition.topic(), topicPartition.partition(), startOffset, endOffset);
p.setFirstOffset(startOffset);
p.setSize(endOffset);
});
}));
}

/**
Expand Down Expand Up @@ -195,25 +197,27 @@ private static String deserialize(MessageDeserializer deserializer, byte[] bytes
return bytes != null ? deserializer.deserializeMessage(ByteBuffer.wrap(bytes)) : "empty";
}

synchronized Map<String, TopicVO> getTopicInfos(String[] topics) {
synchronized Map<String, List<PartitionInfo>> getAllTopics() {
initializeClient();

return kafkaConsumer.listTopics();
}

synchronized Map<String, TopicVO> getTopicInfos(Map<String, List<PartitionInfo>> allTopicsMap, String[] topics) {
initializeClient();
final var topicSet = kafkaConsumer.listTopics().keySet();

final var topicSet = allTopicsMap.keySet();
if (topics.length == 0) {
topics = Arrays.copyOf(topicSet.toArray(), topicSet.size(), String[].class);
}
final var topicVos = new HashMap<String, TopicVO>(topics.length, 1f);

for (var topic : topics) {
if (topicSet.contains(topic)) {
topicVos.put(topic, getTopicInfo(topic));
}
}

return topicVos;
}
return Arrays.stream(topics)
.filter(topicSet::contains)
.map(topic -> Pair.of(topic, getTopicInfo(topic, allTopicsMap.get(topic))))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
}

private TopicVO getTopicInfo(String topic) {
final var partitionInfoList = kafkaConsumer.partitionsFor(topic);
private TopicVO getTopicInfo(String topic, List<PartitionInfo> partitionInfoList) {
final var topicVo = new TopicVO(topic);
final var partitions = new TreeMap<Integer, TopicPartitionVO>();

Expand Down
4 changes: 3 additions & 1 deletion src/main/java/kafdrop/service/KafkaMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ List<MessageVO> getMessages(TopicPartition topicPartition, long offset, int coun

ClusterSummaryVO getClusterSummary(Collection<TopicVO> topics);

List<ConsumerVO> getConsumers(Collection<TopicVO> topicVos);
List<ConsumerVO> getConsumersByGroup(String groupId);

List<ConsumerVO> getConsumersByTopics(Collection<TopicVO> topicVos);

/**
* Create topic
Expand Down
52 changes: 40 additions & 12 deletions src/main/java/kafdrop/service/KafkaMonitorImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,24 +99,31 @@ public ClusterSummaryVO getClusterSummary(Collection<TopicVO> topics) {

@Override
public List<TopicVO> getTopics() {
final var topicVos = getTopicMetadata().values().stream()
final var topicVos = getTopicMetadata(highLevelConsumer.getAllTopics()).values().stream()
.sorted(Comparator.comparing(TopicVO::getName))
.collect(Collectors.toList());
for (var topicVo : topicVos) {
topicVo.setPartitions(getTopicPartitionSizes(topicVo));
}

return topicVos;
}

public List<TopicVO> getTopics(String[] topics) {
Map<String, List<PartitionInfo>> topicsMap = highLevelConsumer.getAllTopics();

ArrayList<TopicVO> topicVos = new ArrayList<>(getTopicMetadata(topicsMap, topics).values());
setTopicPartitionSizes(topicVos);

return topicVos;
}

@Override
public Optional<TopicVO> getTopic(String topic) {
final var topicVo = Optional.ofNullable(getTopicMetadata(topic).get(topic));
topicVo.ifPresent(vo -> vo.setPartitions(getTopicPartitionSizes(vo)));
return topicVo;
String[] topics = { topic };

return getTopics(topics).stream().findAny();
}

private Map<String, TopicVO> getTopicMetadata(String... topics) {
final var topicInfos = highLevelConsumer.getTopicInfos(topics);
private Map<String, TopicVO> getTopicMetadata(Map<String, List<PartitionInfo>> allTopicsMap, String... topics) {
final var topicInfos = highLevelConsumer.getTopicInfos(allTopicsMap, topics);
final var retrievedTopicNames = topicInfos.keySet();
final var topicConfigs = highLevelAdminClient.describeTopicConfigs(retrievedTopicNames);

Expand Down Expand Up @@ -191,12 +198,29 @@ private static Map<String, String> headersToMap(Headers headers) {
return map;
}

private Map<Integer, TopicPartitionVO> getTopicPartitionSizes(TopicVO topic) {
return highLevelConsumer.getPartitionSize(topic.getName());
private void setTopicPartitionSizes(List<TopicVO> topics) {
highLevelConsumer.setTopicPartitionSizes(topics);
}

@Override
public List<ConsumerVO> getConsumers(Collection<TopicVO> topicVos) {
public List<ConsumerVO> getConsumersByGroup(String groupId) {
List<ConsumerGroupOffsets> consumerGroupOffsets = getConsumerOffsets(groupId);

String[] uniqueTopicNames = consumerGroupOffsets.stream()
.flatMap(consumerGroupOffset -> consumerGroupOffset.offsets.keySet()
.stream().map(TopicPartition::topic))
.distinct()
.toArray(String[]::new);

List<TopicVO> topicVOs = getTopics(uniqueTopicNames);

LOG.debug("consumerGroupOffsets: {}", consumerGroupOffsets);
LOG.debug("topicVos: {}", topicVOs);
return convert(consumerGroupOffsets, topicVOs);
}

@Override
public List<ConsumerVO> getConsumersByTopics(Collection<TopicVO> topicVos) {
final var topics = topicVos.stream().map(TopicVO::getName).collect(Collectors.toSet());
final var consumerGroupOffsets = getConsumerOffsets(topics);
LOG.debug("consumerGroupOffsets: {}", consumerGroupOffsets);
Expand Down Expand Up @@ -308,6 +332,10 @@ private ConsumerGroupOffsets resolveOffsets(String groupId) {
return new ConsumerGroupOffsets(groupId, highLevelAdminClient.listConsumerGroupOffsetsIfAuthorized(groupId));
}

private List<ConsumerGroupOffsets> getConsumerOffsets(String groupId) {
return Collections.singletonList(resolveOffsets(groupId));
}

private List<ConsumerGroupOffsets> getConsumerOffsets(Set<String> topics) {
final var consumerGroups = highLevelAdminClient.listConsumerGroups();
return consumerGroups.stream()
Expand Down