目录
- 前言
- 父子进程间通信
- 负载均衡
- 句柄传递
- 集群
- 子进程事件
- 自动重启
- 总结
前言
上节我们讲到,通过 fork()
或者其他API,创建子进程之后,可以通过 send()
和 process.on('message')
进行父子进程间的通信。这样就实现了主进程代理请求到工作进程,实现了 Nodejs集群
:
父子进程间通信
负载均衡
通过代理,可以避免端口不能重复监听的问题,甚至可以在代理进程上做适当的负载均衡,使得每个子进程可以较为均衡地执行任务。下面我们构建了一个简单的 Web 服务器,并实现在两个工作进程之间做简单的负载均衡。
主进程,负责代理到对应进程中:
// main.js
const { fork } = require('child_process');
const normal = fork('subprocess.js', ['normal']);
const special = fork('subprocess.js', ['special']);
// Open up the server and send sockets to child. Use pauseOnConnect to prevent
// 套接字在发送给子进程之前不会被读取
const server = require('net').createServer({ pauseOnConnect: true });
let flag = 0;
server.on('connection', (socket) => {
flag++;
// this is special priority.
if (flag % 2 === 0) {
special.send('socket', socket);
return;
}
// This is normal priority.
normal.send('socket', socket);
});
server.listen(1337);
这是工作进程,接收socket对象并做出响应:
// subprocess.js
process.on('message', (m, socket) => {
if (m === 'socket') {
// Check that the client socket exists.
// It is possible for the socket to be closed between the time it is
if (socket) {
// console.log(`Request handled with ${process.argv[2]} priority`);
socket.end(`Request handled with ${process.argv[2]} priority, running on ${process.pid}`);
}
}
});
然后我又编写了一个 Nodejs 脚本,来发出十个 HTTP 请求:
const cp = require("child_process");
for (let i = 0; i < 10; i++) {
cp.exec(`curl --http0.9 "http://127.0.0.1:1337"`, (err, stdout, stderr) => {
console.log(`finished: ${i}, and received: `, stdout);
})
}
最后运行结果如下:
句柄传递
在使用 send()
方法时,我们注意到,除了能通过IPC发送数据外,还能发送句柄。第二个可选参数就是一个句柄:
child.send(message, [sendHandle]);
💡 句柄是一种可以用来标识资源的引用,它的内部包含了指向对象的文件描述符。比如句柄可以用来标识一个服务器端socket对象、一个客户端socket对象、一个UDP套接字、一个管道等。
在主进程将句柄发送给子进程之后,工作模型就从主进程响应用户请求变成了子进程监听用户活动:
进程对象send()方法可以发送的句柄类型包括如下几种:
- net.Socket。TCP套接字。
- net.Server。TCP服务器,任意建立在TCP服务上的应用层服务都可以享受到它带来的好处。
- net.Native。C++层面的TCP套接字或IPC管道。
- dgram.Socket。UDP套接字。
- dgram.Native。C++层面的UDP套接字。
💡 另外要注意,send()方法能发送消息和句柄并不意味着它能发送任意对象,message
参数和文件句柄都要先通过 JSON.stringfy()
进行序列化后再放入IPC通道中:
集群
通过 child_process模块
,我们完成了父子进程的创建和通信,已经初步搭建了一个Node集群。还有一些问题需要考虑:
- 性能问题。
- 多个工作进程的存活状态管理。
- 工作进程的平滑重启。
- 配置或者静态数据的动态重新载入。
- 其他细节。
这其中最重要的便是集群的稳定性,这决定了该服务模型能否真正用于实践生成中。虽然我们创建了很多工作进程,但每个工作进程依然是在单线程上执行的,它的稳定性还不能得到完全的保障。我们需要建立起一个健全的机制来保障Node应用的健壮性。
子进程事件
父进程能监听到的,与子进程相关的事件:
- error:当子进程无法被复制创建、无法被杀死、无法发送消息时会触发该事件。
- exit:子进程退出时触发该事件。如果是正常退出,这个事件的第一个参数为退出码,否则为null。如果进程是通过kill()方法被杀死的,会得到第二个参数,它表示杀死进程时的信号。
- close:在子进程的标准输入输出流中止时触发该事件,参数与exit相同。
- disconnect:在父进程或子进程中调用disconnect()方法时触发该事件,在调用该方法时将关闭监听IPC通道。
除了 send()
外,还能通过 kill()
方法给子进程发送消息。kill() 方法并不能真正地将通过IPC相连的子进程杀死,它只是给子进程发送了一个系统信号。默认情况下,父进程将通过 kill() 方法给子进程发送一个 SIGTERM信号
。
// 子进程
child.kill([signal]);
// 当前进程
process.kill(pid, [signal]);
// 监听
process.on(signal, callback)
💡 在POSIX标准中,有一套完备的信号系统,在命令行中执行kill -l可以看到详细的信号列表,如下所示:
而 Node 提供了这些信号对应的信号事件,每个进程都可以监听这些信号事件。这些信号事件是用来通知进程的,每个信号事件有不同的含义,进程在收到响应信号时,应当做出约定的行为:
process.on('SIGTERM', () => {
console.log("got sigterm, exiting...");
process.exit(1);
});
console.log("process running on: ", process.pid);
process.kill(process.pid, "SIGTERM");
自动重启
有了父子进程之间的相关事件之后,就可以在这些关系之间创建出需要的机制了,至少我们能够通过监听子进程的 exit事件
来获知其退出的信息。接着前文的多进程架构,我们在主进程上要加入一些子进程管理的机制,比如重新启动一个工作进程来继续服务:
主进程代码:
// master.js
// master.js
const { fork } = require('child_process');
const cpus = require('os').cpus();
const server = require('net').createServer();
server.listen(1337);
const workers = {};
// process.on('uncaughtException', function (err) {
// console.log(`Master uncaughtException:\r\n`);
// console.log(err);
// });
const createWorker = () => {
const worker = fork('./worker.js');
// 收到信号后立即重启新进程
worker.on('message', function (message) {
if (message.act === 'suicide') {
createWorker();
}
});
// 某个进程终止时重新启动新的进程
worker.on('exit', () => {
console.log('Worker ' + worker.pid + ' exited.');
delete workers[worker.pid];
// createWorker();
});
// 句柄转发
worker.send('server', server);
workers[worker.pid] = worker;
console.log('Create worker. pid: ' + worker.pid);
};
for (let i = 0; i < cpus.length; i++) {
createWorker();
}
// server.close();
// 进程自己退出时,让所有工作进程退出
process.on('exit', () => {
for (let pid in workers) {
workers[pid].kill();
}
});
子进程代码:
// worker.js
const http = require('http');
const server = http.createServer((req, res) => {
res.writeHead(200, {'Content-Type': 'text/plain'});
res.end('handled by child, pid is ' + process.pid + '\n');
// 抛出异常,捕获后终止进程
throw new Error('throw exception');
});
var worker;
process.on('message', (m, tcp) => {
if (m === 'server') {
worker = tcp;
worker.on('connection', (socket) => {
server.emit('connection', socket);
});
}
});
// 捕获异常后终止进程
process.on('uncaughtException', (err) => {
// 主动发出信号,避免等待连接断开时收到新请求而缺少进程无法响应
process.send({
act: 'suicide'
});
// 停止接收新的连接
worker.close(function () {
// 所有已有连接断开后,退出进程
process.exit(1);
});
// 避免长连接请求长时间无法终止,5s后自动终止
setTimeout(() => {
process.exit(1);
}, 5000)
});
运行父进程 master.js
,控制台中会打印出开启的进程 PID
:
在 Linux 中,你可以直接使用 kill -9 [pid]
来终止进程。在 Windows 中,你需要打开任务管理器,找到 node.exe 的进程,终止其中某个。此时命令行会显示该进程被终止了,然后重新开启一个新的进程。
当然,你也可以使用我们之前写的 run.js
脚本,每发起一个请求,子进程响应请求之后会抛出一个异常,异常在捕获之后会终止该进程。
💡 我们之前写的 run.js 脚本是并行执行的,此时会存在多个请求被分配到同一个 socket ,即分配到同一个进程中执行。那么就会存在互斥的问题,即某个请求结束后就终止该进程,导致其他请求无法获得响应而终止。此时你需要将 exec 方法改为同步方法:
const cp = require("child_process");
const cpus = require("os").cpus();
const sleep = (delay) => {
const now = Date.now();
while (Date.now() - now < delay);
return;
}
for (let i = 0; i < cpus.length; i++) {
const out = cp.execSync(`curl --http0.9 "http://127.0.0.1:1337"`);
sleep(1000);
console.log(out.toString());
}
该模型一旦有异常出现,主进程会创建新的工作进程来为用户服务,旧的进程一旦处理完已有连接就自动断开。整个过程使得我们的应用的稳定性和健壮性大大提高:
总结
至此,我们完成了一个简单的基于父子进程通信、具备异常重启进程功能的 Web服务器 就已经搭建完成了。对于 Nodejs 多进程编程你也有了初步的了解。接下来我们将介绍 cluster模块
,并介绍一下在 Nodejs 中进行多线程编程。