Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Review Kafka auto-configuration and support for defining additional beans #40174

Open
notusedusername opened this issue Apr 5, 2024 · 6 comments
Labels
type: task A general task

Comments

@notusedusername
Copy link

Summary

Over-defining the ConsumerFactory instance in our configuration as stated in the spring-kafka docs won't take effect, but the default instance will be created and used instead. With a minor modification in the bean definition, the intended behavior can be achieved, but it isn't straightforward to do so.

To work the way the documentation states some spring-boot-autoconfigure related changes are required, or if it is not possible, then the spring-kafka documentation should be modified accordingly.

Details

I'm working with Spring Boot-based microservices connected by Kafka. The message values are in JSON format, in the serialized form of a common DTO, let's call it Document.

  • the Kafka connection is handled by spring-kafka
  • there's a JsonDeserializer for the Document
  • we are defining a ConsumerFactory and applying our custom deserializer to it
@Bean
public JsonDeserializer<Document> deserializer() {
    return new DocumentMessageDeserializer();
}

@Bean
public ConsumerFactory<String, Document> consumerFactory(
        JsonDeserializer<Document> deserializer,
        KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
    return new DefaultKafkaConsumerFactory<>(
            kafkaBinderConfigurationProperties.mergedConsumerConfiguration(),
            new StringDeserializer(),
            deserializer);
}

Expected behavior

Based on the spring-kafka documentation we should be able to override the default factory this way with our custom instance with the custom deserializer.

What happens instead

The service will get runtime errors about failed deserialization instead, as it deserializes the message with the default into a byte[] and then tries to cast it into a Document.

java.lang.RuntimeException: java.lang.ClassCastException: class [B cannot be cast to class com.example.dto.Document ([B is in module java.base of loader 'bootstrap'; com.example.dto.Document is in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @59e84876)

On the other hand, if the property-based config is provided, then the service runs without issues.

spring:
  kafka:
    consumer:
      value-deserializer: com.example.serializers.DocumentMessageDeserializer

Based on these symptoms I'd say, that not our ConsumerFactory instance is used when creating the Consumer, but the default one. It will pick up the property-based configuration, but if the property is not provided then it will use the default deserializer. (Verified it by putting breakpoint into the KafkaListenerContainerFactory creation in the KafkaAnnotationDrivenConfiguration, and the injected factory is in fact null.)

Our programmatically configured @Bean should override the default ConsumerFactory (so we should be able to deserialize the messages without setting the property).

Reason

I saw that in the KafkaAnnotationDrivenConfiguration the ConsumerFactory is injected like this:

@Bean
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
	ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
	ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory
	.getIfAvailable(() -> new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties())));
return factory;
}

The problem I found regarding this is that if we are defining a ConsumerFactory instance the way it is stated in the spring-kafka docs, it won't be injected here because the ConsumerFactory<String, Document> is not an instance of the ConsumerFactory<Object, Object>.

Workarounds

  • Using the property
    • if no other config overrides happen in the @Bean definition, then it would work as expected
  • Creating the whole ConcurrentKafkaListenerContainerFactory in our configuration
  • Creating the ConsumerFactory with wildcards like this:
 @Bean
public ConsumerFactory<?,?> consumerFactory(
        JsonDeserializer<Document> deserializer,
        KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
    var config = new HashMap<>(kafkaBinderConfigurationProperties.mergedConsumerConfiguration());
    return new DefaultKafkaConsumerFactory<>(
            config,
            new StringDeserializer(),
            deserializer);
}

Proposed solution

IMO using wildcards when injecting the ConsumerFactory into the kafkaListenerContainerFactory would lead to complying with the kafka-spring documentation, the ConsumerFactory could be used with concrete type arguments instead of wildcards.

ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
			ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
			ObjectProvider<ConsumerFactory<?, ?>> kafkaConsumerFactory) {

If it is not possible for some reasons I'm not aware of, then the spring-kafka docs should be modified to contain the correct usage in the example.

Versions

  • spring-boot-autoconfigure:2.7.10
  • spring-kafka:2.8.11
    This specific part of the spring-boot-autoconfigure implementation is the same in the latest version too as I could see, so I think the same issue would persist with a version upgrade as well, but I didn't check that.
@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged label Apr 5, 2024
@wilkinsona
Copy link
Member

Spring Boot 2.7.x is no longer supported.

I think the same issue would persist with a version upgrade as well, but I didn't check that.

Please upgrade to 3.1.x or later and let us know if the problem remains. If it does, we can take a further look.

@wilkinsona wilkinsona added the status: waiting-for-feedback We need additional information before we can continue label Apr 5, 2024
@notusedusername
Copy link
Author

Thanks for the fast reply!

I tried it in other services already bumped to Spring Boot 3.x, and the same happened.
The @Bean definitions from our side are the same in this version as it is in the report.

  • spring-boot-autoconfigure:3.1.5
  • spring-kafka:3.0.12

@spring-projects-issues spring-projects-issues added status: feedback-provided Feedback has been provided and removed status: waiting-for-feedback We need additional information before we can continue labels Apr 5, 2024
@wilkinsona
Copy link
Member

Thank you. I think #19221 is relevant here. Reading the discussion there, I am reminded that the current behaviour is intentional.

The intent here is to auto-configure a ConcurrentKafkaListenerContainerFactory bean in the absence of a bean named kafkaListenerContainerFactory. While unusual for Spring Boot's auto-configuration, this is to allow additional container factories to be defined without the default backing off. If we relax the ConsumerFactory<Object, Object> signature, we may end up using a custom consumer factory that was intended for a different container factory rather than the default that we'd currently use.

I think this may have to be fixed by updating Spring Kafka's documentation. Before we go down that route, flagging for a team meeting in case there's anything that I've overlooked and a fix in Boot is actually possible.

@wilkinsona
Copy link
Member

We discussed this and concluded that there's nothing we can do about this in the short- or medium-term. In the longer term, we'd like to reconsider this current behavior as part of investigating service bindings and support for auto-configuring multiple beans (#15732, #22403). In the meantime, the documentation for Spring Kafka should be updated. I've opened spring-projects/spring-kafka#3242 for that.

@wilkinsona wilkinsona added type: task A general task and removed status: waiting-for-triage An issue we've not yet triaged for: team-meeting An issue we'd like to discuss as a team to make progress labels May 9, 2024
@wilkinsona wilkinsona added this to the General Backlog milestone May 9, 2024
@wilkinsona wilkinsona changed the title Kafka consumer related auto-config takes effect even if we are overdefining it Review Kafka auto-configuration and support for defining additional beans May 9, 2024
@notusedusername
Copy link
Author

I checked the issue you linked before, and it makes sense with the mentioned use case.

With the upcoming change in the Spring Kafka documentation, the 3rd workaround from the description becomes the actual solution for this particular problem.

Thanks for the help!

@sobychacko
Copy link
Contributor

Documentation added to Spring Kafka via spring-projects/spring-kafka#3243.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: task A general task
Projects
None yet
Development

No branches or pull requests

4 participants