spring boot集成rabbitmq的实例教程

一、RabbitMQ的介绍  

RabbitMQ是消息中间件的一种,消息中间件即分布式系统中完成消息的发送和接收的基础软件.这些软件有很多,包括ActiveMQ(apache公司的),RocketMQ(阿里巴巴公司的,现已经转让给apache).

消息中间件的工作过程可以用生产者消费者模型来表示.即,生产者不断的向消息队列发送信息,而消费者从消息队列中消费信息.具体过程如下:

从上图可看出,对于消息队列来说,生产者,消息队列,消费者是最重要的三个概念,生产者发消息到消息队列中去,消费者监听指定的消息队列,并且当消息队列收到消息之后,接收消息队列传来的消息,并且给予相应的处理.消息队列常用于分布式系统之间互相信息的传递.

对于RabbitMQ来说,除了这三个基本模块以外,还添加了一个模块,即交换机(Exchange).它使得生产者和消息队列之间产生了隔离,生产者将消息发送给交换机,而交换机则根据调度策略把相应的消息转发给对应的消息队列.那么RabitMQ的工作流程如下所示:

紧接着说一下交换机.交换机的主要作用是接收相应的消息并且绑定到指定的队列.交换机有四种类型,分别为Direct,topic,headers,Fanout.

Direct是RabbitMQ默认的交换机模式,也是最简单的模式.即创建消息队列的时候,指定一个BindingKey.当发送者发送消息的时候,指定对应的Key.当Key和消息队列的BindingKey一致的时候,消息将会被发送到该消息队列中.

topic转发信息主要是依据通配符,队列和交换机的绑定主要是依据一种模式(通配符+字符串),而当发送消息的时候,只有指定的Key和该模式相匹配的时候,消息才会被发送到该消息队列中.

headers也是根据一个规则进行匹配,在消息队列和交换机绑定的时候会指定一组键值对规则,而发送消息的时候也会指定一组键值对规则,当两组键值对规则相匹配的时候,消息会被发送到匹配的消息队列中.

Fanout是路由广播的形式,将会把消息发给绑定它的全部队列,即便设置了key,也会被忽略. 

概念:

  • 生产者 消息的产生方,负责将消息推送到消息队列
  • 消费者 消息的最终接受方,负责监听队列中的对应消息,消费消息
  • 队列 消息的寄存器,负责存放生产者发送的消息
  • 交换机 负责根据一定规则分发生产者产生的消息
  • 绑定 完成交换机和队列之间的绑定

模式:

1、direct

直连模式,用于实例间的任务分发

2、topic

话题模式,通过可配置的规则分发给绑定在该exchange上的队列

3、headers

适用规则复杂的分发,用headers里的参数表达规则

4、fanout

分发给所有绑定到该exchange上的队列,忽略routing key

安装

单机版安装很简单,大概步骤如下:

# 安装erlang包

yum install erlang

# 安装socat

yum install socat

# 安装rabbit

rpm -ivh rabbitmq-server-3.6.6-1.el6.noarch.rpm

# 启动服务

rabbitmq-server start

# 增加管理控制功能

rabbitmq-plugins enable rabbitmq_management

# 增加用户:

sudo rabbitmqctl add_user root password

rabbitmqctl set_user_tags root administrator

rabbitmqctl set_permissions -p / root '.*' '.*' '.*'

集群安装,可参考这篇文章:

     rabbitmq集群安装

以上就是rabbitmq的介绍,下面开始本文的正文:spring boot 集成rabbitmq ,本人在学习rabbitmq时发现网上很少有系统性介绍springboot和rabbitmq如何集成的,其他人总结的都片段化,所以结合个人调研过程,整理此篇文章。

二、springboot配置

废话少说直接上代码:

配置参数

application.yml:

spring:

rabbitmq:

addresses: 192.168.1.1:5672

username: username

password: password

publisher-confirms: true

virtual-host: /

