Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrap long lines #508

Merged
merged 1 commit into from
Apr 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/linters/.ecrc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"Exclude": [
".editorconfig",
".github",
"CONTRIBUTING.md",
"LICENSE",
"README.md",
"chart",
Expand All @@ -31,6 +32,6 @@
"IndentSize": true,
"InsertFinalNewline": false,
"TrimTrailingWhitespace": false,
"MaxLineLength": true
"MaxLineLength": false
}
}
6 changes: 4 additions & 2 deletions src/main/java/kafdrop/Kafdrop.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ public void configureContentNegotiation(ContentNegotiationConfigurer configurer)
};
}

private static final class LoggingConfigurationListener implements ApplicationListener<ApplicationEnvironmentPreparedEvent>, Ordered {
private static final class LoggingConfigurationListener
implements ApplicationListener<ApplicationEnvironmentPreparedEvent>, Ordered {
private static final String PROP_LOGGING_FILE = "logging.file";
private static final String PROP_LOGGER = "LOGGER";
private static final String PROP_SPRING_BOOT_LOG_LEVEL = "logging.level.org.springframework.boot";
Expand Down Expand Up @@ -130,7 +131,8 @@ public void onApplicationEvent(ApplicationEnvironmentPreparedEvent event) {
}
}

private static final class EnvironmentSetupListener implements ApplicationListener<ApplicationEnvironmentPreparedEvent>, Ordered {
private static final class EnvironmentSetupListener
implements ApplicationListener<ApplicationEnvironmentPreparedEvent>, Ordered {
private static final String SM_CONFIG_DIR = "sm.config.dir";
private static final String CONFIG_SUFFIX = "-config.ini";

Expand Down
6 changes: 4 additions & 2 deletions src/main/java/kafdrop/config/CorsConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ public class CorsConfiguration {
@Value("${cors.allowCredentials:true}")
private String corsAllowCredentials;

@Value("${cors.allowHeaders:Origin,Accept,X-Requested-With,Content-Type,Access-Control-Request-Method,Access-Control-Request-Headers,Authorization}")
@Value("${cors.allowHeaders:Origin,Accept,X-Requested-With,Content-Type,Access-Control-Request-Method," +
"Access-Control-Request-Headers,Authorization}")
private String corsAllowHeaders;

@Bean
Expand All @@ -81,7 +82,8 @@ public void init(FilterConfig filterConfig) {
}

@Override
public void doFilter(ServletRequest req, ServletResponse res, FilterChain chain) throws IOException, ServletException {
public void doFilter(ServletRequest req, ServletResponse res, FilterChain chain) throws IOException,
ServletException {
final var response = (HttpServletResponse) res;
final var request = (HttpServletRequest) req;

Expand Down
3 changes: 2 additions & 1 deletion src/main/java/kafdrop/config/InterceptorConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public void addInterceptors(InterceptorRegistry registry) {

public class ProfileHandlerInterceptor implements AsyncHandlerInterceptor {
@Override
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) {
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler,
ModelAndView modelAndView) {
final var activeProfiles = environment.getActiveProfiles();
if (modelAndView != null && activeProfiles != null && activeProfiles.length > 0) {
modelAndView.addObject("profile", String.join(",", activeProfiles));
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/kafdrop/controller/BrokerController.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public String brokerDetails(@PathVariable("id") int brokerId, Model model) {
})
@GetMapping(path = "/broker/{id}", produces = MediaType.APPLICATION_JSON_VALUE)
public @ResponseBody BrokerVO brokerDetailsJson(@PathVariable("id") int brokerId) {
return kafkaMonitor.getBroker(brokerId).orElseThrow(() -> new BrokerNotFoundException("No such broker " + brokerId));
return kafkaMonitor.getBroker(brokerId).orElseThrow(() ->
new BrokerNotFoundException("No such broker " + brokerId));
}

@Operation(summary = "getAllBrokers", description = "Get details for all known Kafka brokers")
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/kafdrop/controller/ClusterController.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public final class ClusterController {

private final boolean topicCreateEnabled;

public ClusterController(KafkaConfiguration kafkaConfiguration, KafkaMonitor kafkaMonitor, ObjectProvider<BuildInfo> buildInfoProvider,
public ClusterController(KafkaConfiguration kafkaConfiguration, KafkaMonitor kafkaMonitor,
ObjectProvider<BuildInfo> buildInfoProvider,
@Value("${topic.createEnabled:true}") Boolean topicCreateEnabled) {
this.kafkaConfiguration = kafkaConfiguration;
this.kafkaMonitor = kafkaMonitor;
Expand Down Expand Up @@ -104,7 +105,8 @@ public String clusterInfo(Model model,
return "cluster-overview";
}

@Operation(summary = "getCluster", description = "Get high level broker, topic, and partition data for the Kafka cluster")
@Operation(summary = "getCluster", description = "Get high level broker, topic, and partition data for the Kafka " +
"cluster")
@ApiResponses(value = {
@ApiResponse(responseCode = "200", description = "Success")
})
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/kafdrop/controller/ConsumerController.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public String consumerDetail(@PathVariable("groupId") String groupId, Model mode
@ApiResponse(responseCode = "404", description = "Invalid consumer group")
})
@GetMapping(path = "/{groupId:.+}", produces = MediaType.APPLICATION_JSON_VALUE)
public @ResponseBody ConsumerVO getConsumer(@PathVariable("groupId") String groupId) throws ConsumerNotFoundException {
public @ResponseBody ConsumerVO getConsumer(@PathVariable("groupId") String groupId)
throws ConsumerNotFoundException {
final var consumer = kafkaMonitor.getConsumersByGroup(groupId).stream().findAny();

return consumer.orElseThrow(() -> new ConsumerNotFoundException(groupId));
Expand Down
20 changes: 14 additions & 6 deletions src/main/java/kafdrop/controller/MessageController.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ public final class MessageController {

private final ProtobufDescriptorProperties protobufProperties;

public MessageController(KafkaMonitor kafkaMonitor, MessageInspector messageInspector, MessageFormatProperties messageFormatProperties, SchemaRegistryProperties schemaRegistryProperties, ProtobufDescriptorProperties protobufProperties) {
public MessageController(KafkaMonitor kafkaMonitor, MessageInspector messageInspector,
MessageFormatProperties messageFormatProperties,
SchemaRegistryProperties schemaRegistryProperties,
ProtobufDescriptorProperties protobufProperties) {
this.kafkaMonitor = kafkaMonitor;
this.messageInspector = messageInspector;
this.messageFormatProperties = messageFormatProperties;
Expand Down Expand Up @@ -172,8 +175,10 @@ public String viewMessageForm(@PathVariable("name") String topicName,
if (!messageForm.isEmpty() && !errors.hasErrors()) {

final var deserializers = new Deserializers(
getDeserializer(topicName, messageForm.getKeyFormat(), messageForm.getDescFile(), messageForm.getMsgTypeName(), messageForm.getIsAnyProto()),
getDeserializer(topicName, messageForm.getFormat(), messageForm.getDescFile(), messageForm.getMsgTypeName(), messageForm.getIsAnyProto())
getDeserializer(topicName, messageForm.getKeyFormat(), messageForm.getDescFile(),
messageForm.getMsgTypeName(), messageForm.getIsAnyProto()),
getDeserializer(topicName, messageForm.getFormat(), messageForm.getDescFile(), messageForm.getMsgTypeName(),
messageForm.getIsAnyProto())
);

model.addAttribute("messages",
Expand Down Expand Up @@ -218,7 +223,8 @@ private MessageFormat getSelectedMessageFormat(String format) {
* @return Offset or message data.
*/
@Operation(summary = "getPartitionOrMessages"
, description = "Get offset or message data for a topic. Without query params returns all partitions with offset data. With query params, returns actual messages (if valid offsets are provided).")
, description = "Get offset or message data for a topic. Without query params returns all partitions with offset " +
"data. With query params, returns actual messages (if valid offsets are provided).")
@ApiResponses(value = {
@ApiResponse(responseCode = "200", description = "Success"),
@ApiResponse(responseCode = "404", description = "Invalid topic name")
Expand All @@ -241,7 +247,8 @@ List<Object> getPartitionOrMessages(
.orElseThrow(() -> new TopicNotFoundException(topicName));

List<Object> partitionList = new ArrayList<>();
topic.getPartitions().forEach(vo -> partitionList.add(new PartitionOffsetInfo(vo.getId(), vo.getFirstOffset(), vo.getSize())));
topic.getPartitions().forEach(vo -> partitionList.add(new PartitionOffsetInfo(vo.getId(), vo.getFirstOffset(),
vo.getSize())));

return partitionList;
} else {
Expand All @@ -266,7 +273,8 @@ List<Object> getPartitionOrMessages(
}
}

private MessageDeserializer getDeserializer(String topicName, MessageFormat format, String descFile, String msgTypeName, boolean isAnyProto) {
private MessageDeserializer getDeserializer(String topicName, MessageFormat format, String descFile,
String msgTypeName, boolean isAnyProto) {
final MessageDeserializer deserializer;

if (format == MessageFormat.AVRO) {
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/kafdrop/model/AclVO.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ public final class AclVO implements Comparable<AclVO> {
private final String operation;
private final String permissionType;

public AclVO(String resourceType, String name, String patternType, String principal, String host, String operation, String permissionType) {
public AclVO(String resourceType, String name, String patternType, String principal, String host, String operation,
String permissionType) {
this.resourceType = resourceType;
this.name = name;
this.patternType = patternType;
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/kafdrop/model/TopicPartitionVO.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ boolean isOffline() {

@Override
public String toString() {
return TopicPartitionVO.class.getSimpleName() + " [id=" + id + ", firstOffset=" + firstOffset + ", size=" + size + "]";
return TopicPartitionVO.class.getSimpleName() + " [id=" + id + ", firstOffset=" + firstOffset + ", size=" + size
+ "]";
}
}
6 changes: 4 additions & 2 deletions src/main/java/kafdrop/service/KafkaHighLevelAdminClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ void deleteTopic(String topic) {
Collection<AclBinding> listAcls() {
final Collection<AclBinding> aclsBindings;
try {
aclsBindings = adminClient.describeAcls(new AclBindingFilter(ResourcePatternFilter.ANY, AccessControlEntryFilter.ANY))
aclsBindings = adminClient.describeAcls(new AclBindingFilter(ResourcePatternFilter.ANY,
AccessControlEntryFilter.ANY))
.values().get();
} catch (InterruptedException | ExecutionException e) {
if (e.getCause() instanceof SecurityDisabledException) {
Expand All @@ -184,7 +185,8 @@ Collection<AclBinding> listAcls() {

private void printAcls() {
try {
final var acls = adminClient.describeAcls(new AclBindingFilter(ResourcePatternFilter.ANY, AccessControlEntryFilter.ANY)).values().get();
final var acls = adminClient.describeAcls(new AclBindingFilter(ResourcePatternFilter.ANY,
AccessControlEntryFilter.ANY)).values().get();
final var newlineDelimitedAcls = new StringBuilder();
for (var acl : acls) {
newlineDelimitedAcls.append('\n').append(acl);
Expand Down
9 changes: 6 additions & 3 deletions src/main/java/kafdrop/service/KafkaHighLevelConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ synchronized void setTopicPartitionSizes(List<TopicVO> topics) {
Long startOffset = beginningOffset.get(topicPartition);
Long endOffset = endOffsets.get(topicPartition);

LOG.debug("topic: {}, partition: {}, startOffset: {}, endOffset: {}", topicPartition.topic(), topicPartition.partition(), startOffset, endOffset);
LOG.debug("topic: {}, partition: {}, startOffset: {}, endOffset: {}", topicPartition.topic(),
topicPartition.partition(), startOffset, endOffset);
p.setFirstOffset(startOffset);
p.setSize(endOffset);
});
Expand Down Expand Up @@ -239,8 +240,10 @@ private TopicVO getTopicInfo(String topic, List<PartitionInfo> partitionInfoList

for (var partitionInfo : partitionInfoList) {
final var topicPartitionVo = new TopicPartitionVO(partitionInfo.partition());
final var inSyncReplicaIds = Arrays.stream(partitionInfo.inSyncReplicas()).map(Node::id).collect(Collectors.toSet());
final var offlineReplicaIds = Arrays.stream(partitionInfo.offlineReplicas()).map(Node::id).collect(Collectors.toSet());
final var inSyncReplicaIds =
Arrays.stream(partitionInfo.inSyncReplicas()).map(Node::id).collect(Collectors.toSet());
final var offlineReplicaIds =
Arrays.stream(partitionInfo.offlineReplicas()).map(Node::id).collect(Collectors.toSet());

for (var node : partitionInfo.replicas()) {
final var isInSync = inSyncReplicaIds.contains(node.id());
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/kafdrop/service/KafkaMonitorImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ public ClusterSummaryVO getClusterSummary(Collection<TopicVO> topics) {
})
.orElseGet(ClusterSummaryVO::new);
topicSummary.setTopicCount(topics.size());
topicSummary.setPreferredReplicaPercent(topics.isEmpty() ? 0 : topicSummary.getPreferredReplicaPercent() / topics.size());
topicSummary.setPreferredReplicaPercent(topics.isEmpty() ? 0 :
topicSummary.getPreferredReplicaPercent() / topics.size());
return topicSummary;
}

Expand Down Expand Up @@ -273,7 +274,8 @@ public List<AclVO> getAcls() {
return aclVos;
}

private static List<ConsumerVO> convert(List<ConsumerGroupOffsets> consumerGroupOffsets, Collection<TopicVO> topicVos) {
private static List<ConsumerVO> convert(List<ConsumerGroupOffsets> consumerGroupOffsets,
Collection<TopicVO> topicVos) {
final var topicVoMap = topicVos.stream().collect(Collectors.toMap(TopicVO::getName, Function.identity()));
final var groupTopicPartitionOffsetMap = new TreeMap<String, Map<String, Map<Integer, Long>>>();

Expand Down
7 changes: 5 additions & 2 deletions src/main/java/kafdrop/util/ProtobufMessageDeserializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public String deserializeMessage(ByteBuffer buffer) {
descs.add(fd);
}

final var descriptors = descs.stream().flatMap(desc -> desc.getMessageTypes().stream()).collect(Collectors.toList());
final var descriptors =
descs.stream().flatMap(desc -> desc.getMessageTypes().stream()).collect(Collectors.toList());
// automatically detect the message type name if the proto is "Any" and no message type name is given
if (isAnyProto && msgTypeName.isBlank()) {
String typeUrl = Any.parseFrom(buffer).getTypeUrl();
Expand All @@ -61,7 +62,9 @@ public String deserializeMessage(ByteBuffer buffer) {
msgTypeNameRef.set(splittedTypeUrl[splittedTypeUrl.length - 1]);
}
// check for full name too if the proto is "Any"
final var messageDescriptor = descriptors.stream().filter(desc -> msgTypeNameRef.get().equals(desc.getName()) || msgTypeNameRef.get().equals(desc.getFullName())).findFirst();
final var messageDescriptor =
descriptors.stream().filter(desc -> msgTypeNameRef.get().equals(desc.getName())
|| msgTypeNameRef.get().equals(desc.getFullName())).findFirst();
if (messageDescriptor.isEmpty()) {
final String errorMsg = "Can't find specific message type: " + msgTypeNameRef.get();
LOG.error(errorMsg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ public class ProtobufSchemaRegistryMessageDeserializer implements MessageDeseria
private final String topicName;
private final KafkaProtobufDeserializer deserializer;

public ProtobufSchemaRegistryMessageDeserializer(String topicName, String schemaRegistryUrl, String schemaRegistryAuth) {
public ProtobufSchemaRegistryMessageDeserializer(String topicName, String schemaRegistryUrl,
String schemaRegistryAuth) {
this.topicName = topicName;
this.deserializer = getDeserializer(schemaRegistryUrl, schemaRegistryAuth);
}
Expand Down
5 changes: 4 additions & 1 deletion src/test/java/kafdrop/AbstractIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ static class Initializer implements ApplicationContextInitializer<ConfigurableAp

public static Map<String, Object> getProperties() {
Startables.deepStart(List.of(kafka)).join();
return Map.of("kafka.brokerConnect", kafka.getBootstrapServers(), "protobufdesc.directory", "./src/test/resources", "protobufdesc.parseAnyProto", true);
return Map.of(
"kafka.brokerConnect", kafka.getBootstrapServers(),
"protobufdesc.directory", "./src/test/resources",
"protobufdesc.parseAnyProto", true);
}

@Override
Expand Down