diff --git a/src/node/utils/ImportEtherpad.js b/src/node/utils/ImportEtherpad.js index 6afc0bea3..a028ee12f 100644 --- a/src/node/utils/ImportEtherpad.js +++ b/src/node/utils/ImportEtherpad.js @@ -18,7 +18,7 @@ const AttributePool = require('../../static/js/AttributePool'); const {Pad} = require('../db/Pad'); -const async = require('async'); +const Stream = require('./Stream'); const authorManager = require('../db/AuthorManager'); const db = require('../db/DB'); const hooks = require('../../static/js/pluginfw/hooks'); @@ -49,10 +49,6 @@ exports.setPadRaw = async (padId, r, authorId = '') => { if (originalPadId !== padId) throw new Error('unexpected pad ID in record'); }; - // Limit the number of in-flight database queries so that the queries do not time out when - // importing really large files. - const q = async.queue(async (task) => await task(), 100); - // First validate and transform values. Do not commit any records to the database yet in case // there is a problem with the data. @@ -61,7 +57,7 @@ exports.setPadRaw = async (padId, r, authorId = '') => { const padDb = new ueberdb.Database('memory', {data}); await padDb.init(); try { - await Promise.all(Object.entries(records).map(([key, value]) => q.pushAsync(async () => { + const processRecord = async (key, value) => { if (!value) return; const keyParts = key.split(':'); const [prefix, id] = keyParts; @@ -98,7 +94,9 @@ exports.setPadRaw = async (padId, r, authorId = '') => { return; } await padDb.set(key, value); - }))); + }; + const readOps = new Stream(Object.entries(records)).map(([k, v]) => processRecord(k, v)); + for (const op of readOps.batch(100).buffer(99)) await op; const pad = new Pad(padId, padDb); await pad.init(null, authorId);