java config读取参数

/**

* RabbitMq配置文件读取类

*

* @author chenhf

* @create 2017-10-23 上午9:31

**/

@Configuration

@ConfigurationProperties(prefix = "spring.rabbitmq")

public class RabbitMqConfig {

@Value("${spring.rabbitmq.addresses}")

private String addresses;

@Value("${spring.rabbitmq.username}")

private String username;

@Value("${spring.rabbitmq.password}")

private String password;

@Value("${spring.rabbitmq.publisher-confirms}")

private Boolean publisherConfirms;

@Value("${spring.rabbitmq.virtual-host}")

private String virtualHost;

// 构建mq实例工厂

@Bean

public ConnectionFactory connectionFactory(){

CachingConnectionFactory connectionFactory = new CachingConnectionFactory();

connectionFactory.setAddresses(addresses);

connectionFactory.setUsername(username);

connectionFactory.setPassword(password);

connectionFactory.setPublisherConfirms(publisherConfirms);

connectionFactory.setVirtualHost(virtualHost);

return connectionFactory;

}

@Bean

public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){

return new RabbitAdmin(connectionFactory);

}

@Bean

@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)

public RabbitTemplate rabbitTemplate(){

RabbitTemplate template = new RabbitTemplate(connectionFactory());

return template;

}

}

三、rabbitmq生产者配置

主要配置了直连和话题模式,其中话题模式设置两个队列(queueTopicTest1、queueTopicTest2),此两个队列在和交换机绑定时分别设置不同的routingkey(.TEST.以及lazy.#)来验证匹配模式。

/**

* 用于配置交换机和队列对应关系

* 新增消息队列应该按照如下步骤

* 1、增加queue bean,参见queueXXXX方法

* 2、增加queue和exchange的binding

* @author chenhf

* @create 2017-10-23 上午10:33

**/

@Configuration

@AutoConfigureAfter(RabbitMqConfig.class)

public class RabbitMqExchangeConfig {

/** logger */

private static final Logger logger = LoggerFactory.getLogger(RabbitMqExchangeConfig.class);

/**

* @Author:chenhf

* @Description: 主题型交换机

* @Date:下午5:49 2017/10/23

* @param

* @return

*/

@Bean

TopicExchange contractTopicExchangeDurable(RabbitAdmin rabbitAdmin){

TopicExchange contractTopicExchange = new TopicExchange(RabbitMqEnum.Exchange.CONTRACT_TOPIC.getCode());

rabbitAdmin.declareExchange(contractTopicExchange);

logger.debug("完成主题型交换机bean实例化");

return contractTopicExchange;

}

/**

* 直连型交换机

*/

@Bean

DirectExchange contractDirectExchange(RabbitAdmin rabbitAdmin) {

DirectExchange contractDirectExchange = new DirectExchange(RabbitMqEnum.Exchange.CONTRACT_DIRECT.getCode());

rabbitAdmin.declareExchange(contractDirectExchange);

logger.debug("完成直连型交换机bean实例化");

return contractDirectExchange;

}

//在此可以定义队列

@Bean

Queue queueTest(RabbitAdmin rabbitAdmin){

Queue queue = new Queue(RabbitMqEnum.QueueName.TESTQUEUE.getCode());

rabbitAdmin.declareQueue(queue);

logger.debug("测试队列实例化完成");

return queue;

}

//topic 1

@Bean

Queue queueTopicTest1(RabbitAdmin rabbitAdmin){

Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST1.getCode());

rabbitAdmin.declareQueue(queue);

logger.debug("话题测试队列1实例化完成");

return queue;

}

//topic 2

@Bean

Queue queueTopicTest2(RabbitAdmin rabbitAdmin){

Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST2.getCode());

rabbitAdmin.declareQueue(queue);

logger.debug("话题测试队列2实例化完成");

return queue;

}

//在此处完成队列和交换机绑定

@Bean

