From 85919ff914067bb49a14ed08473125f9973446e0 Mon Sep 17 00:00:00 2001 From: Richard Hansen Date: Tue, 16 Nov 2021 02:51:36 -0500 Subject: [PATCH] PadMessageHandler: Replace `channels` package with async-friendly class --- src/node/handler/PadMessageHandler.js | 42 +++++++++++++++++++++++---- src/package-lock.json | 5 ---- src/package.json | 1 - 3 files changed, 36 insertions(+), 12 deletions(-) diff --git a/src/node/handler/PadMessageHandler.js b/src/node/handler/PadMessageHandler.js index ca706e325..1b327a92d 100644 --- a/src/node/handler/PadMessageHandler.js +++ b/src/node/handler/PadMessageHandler.js @@ -34,11 +34,9 @@ const messageLogger = log4js.getLogger('message'); const accessLogger = log4js.getLogger('access'); const _ = require('underscore'); const hooks = require('../../static/js/pluginfw/hooks.js'); -const channels = require('channels'); const stats = require('../stats'); const assert = require('assert').strict; const {RateLimiterMemory} = require('rate-limiter-flexible'); -const util = require('util'); const webaccess = require('../hooks/express/webaccess'); let rateLimiter; @@ -84,11 +82,44 @@ stats.gauge('activePads', () => { return padIds.size; }); +/** + * Processes one task at a time per channel. + */ +class Channels { + /** + * @param {(ch, task) => any} [exec] - Task executor. If omitted, tasks are assumed to be + * functions that will be executed with the channel as the only argument. + */ + constructor(exec = (ch, task) => task(ch)) { + this._exec = exec; + this._promiseChains = new Map(); + } + + /** + * Schedules a task for execution. The task will be executed once all previously enqueued tasks + * for the named channel have completed. + * + * @param {any} ch - Identifies the channel. + * @param {any} task - The task to give to the executor. + * @returns {Promise} The value returned by the executor. + */ + async enqueue(ch, task) { + const p = (this._promiseChains.get(ch) || Promise.resolve()).then(() => this._exec(ch, task)); + const pc = p + .catch(() => {}) // Prevent rejections from halting the queue. + .then(() => { + // Clean up this._promiseChains if there are no more tasks for the channel. + if (this._promiseChains.get(ch) === pc) this._promiseChains.delete(ch); + }); + this._promiseChains.set(ch, pc); + return await p; + } +} + /** * A changeset queue per pad that is processed by handleUserChanges() */ -const padChannels = - new channels.channels(({socket, message}, cb) => handleUserChangesCb(socket, message, cb)); +const padChannels = new Channels((ch, {socket, message}) => handleUserChanges(socket, message)); /** * This Method is called by server.js to tell the message handler on which socket it should send @@ -265,7 +296,7 @@ exports.handleMessage = async (socket, message) => { messageLogger.warn('Dropped message, COLLABROOM for readonly pad'); } else if (message.data.type === 'USER_CHANGES') { stats.counter('pendingEdits').inc(); - padChannels.emit(message.padId, {socket, message}); // add to pad queue + await padChannels.enqueue(message.padId, {socket, message}); } else if (message.data.type === 'USERINFO_UPDATE') { await handleUserInfoUpdate(socket, message); } else if (message.data.type === 'CHAT_MESSAGE') { @@ -690,7 +721,6 @@ const handleUserChanges = async (socket, message) => { stopWatch.end(); }; -const handleUserChangesCb = util.callbackify(handleUserChanges); exports.updatePadClients = async (pad) => { // skip this if no-one is on this pad diff --git a/src/package-lock.json b/src/package-lock.json index 3565f4b40..037648d44 100644 --- a/src/package-lock.json +++ b/src/package-lock.json @@ -1233,11 +1233,6 @@ "supports-color": "^5.3.0" } }, - "channels": { - "version": "0.0.4", - "resolved": "https://registry.npmjs.org/channels/-/channels-0.0.4.tgz", - "integrity": "sha1-G+4yPt6hUrue8E9BvG5rD1lIqUE=" - }, "character-entities-html4": { "version": "1.1.4", "resolved": "https://registry.npmjs.org/character-entities-html4/-/character-entities-html4-1.1.4.tgz", diff --git a/src/package.json b/src/package.json index 3dba9c9db..2fd5e985e 100644 --- a/src/package.json +++ b/src/package.json @@ -31,7 +31,6 @@ ], "dependencies": { "async": "^3.2.1", - "channels": "0.0.4", "clean-css": "^5.2.1", "cookie-parser": "1.4.5", "cross-spawn": "^7.0.3",