diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAnnotationDrivenConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAnnotationDrivenConfiguration.java index 0a5331e144b0..1dc7591a5284 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAnnotationDrivenConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAnnotationDrivenConfiguration.java @@ -47,6 +47,16 @@ @ConditionalOnClass(EnableRabbit.class) class RabbitAnnotationDrivenConfiguration { + /** + * Default Name of the thread created for simple rabbit listener. + */ + public static final String THREADNAME_RABBIT_SIMPLE = "rabbit-simple-"; + + /** + * Default Name of the thread created for direct rabbit listener. + */ + public static final String THREADNAME_RABBIT_DIRECT = "rabbit-direct-"; + private final ObjectProvider messageConverter; private final ObjectProvider messageRecoverer; @@ -76,7 +86,7 @@ SimpleRabbitListenerContainerFactoryConfigurer simpleRabbitListenerContainerFact @ConditionalOnThreading(Threading.VIRTUAL) SimpleRabbitListenerContainerFactoryConfigurer simpleRabbitListenerContainerFactoryConfigurerVirtualThreads() { SimpleRabbitListenerContainerFactoryConfigurer configurer = simpleListenerConfigurer(); - configurer.setTaskExecutor(new VirtualThreadTaskExecutor()); + configurer.setTaskExecutor(new VirtualThreadTaskExecutor(THREADNAME_RABBIT_SIMPLE)); return configurer; } @@ -105,7 +115,7 @@ DirectRabbitListenerContainerFactoryConfigurer directRabbitListenerContainerFact @ConditionalOnThreading(Threading.VIRTUAL) DirectRabbitListenerContainerFactoryConfigurer directRabbitListenerContainerFactoryConfigurerVirtualThreads() { DirectRabbitListenerContainerFactoryConfigurer configurer = directListenerConfigurer(); - configurer.setTaskExecutor(new VirtualThreadTaskExecutor()); + configurer.setTaskExecutor(new VirtualThreadTaskExecutor(THREADNAME_RABBIT_DIRECT)); return configurer; } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java index 4a0483f58caf..8c1cea44a435 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java @@ -73,6 +73,16 @@ @Import(PulsarConfiguration.class) public class PulsarAutoConfiguration { + /** + * Default Name of the thread created for pulsar consumer. + */ + public static final String THREADNAME_PULSAR_CONSUMER = "pulsar-consumer-"; + + /** + * Default Name of the thread created for pulsar task executor. + */ + public static final String THREADNAME_PULSAR_TASKEXECUTOR = "pulsar-taskexecutor-"; + private PulsarProperties properties; private PulsarPropertiesMapper propertiesMapper; @@ -158,7 +168,7 @@ ConcurrentPulsarListenerContainerFactory pulsarListenerContainerFactory( containerProperties.setSchemaResolver(schemaResolver); containerProperties.setTopicResolver(topicResolver); if (Threading.VIRTUAL.isActive(environment)) { - containerProperties.setConsumerTaskExecutor(new VirtualThreadTaskExecutor()); + containerProperties.setConsumerTaskExecutor(new VirtualThreadTaskExecutor(THREADNAME_PULSAR_CONSUMER)); } this.propertiesMapper.customizeContainerProperties(containerProperties); return new ConcurrentPulsarListenerContainerFactory<>(pulsarConsumerFactory, containerProperties); @@ -189,7 +199,8 @@ DefaultPulsarReaderContainerFactory pulsarReaderContainerFactory(PulsarReader PulsarReaderContainerProperties readerContainerProperties = new PulsarReaderContainerProperties(); readerContainerProperties.setSchemaResolver(schemaResolver); if (Threading.VIRTUAL.isActive(environment)) { - readerContainerProperties.setReaderTaskExecutor(new VirtualThreadTaskExecutor()); + readerContainerProperties + .setReaderTaskExecutor(new VirtualThreadTaskExecutor(THREADNAME_PULSAR_TASKEXECUTOR)); } this.propertiesMapper.customizeReaderContainerProperties(readerContainerProperties); return new DefaultPulsarReaderContainerFactory<>(pulsarReaderFactory, readerContainerProperties); diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java index de41da0fe10a..6c1a561e307b 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java @@ -18,6 +18,7 @@ import java.security.NoSuchAlgorithmException; import java.util.List; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; import javax.net.ssl.SSLSocketFactory; @@ -545,12 +546,36 @@ void testSimpleRabbitListenerContainerFactoryWithCustomSettings() { @Test @EnabledForJreRange(min = JRE.JAVA_21) - void shouldConfigureVirtualThreads() { + void shouldConfigureVirtualThreadsForSimpleListener() { this.contextRunner.withPropertyValues("spring.threads.virtual.enabled=true").run((context) -> { SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = context .getBean("rabbitListenerContainerFactory", SimpleRabbitListenerContainerFactory.class); assertThat(rabbitListenerContainerFactory).extracting("taskExecutor") .isInstanceOf(VirtualThreadTaskExecutor.class); + final var taskExecutor = ReflectionTestUtils.getField(rabbitListenerContainerFactory, "taskExecutor"); + final var virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory"); + final var threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class)); + assertThat(threadCreated.getName()) + .containsPattern(RabbitAnnotationDrivenConfiguration.THREADNAME_RABBIT_SIMPLE + "[0-9]*"); + + }); + } + + @Test + @EnabledForJreRange(min = JRE.JAVA_21) + void shouldConfigureVirtualThreadsForDirectListener() { + this.contextRunner.withPropertyValues("spring.threads.virtual.enabled=true").run((context) -> { + DirectRabbitListenerContainerFactoryConfigurer rabbitListenerContainerFactory = context.getBean( + "directRabbitListenerContainerFactoryConfigurer", + DirectRabbitListenerContainerFactoryConfigurer.class); + assertThat(rabbitListenerContainerFactory).extracting("taskExecutor") + .isInstanceOf(VirtualThreadTaskExecutor.class); + final var taskExecutor = ReflectionTestUtils.getField(rabbitListenerContainerFactory, "taskExecutor"); + final var virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory"); + final var threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class)); + assertThat(threadCreated.getName()) + .containsPattern(RabbitAnnotationDrivenConfiguration.THREADNAME_RABBIT_DIRECT + "[0-9]*"); + }); } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java index 318554a4ae3b..e19c45a18335 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java @@ -18,6 +18,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import com.github.benmanes.caffeine.cache.Caffeine; @@ -69,6 +70,7 @@ import org.springframework.pulsar.core.ReaderBuilderCustomizer; import org.springframework.pulsar.core.SchemaResolver; import org.springframework.pulsar.core.TopicResolver; +import org.springframework.test.util.ReflectionTestUtils; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; @@ -506,6 +508,11 @@ void whenVirtualThreadsAreEnabledOnJava21AndLaterListenerContainerShouldUseVirtu .getBean(ConcurrentPulsarListenerContainerFactory.class); assertThat(factory.getContainerProperties().getConsumerTaskExecutor()) .isInstanceOf(VirtualThreadTaskExecutor.class); + final var taskExecutor = factory.getContainerProperties().getConsumerTaskExecutor(); + final var virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory"); + final var threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class)); + assertThat(threadCreated.getName()) + .containsPattern(PulsarAutoConfiguration.THREADNAME_PULSAR_CONSUMER + "[0-9]*"); }); } @@ -561,6 +568,11 @@ void whenVirtualThreadsAreEnabledOnJava21AndLaterReaderShouldUseVirtualThreads() .getBean(DefaultPulsarReaderContainerFactory.class); assertThat(factory.getContainerProperties().getReaderTaskExecutor()) .isInstanceOf(VirtualThreadTaskExecutor.class); + final var taskExecutor = factory.getContainerProperties().getReaderTaskExecutor(); + final var virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory"); + final var threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class)); + assertThat(threadCreated.getName()) + .containsPattern(PulsarAutoConfiguration.THREADNAME_PULSAR_TASKEXECUTOR + "[0-9]*"); }); }