利用外部mq中间件解决单实例的简单消息队列重启丢失的问题
hugh 的个人博客 2019-11-23 23:33:59 nodemqrabbitmq前端
# 1. 解决node中耗时操作对请求的影响
通过fork方法创建一个子进制,进行操作 详情
# 问题
一旦单个实例被异常重启, 会导致数据丢失
# 解决
采用外部mq中间件, 一般mq中间件都支持集群部署、容灾处理。 这里选用rabbitmq做示例
# 可以使用docker 安装rabbitmq作为测试消息队列
# 安装
docker pull rabbitmq:management
1
安装带管理端的rabbitmq镜像
docker run -d --name amqp.test -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
1
启动一个mq实例, -p为映射容器端口到主机端口 启动成功之后,可以在浏览器中访问
http://127.0.0.1:15672
1
这是mq的管理客户端
# 连接
我们使用node作为客户端链接,
npm install amqplib
1
连接到服务器分3步
- 创建连接
const amqp = require("amqplib")
this.conn = amqp.connect(this.rabbitMqConfig)
1
2
2
- 创建channel
this.channel = this.conn.then(function(conn) {
return conn.createChannel();
}, function(e) {
logger.error("rabbitmq conn close" + e.message);
})
1
2
3
4
5
2
3
4
5
- 收发消息
// 发消息 sendToQueue
this._getChannel().then(function(ch) {
return ch.assertQueue(name).then(function(ok) {
ch.sendToQueue(name, Buffer.from(JSON.stringify(obj)));
});
}).catch(logger.error);
1
2
3
4
5
6
2
3
4
5
6
// 收消息
this._getChannel().then(function(ch) {
ch.on('close', function() {
logger.error("rabbitmq channel close, ready to reconn");
self.conn = null
self.channel = null
self.channelFlag = false
let timer = setInterval(function() {
if(self.channelFlag) {
clearInterval(timer)
logger.info("rabbitmq channel reconn success");
return;
}
self.conn = null
self.channel = null
self.popFormQueue(name)
}, 1000)
})
self.channelFlag = true;
return ch.assertQueue(name).then(function(ok) {
return ch.consume(name, function(msg) {
if (msg !== null) {
// logger.info(msg.content.toString());
try{
if(self.ruleService) {
let curMsg = JSON.parse(msg.content.toString());
if(curMsg.item){
curMsg.item.isFromMQ = true
}
self.ruleService.doInterceptor(curMsg.item)
}
}catch(e) {
logger.error("rabbitmq fail:" + msg.content.toString())
}
ch.ack(msg);
}
});
});
}, function(e) {
logger.error("rabbitmq channel create error 1" + e,message)
}).then(null, function(e) {
logger.error("rabbitmq channel create error 2" + e,message)
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
接收消息,会复用同一个channel, 需要明确处理channel的error事件, 在链接失败时, 主动尝试重连。