博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
搭建前端监控系统(五)Nodejs + RabbitMq 搭建消息队列,处理高并发问题
阅读量:6087 次
发布时间:2019-06-20

本文共 3754 字,大约阅读时间需要 12 分钟。

  背景:市面上的监控系统有很多,大多收费,对于小型前端项目来说,必然是痛点。另一点主要原因是,功能虽然通用,却未必能够满足我们自己的需求, 所以我们自给自足也许是个不错的办法。

  这是搭建前端监控系统的第五章,主要是介绍如何处理日志高并发上传的情况,跟着我一步步做,你也能搭建出一个属于自己的前端监控系统。

 

  请移步线上: 

 

  随着监控日志搜集的内容越来越多,终于,由于公司的一波推文,导致了日志的瞬间流量达到历史新高,以至于mysql无法处理如此多的连接,系统崩溃。 当然,作为日志上传的服务器,这个是必然会发生的情况,只是早晚的问题。 既然出现了并发问题,那么我们就着手来处理吧。日志上传如何缓解高并发的情况呢?我们分为三个小点来处理。

  •   增加日志上传的时间间隔

  正如我们所知,日志上传的时间间隔越长,用户在这个间隔内离开的几率就会越大,日志的漏传量就会增加,然后会导致日志的准确度降低。因为我们的探针是安插在浏览器内的,用户随时都有可能关掉,所以,理论上讲间隔越短越好,但这并不现实。所以这个需要在服务器的承受能力和日志的准确率之间做个权衡。由具体情况而定

  •   移除探针代码里冗余的参数,缩短参数名字的长度 

  另外一点,每台服务器的硬盘有限,带宽有限,如果参数名字太长,参数内容冗余,对服务器的硬盘和带宽都是一种极大的浪费。虽然每条日志都不起眼,但是日志起量了以后,就是会是一笔非常庞大的开销。

  •   Nodejs + RabbitMq 搭建消息队列,缓解瞬间并发量  

  对于一个前端来说,要把消息队列搭建起来还确实费了一番周折。 

  1)  ubantu16 安装RabbitMQ服务软件包,很多教程都要求安装erlang, 但是更新apt以后,直接执行安装命令,会自动安装erlang的核心组件的。(erlang始终无法成功安装,真心累。)

$ apt-get update$ apt-get install rabbitmq-server

  正常情况下是直接成功的,直接访问ip端口号就可以打开了 http://IP:15672, 如下图:

  2)现在我们需要一个有效的登录名和密码,执行如下命令

$ rabbitmqctl add_user username password  // 设置用户名密码$ rabbitmqctl set_user_tags username administrator  // 设置为管理员身份$ rabbitmqctl set_permissions -p / username ".*" ".*" ".*"  //为用户设置读写等权限

  OK, 现在我们登录进来就是这样的界面,如此消息队列服务我们算是搭建完成了。

  3)消息服务启动了,那么如何存消息,如何取消息呢?如下图所示:

  

  我能够接触到的关于消息队列的应用场景实在有限,所以不能介绍更复杂的内容,大致的思维逻辑如上图1:有消息进来,先存入消息队列里,另一端再从队列去取出来,完成接下来的工作。从代码的角度来看如上图2:就是一个生产者和消费者的模式,生产者不停的向消息队列里生产消息,消费者在有需要的时候,从消息队列里取消息, 一旦完成消费,队列里便移除这个消息。消息的生产者和消费者互相没有感知,生产者产生过剩的消息都存放在消息队列里,由消费者慢慢消耗。以此来削峰填谷,达到处理高并发的目的。当然这都是我的浅显理解,但是也足以满足目前日志上传的需求了。

  OK、理论说完了,具体如何实现呢?

  

let amqp = require('amqplib');module.exports = class RabbitMQ {  constructor() {    this.hosts = ["amqp://localhost"];     this.index = 0;    this.length = this.hosts.length;    this.open = amqp.connect(this.hosts[this.index]);  } // 消息生产者  sendQueueMsg(queueName, msg, errCallBack) {    let self = this;    self.open      .then(function (conn) {        return conn.createChannel();      })      .then(function (channel) {        return channel.assertQueue(queueName).then(function (ok) {          return channel.sendToQueue(queueName, new Buffer.from(msg), {            persistent: true          });        })          .then(function (data) {            if (data) {              errCallBack && errCallBack("success");              channel.close();            }          })          .catch(function () {            setTimeout(() => {              if (channel) {                channel.close();              }            }, 500)          });      })      .catch(function () {      // 这里尝试备用连接,我就一个,所以就处理了      });  }    // 消息消费者  receiveQueueMsg(queueName, receiveCallBack, errCallBack) {    let self = this;    self.open.then(function (conn) {        return conn.createChannel();      }).then(function (channel) {        return channel.assertQueue(queueName).then(function (ok) {            return channel.consume(queueName, function (msg) {              if (msg !== null) {                let data = msg.content.toString();                channel.ack(msg);                receiveCallBack && receiveCallBack(data);              }            }).finally(function () { });          })      })      .catch(function (e) {        errCallBack(e)      });  }}

 

消息队列测试:每隔5秒发送一条消息,每隔5秒取出一条消息,成功

var mq = new RabbitMQ()setInterval(function () {  mq.sendQueueMsg("queue1", "这是一个队列消息", function (err) {    console.log(err)  })}, 5000)setInterval(function () {  mq.receiveQueueMsg("queue1", function (msg) {    console.log(msg)  }, function (error) {    console.log(error)  })}, 5000)

 

RabbitMq消息队列使用中遇到的坑:

①   var mq = new RabbitMQ() 多次创建RabbitMQ对象,导致connections, channels, memory 暴增,服务器很快挂掉

②  生产者的channel忘记close, 导致channel太多,服务器超负荷

③   消费者的channel被close掉了,永远只能接收到一条消息,消息队列很快爆掉

最后是消息队列运行的状态:

 

OK、经过了这么一番处理,我们的日志上传应该能够承受住一定量的并发了,让我们拭目以待吧。

 

上一章:

 

参考:   

转载于:https://www.cnblogs.com/warm-stranger/p/11000996.html

你可能感兴趣的文章
corosync+pacemaker高可用集群
查看>>
看完就能出去神侃,来自研发第一线的“区块链”扫盲文(一)
查看>>
比较全的 POM.xml
查看>>
7.VMware View 4.6安装与部署-connection server(View Security Server)
查看>>
Hyper-V下安装Ossim系统
查看>>
LBS营销的核心是资源共享与互换
查看>>
桌面虚拟化之GPU虚拟化
查看>>
Powershell管理系列(十六)查询最近一个月未登录的AD账号和Exchange账号
查看>>
Siri+ Wolfram Alpha……正在改变用户搜索习惯
查看>>
SDN落地需要方法论
查看>>
客户端自动升级的一个代码例子【C/S】
查看>>
对FrameBuffer的一夜hack
查看>>
Oracle数据库需要修改默认的Profiles,避免用户密码过期
查看>>
函数指针&&typedef
查看>>
C.消息队列(转载)
查看>>
实例讲解遗传算法——基于遗传算法的自动组卷系统【理论篇】
查看>>
C# 拖放操作源码详解1
查看>>
无法在web服务器上启动调试。调试失败,因为没有启用集成windows身份验证
查看>>
Oracle控制文件详解
查看>>
Delphi调用JavaScript解析JSON
查看>>