博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
SpringBoot2.0源码分析(三):整合RabbitMQ分析
阅读量:5934 次
发布时间:2019-06-19

本文共 9411 字,大约阅读时间需要 31 分钟。

SpringBoot具体整合rabbitMQ可参考:

RabbitMQ自动注入

当项目中存在org.springframework.amqp.rabbit.core.RabbitTemplatecom.rabbitmq.client.Channel着两个类时,SpringBoot将RabbitMQ需要使用到的对象注册为Bean,供项目注入使用。一起看一下RabbitAutoConfiguration类。

@Configuration@ConditionalOnClass({ RabbitTemplate.class, Channel.class })@EnableConfigurationProperties(RabbitProperties.class)@Import(RabbitAnnotationDrivenConfiguration.class)public class RabbitAutoConfiguration {	@Configuration	@ConditionalOnMissingBean(ConnectionFactory.class)	protected static class RabbitConnectionFactoryCreator {		@Bean		public CachingConnectionFactory rabbitConnectionFactory(				RabbitProperties properties,				ObjectProvider
connectionNameStrategy) throws Exception { PropertyMapper map = PropertyMapper.get(); CachingConnectionFactory factory = new CachingConnectionFactory( getRabbitConnectionFactoryBean(properties).getObject()); ...... return factory; } } @Configuration @Import(RabbitConnectionFactoryCreator.class) protected static class RabbitTemplateConfiguration { private final ObjectProvider
messageConverter; private final RabbitProperties properties; ...... @Bean @ConditionalOnSingleCandidate(ConnectionFactory.class) @ConditionalOnMissingBean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { PropertyMapper map = PropertyMapper.get(); RabbitTemplate template = new RabbitTemplate(connectionFactory); ...... return template; } ......复制代码

RabbitAutoConfiguration使和RabbitMQ相关的配置生效,并根据相关属性创建和RabbitMQ的连接,并依赖连接工厂创建rabbitTemplateRabbitAutoConfiguration同时导入了RabbitAnnotationDrivenConfiguration,注入了rabbitListenerContainerFactory

RabbitMQ的Queue,Exchange和Routing Key关系创建

先看一个例子:

@Bean    Binding bindingExchangeMessage(Queue queue, TopicExchange exchange) {        // 将队列1绑定到名为topicKey.A的routingKey        return BindingBuilder.bind(queue).to(exchange).with("routingKey");    }复制代码

将queue通过"routingKey"绑定到exchange。 RabbitAdmin类的initialize方法会拿出注入的Exchange,Queue和Binding进行声明。

public void initialize() {	    ......		Collection
contextExchanges = new LinkedList
( this.applicationContext.getBeansOfType(Exchange.class).values()); Collection
contextQueues = new LinkedList
( this.applicationContext.getBeansOfType(Queue.class).values()); Collection
contextBindings = new LinkedList
( this.applicationContext.getBeansOfType(Binding.class).values()); ...... final Collection
exchanges = filterDeclarables(contextExchanges); final Collection
queues = filterDeclarables(contextQueues); final Collection
bindings = filterDeclarables(contextBindings); ...... this.rabbitTemplate.execute(channel -> { declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()])); declareQueues(channel, queues.toArray(new Queue[queues.size()])); declareBindings(channel, bindings.toArray(new Binding[bindings.size()])); return null; }); }复制代码

消息发送

以下面的发送为例:

rabbitTemplate.convertAndSend(MQConst.TOPIC_EXCHANGE, MQConst.TOPIC_KEY1, "from key1");复制代码

这个方法会先对消息进行转换,预处理,最终通过调用ChannelN的basicPublish方法提交一个AMQCommand,由AMQCommand完成最终的消息发送。

public void transmit(AMQChannel channel) throws IOException {        int channelNumber = channel.getChannelNumber();        AMQConnection connection = channel.getConnection();        synchronized (assembler) {            Method m = this.assembler.getMethod();            connection.writeFrame(m.toFrame(channelNumber));            if (m.hasContent()) {                byte[] body = this.assembler.getContentBody();                connection.writeFrame(this.assembler.getContentHeader()                        .toFrame(channelNumber, body.length));                int frameMax = connection.getFrameMax();                int bodyPayloadMax = (frameMax == 0) ? body.length : frameMax                        - EMPTY_FRAME_SIZE;                for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {                    int remaining = body.length - offset;                    int fragmentLength = (remaining < bodyPayloadMax) ? remaining                            : bodyPayloadMax;                    Frame frame = Frame.fromBodyFragment(channelNumber, body,                            offset, fragmentLength);                    connection.writeFrame(frame);                }            }        }        connection.flush();    }复制代码

消息接收

先看一个消费的事例:

@Componentpublic class TopicConsumer {    @RabbitListener(queues = MQConst.TOPIC_QUEUENAME1)    @RabbitHandler    public void process1(String message) {        System.out.println("queue:topic.message1,message:" + message);    }}复制代码

SpringBoot解析@RabbitListener@RabbitHandler两个注解的是RabbitListenerAnnotationBeanPostProcessor中的buildMetadata方法:

