Skip to content

Commit

Permalink
Merge pull request #71 from grusski/bugfix/fix-for-messageplaybook-fe…
Browse files Browse the repository at this point in the history
…ature

Fix for Message Books feature
  • Loading branch information
patschuh authored Jul 29, 2024
2 parents ae96ddf + c81794e commit 81cde14
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 16 deletions.
4 changes: 4 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,8 @@ task versionFile() {
}
}

test {
useJUnit()
}

tasks.named('processResources') { dependsOn('versionFile') }
19 changes: 10 additions & 9 deletions src/main/java/at/esque/kafka/Controller.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@
import at.esque.kafka.topics.CreateTopicController;
import at.esque.kafka.topics.DescribeTopicController;
import at.esque.kafka.topics.DescribeTopicWrapper;
import at.esque.kafka.topics.KafkaMessagBookWrapper;
import at.esque.kafka.topics.model.KafkaMessageBookWrapper;
import at.esque.kafka.topics.KafkaMessage;
import at.esque.kafka.topics.model.KafkaMessageForMessageBook;
import at.esque.kafka.topics.metadata.MessageMetaData;
import at.esque.kafka.topics.metadata.NumericMetadata;
import at.esque.kafka.topics.metadata.StringMetadata;
Expand Down Expand Up @@ -1478,7 +1479,7 @@ public void playMessageBook(ActionEvent event) {
backGroundTaskHolder.setIsInProgress(true);
List<File> listedFiles = Arrays.asList(Objects.requireNonNull(selectedFolder.listFiles()));
Map<String, String> replacementMap = new HashMap<>();
List<KafkaMessagBookWrapper> messagesToSend = new ArrayList<>();
List<KafkaMessageBookWrapper> messagesToSend = new ArrayList<>();
Platform.runLater(() -> backGroundTaskHolder.setBackGroundTaskDescription("Playing Message Book: scanning messages"));
listedFiles.forEach(file -> {
if (!topicListView.getBaseList().contains(file.getName())) {
Expand All @@ -1492,7 +1493,7 @@ public void playMessageBook(ActionEvent event) {
applyReplacements(messagesToSend, replacementMap);
Platform.runLater(() -> backGroundTaskHolder.setBackGroundTaskDescription("Playing Message Book: producing messages"));
AtomicInteger counter = new AtomicInteger(0);
messagesToSend.stream().sorted(Comparator.comparing(KafkaMessagBookWrapper::getTimestamp, Comparator.nullsLast(Comparator.naturalOrder())))
messagesToSend.stream().sorted(Comparator.comparing(KafkaMessageBookWrapper::getTimestamp, Comparator.nullsLast(Comparator.naturalOrder())))
.forEach(message -> {
try {
UUID producerId = topicToProducerMap.computeIfAbsent(message.getTargetTopic(), targetTopic -> {
Expand Down Expand Up @@ -1521,26 +1522,26 @@ public void playMessageBook(ActionEvent event) {
}
}

private void applyReplacements(List<KafkaMessagBookWrapper> messagesToSend, Map<String, String> replacementMap) {
private void applyReplacements(List<KafkaMessageBookWrapper> messagesToSend, Map<String, String> replacementMap) {
messagesToSend.forEach(messageToSend -> replacementMap.forEach((key, value) -> {
messageToSend.setKey(messageToSend.getKey().replace(key, value));
messageToSend.setValue(messageToSend.getValue().replace(key, value));
}));
}

private void addMessagesToSend(List<KafkaMessagBookWrapper> messagesToSend, File playFile) {
void addMessagesToSend(List<KafkaMessageBookWrapper> messagesToSend, File playFile) {
try {
List<KafkaMessage> messages = new CsvToBeanBuilder<KafkaMessage>(new FileReader(playFile.getAbsolutePath()))
.withType(KafkaMessage.class)
List<KafkaMessageForMessageBook> messages = new CsvToBeanBuilder<KafkaMessageForMessageBook>(new FileReader(playFile.getAbsolutePath()))
.withType(KafkaMessageForMessageBook.class)
.build().parse();
messagesToSend.addAll(messages.stream().map(message -> new KafkaMessagBookWrapper(playFile.getName(), message))
messagesToSend.addAll(messages.stream().map(message -> new KafkaMessageBookWrapper(playFile.getName(), message))
.toList());
} catch (FileNotFoundException e) {
Platform.runLater(() -> ErrorAlert.show(e, controlledStage));
}
}

private void addReplacementEntries(Map<String, String> replacementMap, KafkaMessagBookWrapper message) {
private void addReplacementEntries(Map<String, String> replacementMap, KafkaMessageBookWrapper message) {
addReplacementEntries(replacementMap, message.getKey());
addReplacementEntries(replacementMap, message.getValue());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package at.esque.kafka.topics;
package at.esque.kafka.topics.model;

public class KafkaMessagBookWrapper {
public class KafkaMessageBookWrapper {

private String targetTopic;
private KafkaMessage wrappedMessage;
private KafkaMessageForMessageBook wrappedMessage;

public KafkaMessagBookWrapper(String targetTopic, KafkaMessage wrappedMessage) {
public KafkaMessageBookWrapper(String targetTopic, KafkaMessageForMessageBook wrappedMessage) {
this.targetTopic = targetTopic;
this.wrappedMessage = wrappedMessage;
}
Expand All @@ -14,7 +14,7 @@ public String getTargetTopic() {
return targetTopic;
}

public KafkaMessage getWrappedMessage() {
public KafkaMessageForMessageBook getWrappedMessage() {
return wrappedMessage;
}

Expand Down Expand Up @@ -57,6 +57,4 @@ public int getPartition() {
public String getTimestamp() {
return wrappedMessage.getTimestamp();
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package at.esque.kafka.topics.model;


public class KafkaMessageForMessageBook {

private int partition;
private String key;
private String keyType;
private String value;
private String valueType;
private String timestamp;

public int getPartition() {
return partition;
}

public void setPartition(int partition) {
this.partition = partition;
}

public String getKey() {
return key;
}

public void setKey(String key) {
this.key = key;
}

public String getKeyType() {
return keyType;
}

public void setKeyType(String keyType) {
this.keyType = keyType;
}

public String getValue() {
return value;
}

public void setValue(String value) {
this.value = value;
}

public String getValueType() {
return valueType;
}

public void setValueType(String valueType) {
this.valueType = valueType;
}

public String getTimestamp() {
return timestamp;
}

public void setTimestamp(String timestamp) {
this.timestamp = timestamp;
}
}
50 changes: 50 additions & 0 deletions src/test/java/at/esque/kafka/ControllerTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package at.esque.kafka;

import at.esque.kafka.topics.model.KafkaMessageBookWrapper;
import at.esque.kafka.topics.model.KafkaMessageForMessageBook;
import org.junit.Test;
import static org.junit.Assert.*;


import java.io.File;
import java.util.ArrayList;
import java.util.List;

public class ControllerTest {

@Test
public void testAddMessagesToSend() {
// Given
List<KafkaMessageBookWrapper> messagesToSend = new ArrayList<>();
File playFile = new File("src/test/resources/csv_testfiles/testAddMessagesToSend.csv");

KafkaMessageForMessageBook message1Expected = new KafkaMessageForMessageBook();
message1Expected.setKey("${value1:UUID}");
message1Expected.setPartition(-1);
message1Expected.setTimestamp("2022-11-06T17:11:52.919Z");
message1Expected.setValue("{\"version\":0,\"newId\":\"${value1:UUID}\"}");

KafkaMessageForMessageBook message2Expected = new KafkaMessageForMessageBook();
message2Expected.setKey("${value2:UUID}");
message2Expected.setPartition(-1);
message2Expected.setTimestamp("2022-11-06T17:13:52.919Z");
message2Expected.setValue("{\"version\":0,\"newId\":\"${value2:UUID}\"}");

// When
Controller controller = new Controller();
controller.addMessagesToSend(messagesToSend, playFile);

// Then
assertEquals(2, messagesToSend.size());

assertEquals(message1Expected.getPartition(), messagesToSend.get(0).getWrappedMessage().getPartition());
assertEquals(message1Expected.getKey(), messagesToSend.get(0).getWrappedMessage().getKey());
assertEquals(message1Expected.getTimestamp(), messagesToSend.get(0).getWrappedMessage().getTimestamp());
assertEquals(message1Expected.getValue(), messagesToSend.get(0).getWrappedMessage().getValue());

assertEquals(message2Expected.getPartition(), messagesToSend.get(1).getWrappedMessage().getPartition());
assertEquals(message2Expected.getKey(), messagesToSend.get(1).getWrappedMessage().getKey());
assertEquals(message2Expected.getTimestamp(), messagesToSend.get(1).getWrappedMessage().getTimestamp());
assertEquals(message2Expected.getValue(), messagesToSend.get(1).getWrappedMessage().getValue());
}
}
3 changes: 3 additions & 0 deletions src/test/resources/csv_testfiles/testAddMessagesToSend.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"key","partition","timestamp","value"
"${value1:UUID}","-1","2022-11-06T17:11:52.919Z","{""version"":0,""newId"":""${value1:UUID}""}"
"${value2:UUID}","-1","2022-11-06T17:13:52.919Z","{""version"":0,""newId"":""${value2:UUID}""}"

0 comments on commit 81cde14

Please sign in to comment.