Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
362 views
in Technique[技术] by (71.8m points)

node.js - NodeJS, promises, streams - processing large CSV files

I need to build a function for processing large CSV files for use in a bluebird.map() call. Given the potential sizes of the file, I'd like to use streaming.

This function should accept a stream (a CSV file) and a function (that processes the chunks from the stream) and return a promise when the file is read to end (resolved) or errors (rejected).

So, I start with:

'use strict';

var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');

var pgp = require('pg-promise')({promiseLib: promise});

api.parsers.processCsvStream = function(passedStream, processor) {

  var parser = csv.parse(passedStream, {trim: true});
  passedStream.pipe(parser);

  // use readable or data event?
  parser.on('readable', function() {
    // call processor, which may be async
    // how do I throttle the amount of promises generated
  });

  var db = pgp(api.config.mailroom.fileMakerDbConfig);

  return new Promise(function(resolve, reject) {
    parser.on('end', resolve);
    parser.on('error', reject);
  });

}

Now, I have two inter-related issues:

  1. I need to throttle the actual amount of data being processed, so as to not create memory pressures.
  2. The function passed as the processor param is going to often be async, such as saving the contents of the file to the db via a library that is promise-based (right now: pg-promise). As such, it will create a promise in memory and move on, repeatedly.

The pg-promise library has functions to manage this, like page(), but I'm not able to wrap my ahead around how to mix stream event handlers with these promise methods. Right now, I return a promise in the handler for readable section after each read(), which means I create a huge amount of promised database operations and eventually fault out because I hit a process memory limit.

Does anyone have a working example of this that I can use as a jumping point?

UPDATE: Probably more than one way to skin the cat, but this works:

'use strict';

var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');

var pgp = require('pg-promise')({promiseLib: promise});

api.parsers.processCsvStream = function(passedStream, processor) {

  // some checks trimmed out for example

  var db = pgp(api.config.mailroom.fileMakerDbConfig);
  var parser = csv.parse(passedStream, {trim: true});
  passedStream.pipe(parser);

  var readDataFromStream = function(index, data, delay) {
    var records = [];
    var record;
    do {
      record = parser.read();
      if(record != null)
        records.push(record);
    } while(record != null && (records.length < api.config.mailroom.fileParserConcurrency))
    parser.pause();

    if(records.length)
      return records;
  };

  var processData = function(index, data, delay) {
    console.log('processData(' + index + ') > data: ', data);
    parser.resume();
  };

  parser.on('readable', function() {
    db.task(function(tsk) {
      this.page(readDataFromStream, processData);
    });
  });

  return new Promise(function(resolve, reject) {
    parser.on('end', resolve);
    parser.on('error', reject);
  });
}

Anyone sees a potential problem with this approach?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

You might want to look at promise-streams

var ps = require('promise-streams');
passedStream
  .pipe(csv.parse({trim: true}))
  .pipe(ps.map({concurrent: 4}, row => processRowDataWhichMightBeAsyncAndReturnPromise(row)))
  .wait().then(_ => {
    console.log("All done!");
  });

Works with backpressure and everything.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...