Skip to content

Commit

Permalink
Set virtual thread names for RabbitMQ and Pulsar
Browse files Browse the repository at this point in the history
  • Loading branch information
Maziz authored and mhalbritter committed Mar 18, 2024
1 parent f1ccc94 commit ecda754
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@
@ConditionalOnClass(EnableRabbit.class)
class RabbitAnnotationDrivenConfiguration {

/**
* Default Name of the thread created for simple rabbit listener.
*/
public static final String THREADNAME_RABBIT_SIMPLE = "rabbit-simple-";

/**
* Default Name of the thread created for direct rabbit listener.
*/
public static final String THREADNAME_RABBIT_DIRECT = "rabbit-direct-";

private final ObjectProvider<MessageConverter> messageConverter;

private final ObjectProvider<MessageRecoverer> messageRecoverer;
Expand Down Expand Up @@ -76,7 +86,7 @@ SimpleRabbitListenerContainerFactoryConfigurer simpleRabbitListenerContainerFact
@ConditionalOnThreading(Threading.VIRTUAL)
SimpleRabbitListenerContainerFactoryConfigurer simpleRabbitListenerContainerFactoryConfigurerVirtualThreads() {
SimpleRabbitListenerContainerFactoryConfigurer configurer = simpleListenerConfigurer();
configurer.setTaskExecutor(new VirtualThreadTaskExecutor());
configurer.setTaskExecutor(new VirtualThreadTaskExecutor(THREADNAME_RABBIT_SIMPLE));
return configurer;
}

Expand Down Expand Up @@ -105,7 +115,7 @@ DirectRabbitListenerContainerFactoryConfigurer directRabbitListenerContainerFact
@ConditionalOnThreading(Threading.VIRTUAL)
DirectRabbitListenerContainerFactoryConfigurer directRabbitListenerContainerFactoryConfigurerVirtualThreads() {
DirectRabbitListenerContainerFactoryConfigurer configurer = directListenerConfigurer();
configurer.setTaskExecutor(new VirtualThreadTaskExecutor());
configurer.setTaskExecutor(new VirtualThreadTaskExecutor(THREADNAME_RABBIT_DIRECT));
return configurer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,16 @@
@Import(PulsarConfiguration.class)
public class PulsarAutoConfiguration {

/**
* Default Name of the thread created for pulsar consumer.
*/
public static final String THREADNAME_PULSAR_CONSUMER = "pulsar-consumer-";

/**
* Default Name of the thread created for pulsar task executor.
*/
public static final String THREADNAME_PULSAR_TASKEXECUTOR = "pulsar-taskexecutor-";

private PulsarProperties properties;

private PulsarPropertiesMapper propertiesMapper;
Expand Down Expand Up @@ -158,7 +168,7 @@ ConcurrentPulsarListenerContainerFactory<?> pulsarListenerContainerFactory(
containerProperties.setSchemaResolver(schemaResolver);
containerProperties.setTopicResolver(topicResolver);
if (Threading.VIRTUAL.isActive(environment)) {
containerProperties.setConsumerTaskExecutor(new VirtualThreadTaskExecutor());
containerProperties.setConsumerTaskExecutor(new VirtualThreadTaskExecutor(THREADNAME_PULSAR_CONSUMER));
}
this.propertiesMapper.customizeContainerProperties(containerProperties);
return new ConcurrentPulsarListenerContainerFactory<>(pulsarConsumerFactory, containerProperties);
Expand Down Expand Up @@ -189,7 +199,8 @@ DefaultPulsarReaderContainerFactory<?> pulsarReaderContainerFactory(PulsarReader
PulsarReaderContainerProperties readerContainerProperties = new PulsarReaderContainerProperties();
readerContainerProperties.setSchemaResolver(schemaResolver);
if (Threading.VIRTUAL.isActive(environment)) {
readerContainerProperties.setReaderTaskExecutor(new VirtualThreadTaskExecutor());
readerContainerProperties
.setReaderTaskExecutor(new VirtualThreadTaskExecutor(THREADNAME_PULSAR_TASKEXECUTOR));
}
this.propertiesMapper.customizeReaderContainerProperties(readerContainerProperties);
return new DefaultPulsarReaderContainerFactory<>(pulsarReaderFactory, readerContainerProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

import javax.net.ssl.SSLSocketFactory;
Expand Down Expand Up @@ -545,12 +546,36 @@ void testSimpleRabbitListenerContainerFactoryWithCustomSettings() {

@Test
@EnabledForJreRange(min = JRE.JAVA_21)
void shouldConfigureVirtualThreads() {
void shouldConfigureVirtualThreadsForSimpleListener() {
this.contextRunner.withPropertyValues("spring.threads.virtual.enabled=true").run((context) -> {
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = context
.getBean("rabbitListenerContainerFactory", SimpleRabbitListenerContainerFactory.class);
assertThat(rabbitListenerContainerFactory).extracting("taskExecutor")
.isInstanceOf(VirtualThreadTaskExecutor.class);
final var taskExecutor = ReflectionTestUtils.getField(rabbitListenerContainerFactory, "taskExecutor");
final var virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory");
final var threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class));
assertThat(threadCreated.getName())
.containsPattern(RabbitAnnotationDrivenConfiguration.THREADNAME_RABBIT_SIMPLE + "[0-9]*");

});
}

@Test
@EnabledForJreRange(min = JRE.JAVA_21)
void shouldConfigureVirtualThreadsForDirectListener() {
this.contextRunner.withPropertyValues("spring.threads.virtual.enabled=true").run((context) -> {
DirectRabbitListenerContainerFactoryConfigurer rabbitListenerContainerFactory = context.getBean(
"directRabbitListenerContainerFactoryConfigurer",
DirectRabbitListenerContainerFactoryConfigurer.class);
assertThat(rabbitListenerContainerFactory).extracting("taskExecutor")
.isInstanceOf(VirtualThreadTaskExecutor.class);
final var taskExecutor = ReflectionTestUtils.getField(rabbitListenerContainerFactory, "taskExecutor");
final var virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory");
final var threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class));
assertThat(threadCreated.getName())
.containsPattern(RabbitAnnotationDrivenConfiguration.THREADNAME_RABBIT_DIRECT + "[0-9]*");

});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import com.github.benmanes.caffeine.cache.Caffeine;
Expand Down Expand Up @@ -69,6 +70,7 @@
import org.springframework.pulsar.core.ReaderBuilderCustomizer;
import org.springframework.pulsar.core.SchemaResolver;
import org.springframework.pulsar.core.TopicResolver;
import org.springframework.test.util.ReflectionTestUtils;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -506,6 +508,11 @@ void whenVirtualThreadsAreEnabledOnJava21AndLaterListenerContainerShouldUseVirtu
.getBean(ConcurrentPulsarListenerContainerFactory.class);
assertThat(factory.getContainerProperties().getConsumerTaskExecutor())
.isInstanceOf(VirtualThreadTaskExecutor.class);
final var taskExecutor = factory.getContainerProperties().getConsumerTaskExecutor();
final var virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory");
final var threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class));
assertThat(threadCreated.getName())
.containsPattern(PulsarAutoConfiguration.THREADNAME_PULSAR_CONSUMER + "[0-9]*");
});
}

Expand Down Expand Up @@ -561,6 +568,11 @@ void whenVirtualThreadsAreEnabledOnJava21AndLaterReaderShouldUseVirtualThreads()
.getBean(DefaultPulsarReaderContainerFactory.class);
assertThat(factory.getContainerProperties().getReaderTaskExecutor())
.isInstanceOf(VirtualThreadTaskExecutor.class);
final var taskExecutor = factory.getContainerProperties().getReaderTaskExecutor();
final var virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory");
final var threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class));
assertThat(threadCreated.getName())
.containsPattern(PulsarAutoConfiguration.THREADNAME_PULSAR_TASKEXECUTOR + "[0-9]*");
});
}

Expand Down

0 comments on commit ecda754

Please sign in to comment.