我正在尝试使用Redis构建消息队列。每当客户端发送新数据时,它们就会被添加到列表中。
这是它的代码
$client->lPush("my_queue", $data);
现在有一个单独的脚本 slave.php ,它将弹出新到达的数据并进行处理。slave.php的代码
while (true) { list($queue, $message) = $client->brPop(["my_queue"], 0); /* Logic to process the data */ }
我已经修改了apache启动脚本,以便slave.php应该以apache开始和停止。它运作良好。但是在等待几分钟后,brPop停止监听并显示以下错误消息:
Uncaught exception 'Predis\Connection\ConnectionException' with message 'Error while reading line from the server [tcp://127.0.0.1:6379]' in /var/www/api/lib/predis-0.8/lib/Predis/Connection/AbstractConnection.php:139 Stack trace: #0 /var/www/api/lib/predis-0.8/lib/Predis/Connection/StreamConnection.php(205): Predis\Connection\AbstractConnection->onConnectionError('Error while rea...') #1 /var/www/api/lib/predis-0.8/lib/Predis/Connection/AbstractConnection.php(128): Predis\Connection\StreamConnection->read() #2 /var/www/api/lib/predis-0.8/lib/Predis/Connection/AbstractConnection.php(120): Predis\Connection\AbstractConnection->readResponse(Object(Predis\Command\ListPopLastBlocking)) #3 /var/www/api/lib/predis-0.8/lib/Predis/Client.php(227): Predis\Connection\AbstractConnection->executeCommand(Object(Predis\Command\ListPopLastBlocking)) #4 /var/www/api/lib/slave.php(7): Predis\Client->__call('brPop', Array) #5 /var/www/api/lib/slave.php(7): Predis\Client->brPop(Array, 0) #6 {main} thrown in /var/www/api/lib/predis-0.8/lib/Predis/Connection/AbstractConnection.php on line 139
根据文档,如果list为空,则BLPOP / BRPOP会阻止连接,直到另一个客户端对其中一个键执行LPUSH或RPUSH操作。但这对我而言并没有发生。就我而言,一旦brpop阻止了连接,即使新数据到达列表中,它也不会再次侦听。
我应该进行哪些更改才能使其正常工作?
它现在对我有用,但是我不确定这是否是正确的方法。现在,我正在捕获错误并在连接失败的情况下递归调用该函数。我的新slave.php看起来像这样:
function process_data() { try { $client = new \Predis\Client(); require_once("logger.php"); while (true) { list($queue, $message) = $client->brPop(["bookmark_queue"], 0); // logic } } catch (Exception $ex) { $error = $ex->getMessage(); log_error($error, "slave.php"); process_data(); // call the function recursively if connection fails } } process_data(); // call the function