private TypeMetadata buildMetadata(Class
targetClass) { Collection
classLevelListeners = findListenerAnnotations(targetClass); final boolean hasClassLevelListeners = classLevelListeners.size() > 0; final List
methods = new ArrayList<>(); final List
multiMethods = new ArrayList<>(); ReflectionUtils.doWithMethods(targetClass, method -> { Collection
listenerAnnotations = findListenerAnnotations(method); if (listenerAnnotations.size() > 0) { methods.add(new ListenerMethod(method, listenerAnnotations.toArray(new RabbitListener[listenerAnnotations.size()]))); } if (hasClassLevelListeners) { RabbitHandler rabbitHandler = AnnotationUtils.findAnnotation(method, RabbitHandler.class); if (rabbitHandler != null) { multiMethods.add(method); } } }, ReflectionUtils.USER_DECLARED_METHODS); if (methods.isEmpty() && multiMethods.isEmpty()) { return TypeMetadata.EMPTY; } return new TypeMetadata( methods.toArray(new ListenerMethod[methods.size()]), multiMethods.toArray(new Method[multiMethods.size()]), classLevelListeners.toArray(new RabbitListener[classLevelListeners.size()])); }复制代码

@RabbitListener可以放在方法上,也可以放在类上。对于Class级别的Listener,会配合RabbitHandle进行绑定。对于method级别的Listener则不会。 解析完成后,由processListener方法对处理Listener。

protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,			Object adminTarget, String beanName) {		endpoint.setBean(bean);		endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);		endpoint.setId(getEndpointId(rabbitListener));		endpoint.setQueueNames(resolveQueues(rabbitListener));		endpoint.setConcurrency(resolveExpressionAsStringOrInteger(rabbitListener.concurrency(), "concurrency"));		endpoint.setBeanFactory(this.beanFactory);		endpoint.setReturnExceptions(resolveExpressionAsBoolean(rabbitListener.returnExceptions()));		String errorHandlerBeanName = resolveExpressionAsString(rabbitListener.errorHandler(), "errorHandler");		if (StringUtils.hasText(errorHandlerBeanName)) {			endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, RabbitListenerErrorHandler.class));		}		String group = rabbitListener.group();		if (StringUtils.hasText(group)) {			Object resolvedGroup = resolveExpression(group);			if (resolvedGroup instanceof String) {				endpoint.setGroup((String) resolvedGroup);			}		}		String autoStartup = rabbitListener.autoStartup();		if (StringUtils.hasText(autoStartup)) {			endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup));		}		endpoint.setExclusive(rabbitListener.exclusive());		String priority = resolve(rabbitListener.priority());		if (StringUtils.hasText(priority)) {			try {				endpoint.setPriority(Integer.valueOf(priority));			}			catch (NumberFormatException ex) {				throw new BeanInitializationException("Invalid priority value for " +						rabbitListener + " (must be an integer)", ex);			}		}		String rabbitAdmin = resolve(rabbitListener.admin());		if (StringUtils.hasText(rabbitAdmin)) {			Assert.state(this.beanFactory != null, "BeanFactory must be set to resolve RabbitAdmin by bean name");			try {				endpoint.setAdmin(this.beanFactory.getBean(rabbitAdmin, RabbitAdmin.class));			}			catch (NoSuchBeanDefinitionException ex) {				throw new BeanInitializationException("Could not register rabbit listener endpoint on [" +						adminTarget + "], no " + RabbitAdmin.class.getSimpleName() + " with id '" +						rabbitAdmin + "' was found in the application context", ex);			}		}		RabbitListenerContainerFactory
factory = null; String containerFactoryBeanName = resolve(rabbitListener.containerFactory()); if (StringUtils.hasText(containerFactoryBeanName)) { Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name"); try { factory = this.beanFactory.getBean(containerFactoryBeanName, RabbitListenerContainerFactory.class); } catch (NoSuchBeanDefinitionException ex) { throw new BeanInitializationException("Could not register rabbit listener endpoint on [" + adminTarget + "] for bean " + beanName + ", no " + RabbitListenerContainerFactory.class.getSimpleName() + " with id '" + containerFactoryBeanName + "' was found in the application context", ex); } } this.registrar.registerEndpoint(endpoint, factory); }复制代码

先设置endpoint的相关属性,再获取rabbitListenerContainerFactory,根据endpoint和rabbitListenerContainerFactory创建一个messageListenerContainer,并创建endpoint的id到messageListenerContainer的映射。 当收到消息时,SpringBoot会调用绑定好的方法。


本篇到此结束,如果读完觉得有收获的话,欢迎点赞、关注、加公众号【贰级天災】,查阅更多精彩历史!!!

转载地址:http://cnntx.baihongyu.com/

你可能感兴趣的文章
poj - 1860 Currency Exchange
查看>>
chgrp命令
查看>>
Java集合框架GS Collections具体解释
查看>>
洛谷 P2486 BZOJ 2243 [SDOI2011]染色
查看>>
linux 笔记本的温度提示
查看>>
数值积分中的辛普森方法及其误差估计
查看>>
Web service (一) 原理和项目开发实战
查看>>
跑带宽度多少合适_跑步机选购跑带要多宽,你的身体早就告诉你了
查看>>
广平县北方计算机第一届PS设计大赛
查看>>
深入理解Java的接口和抽象类
查看>>
java与xml
查看>>
Javascript异步数据的同步处理方法
查看>>
iis6 zencart1.39 伪静态规则
查看>>
SQL Server代理(3/12):代理警报和操作员
查看>>
基于事件驱动的DDD领域驱动设计框架分享(附源代码)
查看>>
Linux备份ifcfg-eth0文件导致的网络故障问题
查看>>
2018年尾总结——稳中成长
查看>>
JFreeChart开发_用JFreeChart增强JSP报表的用户体验
查看>>
度量时间差
查看>>
apache prefork模式优化错误
查看>>