Skip to content

Spring 基于 FactoryBean 简化 kafka 消息发送

在 Spring 框架中,FactoryBean 是一个非常强大的接口,允许开发者自定义 Bean 的创建逻辑。通过实现 FactoryBean 接口,你可以控制 Bean 的实例化过程,这对于一些复杂的对象创建场景非常有用.

什么是 FactoryBean

java
public interface FactoryBean<T> {
    String OBJECT_TYPE_ATTRIBUTE = "factoryBeanObjectType";

    @Nullable
    T getObject() throws Exception;

    @Nullable
    Class<?> getObjectType();

    default boolean isSingleton() {
        return true;
    }
}

FactoryBean 是一个接口,允许开发者创建定制化的 Bean 创建逻辑。通过实现 FactoryBean 接口,开发者可以控制 Bean 的创建过程,从而实现更加灵活和复杂的 Bean 实例化。

开源框架中的应用

  1. Mybatis 使用只需要定义好接口,不需要写实现类就可以完成数据库操作就是通过实现 BeanFactory 来实现的, 具体参考 MapperFactoryBean
  2. OpenFeign 远程调用的实现原理也是一样的, 具体参考 FeignClientFactoryBean

还有很多开源框架基于 FactoryBean 进行扩展的

需求

我们希望在发送 kafka 消息希望类似 mybatis,openFeign 一样, 只需要定义接口就可以进行消息发送。

引入 kafka 依赖

这里没有采用官方 spring 整合 kafka 的 spring-kafka 依赖,而是采用原生 kafka 依赖

xml
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>2.8.0</version>
            </dependency>

自定义注解

java
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface Producer {

}

@Producer 注解起到标识作用,后续容器启动的时候会找到标记了这些注解,并进行代理

定义生产者 kafka 配置信息

java
@Data
@ConfigurationProperties(prefix = "mq.kafka.producer")
public class KafkaProducerProperties {

    private String bootstrapServers;

    private String acks;

    private String clientId;

}

生产端还有很多参数, 可自行参考官方进行自定义扩展

定义 FactoryBean

java
public class KafkaProducerFactoryBean<T> implements FactoryBean<T> {

    private Class<T> mapperInterface;

    @Autowired
    private KafkaProducer kafkaProducer;


    public KafkaProducerFactoryBean(Class<T> mapperInterface) {
        this.mapperInterface = mapperInterface;
    }

    @Override
    public T getObject() throws Exception {
        InvocationHandler handler = ((proxy, method, args) -> {
            String name = method.getName();
            if ("send".equals(name)) {
                ProducerRecord<String,String> record = new ProducerRecord<>(args[0].toString(), args[1].toString());
                return kafkaProducer.send(record).get();
            }
            return "不支持的方法名称";
        });
        return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{mapperInterface}, handler);
    }

    @Override
    public Class<?> getObjectType() {
        return mapperInterface;
    }
}

自动装配类

java
@Configuration
@EnableConfigurationProperties(KafkaProducerProperties.class)
public class MqAutoConfiguration implements InitializingBean{

    @Autowired
    private KafkaProducerProperties kafkaProducerProperties;

    @Bean
    @ConditionalOnMissingBean
    public KafkaProducer<String,String> kafkaProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProducerProperties.getBootstrapServers());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaProducerProperties.getClientId());
        props.put(ProducerConfig.ACKS_CONFIG, kafkaProducerProperties.getAcks());
        return new KafkaProducer<>(props);
    }

    @Override
    public void afterPropertiesSet() throws Exception {

    }

    public static class ProducerRegister implements BeanFactoryAware, ImportBeanDefinitionRegistrar {

        private BeanFactory beanFactory;

        @Override
        public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
            this.beanFactory = beanFactory;
        }

        @Override
        public void registerBeanDefinitions(AnnotationMetadata annotationMetadata, BeanDefinitionRegistry registry) {
            try {
                if (!AutoConfigurationPackages.has(this.beanFactory)) {
                    return;
                }
                List<String> packages = AutoConfigurationPackages.get(this.beanFactory);
                String basePackage = StringUtils.collectionToCommaDelimitedString(packages);

                String packageSearchPath = "classpath*:" + basePackage.replace('.', '/') + "/**/*.class";

                ResourcePatternResolver resourcePatternResolver = new PathMatchingResourcePatternResolver();
                Resource[] resources = resourcePatternResolver.getResources(packageSearchPath);

                for (Resource resource : resources) {
                    MetadataReader metadataReader = new SimpleMetadataReader(resource, ClassUtils.getDefaultClassLoader());
                    Producer annotation = Class.forName(metadataReader.getClassMetadata().getClassName()).getAnnotation(Producer.class);
                    if (null == annotation) continue;
                    ScannedGenericBeanDefinition beanDefinition = new ScannedGenericBeanDefinition(metadataReader);
                    String beanName = Introspector.decapitalize(ClassUtils.getShortName(beanDefinition.getBeanClassName()));
                    beanDefinition.setResource(resource);
                    beanDefinition.setSource(resource);
                    beanDefinition.setScope("singleton");
                    beanDefinition.getConstructorArgumentValues().addGenericArgumentValue(beanDefinition.getBeanClassName());
                    beanDefinition.setBeanClass(KafkaProducerFactoryBean.class);
                    BeanDefinitionHolder definitionHolder = new BeanDefinitionHolder(beanDefinition, beanName);
                    registry.registerBeanDefinition(beanName, definitionHolder.getBeanDefinition());
                }

            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }

    @Configuration
    @Import(ProducerRegister.class)
    public static class MapperScannerRegistrarConfiguration implements InitializingBean {

        @Override
        public void afterPropertiesSet() throws Exception {

        }
    }

}

在 resources 下创建 META-INF 目录,然后创建 spring.factories 文件,配置配置的位置

java
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  com.ssn.mq.config.MqAutoConfiguration

验证功能

引入我们上述自定义的 starter

application.yml 配置生产端参数

mq:
  kafka:
    producer:
      bootstrap-servers: 10.13.3.21:9092
      acks: 1
      clientId: kafka-client-test

定义接口

java
@Producer
public interface OrderProducer {

    RecordMetadata send(String topic, String userInfo);

}

使用接口

java
@Slf4j
@Component
public class AppInit implements CommandLineRunner {


    private final OrderProducer orderProducer;


    public AppInit(OrderProducer orderProducer) {

        this.orderProducer = orderProducer;
    }

    @Override
    public void run(String... args) throws Exception {
        UserInfo info = new UserInfo();
        info.setId(1);
        info.setUsername("lcd");
        RecordMetadata metadata = orderProducer.send("1", JSON.toJSONString(info));
        log.info("send record metadata {}",metadata);
    }
}

可以看到控制台输出如下内容:

text
send record metadata 1-0@3

总结

FactoryBean 是 Spring 框架中的一个强大工具,它允许开发者灵活地控制 Bean 的创建过程。通过本文的介绍,希望你能更好地理解 FactoryBean 的用途和实现方式,并在实际项目中加以应用

Released under the MIT License.