Skip to content

Commit 265e4d1

Browse files
committed
Remove StreamBuilderFactoryBeanCustomizer in favor of Kafka's StreamsBuilderFactoryBeanConfigurer
This commit removes Boot's StreamBuilderFactoryBeanCustomizer and updates KafkaStreamsAnnotationDrivenConfiguration to use Kafka's StreamsBuilderFactoryBeanConfigurer instead of the nested class KafkaStreamsFactoryBeanConfigurer. This aligns more closely with how configuration is typically applied. Signed-off-by: Dmytro Nosan <[email protected]>
1 parent 764f69c commit 265e4d1

File tree

5 files changed

+43
-55
lines changed

5 files changed

+43
-55
lines changed

documentation/spring-boot-docs/src/docs/antora/modules/reference/pages/messaging/kafka.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ include-code::MyKafkaStreamsConfiguration[]
7474
By default, the streams managed by the javadoc:org.apache.kafka.streams.StreamsBuilder[] object are started automatically.
7575
You can customize this behavior using the configprop:spring.kafka.streams.auto-startup[] property.
7676

77+
TIP: For advanced configuration, the arbitrary javadoc:org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer[] beans can be registered to configure the javadoc:org.springframework.kafka.config.StreamsBuilderFactoryBean[] bean before the javadoc:org.apache.kafka.streams.StreamsBuilder[] bean is initialized.
7778

7879

7980
[[messaging.kafka.additional-properties]]

module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaStreamsAnnotationDrivenConfiguration.java

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,18 @@
2222
import org.apache.kafka.streams.StreamsBuilder;
2323
import org.apache.kafka.streams.StreamsConfig;
2424

25-
import org.springframework.beans.factory.InitializingBean;
26-
import org.springframework.beans.factory.ObjectProvider;
27-
import org.springframework.beans.factory.annotation.Qualifier;
2825
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
2926
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
3027
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
3128
import org.springframework.boot.context.properties.source.InvalidConfigurationPropertyValueException;
3229
import org.springframework.context.annotation.Bean;
3330
import org.springframework.context.annotation.Configuration;
31+
import org.springframework.core.Ordered;
3432
import org.springframework.core.env.Environment;
3533
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
3634
import org.springframework.kafka.config.KafkaStreamsConfiguration;
3735
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
36+
import org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer;
3837
import org.springframework.kafka.core.CleanupConfig;
3938

