RabbitMQ消息中间件

引入

生产者大批量的产生数据,而消费者无法在短时间快速消费,那么就需要一个中间层,保存这些数据,RabbitMQ主要实现系统之间的双向解耦。

一. 优点:

  1. 底层采用Erlang语言编写
  2. 开源、性能优秀,稳定性保障
  3. 与SpringAMQP完美的整合、API丰富

二. 介绍:

  1. 集群模式丰富、表达式配置、HA模式、镜像队列模式
  2. 保证数据不丢失的前提做到高可靠性、可用性

相关概念:
1、Server: Broker,接受客户端的连接,实现AMQP实体服务
2、Connection: 连接,应用程序与Broker的网络连接
3、Channel: 网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可建立多个Channel,每个Channel代表一个会话任务。
4、Message: 消息,服务器和应用程序之间传送的数据,由Properties和Body组成。Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性; Body则就是消息体内容。
5、Virtual host: 虚拟地址,用于进行逻辑隔离,最上层的消息路由。一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有相同名称的Exchange或Queue。
6、Exchange: 交换机,接收消息,根据路由键转发消息到绑定的队列。
7、Binding: Exchange和Queue之间的虚拟连接,Binding中可以包含routing key。
8、Routing key: 一个路由规则,虚拟机可用它来确定如何路由一个特定消息
9、Queue: 消息队列,保存消息并将它们转发给消费者。

三. 消息中间件工作流程:

