大师网-带你快速走向大师之路 解决你在学习过程中的疑惑,带你快速进入大师之门。节省时间,提升效率

SpringBoot集成RabbitMQ-动态注入Bean方式

实现Direct,Fanout,Topic和死信转发方式实现的延迟队列

一个让处女座程序员很难受的问题:
每次申明一个队列,都需要用@Bean注解在config类里面显式的往容器里面注入一个Queue Bean和Binding Bean,十几个队列下来,那场面简直不能忍.
怎么解决呢,思路:
通过遍历枚举的方式,统一往spring容器中注入bean.废话不多说,上代码

一 使用场景说明
1.Direct
根据routekey精确匹配消费,只消费一次
2.Fanout
广播消息队列,同交换机内的所有消费者,都接收到消息
3.Topic
支持模糊匹配,可匹配到多个.配合AnonymousQueue队列可实现集群内多点同一业务集群消费.如:修改集群内所有应用内存中配置
4.TTL
延迟队列,实现消息延迟指定时间消费

二 关键代码

  1. 配置类:
import com.google.common.collect.Maps;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * @author onlinever
 * @date 2018/09/06
 */
@Service
public class RabbitQueueBeanRegister implements BeanDefinitionRegistryPostProcessor, ApplicationContextAware {

    private ApplicationContext applicationContext;

    private BeanDefinitionRegistry beanDefinitionRegistry;

    private String adapterSuffix = "Adapter";

    private Map<RabbitQueueEnum, Queue> topicQueues = Maps.newHashMap();

    private List<TopicConsumer> topicConsumers;