4039
/**
@@ -76,11 +75,8 @@ KafkaStreamsConfiguration defaultKafkaStreamsConfig(Environment environment,
7675
}
7776

7877
@Bean
79-
KafkaStreamsFactoryBeanConfigurer kafkaStreamsFactoryBeanConfigurer(
80-
@Qualifier(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME) StreamsBuilderFactoryBean factoryBean,
81-
ObjectProvider<StreamsBuilderFactoryBeanCustomizer> customizers) {
82-
customizers.orderedStream().forEach((customizer) -> customizer.customize(factoryBean));
83-
return new KafkaStreamsFactoryBeanConfigurer(this.properties, factoryBean);
78+
StreamsBuilderFactoryBeanConfigurer kafkaPropertiesStreamsBuilderFactoryBeanConfigurer() {
79+
return new KafkaPropertiesStreamsBuilderFactoryBeanConfigurer(this.properties);
8480
}
8581

8682
private void applyKafkaConnectionDetailsForStreams(Map<String, Object> properties,
@@ -91,24 +87,26 @@ private void applyKafkaConnectionDetailsForStreams(Map<String, Object> propertie
9187
KafkaAutoConfiguration.applySslBundle(properties, streams.getSslBundle());
9288
}
9389

94-
// Separate class required to avoid BeanCurrentlyInCreationException
95-
static class KafkaStreamsFactoryBeanConfigurer implements InitializingBean {
90+
private static final class KafkaPropertiesStreamsBuilderFactoryBeanConfigurer
91+
implements StreamsBuilderFactoryBeanConfigurer {
9692

9793
private final KafkaProperties properties;
9894

99-
private final StreamsBuilderFactoryBean factoryBean;
100-
101-
KafkaStreamsFactoryBeanConfigurer(KafkaProperties properties, StreamsBuilderFactoryBean factoryBean) {
95+
private KafkaPropertiesStreamsBuilderFactoryBeanConfigurer(KafkaProperties properties) {
10296
this.properties = properties;
103-
this.factoryBean = factoryBean;
10497
}
10598

10699
@Override
107-
public void afterPropertiesSet() {
108-
this.factoryBean.setAutoStartup(this.properties.getStreams().isAutoStartup());
100+
public void configure(StreamsBuilderFactoryBean factoryBean) {
101+
factoryBean.setAutoStartup(this.properties.getStreams().isAutoStartup());
109102
KafkaProperties.Cleanup cleanup = this.properties.getStreams().getCleanup();
110103
CleanupConfig cleanupConfig = new CleanupConfig(cleanup.isOnStartup(), cleanup.isOnShutdown());
111-
this.factoryBean.setCleanupConfig(cleanupConfig);
104+
factoryBean.setCleanupConfig(cleanupConfig);
105+
}
106+
107+
@Override
108+
public int getOrder() {
109+
return Ordered.HIGHEST_PRECEDENCE;
112110
}
113111

114112
}

module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/StreamsBuilderFactoryBeanCustomizer.java

Lines changed: 0 additions & 36 deletions
This file was deleted.

module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/metrics/KafkaMetricsAutoConfiguration.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@
2626
import org.springframework.boot.kafka.autoconfigure.DefaultKafkaConsumerFactoryCustomizer;
2727
import org.springframework.boot.kafka.autoconfigure.DefaultKafkaProducerFactoryCustomizer;
2828
import org.springframework.boot.kafka.autoconfigure.KafkaAutoConfiguration;
29-
import org.springframework.boot.kafka.autoconfigure.StreamsBuilderFactoryBeanCustomizer;
3029
import org.springframework.context.annotation.Bean;
3130
import org.springframework.context.annotation.Configuration;
3231
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
32+
import org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer;
3333
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
3434
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
3535
import org.springframework.kafka.core.MicrometerConsumerListener;
@@ -75,7 +75,7 @@ private <K, V> void addListener(DefaultKafkaProducerFactory<K, V> factory, Meter
7575
static class KafkaStreamsMetricsConfiguration {
7676

7777
@Bean
78-
StreamsBuilderFactoryBeanCustomizer kafkaStreamsMetrics(MeterRegistry meterRegistry) {
78+
StreamsBuilderFactoryBeanConfigurer kafkaStreamsMetrics(MeterRegistry meterRegistry) {
7979
return (factoryBean) -> factoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry));
8080
}
8181

module/spring-boot-kafka/src/test/java/org/springframework/boot/kafka/autoconfigure/KafkaAutoConfigurationTests.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@
6969
import org.springframework.kafka.config.KafkaListenerContainerFactory;
7070
import org.springframework.kafka.config.KafkaStreamsConfiguration;
7171
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
72+
import org.springframework.kafka.config.StreamsBuilderFactoryBean.Listener;
73+
import org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer;
7274
import org.springframework.kafka.core.CleanupConfig;
7375
import org.springframework.kafka.core.ConsumerFactory;
7476
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
@@ -458,6 +460,29 @@ void streamsProperties() {
458460
});
459461
}
460462

463+
@Test
464+
void streamsBuilderFactoryBeanConfigurerIsApplied() {
465+
Listener listener = mock(Listener.class);
466+
this.contextRunner.withUserConfiguration(EnableKafkaStreamsConfiguration.class)
467+
// user's StreamsBuilderFactoryBeanConfigurer must be invoked after the
468+
// default one
469+
.withBean(StreamsBuilderFactoryBeanConfigurer.class, () -> (factoryBean) -> {
470+
assertThat(factoryBean.isAutoStartup()).isFalse();
471+
assertThat(factoryBean).extracting("cleanupConfig.onStart").isEqualTo(true);
472+
assertThat(factoryBean).extracting("cleanupConfig.onStop").isEqualTo(true);
473+
factoryBean.addListener(listener);
474+
})
475+
.withPropertyValues("spring.kafka.client-id=cid",
476+
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.application.name=appName",
477+
"spring.kafka.streams.auto-startup=false", "spring.kafka.streams.cleanup.on-shutdown=true",
478+
"spring.kafka.streams.cleanup.on-startup=true")
479+
.run((context) -> {
480+
assertThat(context).hasSingleBean(StreamsBuilderFactoryBean.class);
481+
StreamsBuilderFactoryBean streamsBuilderFactoryBean = context.getBean(StreamsBuilderFactoryBean.class);
482+
assertThat(streamsBuilderFactoryBean.getListeners()).hasSize(1);
483+
});
484+
}
485+
461486
@Test
462487
void connectionDetailsAreAppliedToStreams() {
463488
this.contextRunner.withUserConfiguration(EnableKafkaStreamsConfiguration.class)

0 commit comments

Comments
 (0)