Alt text
1、生产者往RabbitMQ中发送消息;
2、消息中间件包含了交换机、队列,(虚拟主机为抽象出来用于控制各组用户访问指定的交换机,默认虚拟主机为:/)
3、交换机:用于转发消息的,若交换机没有与队列做绑定的话,传过来的消息会被丢弃掉。
交换机有四种类型:Direct,消息的routing_key匹配到队列时才会被投到对应的队列中去;Topic,转发消息主要是通过通配符匹配(.表示一个词,#表示零个或多个);Headers,设置header attribute参数类型的交换机;Fanout:转发消息到所有绑定的队列中

四. spring boot集成使用

spring boot框架集成了消息中间件,我们使用中间件时,有两个模板可以调用,amqptemplate 和 rabbittemplate,amqptemplate是Spring AMQP提供的一个消息发送和接收的操作模板,定义了一些发送和接收消息等基本功能,而rabbittemplate是amqptemplate的一个实现,支持消息的确认、返回,习惯上使用amqptemplate

4.1、一对一发送

(1)配置新建hello队列

1
2
3
4
5
6
7
8
9
10
11
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {
@Bean
public Queue helloQueue(){
return new Queue("hello");
}
}

(2)接受者 监听hello队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RabbitListener(queues = "hello")
public class MQReceiver {
@RabbitHandler
public void process(String content){
log.info("==> ==> MQReceiver:" + content);
}
}

(3)生产者方法,供调用

1
2
3
4
5
6
7
8
9
10
11
12
@Component
@Slf4j
public class MQSender {

@Autowired
private AmqpTemplate amqpTemplate;

public void send(){
String content = "hello " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SS").format(new Date());
amqpTemplate.convertAndSend("hello", content);
}
}

(4)启动项目后,使用单元测试调用生产者方法,向中间件中发送消息

1
2
3
4
5
6
7
8
9
10
11
12
@RunWith(SpringRunner.class)
@SpringBootTest
public class NycloudExampleAmqpApplicationTests {

@Autowired
private MQSender mqSender;

@Test
public void contextLoads() {
mqSender.send();
}
}

4.2、一对多发送

添加一个消费者同样监听与上面相同的hello队列

1
2
3
4
5
6
7
8
9
@Slf4j
@Component
@RabbitListener(queues = "hello")
public class MQReceiver_2 {
@RabbitHandler
public void process(String content){
log.info("==> ==> MQReceiver 2 :" + content);
}
}

测试发现消费者均匀的接收到了生产者发送过来的消息

4.3、多对多发送

注入两个生产者bean,调用数10次发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@RunWith(SpringRunner.class)
@SpringBootTest
public class NycloudExampleAmqpApplicationTests {
@Autowired
private MQSender mqSender_1;

@Autowired
private MQSender mqSender_2;

@Test
public void contextLoads() {
for(int i=0;i<10;i++){
mqSender_1.send("1---"+i);
mqSender_2.send("2---"+i);
}
}
}

测试发现消费者均匀的接收到了生产者发送过来的消息

4.4、发送对象

SysUser类,即发送的对象,需要序列化,实现Serializable

1
2
3
4
5
6
@Data
public class SysUser implements Serializable {
private int userid;
private String name;
private int age;
}

生产者方法改写

1
2
3
4
5
6
7
8
9
10
11
@Component
@Slf4j
public class MQSender {

@Autowired
private AmqpTemplate amqpTemplate;

public void send(Object sysUser){
amqpTemplate.convertAndSend("hello", sysUser);
}
}

消费者方法改写

1
2
3
4
5
6
7
8
9
10
@Slf4j
@Component
@RabbitListener(queues = "hello")
public class MQReceiver {

@RabbitHandler
public void process(SysUser sysUser){
log.info("==> ==> MQReceiver:" + sysUser.toString());
}
}

接收到的数据正常

4.5、topic类型测试

该模式下可以根据routing_key来绑定不同的队列

新建两个队列topic.msg1、topic.msg2,一个交换机exchange,路由键topic.msg1绑定到topic.msg1,路由键topic.*绑定到topic.msg2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Configuration
public class TopicRabbitConfig {
public final static String msg1 = "topic.msg1";
public final static String msg2 = "topic.msg2";

@Bean
public Queue queueMsg1(){
return new Queue(msg1);
}

@Bean
public Queue queueMsg2(){
return new Queue(msg2);
}

@Bean
public TopicExchange exchange(){
return new TopicExchange("exchange");
}

@Bean
Binding bindExchangeMsg1(Queue queueMsg1,TopicExchange exchange){
return BindingBuilder.bind(queueMsg1).to(exchange).with("topic.msg1");
}

@Bean
Binding bindExchangeMsg2(Queue queueMsg2,TopicExchange exchange){
return BindingBuilder.bind(queueMsg2).to(exchange).with("topic.*");
}

}

循环10次发送到两个队列中,测试发送的时候可以将接收的部分代码注释掉

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@RunWith(SpringRunner.class)
@SpringBootTest
public class NycloudExampleAmqpApplicationTests {

@Autowired
private MQSender mqSender;

@Test
public void contextLoads() {
for(int i=0;i<10;i++){
SysUser sysUser = new SysUser();
sysUser.setAge(i);
sysUser.setName("send 1");
sysUser.setUserid(i+1);
mqSender.send_1(sysUser);
sysUser.setName("send 2");
mqSender.send_2(sysUser);
}
}
}


@Component
@Slf4j
public class MQSender {
@Autowired
private AmqpTemplate amqpTemplate;

public void send_1(Object sysUser){
amqpTemplate.convertAndSend("exchange", "topic.msg1", sysUser);
}

public void send_2(Object sysUser){
amqpTemplate.convertAndSend("exchange", "topic.msg2", sysUser);
}
}

结果topic.msg1队列有10条数据,topic.msg2队列有20条数据,分析不难发现,由于第一个路由键规则为topic.msg1,所以只能接收到路由键值为topic.msg1的消息,而第二个路由键规则为topic.*(若规则为topic.#可以接收到topic.xxx.xxx.xxx…..的消息),会接收路由键为topic.xxx的消息,所以会接收20条

五. python集成使用

python使用rabbitMQ模块有以下几个工具模块可供选择:
主流的为:pika,还有Celery、Haigha、aioamqp,等,官网:http://www.rabbitmq.com/devtools.html

安装pika模块 : pip install pika(测试机器为python3.6.3,pika版本0.12.0)

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import pika

def callback(ch, method, properties, body):
print("接收到:",body)

if __name__ == '__main__':
credentials = pika.PlainCredentials('guest', 'guest')
Parameter = pika.ConnectionParameters('127.0.0.1', 5672, '/', credentials, heartbeat_interval=0)
connection = pika.BlockingConnection(Parameter)
channel = connection.channel()

channel.queue_declare(queue='topic.msg1',durable=True)
channel.basic_consume(callback,
queue='topic.msg1',
no_ack=True)
channel.start_consuming()

生产者

1
2
3
4
5
6
7
8
9
10
11
12
if __name__ == '__main__':
credentials = pika.PlainCredentials('guest', 'guest')
Parameter = pika.ConnectionParameters('127.0.0.1', 5672, '/', credentials, heartbeat_interval=0) # 创建一个连接
connection = pika.BlockingConnection(Parameter) # 创建通道
channel = connection.channel()

channel.queue_declare(queue='topic.msg1',durable=True) # 把消息队列的名字为hello

channel.basic_publish(exchange='exchange',
routing_key='topic.msg1',
body='Hello World!') # 设置routing_key(消息队列的名称)和body(发送的内容)
connection.close()

当然在实际使用的过程中,我们需要应用支持断线重连,接收到队列消息后,确认收到,经过一番处理,将结果写入另外一个队列中,来段示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class CollectImgInfo:
def __init__(self,a,b):
super().__init__()
self.a = a
self.b = b

@staticmethod
def convertObj(self):
return CollectImgInfo(self.a,self.b)


# 发送消息至队列并持久化 properties=pika.BasicProperties(delivery_mode=2) - 配置消息持久化
def sendInfo(channel,content):
channel.basic_publish(exchange='',routing_key='RESULT_QUEUE',body=content,properties=pika.BasicProperties(delivery_mode=2))

# 消息回调
def callback(ch, method,properties, body):
try:
# byte转字符串
content = str(body, encoding="utf8")
#json字符串解析成对象
obj = json.loads(content,object_hook=cinfo.CollectImgInfo.convertObj)
# 一系列的业务操作
...
...
...
expert Exception as ex:
print(str(ex))
sendInfo(ch, json.dumps(obj.__dict__)


# 初始化mq
def initMq():
try:
pika.PlainCredentials(username,password)
pika.BlockingConnection(pika.ConnectionParameters(ip, port,virtualhost,credentials))
channel = connection.channel()
# durable=True - 持久化
#channel.queue_declare(queue='TASK_QUEUE',durable=True)
#channel.queue_declare(queue='RESULT_QUEUE',durable=True)
# no_ack=True 开启自动应答
channel.basic_consume(callback,queue='TASK_QUEUE',no_ack=False)
channel.start_consuming()
except Exception as ex:
time.sleep(reConnectTime)
initMq()

if __name__ == '__main__':
initMq()