    @Override
    public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry beanDefinitionRegistry) throws BeansException {
        this.beanDefinitionRegistry = beanDefinitionRegistry;
        //声明交换机
        declareExchange();
        //声明队列和绑定
        declareQueueAndBinding();
        //奇怪的执行顺序
        if (haveTopicQueue()) {
            declareTopicMessageListenerAdapter();
            declareTopicMessageListenerContainer();
        }
    }

    private boolean haveTopicQueue() {
        try {
            topicConsumers = new ArrayList<>(applicationContext.getBeansOfType(TopicConsumer.class).values());
            return !topicConsumers.isEmpty();
        } catch (Exception e) {
            System.out.println(e.getMessage());
            return false;
        }
    }

    /**
     * 声明交换机
     */
    private void declareExchange() {
        for (RabbitExchangeEnum rabbitExchangeEnum : RabbitExchangeEnum.values()) {
            switch (rabbitExchangeEnum.getRabbitExchangeTypeEnum()) {
                case FANOUT_QUEUE:
                    beanDefinitionRegistry.registerBeanDefinition(rabbitExchangeEnum.getBeanName(), BeanDefinitionBuilder.genericBeanDefinition(FanoutExchange.class, () -> (FanoutExchange) ExchangeBuilder
                            .fanoutExchange(rabbitExchangeEnum.getExchangeName())
                            .durable(true)
                            .build()).getBeanDefinition());
                    break;
                case TOPIC_QUEUE:
                    beanDefinitionRegistry.registerBeanDefinition(rabbitExchangeEnum.getBeanName(), BeanDefinitionBuilder.genericBeanDefinition(TopicExchange.class, () -> (TopicExchange) ExchangeBuilder
                            .topicExchange(rabbitExchangeEnum.getExchangeName())
                            .durable(true)
                            .build()).getBeanDefinition());
                    break;
                default:
                    beanDefinitionRegistry.registerBeanDefinition(rabbitExchangeEnum.getBeanName(), BeanDefinitionBuilder.genericBeanDefinition(DirectExchange.class, () -> (DirectExchange) ExchangeBuilder
                            .directExchange(rabbitExchangeEnum.getExchangeName())
                            .durable(true)
                            .build()).getBeanDefinition());
                    break;
            }
        }
    }

    /**
     * 声明队列和绑定
     */
    private void declareQueueAndBinding() {
        String bindingSuffix = "Binding";
        for (RabbitQueueEnum rabbitQueueEnum : RabbitQueueEnum.values()) {
            //注册所有队列
            beanDefinitionRegistry.registerBeanDefinition(rabbitQueueEnum.getBeanName(), BeanDefinitionBuilder.genericBeanDefinition(Queue.class, () -> {
                Queue queue;
                switch (rabbitQueueEnum.getExchangeEnum().getRabbitExchangeTypeEnum()) {
                    case TTL_QUEUE:
                        queue = QueueBuilder
                                .durable(rabbitQueueEnum.getRouteKey())
                                // 配置到期后转发的交换
                                .withArgument("x-dead-letter-exchange", rabbitQueueEnum.getRabbitQueueEnum().getExchangeName())
                                // 配置到期后转发的路由键
                                .withArgument("x-dead-letter-routing-key", rabbitQueueEnum.getRabbitQueueEnum().getRouteKey())
                                .build();
                        break;
                    case TOPIC_QUEUE:
                        queue = new AnonymousQueue(new AnonymousQueue.Base64UrlNamingStrategy(StringUtils.getTopicQueueNamePrefix(rabbitQueueEnum.getRouteKey())));
                        topicQueues.put(rabbitQueueEnum, queue);
                        break;
                    default:
                        queue = new Queue(rabbitQueueEnum.getRouteKey());
                        break;
                }
                return queue;
            }).getBeanDefinition());
            //注册队列与交换机的绑定
            switch (rabbitQueueEnum.getExchangeEnum().getRabbitExchangeTypeEnum()) {
                case FANOUT_QUEUE:
                    beanDefinitionRegistry.registerBeanDefinition(rabbitQueueEnum.getBeanName() + bindingSuffix, BeanDefinitionBuilder.genericBeanDefinition(Binding.class, () -> BindingBuilder
                            .bind(applicationContext.getBean(rabbitQueueEnum.getBeanName(), Queue.class))
                            .to(applicationContext.getBean(rabbitQueueEnum.getExchangeEnum().getBeanName(), FanoutExchange.class))).getBeanDefinition());

                    break;
                case NORMAL_QUEUE:
                case TTL_QUEUE:
                    beanDefinitionRegistry.registerBeanDefinition(rabbitQueueEnum.getBeanName() + bindingSuffix, BeanDefinitionBuilder.genericBeanDefinition(Binding.class, () -> BindingBuilder
                            .bind(applicationContext.getBean(rabbitQueueEnum.getBeanName(), Queue.class))
                            .to(applicationContext.getBean(rabbitQueueEnum.getExchangeEnum().getBeanName(), DirectExchange.class))
                            .with(rabbitQueueEnum.getRouteKey())).getBeanDefinition());
                    break;
                case TOPIC_QUEUE:
                    beanDefinitionRegistry.registerBeanDefinition(rabbitQueueEnum.getBeanName() + bindingSuffix, BeanDefinitionBuilder.genericBeanDefinition(Binding.class, () -> BindingBuilder
                            .bind(applicationContext.getBean(rabbitQueueEnum.getBeanName(), Queue.class))
                            .to(applicationContext.getBean(rabbitQueueEnum.getExchangeEnum().getBeanName(), TopicExchange.class))
                            .with(StringUtils.getTopicQueueRoute(rabbitQueueEnum.getRouteKey()))).getBeanDefinition());
                    break;
                default:
                    break;
            }
        }
    }

    /**
     * 声明Topic消息监听适配器
     */
    private void declareTopicMessageListenerAdapter() {
        topicConsumers.forEach(topicConsumer -> beanDefinitionRegistry.registerBeanDefinition(topicConsumer.getQueueEnum().getBeanName() + adapterSuffix,
                BeanDefinitionBuilder.genericBeanDefinition(MessageListenerAdapter.class, () -> new MessageListenerAdapter(topicConsumer)).getBeanDefinition()));
    }

    /**
     * 声明Topic消息监听容器
     */
    private void declareTopicMessageListenerContainer() {
        String containerSuffix = "Container";
        topicConsumers.forEach(topicConsumer -> beanDefinitionRegistry.registerBeanDefinition(topicConsumer.getQueueEnum().getBeanName() + containerSuffix,
                BeanDefinitionBuilder.genericBeanDefinition(SimpleMessageListenerContainer.class, () -> {
                    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
                    container.setQueues(topicQueues.get(topicConsumer.getQueueEnum()));
                    container.setConnectionFactory(applicationContext.getBean("rabbitConnectionFactory", ConnectionFactory.class));
                    container.setMessageListener(applicationContext.getBean(topicConsumer.getQueueEnum().getBeanName() + adapterSuffix));
                    container.setRabbitAdmin(applicationContext.getBean(RabbitAdmin.class));
                    return container;
                }).getBeanDefinition()));
    }

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException {

    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}
  1. 枚举类
    2.1 交换机类型枚举
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.TopicExchange;