Binding bindingQueueTest(Queue queueTest,DirectExchange exchange,RabbitAdmin rabbitAdmin){

Binding binding = BindingBuilder.bind(queueTest).to(exchange).with(RabbitMqEnum.QueueEnum.TESTQUEUE.getCode());

rabbitAdmin.declareBinding(binding);

logger.debug("测试队列与直连型交换机绑定完成");

return binding;

}

//topic binding1

@Bean

Binding bindingQueueTopicTest1(Queue queueTopicTest1,TopicExchange exchange,RabbitAdmin rabbitAdmin){

Binding binding = BindingBuilder.bind(queueTopicTest1).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE1.getCode());

rabbitAdmin.declareBinding(binding);

logger.debug("测试队列与话题交换机1绑定完成");

return binding;

}

//topic binding2

@Bean

Binding bindingQueueTopicTest2(Queue queueTopicTest2,TopicExchange exchange,RabbitAdmin rabbitAdmin){

Binding binding = BindingBuilder.bind(queueTopicTest2).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE2.getCode());

rabbitAdmin.declareBinding(binding);

logger.debug("测试队列与话题交换机2绑定完成");

return binding;

}

}

在这里用到枚举类:RabbitMqEnum

/**

* 定义rabbitMq需要的常量

*

* @author chenhf

* @create 2017-10-23 下午4:07

**/

public class RabbitMqEnum {

/**

* @param

* @Author:chenhf

* @Description:定义数据交换方式

* @Date:下午4:08 2017/10/23

* @return

*/

public enum Exchange {

CONTRACT_FANOUT("CONTRACT_FANOUT", "消息分发"),

CONTRACT_TOPIC("CONTRACT_TOPIC", "消息订阅"),

CONTRACT_DIRECT("CONTRACT_DIRECT", "点对点");

private String code;

private String name;

Exchange(String code, String name) {

this.code = code;

this.name = name;

}

public String getCode() {

return code;

}

public String getName() {

return name;

}

}

/**

* describe: 定义队列名称

* creat_user: chenhf

* creat_date: 2017/10/31

**/

public enum QueueName {

TESTQUEUE("TESTQUEUE", "测试队列"),

TOPICTEST1("TOPICTEST1", "topic测试队列"),

TOPICTEST2("TOPICTEST2", "topic测试队列");

private String code;

private String name;

QueueName(String code, String name) {

this.code = code;

this.name = name;

}

public String getCode() {

return code;

}

public String getName() {

return name;

}

}

/**

* describe: 定义routing_key

* creat_user: chenhf

* creat_date: 2017/10/31

**/

public enum QueueEnum {

TESTQUEUE("TESTQUEUE1", "测试队列key"),

TESTTOPICQUEUE1("*.TEST.*", "topic测试队列key"),

TESTTOPICQUEUE2("lazy.#", "topic测试队列key");

private String code;

private String name;

QueueEnum(String code, String name) {

this.code = code;

this.name = name;

}

public String getCode() {

return code;

}

public String getName() {

return name;

}

}

}

以上完成消息生产者的定义,下面封装调用接口

测试时直接调用此工具类,testUser类需自己实现

rabbitMqSender.sendRabbitmqDirect("TESTQUEUE1",testUser);

rabbitMqSender.sendRabbitmqTopic("lazy.1.2",testUser);

rabbitMqSender.sendRabbitmqTopic("lazy.TEST.2",testUser);

/**

* rabbitmq发送消息工具类

*

* @author chenhf

* @create 2017-10-26 上午11:10

**/

@Component

