SpringbBoot整合RabbitMQ入门

/ 技术 / 无站内评论 / 358浏览

前言:

    MQ 是什么?队列是什么,MQ 我们可以理解为消息队列,队列我们可以理解为管道。以管道的方式做消息传递。

场景:

    1.其实我们在双11的时候,当我们凌晨大量的秒杀和抢购商品,然后去结算的时候,就会发现,界面会提醒我们,让我们稍等,以及一些友好的图片文字提醒。而不是像前几年的时代,动不动就页面卡死,报错等来呈现给用户。

    在这业务场景中,我们就可以采用队列的机制来处理,因为同时结算就只能达到这么多。

    2.在我们平时的超市中购物也是一样,当我们在结算的时候,并不会一窝蜂一样涌入收银台,而是排队结算。这也是队列机制。

对,就是排队。一个接着一个的处理,不能插队。

RabbitMQ简介

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。 AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。 RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 下面将重点介绍RabbitMQ中的一些基础概念,了解了这些概念,是使用好RabbitMQ的基础。

RabbitMQ 概念太多了,这个博客讲的不错,这里就不过多赘述。

https://www.sojson.com/blog/48.html

RabbitMQ 选型和对比

1.从社区活跃度

按照目前网络上的资料,RabbitMQ 、activeM 、ZeroMQ 三者中,综合来看,RabbitMQ 是首选。 

2.持久化消息比较

ZeroMq 不支持,ActiveMq 和RabbitMq 都支持。持久化消息主要是指我们机器在不可抗力因素等情况下挂掉了,消息不会丢失的机制。

3.综合技术实现

可靠性、灵活的路由、集群、事务、高可用的队列、消息排序、问题追踪、可视化管理工具、插件系统等等。

RabbitMq / Kafka 最好,ActiveMq 次之,ZeroMq 最差。当然ZeroMq 也可以做到,不过自己必须手动写代码实现,代码量不小。尤其是可靠性中的:持久性、投递确认、发布者证实和高可用性。

4.高并发

毋庸置疑,RabbitMQ 最高,原因是它的实现语言是天生具备高并发高可用的erlang 语言。

5.比较关注的比较, RabbitMQ 和 Kafka

RabbitMq 比Kafka 成熟,在可用性上,稳定性上,可靠性上,  RabbitMq  胜于  Kafka  (理论上)。

另外,Kafka 的定位主要在日志等方面, 因为Kafka 设计的初衷就是处理日志的,可以看做是一个日志(消息)系统一个重要组件,针对性很强,所以 如果业务方面还是建议选择 RabbitMq 。

还有就是,Kafka 的性能(吞吐量、TPS )比RabbitMq 要高出来很多。

选型总结:

如果我们系统中已经有选择  Kafka  ,或者   RabbitMq  ,并且完全可以满足现在的业务,建议就不用重复去增加和造轮子。

可以在  Kafka  和   RabbitMq  中选择一个适合自己团队和业务的,这个才是最重要的。但是毋庸置疑现阶段,综合考虑没有第三选择。

安装

安装环境

RabbitMQ是基于Erlang开发的,所以在安装rabbitMQ之前,需要先安装Erlang 。 alt

https://pan.baidu.com/s/1c2826rA

我使用的是otp_win64_18.1 ,需要其他版本或者32位系统的,可以去官网下载。

全部点击“下一步”就行。

有的选择其他的安装方式,可能需要添加一下系统环境变量(自行百度)

安装RabbitMQ

下载运行rabbitmq-server-3.6.5 ,需要其他版本或者32位系统的,可以去官网下载。

依旧可以不改变默认进行安装。

需要注意:默认安装的RabbitMQ 监听端口是5672

配置

激活 RabbitMQ's Management Plugin 使用RabbitMQ 管理插件,可以更好的可视化方式查看Rabbit MQ 服务器实例的状态。 如图 alt

打开命令窗口:

输入命令:

"C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.5\sbin\rabbitmq-plugins.bat enable rabbitmq_management

这样,就安装好插件了,是不是能使用了呢?别急,需要重启服务才行,使用命令(管理员权限):

net stop RabbitMQ && net start RabbitMQ

这时候就可以打开http://localhost:15672/#/进行观察了。 默认帐号密码均为:guest

安装具体步骤可以参考:https://www.cnblogs.com/ericli-ericli/p/5902270.html

alt

代码

目录

alt

Rabbitmq依赖

<!-- ampq 依赖包 -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>

application.yml 配置

spring:
  rabbitmq:
        addresses: 127.0.0.1:5672
        username: guest
        password: guest
        publisher-confirms: true
        virtual-host: /

使用注解配置

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;

/**
 * @Auther: sanii
 * @Date: 2018/5/9 10:43
 * @Description:
 * 增加回调处理,这里不再使用application.properties默认配置的方式,会在程序中显示的使用文件中的配置信息
 */
