一文弄懂nodejs的多进程开发
什么是进程
进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。
进程的概念主要有两点:
- 进程是一个实体,每一个进程都有自己的地址空间。
- 进程是一个"执行中的程序",存在嵌套关系。
我们可以通过下面的命令来查看当前电脑启动了多个进程:
查看所有进程 ps -ef
查看某个进程 ps -ef| grep pid
- UID: 用户id,在mac下root用户的uid是0,其他用户是uid是501,可以通过uid来判别用户的级别。
- PID: 进程的id
- PPID: 某个进程的父进程,表示这个进程是由父进程发起的,即存在嵌套关系。
那进程的嵌套关系是怎么样的?
- 首先是操作系统进程OS
- 其次是启动进程,所有的进程都是由
sbin/launchd
来启动的,这个其实就是一个桌面进程 - 我们打开了 webstorm/vscode 的编辑器,就相当于开启了一个webstorm/vscode的进程
- 在编辑器进程中启动了node应用,所以需要有个node进程
- 在node进程中利用
child_process
创建nodejs的子进程
child_process
当然在大部分情况下,NodeJs是不需要并发执行的,因为它是事件驱动性永不阻塞。
但是单进程必然存在一个问题,就是无法充分利用cpu等资源。
NodeJs提供了child_process
模块来实现子进程,从而实现一个广义上的多进程的模式。
通过child_process
模块,可以实现1个主进程,多个子进程的模式,主进程称为master进程,子进程又称为工作进程。
在子进程中不仅可以调用其他node程序,也可以执行非node程序以及shell命令等等,执行完子进程后,以流或者回调的形式返回。
child_process提供了4个方法,用于新建子进程,这4个方法分别为spawn
、execFile
、exec
和fork
。
exec: 执行shell脚本命令
cp.exec('ls -al | grep node_modules', function (err, stdout, stderr) {
console.log('err', err)
console.log('stdout', stdout)
console.log('stderr', stderr)
})
execFile: 执行shell脚本文件
execFile 用来执行一个shell脚本文件。
// shell脚本: test.shell
ls -al | grep package.json
echo $1
cp.execFile(
path.resolve(__dirname, 'test.shell'),
[],
function (err, stdout, stderr) {
console.log('err', err)
console.log('stdout', stdout)
console.log('stderr', stderr)
}
)
不过execFile
也能执行命令,不过配置必须放在第二个参数中,第一个参数只能是命令。
cp.execFile('ls', ['-al'], function (err, stdout, stderr) {
console.log('err', err)
console.log('stdout', stdout)
console.log('stderr', stderr)
})
其实 execFile('ls')
里面的ls其实也是一个文件,这个文件是/bin/ls,如下:
exec和execFile的options:最常用的是cwd,当前的工作目录,还有一个timeout超时时间,0表示不会超时。
spwan
返回一个子进程对象,通过这个子进程对象来获取结果。
const child = cp.spawn(path.resolve(__dirname, 'test.shell'), ['-al'])
console.log(child.pid, process.pid) // 5539 5538
可见利用cp.spawn
产生了一个子进程,它的pid是5539。从这个子进程中我们就可以拿到结果:
child.stdout.on('data', chunk => {
console.log('stdout', chunk.toString())
})
child.stderr.on('data', chunk => {
console.log('stderr', chunk.toString())
})
什么场景下使用spwan exec?
-
spwan: 耗时任务(比如: npm install), 需要不断打印日志, 因为是通过流的方式输出结果。比如在安装依赖。
-
exec比较适合开销比较小的任务,因为它是执行完之后才会调用回调把信息打印出来。
注意:这里第一个不能写成npm install,因为第一个参数即命令,其实会找到对应的一个文件(比如ls,它会去找ls这个文件),现在你写成npm install 其实是没有npm install这个文件的,只有npm这个文件。
const child = cp.spawn('npm', ['install'], {
// 安装到哪里
cwd: path.resolve(
'/Users/****/test-lib'
)
})
child.stdout.on('data', function (chunk) {
console.log(chunk.toString())
})
child.stderr.on('data', function (chunk) {
console.log(chunk.toString())
})
spawn的选项stdio: 默认是pipe管道。这里设置为inherit
的意思是通过相应的标准输入输出流传入/传出父进程,即子进程将使用父进程的标准输入输出,直接看效果:
const child = cp.spawn('npm', ['install'], {
// 安装到哪里
cwd: path.resolve(
'/Users/****/test-lib'
),
stdio: 'inherit'
})
可以看到在子进程中所有的结果,包括一些细节动画,都能在父进程中展现。如果去掉stdio: 'inherit',那么就必须通过child.stdout.on这种管道的方式来接受结果。
fork
fork的第一个参数是一个文件的路径。
// index.js
cp.fork(path.resolve(__dirname, 'child.js'))
console.log('parent pid', process.pid)
// child.js
console.log('child process')
console.log('child pid', process.pid)
我们可以看到fork的作用其实类似于require的作用,就是执行了第一个参数传递的文件。但是与require不同的是fork会创建一个新的node进程,然后启动一个独立的v8引擎去执行这个文件。即两个进程之间是一个完全独立的关系。
fork底层其实也调用了spwan,所以fork执行后也会返回一个子进程对象。通过这个对象就可以实现和主进程之间的通信。
// fork 返回一个子进程对象 node(main) -> node(child)
// 创建进程是一个异步的过程
const child = cp.fork(path.resolve(__dirname, 'child.js'))
// 主进程向子进程发送消息
child.send('hello child process', () => {
// 发送完消息后,需要关闭两个进程之间的通信,不然进程会一直等待,即光标会一直闪烁
child.disconnect()
})
child.on('message', msg => {
console.log(msg)
})
// 先执行 因为创建进程是一个异步的过程
console.log('main process', process.pid)
// child.js
console.log('child process', process.pid)
process.on('message', msg => {
console.log('msg', msg)
})
process.send('hello main process')
注意:只有fork才能使用message事件和send方法
我们看看下面代码的执行顺序:
- 首先cp.fork相当于require,所以先执行child.js的代码,但是这个过程是异步的,所以会执行主进程里面的这一行代码 console.log('main process', process.pid)
2、然后执行child.js里面的代码
3、在主进程中通过child这个子进程对象,向子进程发送消息
4、在子进程child.js里面通过process这个子进程对象,向主进程发送消息
cluster 集群
进程与进程之间的通信也可以像服务器和服务器之间走一个RPC
调用的方式,来做一个全双工通信。既然有这么全面的通信能力,那能不能把http服务分发出去呢?实现如下图的功能:
首先我们有个主进程master,用来接收浏览器发送的请求,然后fork几个子进程并把请求转发到这几个子进程中。要实现这个功能,不需要我们自己实现,node的cluster
模块提供了这个功能。
首先看一下不用cluster模块的时候,服务器的QPS(吞吐率:指的是某个并发用户数下单位时间内处理的请求数)是多少?
const fs = require('fs')
const http = require('http')
const server = http.createServer((req, res) => {
res.writeHead(200, {
'content-type': 'text/html'
})
res.end(fs.readFileSync(__dirname + '/index.htm', 'utf-8'))
})
server.listen(3000, () => {
console.log('listen at 3000')
})
利用ab
工具进行测试,运行 ab -n 100 -c 100 http://localhost:3000/
命令(其中 -n 100
表示先后发送 100 次请求,-c 100
表示一次性发送的请求数目是 100 个)。可以看到,吞吐率(Requests per second)为294.68
。
ab 全称 Apache bench ,是 Apache 自带的一个工具,因此使用 ab 必须要安装 Apache 。mac os 系统自带 Apache ,windows 用户视自己的情况进行安装。运行 ab 之前先启动 Apache ,mac os 启动方式是 sudo apachectl start 。
现在启用cluster模块:
const cluster = require('cluster')
const os = require('os')
// 主进程用来开启子进程
if (cluster.isMaster) {
console.log(os.cpus().length)
for (let i = 0; i < os.cpus().length; i++) {
cluster.fork()
}
} else {
// 子进程用来启动http服务
require('./app')
}
经过测试,吞吐率为334
。
可以看到,吞吐率并没有提高多少。如果我们只开启2个子进程,来看看效果:
if (cluster.isMaster) {
for (let i = 0; i < os.cpus().length / 2; i++) {
cluster.fork()
}
} else {
require('./app')
}
经过测试,吞吐率为360
,发现还提高了,这就非常奇怪了,为什么开启的子进程变少了反而吞吐率变高了呢?
我们知道一个node主进程,都有几个子线程来配合完成事件循环,这些子线程跑在其他cpu中,也就是说nodejs也用到了其他核的cpu,并不是一个完全单核的情况。这里在fork子进程的时候,其实没有必要每个cpu都要fork一个子进程,因为有些cpu就是为了处理事件循环的,现在你占满了整个cpu,反而让事件循环没有得到及时的处理,所以效果并不好。
同时,每次fork一个子进程,相当于主进程复制了一遍,把内存空间,代码都复制了一份,那么内存就成倍的消耗,但是带来的效果并没有成倍的增长,所以我们一般fork一半的核数:
for(let i = 0; i < os.cpus().length / 2; i++) {
cluster.fork()
}
进程守护
因为nodejs
是一门解释型语言,因此很多错误必须在执行的时候才会发现。
比如你引用了一个空的值,但是你当成了一个对象处理,获取它其中的一个key,像这样的错误在编译的时候是不会被检测到的,必须要运行的时候运行到那个地方才能知道这段代码是不是能跑。
如果这样的代码没有被检测出来,就发布到线上,同时我们nodejs
有大量异步的代码(回调),我们知道异步代码会在未来的某个事件循环里面在一个新的调用栈里面执行,刚好你的代码发生了刚刚的那个错误,但是在你的新的调用栈又没有try catch
之类的处理错误的代码把它给包裹起来,这就不单单是报错了,还会造成程序的崩溃,服务器直接就挂掉了。
所以就可以使用进程守护的能力,在主进程对子进程进行一个监控,这样就可以提高服务器进程的健康程度,从而提供nodejs程序的稳定性。
未捕获的错误
对于没有被子进程捕获的错误,会导致子进程的崩溃。
// index.js
const cluster = require('cluster')
const os = require('os')
if (cluster.isMaster) {
// 在主进程中执行
for (let i = 0; i < os.cpus().length / 2; i++) {
cluster.fork()
}
// 用来监听子进程挂掉了
cluster.on('exit', () => {
// 如果子进程一挂掉,我们又重新fork一个,当子进程一直挂掉,我们这里就不断的fork,这样会导致一直占用cpu,最后直接挂掉,所以需要延迟下再fork
// cluster.fork()
setTimeout(() => {
cluster.fork()
}, 5000)
})
} else {
// 在子进程中执行
require('./app')
// 对没有trycatch的错误进行捕获
process.on('uncaughtException', err => {
// 在这里可以对错误进行上报,或者把错误发送到微信上提醒你
console.error(err)
// 以1退出进程
process.exit(1)
})
}
// app.js
const http = require('http')
const fs = require('fs')
module.exports = http
.createServer((req, res) => {
res.writeHead(200, {
'Content-Type': 'text/html'
})
setTimeout(() => {
const result = fs.readFileSync(__dirname + '/index.htm', 'utf-8')
res.end(result)
}, 50)
})
.listen(3000, () => {
console.log('listened 3000')
})
在上面的代码中,在主进程中执行fork出子进程的命令,在子进程中执行app.js,即http服务。
如果app.js中发生了未捕获的错误,我们需要把这个子进程杀掉。因此在执行cluster的文件中进行处理:
process.on('uncaughtException', (err) => {
// 在这里可以对错误进行上报,或者把错误发送到微信上提醒你
console.error(err)
// 以1退出进程
process.exit(1)
})
使用uncaughtException事件监听未捕获的错误,然后对错误进行上报等操作,最后杀掉进程,注意code为1。杀掉进程之后,我们需要重新创建一个进程:
cluster.on('exit', () => {
// 如果子进程一挂掉,我们又重新fork一个,当子进程一直挂掉,我们这里就不断的fork,这样会导致一直占用cpu,最后直接挂掉,所以需要延迟下再fork
setTimeout(() => {
cluster.fork()
}, 5000)
})
内存泄漏
如果发生内存泄漏就会导致老生代越来越大,从而垃圾回收遍历的时候耗时越来越多,服务器越来越慢,所以对内存使用情况进行监控,一旦大于某个值就认为发生了内存泄漏,然后杀掉。
setInterval(() => {
if (process.memoryUsage().rss > 534003200) {
console.log('oom')
process.exit(1)
}
}, 5000)
僵尸进程-心跳检测
如果父进程给子进程发送消息,有操作三次子进程没有回复,说明这个子进程是一个僵尸进程。什么情况下会导致僵尸进程呢,比如子进程中有个死循环导致这个进程无法接受任何请求。
if (cluster.isMaster) {
for (let i = 0; i < 1; i++) {
const worker = cluster.fork()
// 对子进程进行心跳检查
let missedPing = 0
let inter = setInterval(() => {
// 往子进程发消息
worker.send('ping')
missedPing++
// 如果发送了三次消息,子进程没有回复,那么就认为是僵尸进程,杀掉
if (missedPing >= 3) {
clearInterval(inter)
process.kill(worker.process.pid)
}
}, 5000)
// 接收子进程传回来的消息
worker.on('message', msg => {
if (msg == 'pong') {
missedPing--
}
})
}
// 用来监听子进程挂掉了
cluster.on('exit', () => {
// 如果子进程一挂掉,我们又重新fork一个,当子进程一直挂掉,我们这里就不断的fork,这样会导致一直占用cpu,最后直接挂掉,所以需要延迟下再fork
// cluster.fork()
setTimeout(() => {
// 这里应该也要做心跳检测
cluster.fork()
}, 5000)
})
} else {
require('./app')
// 子进程监听父进程的消息
process.on('message', msg => {
if (msg == 'ping') {
process.send('pong')
}
})
}