PadMessageHandler: Replace `channels` package with async-friendly class

pull/5285/head
Richard Hansen 2021-11-16 02:51:36 -05:00
parent 5c1177a3d9
commit 85919ff914
3 changed files with 36 additions and 12 deletions

View File

@ -34,11 +34,9 @@ const messageLogger = log4js.getLogger('message');
const accessLogger = log4js.getLogger('access');
const _ = require('underscore');
const hooks = require('../../static/js/pluginfw/hooks.js');
const channels = require('channels');
const stats = require('../stats');
const assert = require('assert').strict;
const {RateLimiterMemory} = require('rate-limiter-flexible');
const util = require('util');
const webaccess = require('../hooks/express/webaccess');
let rateLimiter;
@ -84,11 +82,44 @@ stats.gauge('activePads', () => {
return padIds.size;
});
/**
* Processes one task at a time per channel.
*/
class Channels {
/**
* @param {(ch, task) => any} [exec] - Task executor. If omitted, tasks are assumed to be
* functions that will be executed with the channel as the only argument.
*/
constructor(exec = (ch, task) => task(ch)) {
this._exec = exec;
this._promiseChains = new Map();
}
/**
* Schedules a task for execution. The task will be executed once all previously enqueued tasks
* for the named channel have completed.
*
* @param {any} ch - Identifies the channel.
* @param {any} task - The task to give to the executor.
* @returns {Promise<any>} The value returned by the executor.
*/
async enqueue(ch, task) {
const p = (this._promiseChains.get(ch) || Promise.resolve()).then(() => this._exec(ch, task));
const pc = p
.catch(() => {}) // Prevent rejections from halting the queue.
.then(() => {
// Clean up this._promiseChains if there are no more tasks for the channel.
if (this._promiseChains.get(ch) === pc) this._promiseChains.delete(ch);
});
this._promiseChains.set(ch, pc);
return await p;
}
}
/**
* A changeset queue per pad that is processed by handleUserChanges()
*/
const padChannels =
new channels.channels(({socket, message}, cb) => handleUserChangesCb(socket, message, cb));
const padChannels = new Channels((ch, {socket, message}) => handleUserChanges(socket, message));
/**
* This Method is called by server.js to tell the message handler on which socket it should send
@ -265,7 +296,7 @@ exports.handleMessage = async (socket, message) => {
messageLogger.warn('Dropped message, COLLABROOM for readonly pad');
} else if (message.data.type === 'USER_CHANGES') {
stats.counter('pendingEdits').inc();
padChannels.emit(message.padId, {socket, message}); // add to pad queue
await padChannels.enqueue(message.padId, {socket, message});
} else if (message.data.type === 'USERINFO_UPDATE') {
await handleUserInfoUpdate(socket, message);
} else if (message.data.type === 'CHAT_MESSAGE') {
@ -690,7 +721,6 @@ const handleUserChanges = async (socket, message) => {
stopWatch.end();
};
const handleUserChangesCb = util.callbackify(handleUserChanges);
exports.updatePadClients = async (pad) => {
// skip this if no-one is on this pad

5
src/package-lock.json generated
View File

@ -1233,11 +1233,6 @@
"supports-color": "^5.3.0"
}
},
"channels": {
"version": "0.0.4",
"resolved": "https://registry.npmjs.org/channels/-/channels-0.0.4.tgz",
"integrity": "sha1-G+4yPt6hUrue8E9BvG5rD1lIqUE="
},
"character-entities-html4": {
"version": "1.1.4",
"resolved": "https://registry.npmjs.org/character-entities-html4/-/character-entities-html4-1.1.4.tgz",

View File

@ -31,7 +31,6 @@
],
"dependencies": {
"async": "^3.2.1",
"channels": "0.0.4",
"clean-css": "^5.2.1",
"cookie-parser": "1.4.5",
"cross-spawn": "^7.0.3",