diff --git a/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/core/subscriber/PubSubSubscriberTemplate.java b/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/core/subscriber/PubSubSubscriberTemplate.java index 29b3e40566..6a1ac7ab75 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/core/subscriber/PubSubSubscriberTemplate.java +++ b/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/core/subscriber/PubSubSubscriberTemplate.java @@ -281,8 +281,12 @@ public CompletableFuture>> pul completableFuture.completeExceptionally(exception); return; } - completableFuture.complete( - this.toConvertedAcknowledgeablePubsubMessages(payloadType, ackableMessages)); + try { + completableFuture.complete(this.toConvertedAcknowledgeablePubsubMessages(payloadType, ackableMessages)); + } catch (Exception e) { + completableFuture.completeExceptionally(e); + } + }); return completableFuture; diff --git a/spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java b/spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java index 1f741fffa7..18f03bae96 100644 --- a/spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java +++ b/spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java @@ -17,6 +17,7 @@ package com.google.cloud.spring.pubsub.core.subscriber; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.doAnswer; @@ -59,8 +60,10 @@ import java.util.concurrent.TimeoutException; import java.util.function.BiConsumer; import java.util.function.Consumer; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; import org.mockito.Captor; @@ -530,6 +533,17 @@ void testPullAndConvertAsync() assertThat(result.get(0).getProjectSubscriptionName().getSubscription()).isEqualTo("sub2"); } + @Test + @Timeout(value = 5, unit = TimeUnit.SECONDS) + void testPullAndConvertAsync_publishesConvertException() { + when(this.messageConverter.fromPubSubMessage(this.pubsubMessage, BigInteger.class)).thenThrow(new NullPointerException()); + CompletableFuture>> asyncResult = + this.pubSubSubscriberTemplate.pullAndConvertAsync("sub2", 1, true, BigInteger.class); + + ExecutionException e = assertThrows(ExecutionException.class, asyncResult::get); + assertThat(e.getCause()).isInstanceOf(NullPointerException.class); + } + @Test void testPullNext() {