@Configuration
public class AmqpConfig {

    public static final String FOO_EXCHANGE   = "callback.exchange.foo";
    public static final String FOO_ROUTINGKEY = "callback.routingkey.foo";
    public static final String FOO_QUEUE      = "callback.queue.foo";
    
    //获取yml文件中的配置
    @Value("${spring.rabbitmq.addresses}")
    private String addresses;
    @Value("${spring.rabbitmq.username}")
    private String username;
    @Value("${spring.rabbitmq.password}")
    private String password;
    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;
    @Value("${spring.rabbitmq.publisher-confirms}")
    private boolean publisherConfirms;

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses(addresses);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        /** 如果要进行消息回调,则这里必须要设置为true */
        connectionFactory.setPublisherConfirms(publisherConfirms);
        return connectionFactory;
    }

    @Bean
    /** 因为要设置回调类,所以应是prototype类型,如果是singleton类型,则回调类为最后一次设置 */
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        return template;
    }

}

Spring中为了减少xml中配置,可以声明一个配置类(例如SpringConfig)来对bean进行配置。 用@Configuration注解该类,等价 与XML中配置beans;用@Bean标注方法等价于XML中配置bean。

消息生产者



import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.UUID;
/**
 * @Auther: sanii
 * @Date: 2018/5/8 17:23
 * @Description:
 * RabbitMQ生产者
 */
@Component
public class Sender implements RabbitTemplate.ConfirmCallback {

    private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class);

    private RabbitTemplate rabbitTemplate;

    @Autowired
    public Sender(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        this.rabbitTemplate.setConfirmCallback(this);
    }

    public void send(String msg) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        LOGGER.info("send: " + correlationData.getId());
        this.rabbitTemplate.convertAndSend(AmqpConfig.FOO_EXCHANGE, AmqpConfig.FOO_ROUTINGKEY, msg, correlationData);
    }

    /**
     * @author sanii
     * @date 2018/5/9 11:03
     * @Description:
     * 回调方法,确认消息是否被消费
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        LOGGER.info("confirm: " + correlationData.getId());
    }
}

消息消费者



import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.handler.annotation.Payload;
/**
 * @author sanii
 * @date 2018/5/9 10:05
 * @Description:
 * RabbitMQ消费者
 */
@Configuration
@RabbitListener(queues = AmqpConfig.FOO_QUEUE)
public class Listener {

    private static final Logger LOGGER = LoggerFactory.getLogger(Listener.class);

    /** 设置交换机类型  */
    @Bean
    public DirectExchange defaultExchange() {
        /**
         * DirectExchange:按照routingkey分发到指定队列
         * TopicExchange:多关键字匹配
         * FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
         * HeadersExchange :通过添加属性key-value匹配
         */
        return new DirectExchange(AmqpConfig.FOO_EXCHANGE);
    }

    @Bean
    public Queue fooQueue() {
        return new Queue(AmqpConfig.FOO_QUEUE);
    }

    @Bean
    public Binding binding() {
        /** 将队列绑定到交换机 */
        return BindingBuilder.bind(fooQueue()).to(defaultExchange()).with(AmqpConfig.FOO_ROUTINGKEY);
    }

    @RabbitHandler
    public void process(@Payload String foo) {
        LOGGER.info("Listener: " + foo);
    }
}

测试Controller



import cn.sanii.hello.mq.Sender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.servlet.http.HttpServletRequest;

@RestController
public class RabbitmqController {
    private static final Logger logger=LoggerFactory.getLogger(RabbitmqController.class);
    @Autowired
    private Sender sender;
    
     /**
     * @author shouliang.wang
     * @date 2018/5/10 17:30
     * @param msg 发现的消息体 count测试循环次数
     * @return 
     * @Description: 
     * 发送消息
     */
    @GetMapping("/send")
    public String send(HttpServletRequest request, String msg, int count) {
        long currentTimeMillis = System.currentTimeMillis();
        while (count >= 0) {
            sender.send(msg+count);
            count -= 1;
        }
        logger.info("总耗时:{}",(System.currentTimeMillis()-currentTimeMillis));
        return "Send OK 总耗时:"+(System.currentTimeMillis()-currentTimeMillis);
    }
}

效果

postman进行测试

alt

控制台

alt

分别是:发送者、监听者、确认消费成功

参考

https://blog.csdn.net/zl18310999566/article/details/54341057

https://www.cnblogs.com/yufeng218/p/8075941.html

https://www.cnblogs.com/ericli-ericli/p/5902270.html

https://blog.csdn.net/vvhesj/article/details/47661001

召唤蕾姆
琼ICP备18000156号

鄂公网安备 42011502000211号