hugh 的个人博客

vuePress-theme-reco hugh 的个人博客    2021
hugh 的个人博客

Choose mode

  • dark
  • auto
  • light
Home
分类
  • 前端
  • fe-robot
  • 前端监控
标签
专题
  • femonitor
  • jsby
  • fe-robot
TimeLine
工具
全版

hugh 的个人博客

154

Article

324

Tag

Home
分类
  • 前端
  • fe-robot
  • 前端监控
标签
专题
  • femonitor
  • jsby
  • fe-robot
TimeLine
工具
全版
  • 数据收集篇

  • 数据分析篇

  • 数据监控篇

  • 采坑篇

    • 奇怪的input_-前端监控之采坑篇
      • 大坑描述
      • 问题分析
      • 解决方案
    • comment类型的节点引发的还原布局错误-前端监控之采坑篇
      • 大坑描述
      • 问题分析
      • 问题解决
    • 利用外部mq中间件解决单实例的简单消息队列重启丢失的问题
      • 1. 解决node中耗时操作对请求的影响
      • 问题
      • 解决
    • mapreduce使用的坑-mac_reduce对数据集进行汇总计算时,出现数据样本不全的问题
      • 1. mapreduce
      • 2. 问题案例
      • 3. 原因
      • 4. 解决方案

利用外部mq中间件解决单实例的简单消息队列重启丢失的问题

vuePress-theme-reco hugh 的个人博客    2021

利用外部mq中间件解决单实例的简单消息队列重启丢失的问题


hugh 的个人博客 2019-11-23 23:33:59 nodemqrabbitmq前端

# 1. 解决node中耗时操作对请求的影响

通过fork方法创建一个子进制,进行操作 详情

# 问题

一旦单个实例被异常重启, 会导致数据丢失

# 解决

采用外部mq中间件, 一般mq中间件都支持集群部署、容灾处理。 这里选用rabbitmq做示例

# 可以使用docker 安装rabbitmq作为测试消息队列

# 安装

docker pull rabbitmq:management
1

安装带管理端的rabbitmq镜像

docker run -d --name amqp.test  -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
1

启动一个mq实例, -p为映射容器端口到主机端口 启动成功之后,可以在浏览器中访问

http://127.0.0.1:15672
1

这是mq的管理客户端

# 连接

我们使用node作为客户端链接,

npm install  amqplib
1

连接到服务器分3步

  • 创建连接
const amqp = require("amqplib")
this.conn = amqp.connect(this.rabbitMqConfig)
1
2
  • 创建channel
 this.channel = this.conn.then(function(conn) {
               return conn.createChannel();
           }, function(e) {
               logger.error("rabbitmq conn close" + e.message);
           })
1
2
3
4
5
  • 收发消息
// 发消息 sendToQueue
 this._getChannel().then(function(ch) {
            return ch.assertQueue(name).then(function(ok) {
                ch.sendToQueue(name, Buffer.from(JSON.stringify(obj)));
            });
        }).catch(logger.error);
1
2
3
4
5
6
// 收消息
this._getChannel().then(function(ch) {
            ch.on('close', function() {
                logger.error("rabbitmq channel close, ready to reconn");
                self.conn = null
                self.channel = null
                self.channelFlag = false
                let timer = setInterval(function() {
                    if(self.channelFlag) {
                        clearInterval(timer)
                        logger.info("rabbitmq channel reconn success");
                        return;
                    }
                    self.conn = null
                    self.channel = null
                    self.popFormQueue(name)
                }, 1000)
            })
            self.channelFlag = true;
            return ch.assertQueue(name).then(function(ok) {
                return ch.consume(name, function(msg) {
                    if (msg !== null) {
                        // logger.info(msg.content.toString());
                        try{
                            if(self.ruleService) {
                                let curMsg = JSON.parse(msg.content.toString());
                                if(curMsg.item){
                                    curMsg.item.isFromMQ = true
                                } 
                                self.ruleService.doInterceptor(curMsg.item)
                            }
                        }catch(e) {
                            logger.error("rabbitmq fail:" + msg.content.toString())
                        }
                        ch.ack(msg);
                    }
                });
            });
        }, function(e) {
            logger.error("rabbitmq channel create error 1" + e,message)
        }).then(null, function(e) {
            logger.error("rabbitmq channel create error 2" + e,message)
        });
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

接收消息,会复用同一个channel, 需要明确处理channel的error事件, 在链接失败时, 主动尝试重连。