Skip to content

Commit

Permalink
Add Message Search Functionality (#288)
Browse files Browse the repository at this point in the history
* Added search message page

* Overhaul search functionality protoype.  Improved performance by not reading the entire topic into memory and having a safetly limit on the number of messages to read befor the search aborts.  Added a maximum results count.  Added a date filter to allow user to specify which timestamp to start searching from.  Added better output.

* Fix bad merge

* Remove redundant class

* Remove redundant file

* Make pom customizable for our needs

* Fix formatting and input a bit

* Update search message page card layout

* Standardize code formatting

* Addressed compilation errors and refactored

Refactored to increase maintainability

* Removed impossible end state REACHED_END_OF_TIMESPAN

---------

Co-authored-by: pedro lastra <[email protected]>
Co-authored-by: Nathan Daniels <[email protected]>
Co-authored-by: Bert Roos <[email protected]>
Co-authored-by: Bert Roos <[email protected]>
  • Loading branch information
5 people authored Jul 19, 2023
1 parent 92bc60a commit ed9d1cd
Show file tree
Hide file tree
Showing 11 changed files with 654 additions and 36 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ settings.xml
kafka.properties*
kafka.truststore.jks*
kafka.keystore.jks*
/.vs
67 changes: 65 additions & 2 deletions src/main/java/kafdrop/controller/MessageController.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import kafdrop.config.MessageFormatConfiguration.MessageFormatProperties;
import kafdrop.config.ProtobufDescriptorConfiguration.ProtobufDescriptorProperties;
import kafdrop.config.SchemaRegistryConfiguration.SchemaRegistryProperties;
import kafdrop.form.SearchMessageForm;
import kafdrop.model.MessageVO;
import kafdrop.model.TopicPartitionVO;
import kafdrop.model.TopicVO;
Expand Down Expand Up @@ -59,6 +60,7 @@
import java.io.File;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.List;

@Tag(name = "message-controller", description = "Message Controller")
Expand Down Expand Up @@ -193,10 +195,71 @@ public String viewMessageForm(@PathVariable("name") String topicName,
return "message-inspector";
}

/**
* Human friendly view of searching messages.
*
* @param topicName Name of topic
* @param searchMessageForm Message form for submitting requests to search messages.
* @param errors
* @param model
* @return View for seeing messages in a partition.
*/
@GetMapping("/topic/{name:.+}/search-messages")
public String searchMessageForm(@PathVariable("name") String topicName,
@Valid @ModelAttribute("searchMessageForm") SearchMessageForm searchMessageForm,
BindingResult errors,
Model model) {
final MessageFormat defaultFormat = messageFormatProperties.getFormat();
final MessageFormat defaultKeyFormat = messageFormatProperties.getKeyFormat();

if (searchMessageForm.isEmpty()) {
final SearchMessageForm defaultForm = new SearchMessageForm();

defaultForm.setSearchText("");
defaultForm.setFormat(defaultFormat);
defaultForm.setKeyFormat(defaultFormat);
defaultForm.setMaximumCount(100);
defaultForm.setStartTimestamp(new Date(0));
model.addAttribute("searchMessageForm", defaultForm);
}

final TopicVO topic = kafkaMonitor.getTopic(topicName)
.orElseThrow(() -> new TopicNotFoundException(topicName));

model.addAttribute("topic", topic);
model.addAttribute("defaultFormat", defaultFormat);
model.addAttribute("messageFormats", MessageFormat.values());
model.addAttribute("defaultKeyFormat", defaultKeyFormat);
model.addAttribute("keyFormats", KeyFormat.values());
model.addAttribute("descFiles", protobufProperties.getDescFilesList());

if (!searchMessageForm.isEmpty() && !errors.hasErrors()) {

final var deserializers = new Deserializers(
getDeserializer(topicName, searchMessageForm.getKeyFormat(), searchMessageForm.getDescFile(),
searchMessageForm.getMsgTypeName(),
protobufProperties.getParseAnyProto()),
getDeserializer(topicName, searchMessageForm.getFormat(), searchMessageForm.getDescFile(),
searchMessageForm.getMsgTypeName(),
protobufProperties.getParseAnyProto())
);

var searchResults = kafkaMonitor.searchMessages(
topicName,
searchMessageForm.getSearchText(),
searchMessageForm.getMaximumCount(),
searchMessageForm.getStartTimestamp(),
deserializers);

model.addAttribute("messages", searchResults.getMessages());
model.addAttribute("details", searchResults.getCompletionDetails());
}

return "search-message";
}

/**
* Returns the selected nessagr format based on the
* form submission
* Returns the selected message format based on the form submission
*
* @param format String representation of format name
* @return
Expand Down
106 changes: 106 additions & 0 deletions src/main/java/kafdrop/form/SearchMessageForm.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package kafdrop.form;

import com.fasterxml.jackson.annotation.JsonIgnore;
import jakarta.validation.constraints.Max;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import kafdrop.util.MessageFormat;
import org.springframework.format.annotation.DateTimeFormat;

import java.util.Date;

public class SearchMessageForm {

@NotBlank
private String searchText;

@NotNull
@Min(1)
@Max(1000)
private Integer maximumCount;

private MessageFormat format;

private MessageFormat keyFormat;

private String descFile;

private String msgTypeName;

@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss.SSS")
private Date startTimestamp;

public SearchMessageForm(String searchText, MessageFormat format) {
this.searchText = searchText;
this.format = format;
}

public Date getStartTimestamp() {
return startTimestamp;
}

public void setStartTimestamp(Date startTimestamp) {
this.startTimestamp = startTimestamp;
}

public SearchMessageForm(String searchText) {
this(searchText, MessageFormat.DEFAULT);
}

public SearchMessageForm() {
}

@JsonIgnore
public boolean isEmpty() {
return searchText == null || searchText.isEmpty();
}

public String getSearchText() {
return searchText;
}

public void setSearchText(String searchText) {
this.searchText = searchText;
}

public Integer getMaximumCount() {
return maximumCount;
}

public void setMaximumCount(Integer maximumCount) {
this.maximumCount = maximumCount;
}

public MessageFormat getKeyFormat() {
return keyFormat;
}

public void setKeyFormat(MessageFormat keyFormat) {
this.keyFormat = keyFormat;
}

public MessageFormat getFormat() {
return format;
}

public void setFormat(MessageFormat format) {
this.format = format;
}

public String getDescFile() {
return descFile;
}

public void setDescFile(String descFile) {
this.descFile = descFile;
}

public String getMsgTypeName() {
return msgTypeName;
}

public void setMsgTypeName(String msgTypeName) {
this.msgTypeName = msgTypeName;
}
}
43 changes: 43 additions & 0 deletions src/main/java/kafdrop/model/SearchResultsVO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2021 Kafdrop contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/

package kafdrop.model;

import java.util.List;

public final class SearchResultsVO {
private List<MessageVO> messages;

private String completionDetails;

public List<MessageVO> getMessages() {
return messages;
}

public String getCompletionDetails() {
return completionDetails;
}

public void setCompletionDetails(String completionDetails) {
this.completionDetails = completionDetails;
}

public void setMessages(List<MessageVO> messages) {
this.messages = messages;
}
}
Loading

0 comments on commit ed9d1cd

Please sign in to comment.