用这个来扯掉我的头发。。。有人能将Socket.IO扩展到Node.js 集群模块产生的多个“工作”进程吗?
可以说我在 四个 工作进程(伪)上具有以下内容:
// on the server var express = require('express'); var server = express(); var socket = require('socket.io'); var io = socket.listen(server); // socket.io io.set('store', new socket.RedisStore); // set-up connections... io.sockets.on('connection', function(socket) { socket.on('join', function(rooms) { rooms.forEach(function(room) { socket.join(room); }); }); socket.on('leave', function(rooms) { rooms.forEach(function(room) { socket.leave(room); }); }); }); // Emit a message every second function send() { io.sockets.in('room').emit('data', 'howdy'); } setInterval(send, 1000);
在浏览器上
// on the client socket = io.connect(); socket.emit('join', ['room']); socket.on('data', function(data){ console.log(data); });
问题: 由于四个单独的工作进程发送消息,因此我每秒收到 四条 消息。
如何确保邮件仅发送一次?
编辑: 在Socket.IO 1.0+中,现在可以使用更简单的Redis适配器模块,而不是通过多个Redis客户端设置存储。
var io = require('socket.io')(3000); var redis = require('socket.io-redis'); io.adapter(redis({ host: 'localhost', port: 6379 }));
下面显示的示例看起来更像这样:
var cluster = require('cluster'); var os = require('os'); if (cluster.isMaster) { // we create a HTTP server, but we do not use listen // that way, we have a socket.io server that doesn't accept connections var server = require('http').createServer(); var io = require('socket.io').listen(server); var redis = require('socket.io-redis'); io.adapter(redis({ host: 'localhost', port: 6379 })); setInterval(function() { // all workers will receive this in Redis, and emit io.emit('data', 'payload'); }, 1000); for (var i = 0; i < os.cpus().length; i++) { cluster.fork(); } cluster.on('exit', function(worker, code, signal) { console.log('worker ' + worker.process.pid + ' died'); }); } if (cluster.isWorker) { var express = require('express'); var app = express(); var http = require('http'); var server = http.createServer(app); var io = require('socket.io').listen(server); var redis = require('socket.io-redis'); io.adapter(redis({ host: 'localhost', port: 6379 })); io.on('connection', function(socket) { socket.emit('data', 'connected to worker: ' + cluster.worker.id); }); app.listen(80); }
如果您有一个需要发布到其他Socket.IO进程的主节点,但本身不接受套接字连接,请使用socket.io- emitter而不是socket.io- redis。
如果您在扩展方面遇到问题,请使用来运行Node应用程序DEBUG=*。Socket.IO现在实现调试,该调试还将打印出Redis适配器调试消息。输出示例:
DEBUG=*
socket.io:server initializing namespace / +0ms socket.io:server creating engine.io instance with opts {"path":"/socket.io"} +2ms socket.io:server attaching client serving req handler +2ms socket.io-parser encoding packet {"type":2,"data":["event","payload"],"nsp":"/"} +0ms socket.io-parser encoded {"type":2,"data":["event","payload"],"nsp":"/"} as 2["event","payload"] +1ms socket.io-redis ignore same uid +0ms
如果您的主进程和子进程都显示相同的解析器消息,则您的应用程序正在正确扩展。
如果您是从单个工作人员中解雇,则设置应该没有问题。您正在做的事情是从所有四个工作人员发出的,并且由于Redis的发布/订阅,消息不是重复的,而是按您要求应用程序执行的那样写了四次。这是Redis的简单示意图:
Client <-- Worker 1 emit --> Redis Client <-- Worker 2 <----------| Client <-- Worker 3 <----------| Client <-- Worker 4 <----------|
如您所见,当您从某个工作程序发出时,它会将发布内容发布到Redis,并将从其他已订阅Redis数据库的工作程序中进行镜像。这也意味着您可以使用连接到同一实例的多个套接字服务器,并且一台服务器上的发射将在所有连接的服务器上触发。
使用群集时,当客户端连接时,它将连接到您的四个工作线程之一,而不是全部四个。这也意味着您从该工作人员发出的任何内容只会显示给客户端一次。因此,是的,该应用程序正在扩展,但是您的操作方式是,您要从所有四个工作程序中释放资源,而Redis数据库正在像在单个工作程序上调用它四次一样对其进行处理。如果客户端实际上连接到您的所有四个套接字实例,则它们每秒将接收十六条消息,而不是四条。
套接字处理的类型取决于您将要拥有的应用程序的类型。如果您要分别处理客户端,那么应该没有问题,因为连接事件仅对每个客户端触发一个工作线程。如果需要全局“心跳”,则可以在主进程中使用套接字处理程序。由于工作人员在主进程死亡时死亡,因此您应该抵消主进程的连接负载,并让子进程处理连接。这是一个例子:
var cluster = require('cluster'); var os = require('os'); if (cluster.isMaster) { // we create a HTTP server, but we do not use listen // that way, we have a socket.io server that doesn't accept connections var server = require('http').createServer(); var io = require('socket.io').listen(server); var RedisStore = require('socket.io/lib/stores/redis'); var redis = require('socket.io/node_modules/redis'); io.set('store', new RedisStore({ redisPub: redis.createClient(), redisSub: redis.createClient(), redisClient: redis.createClient() })); setInterval(function() { // all workers will receive this in Redis, and emit io.sockets.emit('data', 'payload'); }, 1000); for (var i = 0; i < os.cpus().length; i++) { cluster.fork(); } cluster.on('exit', function(worker, code, signal) { console.log('worker ' + worker.process.pid + ' died'); }); } if (cluster.isWorker) { var express = require('express'); var app = express(); var http = require('http'); var server = http.createServer(app); var io = require('socket.io').listen(server); var RedisStore = require('socket.io/lib/stores/redis'); var redis = require('socket.io/node_modules/redis'); io.set('store', new RedisStore({ redisPub: redis.createClient(), redisSub: redis.createClient(), redisClient: redis.createClient() })); io.sockets.on('connection', function(socket) { socket.emit('data', 'connected to worker: ' + cluster.worker.id); }); app.listen(80); }
在该示例中,有五个Socket.IO实例,一个是主实例,四个是子实例。主服务器从不调用,listen()因此该进程没有连接开销。但是,如果您在主进程上调用发射,它将被发布到Redis,并且四个工作进程将在其客户端上执行发射。这抵消了对工作人员的连接负载,并且如果工作人员死亡,则主应用程序中的主要应用程序逻辑将保持不变。
listen()
请注意,使用Redis,所有发射,即使是在名称空间或房间中,也将由其他工作进程处理,就像您从该进程触发了发射一样。换句话说,如果您有两个带有一个Redis实例的Socket.IO实例,则emit()在第一个工作线程中调用套接字将向其客户端发送数据,而第二个工作线程将执行与您从该工作线程调用emit相同的操作。
emit()