Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/main/java/kafdrop/config/ObjectMapperConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package kafdrop.config;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;

import java.util.TimeZone;

@Configuration
public class ObjectMapperConfig {

@Bean
public ObjectMapper objectMapper(Jackson2ObjectMapperBuilder builder) {
return builder
.timeZone(TimeZone.getDefault())
.build();
}
}
162 changes: 130 additions & 32 deletions src/main/java/kafdrop/controller/MessageController.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@
import kafdrop.config.ProtobufDescriptorConfiguration.ProtobufDescriptorProperties;
import kafdrop.config.SchemaRegistryConfiguration.SchemaRegistryProperties;
import kafdrop.form.SearchMessageForm;
import kafdrop.form.SearchMessageFormForJson;
import kafdrop.model.CreateMessageVO;
import kafdrop.model.MessageVO;
import kafdrop.model.SearchResultsVO;
import kafdrop.model.TopicPartitionVO;
import kafdrop.model.TopicVO;
import kafdrop.service.KafkaMonitor;
Expand All @@ -52,29 +55,28 @@
import kafdrop.util.ProtobufMessageDeserializer;
import kafdrop.util.ProtobufMessageSerializer;
import kafdrop.util.ProtobufSchemaRegistryMessageDeserializer;

import java.io.File;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;


import kafdrop.util.Serializers;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.validation.BindingResult;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ModelAttribute;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.server.ResponseStatusException;

import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Date;

import org.springframework.web.bind.annotation.PostMapping;
import kafdrop.model.CreateMessageVO;
import java.util.List;

@Tag(name = "message-controller", description = "Message Controller")
@Controller
Expand Down Expand Up @@ -123,11 +125,45 @@ public String viewAllMessages(@PathVariable("name") String topicName,
model.addAttribute("keyFormats", KeyFormat.values());
model.addAttribute("descFiles", protobufProperties.getDescFilesList());

model.addAttribute("messages", getMessages(topicName, defaultKeyFormat, defaultFormat, topic, size));

return "topic-messages";
}

/**
* JSON array of reading all topic messages sorted by timestamp.
*
* @param topicName Name of topic
* @param count Count of messages
* @return JSON array for seeing all messages in a topic sorted by timestamp.
*/
@Operation(summary = "getAllMessages", description = "Get all messages from topic")
@ApiResponses(value = {
@ApiResponse(responseCode = "200", description = "Success"),
@ApiResponse(responseCode = "404", description = "Invalid topic name")
})
@GetMapping(value = "/topic/{name:.+}/allmessages", produces = MediaType.APPLICATION_JSON_VALUE)
@ResponseBody
public List<MessageVO> getAllMessages(@PathVariable("name") String topicName,
@RequestParam(name = "count", required = false) Integer count) {
final int size = (count != null ? count : 100);
final MessageFormat defaultFormat = messageFormatProperties.getFormat();
final MessageFormat defaultKeyFormat = messageFormatProperties.getKeyFormat();
final TopicVO topic = kafkaMonitor.getTopic(topicName)
.orElseThrow(() -> new TopicNotFoundException(topicName));

return getMessages(topicName, defaultKeyFormat, defaultFormat, topic, size);
}

