From 9246a1de26cdf94238dd9296be060b64e3d9f346 Mon Sep 17 00:00:00 2001 From: Ray Bellis Date: Wed, 30 Jan 2019 13:55:49 +0000 Subject: [PATCH] PadMessageHandler.js: further conversion --- src/node/handler/PadMessageHandler.js | 397 +++++++++----------------- 1 file changed, 135 insertions(+), 262 deletions(-) diff --git a/src/node/handler/PadMessageHandler.js b/src/node/handler/PadMessageHandler.js index 1e9496cff..25f9879bc 100644 --- a/src/node/handler/PadMessageHandler.js +++ b/src/node/handler/PadMessageHandler.js @@ -1274,7 +1274,7 @@ async function handleClientReady(client, message) /** * Handles a request for a rough changeset, the timeslider client needs it */ -function handleChangesetRequest(client, message) +async function handleChangesetRequest(client, message) { // check if all ok if (message.data == null) { @@ -1308,309 +1308,182 @@ function handleChangesetRequest(client, message) return; } - var granularity = message.data.granularity; - var start = message.data.start; - var end = start + (100 * granularity); - var padIds; + let granularity = message.data.granularity; + let start = message.data.start; + let end = start + (100 * granularity); - async.series([ - function(callback) { - readOnlyManager.getIds(message.padId, function(err, value) { - if (ERR(err, callback)) return; + let padIds = await readOnlyManager.getIds(message.padId); - padIds = value; - callback(); - }); - }, - - function(callback) { - // build the requested rough changesets and send them back - getChangesetInfo(padIds.padId, start, end, granularity, function(err, changesetInfo) { - if (err) return console.error('Error while handling a changeset request for ' + padIds.padId, err, message.data); - - var data = changesetInfo; - data.requestID = message.data.requestID; - - client.json.send({ type: "CHANGESET_REQ", data: data }); - }); - } - ]); + // build the requested rough changesets and send them back + try { + let data = await getChangesetInfo(padIds.padId, start, end, granularity); + data.requestID = message.data.requestID; + client.json.send({ type: "CHANGESET_REQ", data }); + } catch (err) { + console.error('Error while handling a changeset request for ' + padIds.padId, err, message.data); + } } /** * Tries to rebuild the getChangestInfo function of the original Etherpad * https://github.com/ether/pad/blob/master/etherpad/src/etherpad/control/pad/pad_changeset_control.js#L144 */ -let getChangesetInfo = thenify(function getChangesetInfo(padId, startNum, endNum, granularity, callback) +async function getChangesetInfo(padId, startNum, endNum, granularity) { - var forwardsChangesets = []; - var backwardsChangesets = []; - var timeDeltas = []; - var apool = new AttributePool(); - var pad; - var composedChangesets = {}; - var revisionDate = []; - var lines; - var head_revision = 0; + let pad = await padManager.getPad(padId); + let head_revision = pad.getHeadRevisionNumber(); - async.series([ - // get the pad from the database - function(callback) { - padManager.getPad(padId, function(err, _pad) { - if (ERR(err, callback)) return; + // calculate the last full endnum + if (endNum > head_revision + 1) { + endNum = head_revision + 1; + } + endNum = Math.floor(endNum / granularity) * granularity; - pad = _pad; - head_revision = pad.getHeadRevisionNumber(); - callback(); - }); - }, + let compositesChangesetNeeded = []; + let revTimesNeeded = []; - function(callback) { - // calculate the last full endnum - var lastRev = pad.getHeadRevisionNumber(); - if (endNum > lastRev + 1) { - endNum = lastRev + 1; - } + // figure out which composite Changeset and revTimes we need, to load them in bulk + for (let start = startNum; start < endNum; start += granularity) { + let end = start + granularity; - endNum = Math.floor(endNum / granularity) * granularity; + // add the composite Changeset we needed + compositesChangesetNeeded.push({ start, end }); - var compositesChangesetNeeded = []; - var revTimesNeeded = []; + // add the t1 time we need + revTimesNeeded.push(start == 0 ? 0 : start - 1); - // figure out which composite Changeset and revTimes we need, to load them in bulk - var compositeStart = startNum; - while (compositeStart < endNum) { - var compositeEnd = compositeStart + granularity; + // add the t2 time we need + revTimesNeeded.push(end - 1); + } - // add the composite Changeset we needed - compositesChangesetNeeded.push({ start: compositeStart, end: compositeEnd }); + // get all needed db values parallel - no await here since + // it would make all the lookups run in series - // add the t1 time we need - revTimesNeeded.push(compositeStart == 0 ? 0 : compositeStart - 1); + // get all needed composite Changesets + let composedChangesets = {}; + let p1 = Promise.all(compositesChangesetNeeded.map(item => { + return composePadChangesets(padId, item.start, item.end).then(changeset => { + composedChangesets[item.start + "/" + item.end] = changeset; + }); + })); - // add the t2 time we need - revTimesNeeded.push(compositeEnd - 1); + // get all needed revision Dates + let revisionDate = []; + let p2 = Promise.all(revTimesNeeded.map(revNum => { + return pad.getRevisionDate(revNum).then(revDate => { + revisionDate[revNum] = Math.floor(revDate / 1000); + }); + })); - compositeStart += granularity; - } - - // get all needed db values parallel - async.parallel([ - function(callback) { - // get all needed composite Changesets - async.forEach(compositesChangesetNeeded, function(item, callback) { - composePadChangesets(padId, item.start, item.end, function(err, changeset) { - if (ERR(err, callback)) return; - - composedChangesets[item.start + "/" + item.end] = changeset; - callback(); - }); - }, callback); - }, - - function(callback) { - // get all needed revision Dates - async.forEach(revTimesNeeded, function(revNum, callback) { - pad.getRevisionDate(revNum, function(err, revDate) { - if (ERR(err, callback)) return; - - revisionDate[revNum] = Math.floor(revDate/1000); - callback(); - }); - }, callback); - }, - - // get the lines - function(callback) { - getPadLines(padId, startNum-1, function(err, _lines) { - if (ERR(err, callback)) return; - - lines = _lines; - callback(); - }); - } - ], callback); - }, - - // don't know what happens here exactly :/ - function(callback) { - var compositeStart = startNum; - - while (compositeStart < endNum) { - var compositeEnd = compositeStart + granularity; - if (compositeEnd > endNum || compositeEnd > head_revision+1) { - break; - } - - var forwards = composedChangesets[compositeStart + "/" + compositeEnd]; - var backwards = Changeset.inverse(forwards, lines.textlines, lines.alines, pad.apool()); - - Changeset.mutateAttributionLines(forwards, lines.alines, pad.apool()); - Changeset.mutateTextLines(forwards, lines.textlines); - - var forwards2 = Changeset.moveOpsToNewPool(forwards, pad.apool(), apool); - var backwards2 = Changeset.moveOpsToNewPool(backwards, pad.apool(), apool); - - var t1, t2; - if (compositeStart == 0) { - t1 = revisionDate[0]; - } else { - t1 = revisionDate[compositeStart - 1]; - } - - t2 = revisionDate[compositeEnd - 1]; - - timeDeltas.push(t2 - t1); - forwardsChangesets.push(forwards2); - backwardsChangesets.push(backwards2); - - compositeStart += granularity; - } - - callback(); - } - ], - function(err) { - if (ERR(err, callback)) return; - - callback(null, {forwardsChangesets: forwardsChangesets, - backwardsChangesets: backwardsChangesets, - apool: apool.toJsonable(), - actualEndNum: endNum, - timeDeltas: timeDeltas, - start: startNum, - granularity: granularity }); + // get the lines + let lines; + let p3 = getPadLines(padId, startNum - 1).then(_lines => { + lines = _lines; }); -}); + + // wait for all of the above to complete + await Promise.all([p1, p2, p3]); + + // doesn't know what happens here exactly :/ + let timeDeltas = []; + let forwardsChangesets = []; + let backwardsChangesets = []; + let apool = new AttributePool(); + + for (let compositeStart = startNum; compositeStart < endNum; compositeStart += granularity) { + let compositeEnd = compositeStart + granularity; + if (compositeEnd > endNum || compositeEnd > head_revision + 1) { + break; + } + + let forwards = composedChangesets[compositeStart + "/" + compositeEnd]; + let backwards = Changeset.inverse(forwards, lines.textlines, lines.alines, pad.apool()); + + Changeset.mutateAttributionLines(forwards, lines.alines, pad.apool()); + Changeset.mutateTextLines(forwards, lines.textlines); + + let forwards2 = Changeset.moveOpsToNewPool(forwards, pad.apool(), apool); + let backwards2 = Changeset.moveOpsToNewPool(backwards, pad.apool(), apool); + + let t1 = (compositeStart == 0) ? revisionDate[0] : revisionDate[compositeStart - 1]; + let t2 = revisionDate[compositeEnd - 1]; + + timeDeltas.push(t2 - t1); + forwardsChangesets.push(forwards2); + backwardsChangesets.push(backwards2); + } + + return { forwardsChangesets, backwardsChangesets, + apool: apool.toJsonable(), actualEndNum: endNum, + timeDeltas, start: startNum, granularity }; +} /** * Tries to rebuild the getPadLines function of the original Etherpad * https://github.com/ether/pad/blob/master/etherpad/src/etherpad/control/pad/pad_changeset_control.js#L263 */ -let getPadLines = thenify(function getPadLines(padId, revNum, callback) +async function getPadLines(padId, revNum) { - var atext; - var result = {}; - var pad; + let pad = padManager.getPad(padId); - async.series([ - // get the pad from the database - function(callback) { - padManager.getPad(padId, function(err, _pad) { - if (ERR(err, callback)) return; + // get the atext + let atext; - pad = _pad; - callback(); - }); - }, + if (revNum >= 0) { + atext = await pad.getInternalRevisionAText(revNum); + } else { + atext = Changeset.makeAText("\n"); + } - // get the atext - function(callback) { - if (revNum >= 0) { - pad.getInternalRevisionAText(revNum, function(err, _atext) { - if (ERR(err, callback)) return; - - atext = _atext; - callback(); - }); - } else { - atext = Changeset.makeAText("\n"); - callback(null); - } - }, - - function(callback) { - result.textlines = Changeset.splitTextLines(atext.text); - result.alines = Changeset.splitAttributionLines(atext.attribs, atext.text); - callback(null); - } - ], - - function(err) { - if (ERR(err, callback)) return; - - callback(null, result); - }); -}); + return { + textlines: Changeset.splitTextLines(atext.text), + alines: Changeset.splitAttributionLines(atext.attribs, atext.text) + }; +} /** * Tries to rebuild the composePadChangeset function of the original Etherpad * https://github.com/ether/pad/blob/master/etherpad/src/etherpad/control/pad/pad_changeset_control.js#L241 */ -let composePadChangesets = thenify(function(padId, startNum, endNum, callback) +async function composePadChangesets (padId, startNum, endNum) { - var pad; - var changesets = {}; - var changeset; + let pad = await padManager.getPad(padId); - async.series([ - // get the pad from the database - function(callback) { - padManager.getPad(padId, function(err, _pad) { - if (ERR(err, callback)) return; + // fetch all changesets we need + let headNum = pad.getHeadRevisionNumber(); + endNum = Math.min(endNum, headNum + 1); + startNum = Math.max(startNum, 0); - pad = _pad; - callback(); - }); - }, + // create an array for all changesets, we will + // replace the values with the changeset later + let changesetsNeeded = []; + for (let r = startNum ; r < endNum; r++) { + changesetsNeeded.push(r); + } - // fetch all changesets we need - function(callback) { - var changesetsNeeded=[]; + // get all changesets + let changesets = {}; + await Promise.all(changesetsNeeded.map(revNum => { + return pad.getRevisionChangeset(revNum).then(changeset => changesets[revNum] = changeset); + })); - var headNum = pad.getHeadRevisionNumber(); - if (endNum > headNum + 1) { - endNum = headNum + 1; - } + // compose Changesets + try { + let changeset = changesets[startNum]; + let pool = pad.apool(); - if (startNum < 0) { - startNum = 0; - } - - // create an array for all changesets, we will - // replace the values with the changeset later - for (var r = startNum; r < endNum; r++) { - changesetsNeeded.push(r); - } - - // get all changesets - async.forEach(changesetsNeeded, function(revNum,callback) { - pad.getRevisionChangeset(revNum, function(err, value) { - if (ERR(err, callback)) return; - - changesets[revNum] = value; - callback(); - }); - },callback); - }, - - // compose Changesets - function(callback) { - changeset = changesets[startNum]; - var pool = pad.apool(); - - try { - for (var r = startNum + 1; r < endNum; r++) { - var cs = changesets[r]; - changeset = Changeset.compose(changeset, cs, pool); - } - } catch(e) { - // r-1 indicates the rev that was build starting with startNum, applying startNum+1, +2, +3 - console.warn("failed to compose cs in pad:", padId, " startrev:", startNum, " current rev:", r); - return callback(e); - } - - callback(null); + for (let r = startNum + 1; r < endNum; r++) { + let cs = changesets[r]; + changeset = Changeset.compose(changeset, cs, pool); } - ], + return changeset; - // return err and changeset - function(err) { - if (ERR(err, callback)) return; - - callback(null, changeset); - }); -}); + } catch (e) { + // r-1 indicates the rev that was build starting with startNum, applying startNum+1, +2, +3 + console.warn("failed to compose cs in pad:", padId, " startrev:", startNum," current rev:", r); + throw e; + } +} function _getRoomClients(padID) { var roomClients = [];