From 168cb430bd6cc74d209e2b05d6f92888900141e7 Mon Sep 17 00:00:00 2001 From: yuluo-yx Date: Sat, 20 Jul 2024 12:07:12 +0800 Subject: [PATCH 1/3] [Improve] add kafak queue unit test Signed-off-by: yuluo-yx --- .../common/entity/message/CollectRep.java | 2 +- .../queue/impl/KafkaCommonDataQueueTest.java | 133 +++++++++++++++++- 2 files changed, 128 insertions(+), 7 deletions(-) diff --git a/common/src/main/java/org/apache/hertzbeat/common/entity/message/CollectRep.java b/common/src/main/java/org/apache/hertzbeat/common/entity/message/CollectRep.java index 8b0ab91a27a..be4342923d0 100644 --- a/common/src/main/java/org/apache/hertzbeat/common/entity/message/CollectRep.java +++ b/common/src/main/java/org/apache/hertzbeat/common/entity/message/CollectRep.java @@ -449,7 +449,7 @@ public static final class MetricsData extends private MetricsData(com.google.protobuf.GeneratedMessageV3.Builder builder) { super(builder); } - private MetricsData() { + public MetricsData() { app_ = ""; metrics_ = ""; code_ = 0; diff --git a/common/src/test/java/org/apache/hertzbeat/common/queue/impl/KafkaCommonDataQueueTest.java b/common/src/test/java/org/apache/hertzbeat/common/queue/impl/KafkaCommonDataQueueTest.java index 8f2c0e7ca46..a82b3ea82a7 100644 --- a/common/src/test/java/org/apache/hertzbeat/common/queue/impl/KafkaCommonDataQueueTest.java +++ b/common/src/test/java/org/apache/hertzbeat/common/queue/impl/KafkaCommonDataQueueTest.java @@ -17,37 +17,158 @@ package org.apache.hertzbeat.common.queue.impl; +import java.lang.reflect.Field; +import java.time.Duration; +import java.util.Collections; + +import org.apache.hertzbeat.common.config.CommonProperties; +import org.apache.hertzbeat.common.entity.alerter.Alert; +import org.apache.hertzbeat.common.entity.message.CollectRep; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** - * Test case for {@link KafkaCommonDataQueue} + * Test case for {@link org.apache.hertzbeat.common.queue.impl.KafkaCommonDataQueue} */ class KafkaCommonDataQueueTest { + private KafkaProducer metricsDataProducer; + private KafkaProducer alertDataProducer; + private KafkaConsumer alertDataConsumer; + private KafkaConsumer metricsDataToAlertConsumer; + private KafkaConsumer metricsDataToPersistentStorageConsumer; + private KafkaConsumer metricsDataToRealTimeStorageConsumer; + private CommonProperties.KafkaProperties kafkaProperties; + private KafkaCommonDataQueue kafkaCommonDataQueue; + @BeforeEach - void setUp() { + void setUp() throws Exception { + kafkaProperties = mock(CommonProperties.KafkaProperties.class); + when(kafkaProperties.getServers()).thenReturn("localhost:9092"); + when(kafkaProperties.getAlertsDataTopic()).thenReturn("alerts"); + when(kafkaProperties.getMetricsDataTopic()).thenReturn("metrics"); + + CommonProperties properties = mock(CommonProperties.class); + CommonProperties.DataQueueProperties queueProperties = mock(CommonProperties.DataQueueProperties.class); + when(properties.getQueue()).thenReturn(queueProperties); + when(queueProperties.getKafka()).thenReturn(kafkaProperties); + + metricsDataProducer = mock(KafkaProducer.class); + alertDataProducer = mock(KafkaProducer.class); + alertDataConsumer = mock(KafkaConsumer.class); + metricsDataToAlertConsumer = mock(KafkaConsumer.class); + metricsDataToPersistentStorageConsumer = mock(KafkaConsumer.class); + metricsDataToRealTimeStorageConsumer = mock(KafkaConsumer.class); + + kafkaCommonDataQueue = new KafkaCommonDataQueue(properties); + + setPrivateField(kafkaCommonDataQueue, "metricsDataProducer", metricsDataProducer); + setPrivateField(kafkaCommonDataQueue, "alertDataProducer", alertDataProducer); + setPrivateField(kafkaCommonDataQueue, "alertDataConsumer", alertDataConsumer); + setPrivateField(kafkaCommonDataQueue, "metricsDataToAlertConsumer", metricsDataToAlertConsumer); + setPrivateField(kafkaCommonDataQueue, "metricsDataToPersistentStorageConsumer", metricsDataToPersistentStorageConsumer); + setPrivateField(kafkaCommonDataQueue, "metricsDataToRealTimeStorageConsumer", metricsDataToRealTimeStorageConsumer); + } + + // Test use, set private field. + private void setPrivateField(Object target, String fieldName, Object value) throws Exception { + + Field field = target.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + field.set(target, value); } @Test void testSendAlertsData() { + + Alert alert = new Alert(); + kafkaCommonDataQueue.sendAlertsData(alert); + + ArgumentCaptor> captor = ArgumentCaptor.forClass(ProducerRecord.class); + verify(alertDataProducer).send(captor.capture()); + + ProducerRecord record = captor.getValue(); + assertEquals("alerts", record.topic()); + assertEquals(alert, record.value()); } @Test - void testPollAlertsData() { + void testPollAlertsData() throws InterruptedException { + + Alert alert = new Alert(); + ConsumerRecords records = new ConsumerRecords<>(Collections.emptyMap()); + when(alertDataConsumer.poll(Duration.ofSeconds(1))).thenReturn(records); + + assertNull(kafkaCommonDataQueue.pollAlertsData()); + + records = new ConsumerRecords<>(Collections.singletonMap( + new TopicPartition("alerts", 0), + Collections.singletonList( + new ConsumerRecord<>("alerts", 0, 0L, 1L, alert) + ) + )); + when(alertDataConsumer.poll(Duration.ofSeconds(1))).thenReturn(records); + + assertEquals(alert, kafkaCommonDataQueue.pollAlertsData()); } @Test void testSendMetricsData() { + CollectRep.MetricsData metricsData = new CollectRep.MetricsData(); + + kafkaCommonDataQueue.sendMetricsData(metricsData); + + ArgumentCaptor> captor = ArgumentCaptor.forClass(ProducerRecord.class); + verify(metricsDataProducer).send(captor.capture()); + + ProducerRecord record = captor.getValue(); + assertEquals("metrics", record.topic()); + assertEquals(metricsData, record.value()); } @Test - void testPollMetricsDataToAlerter() { + void testPollMetricsDataToAlerter() throws InterruptedException { + + CollectRep.MetricsData metricsData = new CollectRep.MetricsData(); + ConsumerRecords records = new ConsumerRecords<>(Collections.emptyMap()); + when(metricsDataToAlertConsumer.poll(Duration.ofSeconds(1))).thenReturn(records); + + assertNull(kafkaCommonDataQueue.pollMetricsDataToAlerter()); + + records = new ConsumerRecords<>(Collections.singletonMap( + new TopicPartition("metrics", 0), + Collections.singletonList( + new ConsumerRecord<>("metrics", 0, 0L, 1L, metricsData) + ) + )); + when(metricsDataToAlertConsumer.poll(Duration.ofSeconds(1))).thenReturn(records); + + assertEquals(metricsData, kafkaCommonDataQueue.pollMetricsDataToAlerter()); } @Test - void testDestroy() { - } + void testDestroy() throws Exception { + + kafkaCommonDataQueue.destroy(); + verify(metricsDataProducer).close(); + verify(alertDataProducer).close(); + verify(alertDataConsumer).close(); + verify(metricsDataToAlertConsumer).close(); + verify(metricsDataToPersistentStorageConsumer).close(); + verify(metricsDataToRealTimeStorageConsumer).close(); + } } From ae65a8dd3f3895f27bcf6d53607525c2c9ffd254 Mon Sep 17 00:00:00 2001 From: yuluo-yx Date: Sun, 21 Jul 2024 17:40:39 +0800 Subject: [PATCH 2/3] fix Signed-off-by: yuluo-yx --- .../apache/hertzbeat/common/entity/message/CollectRep.java | 2 +- .../hertzbeat/common/queue/impl/KafkaCommonDataQueueTest.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/common/src/main/java/org/apache/hertzbeat/common/entity/message/CollectRep.java b/common/src/main/java/org/apache/hertzbeat/common/entity/message/CollectRep.java index be4342923d0..8b0ab91a27a 100644 --- a/common/src/main/java/org/apache/hertzbeat/common/entity/message/CollectRep.java +++ b/common/src/main/java/org/apache/hertzbeat/common/entity/message/CollectRep.java @@ -449,7 +449,7 @@ public static final class MetricsData extends private MetricsData(com.google.protobuf.GeneratedMessageV3.Builder builder) { super(builder); } - public MetricsData() { + private MetricsData() { app_ = ""; metrics_ = ""; code_ = 0; diff --git a/common/src/test/java/org/apache/hertzbeat/common/queue/impl/KafkaCommonDataQueueTest.java b/common/src/test/java/org/apache/hertzbeat/common/queue/impl/KafkaCommonDataQueueTest.java index a82b3ea82a7..f3c943f39ef 100644 --- a/common/src/test/java/org/apache/hertzbeat/common/queue/impl/KafkaCommonDataQueueTest.java +++ b/common/src/test/java/org/apache/hertzbeat/common/queue/impl/KafkaCommonDataQueueTest.java @@ -127,8 +127,8 @@ void testPollAlertsData() throws InterruptedException { @Test void testSendMetricsData() { - CollectRep.MetricsData metricsData = new CollectRep.MetricsData(); + CollectRep.MetricsData metricsData = CollectRep.MetricsData.newBuilder().build(); kafkaCommonDataQueue.sendMetricsData(metricsData); ArgumentCaptor> captor = ArgumentCaptor.forClass(ProducerRecord.class); @@ -142,7 +142,7 @@ void testSendMetricsData() { @Test void testPollMetricsDataToAlerter() throws InterruptedException { - CollectRep.MetricsData metricsData = new CollectRep.MetricsData(); + CollectRep.MetricsData metricsData = CollectRep.MetricsData.newBuilder().build(); ConsumerRecords records = new ConsumerRecords<>(Collections.emptyMap()); when(metricsDataToAlertConsumer.poll(Duration.ofSeconds(1))).thenReturn(records); From cd8660691d9cf142500af319cbbeb3b3cd6a9ee3 Mon Sep 17 00:00:00 2001 From: YuLuo Date: Sun, 21 Jul 2024 18:04:23 +0800 Subject: [PATCH 3/3] Update KafkaCommonDataQueueTest.java Signed-off-by: YuLuo --- .../hertzbeat/common/queue/impl/KafkaCommonDataQueueTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/src/test/java/org/apache/hertzbeat/common/queue/impl/KafkaCommonDataQueueTest.java b/common/src/test/java/org/apache/hertzbeat/common/queue/impl/KafkaCommonDataQueueTest.java index f3c943f39ef..f7bf8f0da4a 100644 --- a/common/src/test/java/org/apache/hertzbeat/common/queue/impl/KafkaCommonDataQueueTest.java +++ b/common/src/test/java/org/apache/hertzbeat/common/queue/impl/KafkaCommonDataQueueTest.java @@ -41,7 +41,7 @@ import static org.mockito.Mockito.when; /** - * Test case for {@link org.apache.hertzbeat.common.queue.impl.KafkaCommonDataQueue} + * Test case for {@link KafkaCommonDataQueue} */ class KafkaCommonDataQueueTest {