Spring 基于 FactoryBean 简化 kafka 消息发送
在 Spring 框架中,FactoryBean 是一个非常强大的接口,允许开发者自定义 Bean 的创建逻辑。通过实现 FactoryBean 接口,你可以控制 Bean 的实例化过程,这对于一些复杂的对象创建场景非常有用.
什么是 FactoryBean
java
public interface FactoryBean<T> {
String OBJECT_TYPE_ATTRIBUTE = "factoryBeanObjectType";
@Nullable
T getObject() throws Exception;
@Nullable
Class<?> getObjectType();
default boolean isSingleton() {
return true;
}
}
FactoryBean 是一个接口,允许开发者创建定制化的 Bean 创建逻辑。通过实现 FactoryBean 接口,开发者可以控制 Bean 的创建过程,从而实现更加灵活和复杂的 Bean 实例化。
开源框架中的应用
- Mybatis 使用只需要定义好接口,不需要写实现类就可以完成数据库操作就是通过实现 BeanFactory 来实现的, 具体参考
MapperFactoryBean
- OpenFeign 远程调用的实现原理也是一样的, 具体参考
FeignClientFactoryBean
还有很多开源框架基于 FactoryBean 进行扩展的
需求
我们希望在发送 kafka 消息希望类似 mybatis,openFeign 一样, 只需要定义接口就可以进行消息发送。
引入 kafka 依赖
这里没有采用官方 spring 整合 kafka 的 spring-kafka 依赖,而是采用原生 kafka 依赖
xml
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
自定义注解
java
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface Producer {
}
@Producer
注解起到标识作用,后续容器启动的时候会找到标记了这些注解,并进行代理
定义生产者 kafka 配置信息
java
@Data
@ConfigurationProperties(prefix = "mq.kafka.producer")
public class KafkaProducerProperties {
private String bootstrapServers;
private String acks;
private String clientId;
}
生产端还有很多参数, 可自行参考官方进行自定义扩展
定义 FactoryBean
java
public class KafkaProducerFactoryBean<T> implements FactoryBean<T> {
private Class<T> mapperInterface;
@Autowired
private KafkaProducer kafkaProducer;
public KafkaProducerFactoryBean(Class<T> mapperInterface) {
this.mapperInterface = mapperInterface;
}
@Override
public T getObject() throws Exception {
InvocationHandler handler = ((proxy, method, args) -> {
String name = method.getName();
if ("send".equals(name)) {
ProducerRecord<String,String> record = new ProducerRecord<>(args[0].toString(), args[1].toString());
return kafkaProducer.send(record).get();
}
return "不支持的方法名称";
});
return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{mapperInterface}, handler);
}
@Override
public Class<?> getObjectType() {
return mapperInterface;
}
}
自动装配类
java
@Configuration
@EnableConfigurationProperties(KafkaProducerProperties.class)
public class MqAutoConfiguration implements InitializingBean{
@Autowired
private KafkaProducerProperties kafkaProducerProperties;
@Bean
@ConditionalOnMissingBean
public KafkaProducer<String,String> kafkaProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProducerProperties.getBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaProducerProperties.getClientId());
props.put(ProducerConfig.ACKS_CONFIG, kafkaProducerProperties.getAcks());
return new KafkaProducer<>(props);
}
@Override
public void afterPropertiesSet() throws Exception {
}
public static class ProducerRegister implements BeanFactoryAware, ImportBeanDefinitionRegistrar {
private BeanFactory beanFactory;
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
}
@Override
public void registerBeanDefinitions(AnnotationMetadata annotationMetadata, BeanDefinitionRegistry registry) {
try {
if (!AutoConfigurationPackages.has(this.beanFactory)) {
return;
}
List<String> packages = AutoConfigurationPackages.get(this.beanFactory);
String basePackage = StringUtils.collectionToCommaDelimitedString(packages);
String packageSearchPath = "classpath*:" + basePackage.replace('.', '/') + "/**/*.class";
ResourcePatternResolver resourcePatternResolver = new PathMatchingResourcePatternResolver();
Resource[] resources = resourcePatternResolver.getResources(packageSearchPath);
for (Resource resource : resources) {
MetadataReader metadataReader = new SimpleMetadataReader(resource, ClassUtils.getDefaultClassLoader());
Producer annotation = Class.forName(metadataReader.getClassMetadata().getClassName()).getAnnotation(Producer.class);
if (null == annotation) continue;
ScannedGenericBeanDefinition beanDefinition = new ScannedGenericBeanDefinition(metadataReader);
String beanName = Introspector.decapitalize(ClassUtils.getShortName(beanDefinition.getBeanClassName()));
beanDefinition.setResource(resource);
beanDefinition.setSource(resource);
beanDefinition.setScope("singleton");
beanDefinition.getConstructorArgumentValues().addGenericArgumentValue(beanDefinition.getBeanClassName());
beanDefinition.setBeanClass(KafkaProducerFactoryBean.class);
BeanDefinitionHolder definitionHolder = new BeanDefinitionHolder(beanDefinition, beanName);
registry.registerBeanDefinition(beanName, definitionHolder.getBeanDefinition());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
@Configuration
@Import(ProducerRegister.class)
public static class MapperScannerRegistrarConfiguration implements InitializingBean {
@Override
public void afterPropertiesSet() throws Exception {
}
}
}
在 resources 下创建 META-INF 目录,然后创建 spring.factories 文件,配置配置的位置
java
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.ssn.mq.config.MqAutoConfiguration
验证功能
引入我们上述自定义的 starter
application.yml 配置生产端参数
mq:
kafka:
producer:
bootstrap-servers: 10.13.3.21:9092
acks: 1
clientId: kafka-client-test
定义接口
java
@Producer
public interface OrderProducer {
RecordMetadata send(String topic, String userInfo);
}
使用接口
java
@Slf4j
@Component
public class AppInit implements CommandLineRunner {
private final OrderProducer orderProducer;
public AppInit(OrderProducer orderProducer) {
this.orderProducer = orderProducer;
}
@Override
public void run(String... args) throws Exception {
UserInfo info = new UserInfo();
info.setId(1);
info.setUsername("lcd");
RecordMetadata metadata = orderProducer.send("1", JSON.toJSONString(info));
log.info("send record metadata {}",metadata);
}
}
可以看到控制台输出如下内容:
text
send record metadata 1-0@3
总结
FactoryBean 是 Spring 框架中的一个强大工具,它允许开发者灵活地控制 Bean 的创建过程。通过本文的介绍,希望你能更好地理解 FactoryBean 的用途和实现方式,并在实际项目中加以应用