Skip to content

Commit

Permalink
Add spring pulsar tx example
Browse files Browse the repository at this point in the history
  • Loading branch information
eddumelendez committed Oct 3, 2024
1 parent c425d2e commit 89b1ff3
Showing 1 changed file with 69 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.example.pulsar;

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.context.annotation.Bean;
import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.listener.AckMode;
import org.testcontainers.containers.PulsarContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.waitAtMost;

@SpringBootTest(properties = "spring.pulsar.transaction.enabled=true")
@Testcontainers
class SpringBootPulsarTxApplicationTests {

@Container
@ServiceConnection
static PulsarContainer pulsar = new PulsarContainer(DockerImageName.parse("apachepulsar/pulsar:3.1.0"))
.withTransactions();

@Autowired
private PulsarTemplate<String> pulsarTemplate;

@Autowired
private TestListener testListener;

@Test
void consumeMessage() {
this.pulsarTemplate.send("test", "test-data");

waitAtMost(Duration.ofSeconds(30)).untilAsserted(() -> {
assertThat(this.testListener.messages).hasSize(1);
});
}

@TestConfiguration
static class Config {

@Bean
TestListener testListener() {
return new TestListener();
}

}

static class TestListener {

private final List<String> messages = new ArrayList<>();

@PulsarListener(topics = "test", ackMode = AckMode.RECORD)
void listen(String data) {
this.messages.add(data);
}

}

}

0 comments on commit 89b1ff3

Please sign in to comment.