ImportEtherpad: Process records in batches
This enables ueberdb to combine multiple queries into a single message to the database backend.pull/5513/head
parent
10117bc988
commit
c1652fd695
|
@ -18,7 +18,7 @@
|
||||||
|
|
||||||
const AttributePool = require('../../static/js/AttributePool');
|
const AttributePool = require('../../static/js/AttributePool');
|
||||||
const {Pad} = require('../db/Pad');
|
const {Pad} = require('../db/Pad');
|
||||||
const async = require('async');
|
const Stream = require('./Stream');
|
||||||
const authorManager = require('../db/AuthorManager');
|
const authorManager = require('../db/AuthorManager');
|
||||||
const db = require('../db/DB');
|
const db = require('../db/DB');
|
||||||
const hooks = require('../../static/js/pluginfw/hooks');
|
const hooks = require('../../static/js/pluginfw/hooks');
|
||||||
|
@ -49,10 +49,6 @@ exports.setPadRaw = async (padId, r, authorId = '') => {
|
||||||
if (originalPadId !== padId) throw new Error('unexpected pad ID in record');
|
if (originalPadId !== padId) throw new Error('unexpected pad ID in record');
|
||||||
};
|
};
|
||||||
|
|
||||||
// Limit the number of in-flight database queries so that the queries do not time out when
|
|
||||||
// importing really large files.
|
|
||||||
const q = async.queue(async (task) => await task(), 100);
|
|
||||||
|
|
||||||
// First validate and transform values. Do not commit any records to the database yet in case
|
// First validate and transform values. Do not commit any records to the database yet in case
|
||||||
// there is a problem with the data.
|
// there is a problem with the data.
|
||||||
|
|
||||||
|
@ -61,7 +57,7 @@ exports.setPadRaw = async (padId, r, authorId = '') => {
|
||||||
const padDb = new ueberdb.Database('memory', {data});
|
const padDb = new ueberdb.Database('memory', {data});
|
||||||
await padDb.init();
|
await padDb.init();
|
||||||
try {
|
try {
|
||||||
await Promise.all(Object.entries(records).map(([key, value]) => q.pushAsync(async () => {
|
const processRecord = async (key, value) => {
|
||||||
if (!value) return;
|
if (!value) return;
|
||||||
const keyParts = key.split(':');
|
const keyParts = key.split(':');
|
||||||
const [prefix, id] = keyParts;
|
const [prefix, id] = keyParts;
|
||||||
|
@ -98,7 +94,9 @@ exports.setPadRaw = async (padId, r, authorId = '') => {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
await padDb.set(key, value);
|
await padDb.set(key, value);
|
||||||
})));
|
};
|
||||||
|
const readOps = new Stream(Object.entries(records)).map(([k, v]) => processRecord(k, v));
|
||||||
|
for (const op of readOps.batch(100).buffer(99)) await op;
|
||||||
|
|
||||||
const pad = new Pad(padId, padDb);
|
const pad = new Pad(padId, padDb);
|
||||||
await pad.init(null, authorId);
|
await pad.init(null, authorId);
|
||||||
|
|
Loading…
Reference in New Issue