public class RabbitMqSender implements RabbitTemplate.ConfirmCallback{

/** logger */

private static final Logger logger = LoggerFactory.getLogger(RabbitMqSender.class);

private RabbitTemplate rabbitTemplate;

@Autowired

public RabbitMqSender(RabbitTemplate rabbitTemplate) {

this.rabbitTemplate = rabbitTemplate;

this.rabbitTemplate.setConfirmCallback(this);

}

@Override

public void confirm(CorrelationData correlationData, boolean b, String s) {

logger.info("confirm: " + correlationData.getId());

}

/**

* 发送到 指定routekey的指定queue

* @param routeKey

* @param obj

*/

public void sendRabbitmqDirect(String routeKey,Object obj) {

CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

logger.info("send: " + correlationData.getId());

this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_DIRECT.getCode(), routeKey , obj, correlationData);

}

/**

* 所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上

* @param routeKey

* @param obj

*/

public void sendRabbitmqTopic(String routeKey,Object obj) {

CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

logger.info("send: " + correlationData.getId());

this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_TOPIC.getCode(), routeKey , obj, correlationData);

}

}

四、rabbitmq消费者配置

springboot注解方式监听队列,无法手动指定回调,所以采用了实现ChannelAwareMessageListener接口,重写onMessage来进行手动回调,详见以下代码,详细介绍可以在spring的官网上找amqp相关章节阅读

直连消费者

通过设置TestUser的name来测试回调,分别发两条消息,一条UserName为1,一条为2,查看控制台中队列中消息是否被消费

/**

* 消费者配置

*

* @author chenhf

* @create 2017-10-30 下午3:14

**/

@Configuration

@AutoConfigureAfter(RabbitMqConfig.class)

public class ExampleAmqpConfiguration {

@Bean("testQueueContainer")

public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();

container.setConnectionFactory(connectionFactory);

container.setQueueNames("TESTQUEUE");

container.setMessageListener(exampleListener());

container.setAcknowledgeMode(AcknowledgeMode.MANUAL);

return container;

}

@Bean("testQueueListener")

public ChannelAwareMessageListener exampleListener() {

return new ChannelAwareMessageListener() {

@Override

public void onMessage(Message message, Channel channel) throws Exception {

TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody());

//通过设置TestUser的name来测试回调,分别发两条消息,一条UserName为1,一条为2,查看控制台中队列中消息是否被消费

if ("2".equals(testUser.getUserName())){

System.out.println(testUser.toString());

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

}

if ("1".equals(testUser.getUserName())){

System.out.println(testUser.toString());

channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);

}

}

};

}

}

topic消费者1

/**

* 消费者配置

*

* @author chenhf

* @create 2017-10-30 下午3:14

**/

@Configuration

@AutoConfigureAfter(RabbitMqConfig.class)

public class TopicAmqpConfiguration {

@Bean("topicTest1Container")

public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();

container.setConnectionFactory(connectionFactory);

container.setQueueNames("TOPICTEST1");

container.setMessageListener(exampleListener1());

container.setAcknowledgeMode(AcknowledgeMode.MANUAL);

return container;

}

@Bean("topicTest1Listener")

public ChannelAwareMessageListener exampleListener1(){

return new ChannelAwareMessageListener() {

@Override

public void onMessage(Message message, Channel channel) throws Exception {

TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody());

System.out.println("TOPICTEST1:"+testUser.toString());

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

}

};

}

}

topic消费者2

/**

* 消费者配置

*

* @author chenhf

* @create 2017-10-30 下午3:14

**/

@Configuration

@AutoConfigureAfter(RabbitMqConfig.class)

public class TopicAmqpConfiguration2 {

@Bean("topicTest2Container")

public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();

container.setConnectionFactory(connectionFactory);

container.setQueueNames("TOPICTEST2");

container.setMessageListener(exampleListener());

container.setAcknowledgeMode(AcknowledgeMode.MANUAL);

return container;

}

@Bean("topicTest2Listener")

public ChannelAwareMessageListener exampleListener() {

return new ChannelAwareMessageListener() {

@Override

public void

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对的支持。

以上是 spring boot集成rabbitmq的实例教程 的全部内容, 来源链接: utcz.com/p/215328.html

回到顶部