小编典典

等待最后一个stream.on('data')事件中的异步函数回调

redis

我正在使用fast-csv使用来遍历CSV文件stream。对于CSV文件的每一行,我想在redis中创建一个作业,为此我使用kue。解析行是同步功能。整个过程看起来像这样:

var csvStream = fastCsv(_config.csvOptions)
  .on('data', function(data) {
    var stream = this;
    stream.pause();

    var payload = parseRow(data, _config);
    console.log(payload); // the payload is always printed to the console

    var job = kue.create('csv-row', {
        payload: payload
      })
      .save(function(err) {
        if (!err) console.log('Enqueued at ' + job.id);
        else console.log('Redis error ' + JSON.stringify(err));

        stream.resume();
      });
  })
  .on('end', function() {
    callback(); // this ends my parsing
  });

console.log(payload);CSV文件每一行的简单显示,但是未创建作业。即,没有save打印出回调中的输出,并且该作业不在我的Redis中。

我假设,因为它是CSV文件的最后一行,所以该流已经发出end,因此最后一个kue.create()不能在进程终止之前执行吗?

有没有办法在end完成kue之前停止流的播放?


阅读 737

收藏
2020-06-20

共1个答案

小编典典

您可以使用异步库解决此问题。您可以对任何流使用以下模式。

var AsyncLib = require('async');

var worker = function (payload, cb) {
    //do something with payload and call callback
    return cb();
};

var concurrency = 5;
var streamQueue = AsyncLib.queue(worker, concurrency);

var stream = //some readable stream;

stream.on('data', function(data) {
    //no need to pause and resume
    var payload = '//some payload';
    streamQueue.push(payload);
})
.on('end', function() {
    //register drain event on end and callback
    streamQueue.drain = function () {
        callback();
    };
});
2020-06-20