diff --git a/common/src/test/java/org/apache/hertzbeat/common/queue/impl/KafkaCommonDataQueueTest.java b/common/src/test/java/org/apache/hertzbeat/common/queue/impl/KafkaCommonDataQueueTest.java index 8f2c0e7ca46..f7bf8f0da4a 100644 --- a/common/src/test/java/org/apache/hertzbeat/common/queue/impl/KafkaCommonDataQueueTest.java +++ b/common/src/test/java/org/apache/hertzbeat/common/queue/impl/KafkaCommonDataQueueTest.java @@ -17,37 +17,158 @@ package org.apache.hertzbeat.common.queue.impl; +import java.lang.reflect.Field; +import java.time.Duration; +import java.util.Collections; + +import org.apache.hertzbeat.common.config.CommonProperties; +import org.apache.hertzbeat.common.entity.alerter.Alert; +import org.apache.hertzbeat.common.entity.message.CollectRep; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Test case for {@link KafkaCommonDataQueue} */ class KafkaCommonDataQueueTest { + private KafkaProducer metricsDataProducer; + private KafkaProducer alertDataProducer; + private KafkaConsumer alertDataConsumer; + private KafkaConsumer metricsDataToAlertConsumer; + private KafkaConsumer metricsDataToPersistentStorageConsumer; + private KafkaConsumer metricsDataToRealTimeStorageConsumer; + private CommonProperties.KafkaProperties kafkaProperties; + private KafkaCommonDataQueue kafkaCommonDataQueue; + @BeforeEach - void setUp() { + void setUp() throws Exception { + kafkaProperties = mock(CommonProperties.KafkaProperties.class); + when(kafkaProperties.getServers()).thenReturn("localhost:9092"); + when(kafkaProperties.getAlertsDataTopic()).thenReturn("alerts"); + when(kafkaProperties.getMetricsDataTopic()).thenReturn("metrics"); + + CommonProperties properties = mock(CommonProperties.class); + CommonProperties.DataQueueProperties queueProperties = mock(CommonProperties.DataQueueProperties.class); + when(properties.getQueue()).thenReturn(queueProperties); + when(queueProperties.getKafka()).thenReturn(kafkaProperties); + + metricsDataProducer = mock(KafkaProducer.class); + alertDataProducer = mock(KafkaProducer.class); + alertDataConsumer = mock(KafkaConsumer.class); + metricsDataToAlertConsumer = mock(KafkaConsumer.class); + metricsDataToPersistentStorageConsumer = mock(KafkaConsumer.class); + metricsDataToRealTimeStorageConsumer = mock(KafkaConsumer.class); + + kafkaCommonDataQueue = new KafkaCommonDataQueue(properties); + + setPrivateField(kafkaCommonDataQueue, "metricsDataProducer", metricsDataProducer); + setPrivateField(kafkaCommonDataQueue, "alertDataProducer", alertDataProducer); + setPrivateField(kafkaCommonDataQueue, "alertDataConsumer", alertDataConsumer); + setPrivateField(kafkaCommonDataQueue, "metricsDataToAlertConsumer", metricsDataToAlertConsumer); + setPrivateField(kafkaCommonDataQueue, "metricsDataToPersistentStorageConsumer", metricsDataToPersistentStorageConsumer); + setPrivateField(kafkaCommonDataQueue, "metricsDataToRealTimeStorageConsumer", metricsDataToRealTimeStorageConsumer); + } + + // Test use, set private field. + private void setPrivateField(Object target, String fieldName, Object value) throws Exception { + + Field field = target.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + field.set(target, value); } @Test void testSendAlertsData() { + + Alert alert = new Alert(); + kafkaCommonDataQueue.sendAlertsData(alert); + + ArgumentCaptor> captor = ArgumentCaptor.forClass(ProducerRecord.class); + verify(alertDataProducer).send(captor.capture()); + + ProducerRecord record = captor.getValue(); + assertEquals("alerts", record.topic()); + assertEquals(alert, record.value()); } @Test - void testPollAlertsData() { + void testPollAlertsData() throws InterruptedException { + + Alert alert = new Alert(); + ConsumerRecords records = new ConsumerRecords<>(Collections.emptyMap()); + when(alertDataConsumer.poll(Duration.ofSeconds(1))).thenReturn(records); + + assertNull(kafkaCommonDataQueue.pollAlertsData()); + + records = new ConsumerRecords<>(Collections.singletonMap( + new TopicPartition("alerts", 0), + Collections.singletonList( + new ConsumerRecord<>("alerts", 0, 0L, 1L, alert) + ) + )); + when(alertDataConsumer.poll(Duration.ofSeconds(1))).thenReturn(records); + + assertEquals(alert, kafkaCommonDataQueue.pollAlertsData()); } @Test void testSendMetricsData() { + + CollectRep.MetricsData metricsData = CollectRep.MetricsData.newBuilder().build(); + kafkaCommonDataQueue.sendMetricsData(metricsData); + + ArgumentCaptor> captor = ArgumentCaptor.forClass(ProducerRecord.class); + verify(metricsDataProducer).send(captor.capture()); + + ProducerRecord record = captor.getValue(); + assertEquals("metrics", record.topic()); + assertEquals(metricsData, record.value()); } @Test - void testPollMetricsDataToAlerter() { + void testPollMetricsDataToAlerter() throws InterruptedException { + + CollectRep.MetricsData metricsData = CollectRep.MetricsData.newBuilder().build(); + ConsumerRecords records = new ConsumerRecords<>(Collections.emptyMap()); + when(metricsDataToAlertConsumer.poll(Duration.ofSeconds(1))).thenReturn(records); + + assertNull(kafkaCommonDataQueue.pollMetricsDataToAlerter()); + + records = new ConsumerRecords<>(Collections.singletonMap( + new TopicPartition("metrics", 0), + Collections.singletonList( + new ConsumerRecord<>("metrics", 0, 0L, 1L, metricsData) + ) + )); + when(metricsDataToAlertConsumer.poll(Duration.ofSeconds(1))).thenReturn(records); + + assertEquals(metricsData, kafkaCommonDataQueue.pollMetricsDataToAlerter()); } @Test - void testDestroy() { - } + void testDestroy() throws Exception { + + kafkaCommonDataQueue.destroy(); + verify(metricsDataProducer).close(); + verify(alertDataProducer).close(); + verify(alertDataConsumer).close(); + verify(metricsDataToAlertConsumer).close(); + verify(metricsDataToPersistentStorageConsumer).close(); + verify(metricsDataToRealTimeStorageConsumer).close(); + } }