我正在使用fast-csv使用来遍历CSV文件stream。对于CSV文件的每一行,我想在redis中创建一个作业,为此我使用kue。解析行是同步功能。整个过程看起来像这样:
stream
var csvStream = fastCsv(_config.csvOptions) .on('data', function(data) { var stream = this; stream.pause(); var payload = parseRow(data, _config); console.log(payload); // the payload is always printed to the console var job = kue.create('csv-row', { payload: payload }) .save(function(err) { if (!err) console.log('Enqueued at ' + job.id); else console.log('Redis error ' + JSON.stringify(err)); stream.resume(); }); }) .on('end', function() { callback(); // this ends my parsing });
console.log(payload);CSV文件每一行的简单显示,但是未创建作业。即,没有save打印出回调中的输出,并且该作业不在我的Redis中。
console.log(payload);
save
我假设,因为它是CSV文件的最后一行,所以该流已经发出end,因此最后一个kue.create()不能在进程终止之前执行吗?
end
kue.create()
有没有办法在end完成kue之前停止流的播放?
您可以使用异步库解决此问题。您可以对任何流使用以下模式。
var AsyncLib = require('async'); var worker = function (payload, cb) { //do something with payload and call callback return cb(); }; var concurrency = 5; var streamQueue = AsyncLib.queue(worker, concurrency); var stream = //some readable stream; stream.on('data', function(data) { //no need to pause and resume var payload = '//some payload'; streamQueue.push(payload); }) .on('end', function() { //register drain event on end and callback streamQueue.drain = function () { callback(); }; });