/**
 * @author onlinever
 * @date 2018/09/06
 */
public enum RabbitExchangeTypeEnum {

    /**
     * 死信转发方式延迟队列
     */
    TTL_QUEUE(1, DirectExchange.class),
    /**
     * 正常队列
     */
    NORMAL_QUEUE(2, DirectExchange.class),
    /**
     * 广播队列
     */
    FANOUT_QUEUE(3, FanoutExchange.class),
    /**
     * topic队列
     */
    TOPIC_QUEUE(4, TopicExchange.class);


    /**
     * 队列routeKey
     */
    private int index;

    /**
     * 交换机class
     */
    private Class exchangeClazz;


    RabbitExchangeTypeEnum(int index, Class exchangeClazz) {
        this.index = index;
        this.exchangeClazz = exchangeClazz;
    }

    public int getIndex() {
        return index;
    }

    public Class getExchangeClazz() {
        return exchangeClazz;
    }
}

2.2 交换机枚举

/**
 * @author onlinever
 * @date 2018/09/06
 */
public enum RabbitExchangeEnum {


    /**
     * rabbit交换机名称
     * 默认一个应用设置一个交换机
     * exchange.{0}.{1}
     * 0: 交换机类型 direct、topic、fanout、headers
     * 1: 应用名称
     */
    DIRECT_EXCHANGE("directExchange", "exchange.direct.gateway", RabbitExchangeTypeEnum.NORMAL_QUEUE),
    FANOUT_EXCHANGE("fanoutExchange", "exchange.fanout.gateway", RabbitExchangeTypeEnum.FANOUT_QUEUE),
    TOPIC_EXCHANGE("topicExchange", "exchange.topic.gateway", RabbitExchangeTypeEnum.TOPIC_QUEUE),;

    /**
     * 交换机beanName
     */
    private String beanName;
    /**
     * 交换机key
     */
    private String exchangeName;
    /**
     * 交换机类型
     */
    private RabbitExchangeTypeEnum rabbitExchangeTypeEnum;

    RabbitExchangeEnum(String beanName, String exchangeName, RabbitExchangeTypeEnum rabbitExchangeTypeEnum) {
        this.beanName = beanName;
        this.exchangeName = exchangeName;
        this.rabbitExchangeTypeEnum = rabbitExchangeTypeEnum;
    }

    public String getExchangeName() {
        return exchangeName;
    }

    public String getBeanName() {
        return beanName;
    }

    public RabbitExchangeTypeEnum getRabbitExchangeTypeEnum() {
        return rabbitExchangeTypeEnum;
    }
}

2.3 队列枚举

/**
 * @author onlinever
 * @date 2018/09/06
 */
public enum RabbitQueueEnum {

    ;
    /**
     * 队列BeanName
     */
    private String beanName;
    /**
     * 队列routeKey
     */
    private String routeKey;
    /**
     * 交换机
     */
    private RabbitExchangeEnum exchangeEnum;

    /**
     * 死信转发到队列
     */
    private RabbitQueueEnum rabbitQueueEnum;


    RabbitQueueEnum(String beanName, String routeKey, RabbitExchangeEnum exchangeEnum, RabbitQueueEnum rabbitQueueEnum) {
        this.beanName = beanName;
        this.routeKey = routeKey;
        this.exchangeEnum = exchangeEnum;
        this.rabbitQueueEnum = rabbitQueueEnum;
    }

    public String getRouteKey() {
        return routeKey;
    }

    public RabbitExchangeEnum getExchangeEnum() {
        return exchangeEnum;
    }

    public String getExchangeName() {
        return exchangeEnum.getExchangeName();
    }

    public String getBeanName() {
        return beanName;
    }

    public RabbitQueueEnum getRabbitQueueEnum() {
        return rabbitQueueEnum;
    }
}
  1. Topic消费者接口
/**
 * topic队列消费者
 *
 * @author onlinever
 * @date 2018/8/17
 */
public interface TopicConsumer {
    /**
     * 消费的队列
     *
     * @return 队列
     */
    RabbitQueueEnum getQueueEnum();

    /**
     * 具体消费者的实现
     *
     * @param message 消息
     */
    void handleMessage(String message);
}
  1. 其他消费者使用@RabbitListener方式