From 8422f9ee875b8c4eaea6510ca336363004fa9e19 Mon Sep 17 00:00:00 2001 From: Jork Zijlstra Date: Sun, 16 May 2021 19:59:32 +0200 Subject: [PATCH 01/10] don't go to kafka to again retrieve the partitionInfoList when its already available --- .../kafdrop/service/KafkaHighLevelConsumer.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java index b3ddc6e9..d6bd467f 100644 --- a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java +++ b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java @@ -58,7 +58,7 @@ synchronized Map getPartitionSize(String topic) { kafkaConsumer.poll(Duration.ofMillis(0)); final Set assignedPartitionList = kafkaConsumer.assignment(); - final TopicVO topicVO = getTopicInfo(topic); + final TopicVO topicVO = getTopicInfo(topic, partitionInfoSet); final Map partitionsVo = topicVO.getPartitionMap(); kafkaConsumer.seekToBeginning(assignedPartitionList); @@ -197,7 +197,11 @@ private static String deserialize(MessageDeserializer deserializer, byte[] bytes synchronized Map getTopicInfos(String[] topics) { initializeClient(); - final var topicSet = kafkaConsumer.listTopics().keySet(); + + final Map> topicsMap; + topicsMap = kafkaConsumer.listTopics(); + + final var topicSet = topicsMap.keySet(); if (topics.length == 0) { topics = Arrays.copyOf(topicSet.toArray(), topicSet.size(), String[].class); } @@ -205,15 +209,14 @@ synchronized Map getTopicInfos(String[] topics) { for (var topic : topics) { if (topicSet.contains(topic)) { - topicVos.put(topic, getTopicInfo(topic)); + topicVos.put(topic, getTopicInfo(topic, topicsMap.get(topic))); } } return topicVos; } - private TopicVO getTopicInfo(String topic) { - final var partitionInfoList = kafkaConsumer.partitionsFor(topic); + private TopicVO getTopicInfo(String topic, List partitionInfoList) { final var topicVo = new TopicVO(topic); final var partitions = new TreeMap(); From cc8feeccd9e52fe456bf9cbb3e005d8c124ac5eb Mon Sep 17 00:00:00 2001 From: Jork Zijlstra Date: Sun, 16 May 2021 20:14:11 +0200 Subject: [PATCH 02/10] reuse already retrieved partitionInfoList do only 2 request to retrieve all partition sized instead of 2 per topic --- src/main/java/kafdrop/model/TopicVO.java | 16 +++-- .../service/KafkaHighLevelConsumer.java | 67 ++++++++++--------- .../kafdrop/service/KafkaMonitorImpl.java | 12 ++-- 3 files changed, 53 insertions(+), 42 deletions(-) diff --git a/src/main/java/kafdrop/model/TopicVO.java b/src/main/java/kafdrop/model/TopicVO.java index f9467217..16f12e90 100644 --- a/src/main/java/kafdrop/model/TopicVO.java +++ b/src/main/java/kafdrop/model/TopicVO.java @@ -18,6 +18,8 @@ package kafdrop.model; +import org.apache.kafka.common.PartitionInfo; + import java.util.*; import java.util.stream.*; @@ -26,6 +28,8 @@ public final class TopicVO implements Comparable { private Map partitions = new TreeMap<>(); + private List partitionInfoList = new ArrayList<>(); + private Map config = Collections.emptyMap(); public TopicVO(String name) { @@ -44,10 +48,6 @@ public void setConfig(Map config) { this.config = config; } - public Map getPartitionMap() { - return Collections.unmodifiableMap(partitions); - } - public Collection getPartitions() { return Collections.unmodifiableCollection(partitions.values()); } @@ -56,6 +56,14 @@ public void setPartitions(Map partitions) { this.partitions = partitions; } + public List getPartitionInfoList() { + return this.partitionInfoList; + } + + public void setPartitionInfoList(List partitionInfoList) { + this.partitionInfoList = partitionInfoList; + } + public Optional getPartition(int partitionId) { return Optional.ofNullable(partitions.get(partitionId)); } diff --git a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java index d6bd467f..8114cea1 100644 --- a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java +++ b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java @@ -10,11 +10,11 @@ import org.slf4j.*; import org.springframework.stereotype.*; -import javax.annotation.*; -import java.nio.*; -import java.time.*; +import javax.annotation.PostConstruct; +import java.nio.ByteBuffer; +import java.time.Duration; import java.util.*; -import java.util.stream.*; +import java.util.stream.Collectors; @Service public final class KafkaHighLevelConsumer { @@ -47,36 +47,38 @@ private void initializeClient() { } } - synchronized Map getPartitionSize(String topic) { + synchronized void setAllPartitionSizes(List topics) { initializeClient(); - final var partitionInfoSet = kafkaConsumer.partitionsFor(topic); - kafkaConsumer.assign(partitionInfoSet.stream() - .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), - partitionInfo.partition())) - .collect(Collectors.toList())); - - kafkaConsumer.poll(Duration.ofMillis(0)); - final Set assignedPartitionList = kafkaConsumer.assignment(); - final TopicVO topicVO = getTopicInfo(topic, partitionInfoSet); - final Map partitionsVo = topicVO.getPartitionMap(); - - kafkaConsumer.seekToBeginning(assignedPartitionList); - assignedPartitionList.forEach(topicPartition -> { - final TopicPartitionVO topicPartitionVo = partitionsVo.get(topicPartition.partition()); - final long startOffset = kafkaConsumer.position(topicPartition); - LOG.debug("topic: {}, partition: {}, startOffset: {}", topicPartition.topic(), topicPartition.partition(), startOffset); - topicPartitionVo.setFirstOffset(startOffset); - }); - - kafkaConsumer.seekToEnd(assignedPartitionList); - assignedPartitionList.forEach(topicPartition -> { - final long latestOffset = kafkaConsumer.position(topicPartition); - LOG.debug("topic: {}, partition: {}, latestOffset: {}", topicPartition.topic(), topicPartition.partition(), latestOffset); - final TopicPartitionVO partitionVo = partitionsVo.get(topicPartition.partition()); - partitionVo.setSize(latestOffset); - }); - return partitionsVo; + Map> allTopics = topics.stream().map(topicVO -> { + List topicPartitions = topicVO.getPartitionInfoList().stream() + .map(partitionInfo -> { + return new TopicPartition(partitionInfo.topic(), partitionInfo.partition()); + }).collect(Collectors.toList()); + + return Pair.of(topicVO, topicPartitions); + } + ).collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + + List allTopicPartitions = allTopics.values().stream().flatMap(Collection::stream) + .collect(Collectors.toList()); + + kafkaConsumer.assign(allTopicPartitions); + Map beginningOffset = kafkaConsumer.beginningOffsets(allTopicPartitions); + Map endOffsets = kafkaConsumer.endOffsets(allTopicPartitions); + + allTopics.forEach((topicVO, topicPartitions) -> topicPartitions.forEach(topicPartition -> { + Optional partition = topicVO.getPartition(topicPartition.partition()); + + partition.ifPresent(p -> { + Long startOffset = beginningOffset.get(topicPartition); + Long endOffset = endOffsets.get(topicPartition); + + LOG.debug("topic: {}, partition: {}, startOffset: {}, endOffset: {}", topicPartition.topic(), topicPartition.partition(), startOffset, endOffset); + p.setFirstOffset(startOffset); + p.setSize(endOffset); + }); + })); } /** @@ -218,6 +220,7 @@ synchronized Map getTopicInfos(String[] topics) { private TopicVO getTopicInfo(String topic, List partitionInfoList) { final var topicVo = new TopicVO(topic); + topicVo.setPartitionInfoList(partitionInfoList); final var partitions = new TreeMap(); for (var partitionInfo : partitionInfoList) { diff --git a/src/main/java/kafdrop/service/KafkaMonitorImpl.java b/src/main/java/kafdrop/service/KafkaMonitorImpl.java index 79038e9b..e714b420 100644 --- a/src/main/java/kafdrop/service/KafkaMonitorImpl.java +++ b/src/main/java/kafdrop/service/KafkaMonitorImpl.java @@ -102,16 +102,16 @@ public List getTopics() { final var topicVos = getTopicMetadata().values().stream() .sorted(Comparator.comparing(TopicVO::getName)) .collect(Collectors.toList()); - for (var topicVo : topicVos) { - topicVo.setPartitions(getTopicPartitionSizes(topicVo)); - } + + setTopicPartitionSizes(topicVos); + return topicVos; } @Override public Optional getTopic(String topic) { final var topicVo = Optional.ofNullable(getTopicMetadata(topic).get(topic)); - topicVo.ifPresent(vo -> vo.setPartitions(getTopicPartitionSizes(vo))); + topicVo.ifPresent(vo -> setTopicPartitionSizes(Collections.singletonList(vo))); return topicVo; } @@ -191,8 +191,8 @@ private static Map headersToMap(Headers headers) { return map; } - private Map getTopicPartitionSizes(TopicVO topic) { - return highLevelConsumer.getPartitionSize(topic.getName()); + private void setTopicPartitionSizes(List topics) { + highLevelConsumer.setAllPartitionSizes(topics); } @Override From 6ed15083d5fe76b6053d6f29985083cec4211a3b Mon Sep 17 00:00:00 2001 From: Jork Zijlstra Date: Sun, 16 May 2021 21:08:19 +0200 Subject: [PATCH 03/10] don't get partition offsets where its not used --- src/main/java/kafdrop/controller/ConsumerController.java | 2 +- src/main/java/kafdrop/service/KafkaMonitor.java | 2 ++ src/main/java/kafdrop/service/KafkaMonitorImpl.java | 7 +++++++ 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/src/main/java/kafdrop/controller/ConsumerController.java b/src/main/java/kafdrop/controller/ConsumerController.java index 4996f44c..b1c76067 100644 --- a/src/main/java/kafdrop/controller/ConsumerController.java +++ b/src/main/java/kafdrop/controller/ConsumerController.java @@ -37,7 +37,7 @@ public ConsumerController(KafkaMonitor kafkaMonitor) { @RequestMapping("/{groupId:.+}") public String consumerDetail(@PathVariable("groupId") String groupId, Model model) throws ConsumerNotFoundException { - final var topicVos = kafkaMonitor.getTopics(); + final var topicVos = kafkaMonitor.getTopicsWithOffsets(); final var consumer = kafkaMonitor.getConsumers(topicVos) .stream() .filter(c -> c.getGroupId().equals(groupId)) diff --git a/src/main/java/kafdrop/service/KafkaMonitor.java b/src/main/java/kafdrop/service/KafkaMonitor.java index d4bcc000..860f10c5 100644 --- a/src/main/java/kafdrop/service/KafkaMonitor.java +++ b/src/main/java/kafdrop/service/KafkaMonitor.java @@ -31,6 +31,8 @@ public interface KafkaMonitor { List getTopics(); + List getTopicsWithOffsets(); + /** * Returns messages for a given topic. */ diff --git a/src/main/java/kafdrop/service/KafkaMonitorImpl.java b/src/main/java/kafdrop/service/KafkaMonitorImpl.java index e714b420..43973a3d 100644 --- a/src/main/java/kafdrop/service/KafkaMonitorImpl.java +++ b/src/main/java/kafdrop/service/KafkaMonitorImpl.java @@ -103,6 +103,13 @@ public List getTopics() { .sorted(Comparator.comparing(TopicVO::getName)) .collect(Collectors.toList()); + return topicVos; + } + + @Override + public List getTopicsWithOffsets() { + final var topicVos = getTopics(); + setTopicPartitionSizes(topicVos); return topicVos; From 5fade768e7f65e0893fb231fab467bf11651a58c Mon Sep 17 00:00:00 2001 From: Jork Zijlstra Date: Mon, 17 May 2021 10:57:42 +0200 Subject: [PATCH 04/10] remove partitionInfoList from TopicVo again and inject it where needed --- src/main/java/kafdrop/model/TopicVO.java | 10 ------- .../service/KafkaHighLevelConsumer.java | 28 ++++++++++--------- .../kafdrop/service/KafkaMonitorImpl.java | 24 ++++++++++------ 3 files changed, 30 insertions(+), 32 deletions(-) diff --git a/src/main/java/kafdrop/model/TopicVO.java b/src/main/java/kafdrop/model/TopicVO.java index 16f12e90..91f6fcfa 100644 --- a/src/main/java/kafdrop/model/TopicVO.java +++ b/src/main/java/kafdrop/model/TopicVO.java @@ -28,8 +28,6 @@ public final class TopicVO implements Comparable { private Map partitions = new TreeMap<>(); - private List partitionInfoList = new ArrayList<>(); - private Map config = Collections.emptyMap(); public TopicVO(String name) { @@ -56,14 +54,6 @@ public void setPartitions(Map partitions) { this.partitions = partitions; } - public List getPartitionInfoList() { - return this.partitionInfoList; - } - - public void setPartitionInfoList(List partitionInfoList) { - this.partitionInfoList = partitionInfoList; - } - public Optional getPartition(int partitionId) { return Optional.ofNullable(partitions.get(partitionId)); } diff --git a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java index 8114cea1..9c925c9d 100644 --- a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java +++ b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java @@ -47,17 +47,17 @@ private void initializeClient() { } } - synchronized void setAllPartitionSizes(List topics) { + synchronized void setAllPartitionSizes(Map> topicsMap, List topics) { initializeClient(); Map> allTopics = topics.stream().map(topicVO -> { - List topicPartitions = topicVO.getPartitionInfoList().stream() - .map(partitionInfo -> { - return new TopicPartition(partitionInfo.topic(), partitionInfo.partition()); - }).collect(Collectors.toList()); + List topicPartitions = topicsMap.get(topicVO.getName()).stream() + .map(partitionInfo -> { + return new TopicPartition(partitionInfo.topic(), partitionInfo.partition()); + }).collect(Collectors.toList()); - return Pair.of(topicVO, topicPartitions); - } + return Pair.of(topicVO, topicPartitions); + } ).collect(Collectors.toMap(Pair::getKey, Pair::getValue)); List allTopicPartitions = allTopics.values().stream().flatMap(Collection::stream) @@ -197,13 +197,16 @@ private static String deserialize(MessageDeserializer deserializer, byte[] bytes return bytes != null ? deserializer.deserializeMessage(ByteBuffer.wrap(bytes)) : "empty"; } - synchronized Map getTopicInfos(String[] topics) { + synchronized Map> getAllTopic() { initializeClient(); - final Map> topicsMap; - topicsMap = kafkaConsumer.listTopics(); + return kafkaConsumer.listTopics(); + } + + synchronized Map getTopicInfos(Map> allTopicsMap, String[] topics) { + initializeClient(); - final var topicSet = topicsMap.keySet(); + final var topicSet = allTopicsMap.keySet(); if (topics.length == 0) { topics = Arrays.copyOf(topicSet.toArray(), topicSet.size(), String[].class); } @@ -211,7 +214,7 @@ synchronized Map getTopicInfos(String[] topics) { for (var topic : topics) { if (topicSet.contains(topic)) { - topicVos.put(topic, getTopicInfo(topic, topicsMap.get(topic))); + topicVos.put(topic, getTopicInfo(topic, allTopicsMap.get(topic))); } } @@ -220,7 +223,6 @@ synchronized Map getTopicInfos(String[] topics) { private TopicVO getTopicInfo(String topic, List partitionInfoList) { final var topicVo = new TopicVO(topic); - topicVo.setPartitionInfoList(partitionInfoList); final var partitions = new TreeMap(); for (var partitionInfo : partitionInfoList) { diff --git a/src/main/java/kafdrop/service/KafkaMonitorImpl.java b/src/main/java/kafdrop/service/KafkaMonitorImpl.java index 43973a3d..a633066f 100644 --- a/src/main/java/kafdrop/service/KafkaMonitorImpl.java +++ b/src/main/java/kafdrop/service/KafkaMonitorImpl.java @@ -99,7 +99,7 @@ public ClusterSummaryVO getClusterSummary(Collection topics) { @Override public List getTopics() { - final var topicVos = getTopicMetadata().values().stream() + final var topicVos = getTopicMetadata(highLevelConsumer.getAllTopic()).values().stream() .sorted(Comparator.comparing(TopicVO::getName)) .collect(Collectors.toList()); @@ -108,22 +108,28 @@ public List getTopics() { @Override public List getTopicsWithOffsets() { - final var topicVos = getTopics(); + Map> topicsMap = highLevelConsumer.getAllTopic(); - setTopicPartitionSizes(topicVos); + final var topicVos = getTopicMetadata(topicsMap).values().stream() + .sorted(Comparator.comparing(TopicVO::getName)) + .collect(Collectors.toList()); + + setTopicPartitionSizes(topicsMap, topicVos); return topicVos; } @Override public Optional getTopic(String topic) { - final var topicVo = Optional.ofNullable(getTopicMetadata(topic).get(topic)); - topicVo.ifPresent(vo -> setTopicPartitionSizes(Collections.singletonList(vo))); + Map> topicsMap = highLevelConsumer.getAllTopic(); + + final var topicVo = Optional.ofNullable(getTopicMetadata(topicsMap, topic).get(topic)); + topicVo.ifPresent(vo -> setTopicPartitionSizes(topicsMap, Collections.singletonList(vo))); return topicVo; } - private Map getTopicMetadata(String... topics) { - final var topicInfos = highLevelConsumer.getTopicInfos(topics); + private Map getTopicMetadata(Map> allTopicsMap, String... topics) { + final var topicInfos = highLevelConsumer.getTopicInfos(allTopicsMap, topics); final var retrievedTopicNames = topicInfos.keySet(); final var topicConfigs = highLevelAdminClient.describeTopicConfigs(retrievedTopicNames); @@ -198,8 +204,8 @@ private static Map headersToMap(Headers headers) { return map; } - private void setTopicPartitionSizes(List topics) { - highLevelConsumer.setAllPartitionSizes(topics); + private void setTopicPartitionSizes(Map> topicsMap, List topics) { + highLevelConsumer.setAllPartitionSizes(topicsMap, topics); } @Override From 1a3fb1972d6fd4072135935edf233f2e583e304d Mon Sep 17 00:00:00 2001 From: Jork Zijlstra Date: Fri, 28 Jan 2022 13:04:07 +0100 Subject: [PATCH 05/10] fix rebase issue --- src/main/java/kafdrop/service/KafkaHighLevelConsumer.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java index 9c925c9d..97dd7327 100644 --- a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java +++ b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java @@ -3,6 +3,7 @@ import kafdrop.config.*; import kafdrop.model.*; import kafdrop.util.*; +import org.apache.commons.lang3.tuple.Pair; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.Node; import org.apache.kafka.common.*; From 3b13b64c80ce45ffeca807db1e5d8df8bc4f70e9 Mon Sep 17 00:00:00 2001 From: Jork Zijlstra Date: Mon, 31 Jan 2022 15:02:49 +0100 Subject: [PATCH 06/10] remove not needed topicsMap from arguments --- .../kafdrop/service/KafkaHighLevelConsumer.java | 14 ++++++-------- .../java/kafdrop/service/KafkaMonitorImpl.java | 8 ++++---- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java index 97dd7327..9fc1998d 100644 --- a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java +++ b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java @@ -48,18 +48,16 @@ private void initializeClient() { } } - synchronized void setAllPartitionSizes(Map> topicsMap, List topics) { + synchronized void setTopicPartitionSizes(List topics) { initializeClient(); Map> allTopics = topics.stream().map(topicVO -> { - List topicPartitions = topicsMap.get(topicVO.getName()).stream() - .map(partitionInfo -> { - return new TopicPartition(partitionInfo.topic(), partitionInfo.partition()); - }).collect(Collectors.toList()); + List topicPartitions = topicVO.getPartitions().stream().map(topicPartitionVO -> { + return new TopicPartition(topicVO.getName(), topicPartitionVO.getId()); + }).collect(Collectors.toList()); - return Pair.of(topicVO, topicPartitions); - } - ).collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + return Pair.of(topicVO, topicPartitions); + }).collect(Collectors.toMap(Pair::getKey, Pair::getValue)); List allTopicPartitions = allTopics.values().stream().flatMap(Collection::stream) .collect(Collectors.toList()); diff --git a/src/main/java/kafdrop/service/KafkaMonitorImpl.java b/src/main/java/kafdrop/service/KafkaMonitorImpl.java index a633066f..4f5cfded 100644 --- a/src/main/java/kafdrop/service/KafkaMonitorImpl.java +++ b/src/main/java/kafdrop/service/KafkaMonitorImpl.java @@ -114,7 +114,7 @@ public List getTopicsWithOffsets() { .sorted(Comparator.comparing(TopicVO::getName)) .collect(Collectors.toList()); - setTopicPartitionSizes(topicsMap, topicVos); + setTopicPartitionSizes(topicVos); return topicVos; } @@ -124,7 +124,7 @@ public Optional getTopic(String topic) { Map> topicsMap = highLevelConsumer.getAllTopic(); final var topicVo = Optional.ofNullable(getTopicMetadata(topicsMap, topic).get(topic)); - topicVo.ifPresent(vo -> setTopicPartitionSizes(topicsMap, Collections.singletonList(vo))); + topicVo.ifPresent(vo -> setTopicPartitionSizes(Collections.singletonList(vo))); return topicVo; } @@ -204,8 +204,8 @@ private static Map headersToMap(Headers headers) { return map; } - private void setTopicPartitionSizes(Map> topicsMap, List topics) { - highLevelConsumer.setAllPartitionSizes(topicsMap, topics); + private void setTopicPartitionSizes(List topics) { + highLevelConsumer.setTopicPartitionSizes(topics); } @Override From 9a5b53fb3f4ec90a659332cd0beb3b85ba373611 Mon Sep 17 00:00:00 2001 From: Jork Zijlstra Date: Mon, 31 Jan 2022 15:05:47 +0100 Subject: [PATCH 07/10] remove not needed import --- src/main/java/kafdrop/model/TopicVO.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/java/kafdrop/model/TopicVO.java b/src/main/java/kafdrop/model/TopicVO.java index 91f6fcfa..3055d9f0 100644 --- a/src/main/java/kafdrop/model/TopicVO.java +++ b/src/main/java/kafdrop/model/TopicVO.java @@ -18,8 +18,6 @@ package kafdrop.model; -import org.apache.kafka.common.PartitionInfo; - import java.util.*; import java.util.stream.*; From dafe1827b0fb11546d24891923b678a8755b1620 Mon Sep 17 00:00:00 2001 From: Jork Zijlstra Date: Mon, 31 Jan 2022 15:39:55 +0100 Subject: [PATCH 08/10] rename methods --- src/main/java/kafdrop/service/KafkaHighLevelConsumer.java | 2 +- src/main/java/kafdrop/service/KafkaMonitorImpl.java | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java index 9fc1998d..0c6c85fb 100644 --- a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java +++ b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java @@ -196,7 +196,7 @@ private static String deserialize(MessageDeserializer deserializer, byte[] bytes return bytes != null ? deserializer.deserializeMessage(ByteBuffer.wrap(bytes)) : "empty"; } - synchronized Map> getAllTopic() { + synchronized Map> getAllTopics() { initializeClient(); return kafkaConsumer.listTopics(); diff --git a/src/main/java/kafdrop/service/KafkaMonitorImpl.java b/src/main/java/kafdrop/service/KafkaMonitorImpl.java index 4f5cfded..e9c73489 100644 --- a/src/main/java/kafdrop/service/KafkaMonitorImpl.java +++ b/src/main/java/kafdrop/service/KafkaMonitorImpl.java @@ -99,7 +99,7 @@ public ClusterSummaryVO getClusterSummary(Collection topics) { @Override public List getTopics() { - final var topicVos = getTopicMetadata(highLevelConsumer.getAllTopic()).values().stream() + final var topicVos = getTopicMetadata(highLevelConsumer.getAllTopics()).values().stream() .sorted(Comparator.comparing(TopicVO::getName)) .collect(Collectors.toList()); @@ -108,7 +108,7 @@ public List getTopics() { @Override public List getTopicsWithOffsets() { - Map> topicsMap = highLevelConsumer.getAllTopic(); + Map> topicsMap = highLevelConsumer.getAllTopics(); final var topicVos = getTopicMetadata(topicsMap).values().stream() .sorted(Comparator.comparing(TopicVO::getName)) @@ -121,7 +121,7 @@ public List getTopicsWithOffsets() { @Override public Optional getTopic(String topic) { - Map> topicsMap = highLevelConsumer.getAllTopic(); + Map> topicsMap = highLevelConsumer.getAllTopics(); final var topicVo = Optional.ofNullable(getTopicMetadata(topicsMap, topic).get(topic)); topicVo.ifPresent(vo -> setTopicPartitionSizes(Collections.singletonList(vo))); From e084812bb8e8774007920ac02da0ffd8d45c2782 Mon Sep 17 00:00:00 2001 From: Jork Zijlstra Date: Mon, 31 Jan 2022 16:33:40 +0100 Subject: [PATCH 09/10] some syntax improvements --- .../service/KafkaHighLevelConsumer.java | 25 ++++++++----------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java index 0c6c85fb..458df364 100644 --- a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java +++ b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java @@ -52,15 +52,16 @@ synchronized void setTopicPartitionSizes(List topics) { initializeClient(); Map> allTopics = topics.stream().map(topicVO -> { - List topicPartitions = topicVO.getPartitions().stream().map(topicPartitionVO -> { - return new TopicPartition(topicVO.getName(), topicPartitionVO.getId()); - }).collect(Collectors.toList()); + List topicPartitions = topicVO.getPartitions().stream().map(topicPartitionVO -> + new TopicPartition(topicVO.getName(), topicPartitionVO.getId()) + ).collect(Collectors.toList()); return Pair.of(topicVO, topicPartitions); }).collect(Collectors.toMap(Pair::getKey, Pair::getValue)); - List allTopicPartitions = allTopics.values().stream().flatMap(Collection::stream) - .collect(Collectors.toList()); + List allTopicPartitions = allTopics.values().stream() + .flatMap(Collection::stream) + .collect(Collectors.toList()); kafkaConsumer.assign(allTopicPartitions); Map beginningOffset = kafkaConsumer.beginningOffsets(allTopicPartitions); @@ -209,16 +210,12 @@ synchronized Map getTopicInfos(Map> if (topics.length == 0) { topics = Arrays.copyOf(topicSet.toArray(), topicSet.size(), String[].class); } - final var topicVos = new HashMap(topics.length, 1f); - for (var topic : topics) { - if (topicSet.contains(topic)) { - topicVos.put(topic, getTopicInfo(topic, allTopicsMap.get(topic))); - } - } - - return topicVos; - } + return Arrays.stream(topics) + .filter(topicSet::contains) + .map(topic -> Pair.of(topic, getTopicInfo(topic, allTopicsMap.get(topic)))) + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + } private TopicVO getTopicInfo(String topic, List partitionInfoList) { final var topicVo = new TopicVO(topic); From ed020d2d0e0a7e564d820d96303c59eb2294fbfc Mon Sep 17 00:00:00 2001 From: Jork Zijlstra Date: Mon, 31 Jan 2022 22:23:28 +0100 Subject: [PATCH 10/10] only fetch topic data that are part of the consumer groupId --- .../controller/ConsumerController.java | 14 ++----- .../kafdrop/controller/TopicController.java | 4 +- .../java/kafdrop/service/KafkaMonitor.java | 6 +-- .../kafdrop/service/KafkaMonitorImpl.java | 37 +++++++++++++------ 4 files changed, 35 insertions(+), 26 deletions(-) diff --git a/src/main/java/kafdrop/controller/ConsumerController.java b/src/main/java/kafdrop/controller/ConsumerController.java index b1c76067..3b11ddaf 100644 --- a/src/main/java/kafdrop/controller/ConsumerController.java +++ b/src/main/java/kafdrop/controller/ConsumerController.java @@ -37,11 +37,8 @@ public ConsumerController(KafkaMonitor kafkaMonitor) { @RequestMapping("/{groupId:.+}") public String consumerDetail(@PathVariable("groupId") String groupId, Model model) throws ConsumerNotFoundException { - final var topicVos = kafkaMonitor.getTopicsWithOffsets(); - final var consumer = kafkaMonitor.getConsumers(topicVos) - .stream() - .filter(c -> c.getGroupId().equals(groupId)) - .findAny(); + final var consumer = kafkaMonitor.getConsumersByGroup(groupId).stream().findAny(); + model.addAttribute("consumer", consumer.orElseThrow(() -> new ConsumerNotFoundException(groupId))); return "consumer-detail"; } @@ -53,11 +50,8 @@ public String consumerDetail(@PathVariable("groupId") String groupId, Model mode }) @GetMapping(path = "/{groupId:.+}", produces = MediaType.APPLICATION_JSON_VALUE) public @ResponseBody ConsumerVO getConsumer(@PathVariable("groupId") String groupId) throws ConsumerNotFoundException { - final var topicVos = kafkaMonitor.getTopics(); - final var consumer = kafkaMonitor.getConsumers(topicVos) - .stream() - .filter(c -> c.getGroupId().equals(groupId)) - .findAny(); + final var consumer = kafkaMonitor.getConsumersByGroup(groupId).stream().findAny(); + return consumer.orElseThrow(() -> new ConsumerNotFoundException(groupId)); } } \ No newline at end of file diff --git a/src/main/java/kafdrop/controller/TopicController.java b/src/main/java/kafdrop/controller/TopicController.java index ba9ed7c4..09a946c0 100644 --- a/src/main/java/kafdrop/controller/TopicController.java +++ b/src/main/java/kafdrop/controller/TopicController.java @@ -60,7 +60,7 @@ public String topicDetails(@PathVariable("name") String topicName, Model model) final var topic = kafkaMonitor.getTopic(topicName) .orElseThrow(() -> new TopicNotFoundException(topicName)); model.addAttribute("topic", topic); - model.addAttribute("consumers", kafkaMonitor.getConsumers(Collections.singleton(topic))); + model.addAttribute("consumers", kafkaMonitor.getConsumersByTopics(Collections.singleton(topic))); model.addAttribute("topicDeleteEnabled", topicDeleteEnabled); model.addAttribute("keyFormat", defaultKeyFormat); model.addAttribute("format", defaultFormat); @@ -125,7 +125,7 @@ public String createTopicPage(Model model) { public @ResponseBody List getConsumers(@PathVariable("name") String topicName) { final var topic = kafkaMonitor.getTopic(topicName) .orElseThrow(() -> new TopicNotFoundException(topicName)); - return kafkaMonitor.getConsumers(Collections.singleton(topic)); + return kafkaMonitor.getConsumersByTopics(Collections.singleton(topic)); } /** diff --git a/src/main/java/kafdrop/service/KafkaMonitor.java b/src/main/java/kafdrop/service/KafkaMonitor.java index 860f10c5..137ac4f3 100644 --- a/src/main/java/kafdrop/service/KafkaMonitor.java +++ b/src/main/java/kafdrop/service/KafkaMonitor.java @@ -31,8 +31,6 @@ public interface KafkaMonitor { List getTopics(); - List getTopicsWithOffsets(); - /** * Returns messages for a given topic. */ @@ -46,7 +44,9 @@ List getMessages(TopicPartition topicPartition, long offset, int coun ClusterSummaryVO getClusterSummary(Collection topics); - List getConsumers(Collection topicVos); + List getConsumersByGroup(String groupId); + + List getConsumersByTopics(Collection topicVos); /** * Create topic diff --git a/src/main/java/kafdrop/service/KafkaMonitorImpl.java b/src/main/java/kafdrop/service/KafkaMonitorImpl.java index e9c73489..ab87593a 100644 --- a/src/main/java/kafdrop/service/KafkaMonitorImpl.java +++ b/src/main/java/kafdrop/service/KafkaMonitorImpl.java @@ -106,14 +106,10 @@ public List getTopics() { return topicVos; } - @Override - public List getTopicsWithOffsets() { + public List getTopics(String[] topics) { Map> topicsMap = highLevelConsumer.getAllTopics(); - final var topicVos = getTopicMetadata(topicsMap).values().stream() - .sorted(Comparator.comparing(TopicVO::getName)) - .collect(Collectors.toList()); - + ArrayList topicVos = new ArrayList<>(getTopicMetadata(topicsMap, topics).values()); setTopicPartitionSizes(topicVos); return topicVos; @@ -121,11 +117,9 @@ public List getTopicsWithOffsets() { @Override public Optional getTopic(String topic) { - Map> topicsMap = highLevelConsumer.getAllTopics(); + String[] topics = { topic }; - final var topicVo = Optional.ofNullable(getTopicMetadata(topicsMap, topic).get(topic)); - topicVo.ifPresent(vo -> setTopicPartitionSizes(Collections.singletonList(vo))); - return topicVo; + return getTopics(topics).stream().findAny(); } private Map getTopicMetadata(Map> allTopicsMap, String... topics) { @@ -209,7 +203,24 @@ private void setTopicPartitionSizes(List topics) { } @Override - public List getConsumers(Collection topicVos) { + public List getConsumersByGroup(String groupId) { + List consumerGroupOffsets = getConsumerOffsets(groupId); + + String[] uniqueTopicNames = consumerGroupOffsets.stream() + .flatMap(consumerGroupOffset -> consumerGroupOffset.offsets.keySet() + .stream().map(TopicPartition::topic)) + .distinct() + .toArray(String[]::new); + + List topicVOs = getTopics(uniqueTopicNames); + + LOG.debug("consumerGroupOffsets: {}", consumerGroupOffsets); + LOG.debug("topicVos: {}", topicVOs); + return convert(consumerGroupOffsets, topicVOs); + } + + @Override + public List getConsumersByTopics(Collection topicVos) { final var topics = topicVos.stream().map(TopicVO::getName).collect(Collectors.toSet()); final var consumerGroupOffsets = getConsumerOffsets(topics); LOG.debug("consumerGroupOffsets: {}", consumerGroupOffsets); @@ -321,6 +332,10 @@ private ConsumerGroupOffsets resolveOffsets(String groupId) { return new ConsumerGroupOffsets(groupId, highLevelAdminClient.listConsumerGroupOffsetsIfAuthorized(groupId)); } + private List getConsumerOffsets(String groupId) { + return Collections.singletonList(resolveOffsets(groupId)); + } + private List getConsumerOffsets(Set topics) { final var consumerGroups = highLevelAdminClient.listConsumerGroups(); return consumerGroups.stream()