diff --git a/src/main/java/org/akhq/repositories/RecordRepository.java b/src/main/java/org/akhq/repositories/RecordRepository.java index b13af121a..83714fc08 100644 --- a/src/main/java/org/akhq/repositories/RecordRepository.java +++ b/src/main/java/org/akhq/repositories/RecordRepository.java @@ -10,6 +10,7 @@ import io.micronaut.core.util.StringUtils; import io.micronaut.http.sse.Event; import io.reactivex.Flowable; +import java.util.stream.StreamSupport; import lombok.*; import lombok.extern.slf4j.Slf4j; import org.akhq.configs.SchemaRegistryType; @@ -80,10 +81,10 @@ public class RecordRepository extends AbstractRepository { @Inject private MaskingUtils maskingUtils; - @Value("${akhq.topic-data.poll-timeout:1000}") + @Value("${akhq.topic-data.poll-timeout:10000}") protected int pollTimeout; - @Value("${akhq.clients-defaults.consumer.properties.max.poll.records:50}") + @Value("${akhq.clients-defaults.consumer.properties.max.poll.records:25000}") protected int maxPollRecords; @Value("${akhq.topic-data.kafka-max-message-length:2147483647}") @@ -99,33 +100,31 @@ public Map getLastRecord(String clusterId, List topicsNa .map(partition -> new TopicPartition(partition.getTopic(), partition.getId())) .collect(Collectors.toList()); - KafkaConsumer consumer = kafkaModule.getConsumer(clusterId, new Properties() {{ - put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, topicPartitions.size() * 3); - }}); - consumer.assign(topicPartitions); + ConcurrentHashMap records = new ConcurrentHashMap<>(); - consumer - .endOffsets(consumer.assignment()) - .forEach((topicPartition, offset) -> { - consumer.seek(topicPartition, Math.max(0, offset - 2)); - }); + try (KafkaConsumer consumer = kafkaModule.getConsumer(clusterId)) { + consumer.assign(topicPartitions); - ConcurrentHashMap records = new ConcurrentHashMap<>(); + consumer + .endOffsets(consumer.assignment()) + .forEach((topicPartition, offset) -> { + consumer.seek(topicPartition, Math.max(0, offset - 2)); + }); - this.poll(consumer) - .forEach(record -> { - if (!records.containsKey(record.topic())) { - records.put(record.topic(), newRecord(record, clusterId, topics.get(record.topic()))); - } else { - Record current = records.get(record.topic()); - if (current.getTimestamp().toInstant().toEpochMilli() < record.timestamp()) { + this.poll(consumer) + .forEach(record -> { + if (!records.containsKey(record.topic())) { records.put(record.topic(), newRecord(record, clusterId, topics.get(record.topic()))); + } else { + Record current = records.get(record.topic()); + if (current.getTimestamp().toInstant().toEpochMilli() < record.timestamp()) { + records.put(record.topic(), newRecord(record, clusterId, topics.get(record.topic()))); + } } - } - }); + }); + } - consumer.close(); return records; } @@ -142,92 +141,92 @@ public List consume(String clusterId, Options options) throws ExecutionE } private List consumeOldest(Topic topic, Options options) { - KafkaConsumer consumer = this.kafkaModule.getConsumer(options.clusterId); - Map partitions = getTopicPartitionForSortOldest(topic, options, consumer); List list = new ArrayList<>(); - if (partitions.size() > 0) { - consumer.assign(partitions.keySet()); - partitions.forEach(consumer::seek); + for (Map.Entry partition : getTopicPartitionForSortOldest(topic, options).entrySet()) { + Properties properties = new Properties() {{ + put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, options.size); + }}; + + try (KafkaConsumer consumer = this.kafkaModule.getConsumer(options.clusterId, properties)) { + consumer.assign(List.of(partition.getKey())); + consumer.seek(partition.getKey(), partition.getValue()); - if (log.isTraceEnabled()) { - partitions.forEach((topicPartition, first) -> + if (log.isTraceEnabled()) { log.trace( "Consume [topic: {}] [partition: {}] [start: {}]", - topicPartition.topic(), - topicPartition.partition(), - first - ) - ); - } + partition.getKey().topic(), + partition.getKey().partition(), + partition.getValue() + ); + } - ConsumerRecords records = this.poll(consumer); + ConsumerRecords records = this.poll(consumer); - for (ConsumerRecord record : records) { - Record current = newRecord(record, options, topic); - if (matchFilters(options, current)) { - filterMessageLength(current); - list.add(current); + for (ConsumerRecord record : records) { + Record current = newRecord(record, options, topic); + if (matchFilters(options, current)) { + filterMessageLength(current); + list.add(current); + } } } } - consumer.close(); - - list.sort(Comparator.comparing(Record::getTimestamp)); - - return list; + return list.stream() + .sorted(Comparator.comparing(Record::getTimestamp)) + .limit(options.size) + .toList(); } public List getOffsetForTime(String clusterId, List partitions, Long timestamp) throws ExecutionException, InterruptedException { return Debug.call(() -> { Map map = new HashMap<>(); - KafkaConsumer consumer = this.kafkaModule.getConsumer(clusterId); - - partitions - .forEach(partition -> map.put( - new TopicPartition(partition.getTopic(), partition.getPartition()), - timestamp - )); - - List collect = consumer.offsetsForTimes(map) - .entrySet() - .stream() - .map(r -> r.getValue() != null ? new TimeOffset( - r.getKey().topic(), - r.getKey().partition(), - r.getValue().offset() - ) : null) - .filter(Objects::nonNull) - .collect(Collectors.toList()); - - consumer.close(); - - return collect; + try (KafkaConsumer consumer = this.kafkaModule.getConsumer(clusterId)) { + partitions + .forEach(partition -> map.put( + new TopicPartition(partition.getTopic(), partition.getPartition()), + timestamp + )); + List collect = consumer.offsetsForTimes(map) + .entrySet() + .stream() + .map(r -> r.getValue() != null ? new TimeOffset( + r.getKey().topic(), + r.getKey().partition(), + r.getValue().offset() + ) : null) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + + return collect; + } }, "Offsets for " + partitions + " Timestamp " + timestamp, null); } public Optional consumeSingleRecord(String clusterId, Topic topic, Options options) throws ExecutionException, InterruptedException { return Debug.call(() -> { Optional singleRecord = Optional.empty(); - KafkaConsumer consumer = kafkaModule.getConsumer(clusterId, new Properties() {{ + + Map partitions = getTopicPartitionForSortOldest(topic, options); + + Properties properties = new Properties() {{ put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1); - }}); + }}; - Map partitions = getTopicPartitionForSortOldest(topic, options, consumer); - consumer.assign(partitions.keySet()); - partitions.forEach(consumer::seek); + try (KafkaConsumer consumer = kafkaModule.getConsumer(clusterId, properties)) { + consumer.assign(partitions.keySet()); + partitions.forEach(consumer::seek); - ConsumerRecords records = this.poll(consumer); - if(!records.isEmpty()) { - singleRecord = Optional.of(newRecord(records.iterator().next(), options, topic)); + ConsumerRecords records = this.poll(consumer); + if (!records.isEmpty()) { + singleRecord = Optional.of(newRecord(records.iterator().next(), options, topic)); + } } - consumer.close(); return singleRecord; - }, "Consume with options {}", Collections.singletonList(options.toString())); } @@ -242,21 +241,27 @@ public static class TimeOffset { } - private Map getTopicPartitionForSortOldest(Topic topic, Options options, KafkaConsumer consumer) { - return topic - .getPartitions() - .stream() - .map(partition -> getFirstOffsetForSortOldest(consumer, partition, options) - .map(offsetBound -> offsetBound.withTopicPartition( - new TopicPartition( - partition.getTopic(), - partition.getId() - ) - )) - ) - .filter(Optional::isPresent) - .map(Optional::get) - .collect(Collectors.toMap(OffsetBound::getTopicPartition, OffsetBound::getBegin)); + private Map getTopicPartitionForSortOldest(Topic topic, Options options) { + Properties properties = new Properties() {{ + put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1); + }}; + + try (KafkaConsumer consumer = kafkaModule.getConsumer(options.clusterId, properties)) { + return topic + .getPartitions() + .stream() + .map(partition -> getFirstOffsetForSortOldest(consumer, partition, options) + .map(offsetBound -> offsetBound.withTopicPartition( + new TopicPartition( + partition.getTopic(), + partition.getId() + ) + )) + ) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toMap(OffsetBound::getTopicPartition, OffsetBound::getBegin)); + } } private List consumeNewest(Topic topic, Options options) { @@ -264,12 +269,10 @@ private List consumeNewest(Topic topic, Options options) { .getPartitions() .parallelStream() .map(partition -> { - KafkaConsumer consumer = this.kafkaModule.getConsumer( - options.clusterId, - new Properties() {{ - put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, String.valueOf(options.size)); - }} - ); + KafkaConsumer consumer = + this.kafkaModule.getConsumer(options.clusterId, new Properties() {{ + put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, options.size); + }}); return getOffsetForSortNewest(consumer, partition, options) .map(offset -> offset.withTopicPartition( @@ -652,12 +655,10 @@ public RecordMetadata delete(String clusterId, String topic, Integer partition, public Flowable> search(Topic topic, Options options) throws ExecutionException, InterruptedException { AtomicInteger matchesCount = new AtomicInteger(); - Properties properties = new Properties(); - properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, options.getSize()); - return Flowable.generate(() -> { - KafkaConsumer consumer = this.kafkaModule.getConsumer(options.clusterId, properties); - Map partitions = getTopicPartitionForSortOldest(topic, options, consumer); + Map partitions = getTopicPartitionForSortOldest(topic, options); + + KafkaConsumer consumer = this.kafkaModule.getConsumer(options.clusterId); if (partitions.size() == 0) { return new SearchState(consumer, null); @@ -699,22 +700,32 @@ public Flowable> search(Topic topic, Options options) throws currentEvent.emptyPoll = 0; } + Comparator comparator = Comparator.comparing(Record::getTimestamp); + + List sortedRecords = StreamSupport.stream(records.spliterator(), false) + .map(record -> newRecord(record, options, topic)) + .sorted(Options.Sort.NEWEST.equals(options.sort) ? comparator.reversed() : comparator) + .toList(); + List list = new ArrayList<>(); - for (ConsumerRecord record : records) { + for (Record record : sortedRecords) { + if (matchesCount.get() >= options.size) { + break; + } + currentEvent.updateProgress(record); - Record current = newRecord(record, options, topic); - if (matchFilters(options, current)) { - list.add(current); + if (matchFilters(options, record)) { + list.add(record); matchesCount.getAndIncrement(); log.trace( "Record [topic: {}] [partition: {}] [offset: {}] [key: {}]", - record.topic(), - record.partition(), - record.offset(), - record.key() + record.getTopic(), + record.getPartition(), + record.getOffset(), + record.getKey() ); } } @@ -927,10 +938,9 @@ public Event progress(Options options) { return Event.of(this).name("searchBody"); } - - private void updateProgress(ConsumerRecord record) { - Offset offset = this.offsets.get(record.partition()); - offset.current = record.offset(); + private void updateProgress(Record record) { + Offset offset = this.offsets.get(record.getPartition()); + offset.current = record.getOffset(); } @AllArgsConstructor @@ -1015,69 +1025,67 @@ public Flowable> tail(String clusterId, TailOptions options) { } public CopyResult copy(Topic fromTopic, String toClusterId, Topic toTopic, List offsets, RecordRepository.Options options) { - KafkaConsumer consumer = this.kafkaModule.getConsumer( - options.clusterId, - new Properties() {{ - put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); - }} - ); - - Map partitions = getTopicPartitionForSortOldest(fromTopic, options, consumer); + Map partitions = getTopicPartitionForSortOldest(fromTopic, options); Map filteredPartitions = partitions.entrySet().stream() .filter(topicPartitionLongEntry -> offsets.stream() .anyMatch(offsetCopy -> offsetCopy.getPartition() == topicPartitionLongEntry.getKey().partition())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - int counter = 0; + Properties properties = new Properties() {{ + put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); + }}; - if (filteredPartitions.size() > 0) { - consumer.assign(filteredPartitions.keySet()); - filteredPartitions.forEach(consumer::seek); + try (KafkaConsumer consumer = this.kafkaModule.getConsumer(options.clusterId, properties)) { + int counter = 0; - if (log.isTraceEnabled()) { - filteredPartitions.forEach((topicPartition, first) -> - log.trace( - "Consume [topic: {}] [partition: {}] [start: {}]", - topicPartition.topic(), - topicPartition.partition(), - first - ) - ); - } + if (filteredPartitions.size() > 0) { + consumer.assign(filteredPartitions.keySet()); + filteredPartitions.forEach(consumer::seek); - Map partitionsLastOffsetMap = fromTopic.getPartitions() - .stream() - .collect(Collectors.toMap(Function.identity(), Partition::getLastOffset)); + if (log.isTraceEnabled()) { + filteredPartitions.forEach((topicPartition, first) -> + log.trace( + "Consume [topic: {}] [partition: {}] [start: {}]", + topicPartition.topic(), + topicPartition.partition(), + first + ) + ); + } - boolean samePartition = toTopic.getPartitions().size() == fromTopic.getPartitions().size(); + Map partitionsLastOffsetMap = fromTopic.getPartitions() + .stream() + .collect(Collectors.toMap(Function.identity(), Partition::getLastOffset)); - KafkaProducer producer = kafkaModule.getProducer(toClusterId); - ConsumerRecords records; - do { - records = this.pollAndFilter(consumer, partitionsLastOffsetMap); + boolean samePartition = toTopic.getPartitions().size() == fromTopic.getPartitions().size(); - for (ConsumerRecord record : records) { - System.out.println(record.offset() + "-" + record.partition()); - - counter++; - producer.send(new ProducerRecord<>( - toTopic.getName(), - samePartition ? record.partition() : null, - record.timestamp(), - record.key(), - record.value(), - record.headers() - )); - } + KafkaProducer producer = kafkaModule.getProducer(toClusterId); + ConsumerRecords records; + do { + records = this.pollAndFilter(consumer, partitionsLastOffsetMap); - } while (!records.isEmpty()); + for (ConsumerRecord record : records) { + System.out.println(record.offset() + "-" + record.partition()); + + counter++; + producer.send(new ProducerRecord<>( + toTopic.getName(), + samePartition ? record.partition() : null, + record.timestamp(), + record.key(), + record.value(), + record.headers() + )); + } - producer.flush(); - } - consumer.close(); + } while (!records.isEmpty()); - return new CopyResult(counter); + producer.flush(); + } + + return new CopyResult(counter); + } } /** diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index e8b584688..13655b069 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -99,7 +99,7 @@ akhq: clients-defaults: consumer: properties: - max.poll.records: 50 + max.poll.records: 25000 isolation.level: read_committed # group.id: Akhq enable.auto.commit: "false" @@ -130,7 +130,7 @@ akhq: topic-data: size: 50 - poll-timeout: 1000 + poll-timeout: 10000 kafka-max-message-length: 1000000 audit: diff --git a/src/test/java/org/akhq/controllers/TopicControllerTest.java b/src/test/java/org/akhq/controllers/TopicControllerTest.java index 212a83db2..6f6473aa0 100644 --- a/src/test/java/org/akhq/controllers/TopicControllerTest.java +++ b/src/test/java/org/akhq/controllers/TopicControllerTest.java @@ -16,7 +16,7 @@ import static org.junit.jupiter.api.Assertions.*; @TestMethodOrder(MethodOrderer.OrderAnnotation.class) -class TopicControllerTest extends AbstractTest { +public class TopicControllerTest extends AbstractTest { public static final String BASE_URL = "/api/" + KafkaTestCluster.CLUSTER_ID + "/topic"; public static final String DEFAULTS_CONFIGS_URL = "api/topic/defaults-configs"; public static final String TOPIC_URL = BASE_URL + "/" + KafkaTestCluster.TOPIC_COMPACTED; diff --git a/src/test/java/org/akhq/repositories/TopicRepositoryTest.java b/src/test/java/org/akhq/repositories/TopicRepositoryTest.java index 4bbf9b374..b6defd6b3 100644 --- a/src/test/java/org/akhq/repositories/TopicRepositoryTest.java +++ b/src/test/java/org/akhq/repositories/TopicRepositoryTest.java @@ -30,6 +30,7 @@ import java.util.concurrent.ExecutionException; import java.util.function.Predicate; +import static org.akhq.controllers.TopicControllerTest.CREATE_TOPIC_NAME; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.when; @@ -49,6 +50,8 @@ class TopicRepositoryTest extends AbstractTest { private final FilterGenerated filterGenerated = new FilterGenerated(); + private final FilterTopicController filterTopicController = new FilterTopicController(); + @BeforeEach void before() { MockitoAnnotations.initMocks(this); @@ -62,7 +65,7 @@ void list() throws ExecutionException, InterruptedException { TopicRepository.TopicListView.ALL, Optional.empty(), List.of() - ).stream().filter(filterGenerated).toList().size()); + ).stream().filter(filterGenerated).filter(filterTopicController).toList().size()); } @Test @@ -73,7 +76,7 @@ void listNoInternal() throws ExecutionException, InterruptedException { TopicRepository.TopicListView.HIDE_INTERNAL, Optional.empty(), List.of() - ).stream().filter(filterGenerated).toList().size()); + ).stream().filter(filterGenerated).filter(filterTopicController).toList().size()); } @Test @@ -84,7 +87,7 @@ void listNoInternalStream() throws ExecutionException, InterruptedException { TopicRepository.TopicListView.HIDE_INTERNAL_STREAM, Optional.empty(), List.of() - ).stream().filter(filterGenerated).toList().size()); + ).stream().filter(filterGenerated).filter(filterTopicController).toList().size()); } @Test @@ -95,7 +98,7 @@ void listNoStream() throws ExecutionException, InterruptedException { TopicRepository.TopicListView.HIDE_STREAM, Optional.empty(), List.of() - ).stream().filter(filterGenerated).toList().size()); + ).stream().filter(filterGenerated).filter(filterTopicController).toList().size()); } @Test @@ -207,4 +210,11 @@ public boolean test(Topic topic) { return !topic.getName().startsWith(PATTERN); } } + + private static class FilterTopicController implements Predicate { + @Override + public boolean test(Topic topic) { + return !topic.getName().equals(CREATE_TOPIC_NAME); + } + } }