Node多进程服务模型代码实践
引子
提到Node多进程管理,大家肯定会想到PM2,Forever,Egg等相关项目,这篇文章的目的并不是进行源码解读,而是通过代码实践的方式帮助大家快速理解多进程管理的原理。内容主要围绕 进程管理,进程守护,日志收集等几个方面展开。
进程管理
nodeJS单线程模型原生只能利用单核的cpu,如果线程崩溃之后,整个web程序就会崩溃, 这并不适合服务端大规模生产环境的需求。为了解决这个问题, Node提供了cluster模块,它可以fork新的进程,并通过IPC进行通信,从而充分利用cpu的多核性能。
const cluster = require('cluster'); //集群模块
const http = require('http');
const os = require('os');
// 路由请求管理
const appHandler = (request, response) => {
if (request.url == '/') {
response.writeHead(200, { "Content-Type": "text/plain" });
response.end("Home Page!\n");
} else if (request.url == '/about') {
response.writeHead(200, { "Content-Type": "text/plain" });
response.end("About Page!\n");
} else {
response.writeHead(404, { "Content-Type": "text/plain" });
response.end("404 Not Found!\n");
}
}
// 进程管理
if (cluster.isMaster) {
console.log(`master主进程 ${process.pid} 启动运行`);
// 获取CPU数目
const numCPUs = os.cpus().length;
// 根据CPU数目fork相应数量worker进程。
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
} else {
console.log(`worker进程 ${process.pid} 启动运行`);
// 工作进程可以共享任何 TCP 连接, 这里虽然表面上创建了多个 http 服务, 但是本质上共享的是一个 HTTP 服务器
http.createServer(appHandler).listen(8009);
}
上面的代码基于node的cluster模块,http模块等实现的简易master-worker服务模型,master进程处于listening状态,负责监听8009端口,然后把Established状态的连接分配给worker进程, worker进程只需处理请求,cluster默认采用round-robin调度策略。appHandler是处理请求的响应函数,我们熟悉的koa,express等框架可以理解成是一个复杂的appHandler函数。
进程守护
上面是一个最简单的master-worker服务模型,实际运行过程中会存在很多问题,比如:
(1)master如何得知worker进程工作状态?worker进程异常退出如何通知master fork新进程?
(2)如何监控master进程的工作状态?master进程异常整个服务就会垮掉?如何重启服务?
我们需要完善一下我们的模型,让其具备进程守护能力。
(1)开启进程心跳检测,循环监管子进程存活情况。
(2)新增monitor进程,监控master进程运行情况。
我们把项目拓展为5个文件,方便分模块让大家理解,分别为index.js,worker.js,monitor.js,admin.js,config.js
index.js
master主进程及相关核心逻辑
const cluster = require('cluster'); //集群模块
const config = require('./config.js');
const os = require('os');
const { fork } = require('child_process'); //child_process模块
const path = require('path');
let RequestDoneNum =0; //总请求个数
//fork新的worker子进程
const restartWorker = (worker) =>{
if (worker.hasRestart) {
return;
}
cluster.fork();
closeWorker(worker);
worker.hasRestart = true;
}
// 关闭worker
const closeWorker =(worker) => {
worker.kill(9);
}
// 处理子进程的心跳消息
const heartBeat = function (m) {
const worker = this;
const now = new Date().getTime();
worker.lastMessage = m;
worker.lastLiveTime = now;
};
//master监控worker进程相关事件处理
const masterEventHandler = () => {
//master主进程名称
process.title = 'master';
//master主进程异常事件监听处理
process.on('uncaughtException', function (e) {
console.log(e && e.stack);
});
//master主进程监听自定义reload事件,重新fork全部worker子进程
process.on('reload', function () {
for (const id in cluster.workers) {
const worker =cluster.workers[id];
restartWorker(worker);
}
console.log('reload done');
});
//master主进程监听自定义showRuningWorkers事件,展示进程池中正常工作的worker子进程
process.on('showRuningWorkers', function () {
let str ='进程池中正常服务中的worker子进程有:';
for (const id in cluster.workers) {
const pid =cluster.workers[id].process.pid;
str = str+'worker:id='+id+'&pid='+pid+' ';
}
console.log(str);
});
// 监听worker子进程是否fork成功
cluster.on('fork', function (currWorker) {
console.log('worker进程 ' + currWorker.process.pid + ' 创建成功')
//监听worker子进程发来的消息
currWorker.on('message', function (args) {
if (args.cmd =='heartBeat'){
heartBeat.apply(this,args);
}else if(args.cmd =='dealRequest'){
RequestDoneNum++;
console.log('master-worker服务体系共处理了 '+RequestDoneNum+' 个请求');
}
});
//通知worker子进程开始监听
currWorker.send({
from: 'master',
cmd: 'listen'
});
});
// 监听子进程退出时做的处理
cluster.on('disconnect', function (worker) {
restartWorker(worker);
});
// 监听子进程被杀死时做的处理
cluster.on('exit', function (worker) {
restartWorker(worker);
});
}
//定时检测子进程存活,15秒未响应的采取措施
const checkWorkerAlive = () =>{
const checkWorkerAliveTimeout = 5000;
setInterval(function () {
const nowDate = new Date();
const now = nowDate.getTime();
for (const id in cluster.workers) {
const worker =cluster.workers[id];
worker.lastLiveTime = worker.lastLiveTime || now;
if (!worker.startTime) {
worker.startTime = now;
}
// 无响应进程处理
if (now - worker.lastLiveTime > checkWorkerAliveTimeout * 3) {
console.log('worker:id='+worker.id+'&pid='+worker.process.pid+'的子进程无响应,kill后自动fork新子进程代替它')
restartWorker(worker);
continue;
}
}
}, checkWorkerAliveTimeout);
}
//master起一个admin httpserver,对外提供管理master的API,限本机调用
const startAdmin = () => {
require('./admin.js').start();
}
//启动一个监控master进程的monitor进程
const startMasterMonitor = () => {
console.info('start master monitor....');
fork(path.resolve(__dirname, './monitor.js'), [process.pid], {
silent: false
});
}
//进程管理
const startServer =() =>{
cluster.schedulingPolicy = cluster.SCHED_RR;
if (cluster.isMaster) {
console.log(`master主进程 ${process.pid} 启动运行`);
//获取CPU数目
let numCPUs = os.cpus().length;
if (config.runAtThisCpu && config.runAtThisCpu !='auto'){
numCPUs = config.runAtThisCpu;
}
// 衍生工作进程。
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
masterEventHandler();
checkWorkerAlive();
startAdmin();
startMasterMonitor();
} else {
// console.log(`worker进程 ${process.pid} 启动运行`);
// 工作进程可以共享任何 TCP 连接, 这里虽然表面上创建了多个 http 服务, 但是本质上共享的是一个 HTTP 服务器。
require('./worker.js');
}
}
startServer(); //总入口函数
monitor.js
monitor监控进程的相关逻辑,启动后主要负责监控master进程运行情况的监控,同时在master异常后负责重启master进程,保持服务的稳定性。
'use strict';
const http = require('http');
const config = require('./config.js');
const PING_RETRY_TIME = 2;
const PING_TIMEOUT = 10000;
const PING_INSTERVALS = 30000; //30s就ping一次
let masterPid;
let retry = 0;
function ping() {
return new Promise((resolve, reject) => {
const req = http.request({
method: 'GET',
hostname: config.httpAdminAddress,
port: config.httpAdminPort,
path: '/ping',
timeout: PING_TIMEOUT
});
req.on('response', (res) => {
console.log('ping master正常');
resolve();
});
req.on('error', (e) => {
console.log('ping error');
reject(new Error('Master response error'));
});
req.on('timeout', () => {
console.log('ping timeout');
reject(new Error('Master Timeout'));
});
req.end();
});
}
async function run() {
try {
await ping();
retry = 0;
setTimeout(run, PING_INSTERVALS);
} catch (e) {
if (retry >= PING_RETRY_TIME) {
console.log('ping master fail. restart master...');
restartMaster();
} else {
retry++;
console.log('ping master fail.retry:'+retry);
setTimeout(run, PING_INSTERVALS);
}
}
}
function restartMaster() {
console.log('重启master');
}
function startMonitor() {
masterPid = process.argv[2];
console.log('master monitor 进程启动 pid:'+masterPid);
process.title = 'worker/master-monitor';
if (!masterPid) {
console.log('master pid is empty! exit monitor');
return;
}
run();
}
startMonitor();
worker.js
worker子进程的相关逻辑,主要由路由管理,业务请求处理及响应,上报worker进程心跳给master主进程等逻辑组成。
const http = require('http');
const config =require('./config.js');
let isStartHeartBeat = false;
//拓展app,路由管理
const appHandler = (request, response) => {
if (request.url == '/') {
response.writeHead(200, { "Content-Type": "text/plain" });
response.end("Home Page!\n");
} else if (request.url == '/about') {
response.writeHead(200, { "Content-Type": "text/plain" });
response.end("About Page!\n");
} else {
response.writeHead(404, { "Content-Type": "text/plain" });
response.end("404 Not Found!\n");
}
console.log('worker pid: '+process.pid+' 处理了一个请求');
process.send({
cmd: 'dealRequest',
status: 'done'
});
}
const httpServer =http.createServer(appHandler);
//监控未捕获的worker进程错误
process.on('uncaughtException', function(e) {
console.log(e && e.stack);
});
//监控master发送的消息
process.on('message', function(args) {
if (!args) {
return;
}
if (!args.cmd) {
return;
}
if(args.from =='master' && args.cmd =='listen'){
console.log('worker' +process.pid+ 'listen');
listen();
}
});
const heartBeat = () =>{
//发送心跳包,通知master进程
process.connected && process.send && process.send({
cmd: 'heartBeat',
memoryUsage: process.memoryUsage()
});
}
const startHeartBeat = () =>{
if (isStartHeartBeat) {
return;
}
isStartHeartBeat = true;
setInterval(heartBeat, 5000);
}
const listen =()=>{
httpServer.listen({
host: config.httpAddress,
port: config.httpPort,
exclusive: false
},function(){
console.log('worker pid: '+ process.pid +' start heartbeat');
startHeartBeat();
});
}
日志管理
到这里,我们基本就完整的实现了master-worker服务模型,它可以正常跑起来,并自己照顾自己了。
我们需要在worker.js里面appHandler函数最开始的地方初始化domain对象,其实就是为了在每个请求生命周期最开始的时候便初始化一个domain对象,然后后续请求及逻辑处理整个周期中的错误我们就都可以捕获到了,不仅如此,一个请求生命周期中的关键逻辑节点,我们可以输出一些日志,同样可以挂在domain对象上,这样一个请求内的错误,日志,全局变量等信息都可以通过domain来整合起来,也不会串台,因为一个请求周期就对应一个domain对象。当然考虑到内存泄露或者以防一些未知错误,我们记得在请求处理完毕时,日志记录上报完成后记得清理domian对象(正常来讲:domian对象在一个请求结束,不存在调用的话是会被垃圾回收的,但是为了保险,我们还是自己要清理一下)。
在每个请求结束生命周期前,domain对象都是可以“全局”访问的,所以一些关键节点日志,错误日志,请求req,res相关的信息我们都可以保存在domian对象下,日志可按照配置文件设定的等级进行本地记录即可。
下面是具体实现代码,拓展一下我们上面的worker.js
const http = require('http');
const config =require('./config.js');
const domain = require('domain');
let isStartHeartBeat = false;
//拓展app,路由管理
const handleRequest =(request, response) => {
if (request.url == '/') {
response.writeHead(200, { "Content-Type": "text/plain" });
response.end("Home Page!\n");
} else if (request.url == '/about') {
response.writeHead(200, { "Content-Type": "text/plain" });
response.end("About Page!\n");
} else {
response.writeHead(404, { "Content-Type": "text/plain" });
response.end("404 Not Found!\n");
}
console.log('worker pid: '+process.pid+' 处理了一个请求');
process.send({
cmd: 'dealRequest',
status: 'done'
});
}
//初始化请求生命周期domain对象
const appHandler = (req, res) => {
//判断号码是否添加测试环境,是否需要反向代理转发
if(req.url == '/test'){
proxyToTest(req,res);
return;
}
// 为每一个请求,创建了一个domain实例
let d = domain.create();// 创建domian实例
console.log('请求生命周期对象domain已创建')
// handleRequest执行过程中,所有的同步,异步的error都会被当前的domain.error捕获到
d.on('error', (er) => {
console.log(er); //打印错误
// console.log(req); //打印请求对象req
// console.log(process.domain); //打印domain对象
//记录到domain对象上
});
d.add(req);
d.add(res);
d.currentContext = {};//空对象
d.currentContext.log = {}; // 挂载在domain实例上的日志对象
d.currentContext.request = req;
d.currentContext.response = res;
// 清理domain对象
let clear = function(){
res.removeAllListeners('finish');
d.remove(req);
d.remove(res);
if (d.currentContext) {
if (d.currentContext) {
d.currentContext.request = null;
d.currentContext.response = null;
}
d.currentContext.log = null;
d.currentContext = null;
}
d = null;
req = null;
res = null;
console.log('请求生命周期对象domain已销毁')
};
// 监听响应回包的finish事件
res.once('finish', function() {
// 记录日志
console.log('---> path :' + d.currentContext.request.url)
console.log('---> method :' + d.currentContext.request.method)
console.log('---> statusCode :' + d.currentContext.response.statusCode)
clear(); //再清理domian
});
// 调用handleRequest函数内,所有的异步/同步错误都会被domain的error事件捕获
d.run(() => {
handleRequest(req, res);
});
}
const httpServer =http.createServer(appHandler);
//监控未捕获的worker进程错误
process.on('uncaughtException', function(e) {
console.log(e && e.stack);
});
//监控master发送的消息
process.on('message', function(args) {
if (!args) {
return;
}
if (!args.cmd) {
return;
}
if(args.from =='master' && args.cmd =='listen'){
console.log('worker' +process.pid+ 'listen');
listen();
}
});
const heartBeat = () =>{
//发送心跳包,通知master进程
process.connected && process.send && process.send({
cmd: 'heartBeat',
memoryUsage: process.memoryUsage()
});
}
const startHeartBeat = () =>{
if (isStartHeartBeat) {
return;
}
isStartHeartBeat = true;
setInterval(heartBeat, 5000);
}
const listen =()=>{
httpServer.listen({
host: config.httpAddress,
port: config.httpPort,
exclusive: false
},function(){
console.log('worker pid: '+ process.pid +' start heartbeat');
startHeartBeat();
});
}
服务管理
admin.js,主要涉及master进程启动的一个httpserver的相关逻辑,提供管理调度master进程的对外API。实现上述几个部分,基本可以实现对异常worker的检测,重新fork,另外针对外部指令导致的进程销毁也可以及时的重新fork,另外通过admin 这个httpserver,monitor进程可以了解master的运行情况,另外我们也可以利用curl 127.0.0.1:12701/xxx 等方式实现对master进程管理等运维化调度操作
'use strict';
const config = require('./config.js');
const http = require('http');
const admin = (request, response) => {
if (request.url == '/ping') {
response.writeHead(200);
response.end('ok');
} else if(request.url == '/reload'){
response.writeHead(200);
response.end('reload ok');
process.emit('reload');
} else if(request.url == '/showRuningWorkers'){
response.writeHead(200);
response.end('showRuningWorkers ok');
process.emit('showRuningWorkers');
} else {
response.writeHead(404, { "Content-Type": "text/plain" });
response.end("404 Not Found!\n");
}
}
const server = http.createServer(admin);
server.on('error', serverError);
function serverError(err) {
console.log('exit with ' + err.stack);
setTimeout(function () {
process.exit(1);
}, 500);
}
this.start = function () {
// 管理进程开启info日志
console.log('start admin...');
server.listen(config.httpAdminPort, config.httpAdminAddress, function () {
console.log('admin listen ok');
});
};
``
实际运行效果
- 根目录执行 node index.js,启动master-worker服务模型,8009端口是http请求服务的监听端口,12701是我们管理master的私有端口。
说明:master启动成功,fork worker进程,通知worker进程listen,启动admin httpserver服务,master的监控进程monitor进程启动成功,两个worker子进程开始向master进程上报心跳,“ping master正常”代表master进程服务正常,monitor会定时监控询问它的运行情况。
- 浏览器访问 http://127.0.0.1:8009/
说明:表示的是一次正常访问,全息日志收集并且上报的关键日志输出
3. 浏览器访问 http://127.0.0.1:8009/about
说明:表示一次正常访问,全息日志收集并且上报的关键日志输出
5. 私有API http://127.0.0.1:12701/showRuningWorkers
说明:获取当前worker进程池中正在提供服务的worker进程信息
7. 业务代码有更新, 或者想重启服务,私有API http://127.0.0.1:12701/reload
说明:销毁并重启所有的worker子进程,柔性重启
源代码
转载自:https://juejin.cn/post/7202796308608319547