private @org.jetbrains.annotations.NotNull List<MessageVO> getMessages(String topicName,
MessageFormat defaultKeyFormat,
MessageFormat defaultFormat, TopicVO topic,
int size) {
final var deserializers = new Deserializers(
getDeserializer(topicName, defaultKeyFormat, "", "", protobufProperties.getParseAnyProto()),
getDeserializer(topicName, defaultFormat, "", "", protobufProperties.getParseAnyProto()));

final List<MessageVO> messages = messageInspector.getMessages(topicName, size, deserializers);
final List<MessageVO> messages = new ArrayList<>();

for (TopicPartitionVO partition : topic.getPartitions()) {
messages.addAll(messageInspector.getMessages(topicName,
Expand All @@ -138,9 +174,7 @@ public String viewAllMessages(@PathVariable("name") String topicName,
}

messages.sort(Comparator.comparing(MessageVO::getTimestamp));
model.addAttribute("messages", messages);

return "topic-messages";
return messages;
}

/**
Expand Down Expand Up @@ -303,23 +337,8 @@ public String searchMessageForm(@PathVariable("name") String topicName,
model.addAttribute("descFiles", protobufProperties.getDescFilesList());

if (!searchMessageForm.isEmpty() && !errors.hasErrors()) {

final var deserializers = new Deserializers(
getDeserializer(topicName, searchMessageForm.getKeyFormat(), searchMessageForm.getDescFile(),
searchMessageForm.getMsgTypeName(),
protobufProperties.getParseAnyProto()),
getDeserializer(topicName, searchMessageForm.getFormat(), searchMessageForm.getDescFile(),
searchMessageForm.getMsgTypeName(),
protobufProperties.getParseAnyProto())
);

var searchResults = kafkaMonitor.searchMessages(
topicName,
searchMessageForm.getSearchText(),
searchMessageForm.getPartition(),
searchMessageForm.getMaximumCount(),
searchMessageForm.getStartTimestamp(),
deserializers);
final var searchResults = searchMessageVOs(topicName, searchMessageForm, searchMessageForm.getDescFile(),
searchMessageForm.getMsgTypeName());

model.addAttribute("messages", searchResults.getMessages());
model.addAttribute("details", searchResults.getCompletionDetails());
Expand All @@ -328,6 +347,86 @@ public String searchMessageForm(@PathVariable("name") String topicName,
return "search-message";
}

private SearchResultsVO searchMessageVOs(String topicName, SearchMessageFormForJson searchMessageForm,
String descFile,
String msgTypeName) {
final var deserializers = new Deserializers(
getDeserializer(topicName, searchMessageForm.getKeyFormat(), descFile,
msgTypeName,
protobufProperties.getParseAnyProto()),
getDeserializer(topicName, searchMessageForm.getFormat(), descFile,
msgTypeName,
protobufProperties.getParseAnyProto())
);

return kafkaMonitor.searchMessages(
topicName,
searchMessageForm.getSearchText(),
searchMessageForm.getPartition(),
searchMessageForm.getMaximumCount(),
searchMessageForm.getStartTimestamp(),
deserializers);
}

/**
* Searches for messages in a specific topic based on criteria provided in the request body.
* This endpoint expects a POST request with a JSON payload.
*
* @param topicName The name of the topic to search within.
* @param searchMessageForm A JSON object in the request body containing the search criteria. All fields are optional.
* <ul>
* <li><b>searchText</b>: Text to search for in the message payload. (Default: "")</li>
* <li><b>maximumCount</b>: Maximum number of messages to return. (Default: 1000)</li>
* <li><b>partition</b>: Specific partition to search in. (Default: -1 for all
* partitions)</li>
* <li><b>format</b>: Deserialization format for the message value. (Default: DEFAULT)
* </li>
* <li><b>keyFormat</b>: Deserialization format for the message key. (Default:
* DEFAULT)</li>
* <li><b>startTimestamp</b>: Start timestamp in ISO 8601 UTC format. (Example: {@code
* 1970-01-01T00:00:00.000Z})</li>
* <li><b>keys</b>: An array of message keys to filter by. (Example: {@code ["key1",
* "key2"]})</li>
* </ul>
* @param errors BindingResult for validation, automatically populated by Spring.
* @return A {@link SearchResultsVO} object containing the found messages (sorted by timestamp) and search
* completion details.
*/

@Operation(summary = "searchMessages", description = "Search messages and return results as JSON")
@ApiResponses(value = {
@ApiResponse(responseCode = "200", description = "Success"),
@ApiResponse(responseCode = "400", description = "Body has validation errors"),
@ApiResponse(responseCode = "404", description = "Invalid topic name")
})
@PostMapping(value = "/topic/{name:.+}/search-messages",
produces = MediaType.APPLICATION_JSON_VALUE,
consumes = MediaType.APPLICATION_JSON_VALUE)
@ResponseBody
public SearchResultsVO searchMessages(@PathVariable("name") String topicName,
@Valid @RequestBody SearchMessageFormForJson searchMessageForm,
BindingResult errors) {

if (errors.hasErrors()) throw new ResponseStatusException(HttpStatus.BAD_REQUEST, errors.getAllErrors().toString());

kafkaMonitor.getTopic(topicName)
.orElseThrow(() -> new TopicNotFoundException(topicName));

final var searchResultsVO = searchMessageVOs(topicName, searchMessageForm, null, null);

if (searchMessageForm.getKeys() != null) {
var filteredByKeyMessages = searchResultsVO.getMessages().stream()
.filter(
messageVO -> Arrays.asList(searchMessageForm.getKeys()).contains(messageVO.getKey()))
.sorted(Comparator.comparing(MessageVO::getTimestamp))
.toList();

searchResultsVO.setMessages(filteredByKeyMessages);
}

return searchResultsVO;
}

/**
* Returns the selected message format based on the form submission
*
Expand Down Expand Up @@ -588,6 +687,5 @@ public Boolean getIsAnyProto() {
public void setIsAnyProto(Boolean isAnyProto) {
this.isAnyProto = isAnyProto;
}

}
}
74 changes: 2 additions & 72 deletions src/main/java/kafdrop/form/SearchMessageForm.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,40 +10,14 @@

import java.util.Date;

public class SearchMessageForm {

@NotBlank
private String searchText;

@NotNull
@Min(1)
@Max(1000)
private Integer maximumCount;

private Integer partition;

private MessageFormat format;

private MessageFormat keyFormat;
public class SearchMessageForm extends SearchMessageFormForJson {

private String descFile;

private String msgTypeName;

@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss.SSS")
private Date startTimestamp;

public SearchMessageForm(String searchText, MessageFormat format) {
this.searchText = searchText;
this.format = format;
}

public Date getStartTimestamp() {
return startTimestamp;
}

public void setStartTimestamp(Date startTimestamp) {
this.startTimestamp = startTimestamp;
super (searchText, format);
}

public SearchMessageForm(String searchText) {
Expand All @@ -53,43 +27,6 @@ public SearchMessageForm(String searchText) {
public SearchMessageForm() {
}

@JsonIgnore
public boolean isEmpty() {
return searchText == null || searchText.isEmpty();
}

public String getSearchText() {
return searchText;
}

public void setSearchText(String searchText) {
this.searchText = searchText;
}

public Integer getMaximumCount() {
return maximumCount;
}

public void setMaximumCount(Integer maximumCount) {
this.maximumCount = maximumCount;
}

public MessageFormat getKeyFormat() {
return keyFormat;
}

public void setKeyFormat(MessageFormat keyFormat) {
this.keyFormat = keyFormat;
}

public MessageFormat getFormat() {
return format;
}

public void setFormat(MessageFormat format) {
this.format = format;
}

public String getDescFile() {
return descFile;
}
Expand All @@ -106,11 +43,4 @@ public void setMsgTypeName(String msgTypeName) {
this.msgTypeName = msgTypeName;
}

public Integer getPartition() {
return partition;
}

public void setPartition(Integer partition) {
this.partition = partition;
}
}
Loading