lint: src/node/handler/PadMessageHandler.js

pull/4667/head
John McLear 2021-01-21 21:06:52 +00:00 committed by Richard Hansen
parent 841d45cbe1
commit 532bde71f7
1 changed files with 110 additions and 80 deletions

View File

@ -1,3 +1,4 @@
'use strict';
/**
* The MessageHandler handles all Messages that comes from Socket.IO and controls the sessions
*/
@ -18,22 +19,20 @@
* limitations under the License.
*/
/* global exports, process, require */
const padManager = require('../db/PadManager');
const Changeset = require('ep_etherpad-lite/static/js/Changeset');
const AttributePool = require('ep_etherpad-lite/static/js/AttributePool');
const AttributeManager = require('ep_etherpad-lite/static/js/AttributeManager');
const Changeset = require('../../static/js/Changeset');
const AttributePool = require('../../static/js/AttributePool');
const AttributeManager = require('../../static/js/AttributeManager');
const authorManager = require('../db/AuthorManager');
const readOnlyManager = require('../db/ReadOnlyManager');
const settings = require('../utils/Settings');
const securityManager = require('../db/SecurityManager');
const plugins = require('ep_etherpad-lite/static/js/pluginfw/plugin_defs.js');
const plugins = require('../../static/js/pluginfw/plugin_defs.js');
const log4js = require('log4js');
const messageLogger = log4js.getLogger('message');
const accessLogger = log4js.getLogger('access');
const _ = require('underscore');
const hooks = require('ep_etherpad-lite/static/js/pluginfw/hooks.js');
const hooks = require('../../static/js/pluginfw/hooks.js');
const channels = require('channels');
const stats = require('../stats');
const assert = require('assert').strict;
@ -65,7 +64,9 @@ stats.gauge('totalUsers', () => Object.keys(socketio.sockets.sockets).length);
/**
* A changeset queue per pad that is processed by handleUserChanges()
*/
const padChannels = new channels.channels(({socket, message}, callback) => nodeify(handleUserChanges(socket, message), callback));
const padChannels = new channels.channels(
({socket, message}, callback) => nodeify(handleUserChanges(socket, message), callback)
);
/**
* Saves the Socket class we need to send and receive data from the client
@ -76,7 +77,7 @@ let socketio;
* This Method is called by server.js to tell the message handler on which socket it should send
* @param socket_io The Socket
*/
exports.setSocketIO = function (socket_io) {
exports.setSocketIO = (socket_io) => {
socketio = socket_io;
};
@ -94,7 +95,7 @@ exports.handleConnect = (socket) => {
/**
* Kicks all sessions from a pad
*/
exports.kickSessionsFromPad = function (padID) {
exports.kickSessionsFromPad = (padID) => {
if (typeof socketio.sockets.clients !== 'function') return;
// skip if there is nobody on this pad
@ -114,7 +115,8 @@ exports.handleDisconnect = async (socket) => {
// save the padname of this session
const session = sessioninfos[socket.id];
// if this connection was already etablished with a handshake, send a disconnect message to the others
// if this connection was already etablished with a handshake,
// send a disconnect message to the others
if (session && session.author) {
const {session: {user} = {}} = socket.client.request;
accessLogger.info(`${'[LEAVE]' +
@ -192,7 +194,8 @@ exports.handleMessage = async (socket, message) => {
const auth = thisSession.auth;
if (!auth) {
console.error('Auth was never applied to a session. If you are using the stress-test tool then restart Etherpad and the Stress test tool.');
console.error('Auth was never applied to a session. If you are using the ' +
'stress-test tool then restart Etherpad and the Stress test tool.');
return;
}
@ -234,7 +237,7 @@ exports.handleMessage = async (socket, message) => {
}
// Call handleMessage hook. If a plugin returns null, the message will be dropped.
if ((await hooks.aCallAll('handleMessage', context)).some((m) => m === null)) {
if ((await hooks.aCallAll('handleMessage', context)).some((m) => m == null)) {
return;
}
@ -283,11 +286,11 @@ exports.handleMessage = async (socket, message) => {
* @param socket the socket.io Socket object for the client
* @param message the message from the client
*/
async function handleSaveRevisionMessage(socket, message) {
const handleSaveRevisionMessage = async (socket, message) => {
const {padId, author: authorId} = sessioninfos[socket.id];
const pad = await padManager.getPad(padId);
await pad.addSavedRevision(pad.head, authorId);
}
};
/**
* Handles a custom message, different to the function below as it handles
@ -296,7 +299,7 @@ async function handleSaveRevisionMessage(socket, message) {
* @param msg {Object} the message we're sending
* @param sessionID {string} the socketIO session to which we're sending this message
*/
exports.handleCustomObjectMessage = function (msg, sessionID) {
exports.handleCustomObjectMessage = (msg, sessionID) => {
if (msg.data.type === 'CUSTOM') {
if (sessionID) {
// a sessionID is targeted: directly to this sessionID
@ -314,7 +317,7 @@ exports.handleCustomObjectMessage = function (msg, sessionID) {
* @param padID {Pad} the pad to which we're sending this message
* @param msgString {String} the message we're sending
*/
exports.handleCustomMessage = function (padID, msgString) {
exports.handleCustomMessage = (padID, msgString) => {
const time = Date.now();
const msg = {
type: 'COLLABROOM',
@ -331,12 +334,12 @@ exports.handleCustomMessage = function (padID, msgString) {
* @param socket the socket.io Socket object for the client
* @param message the message from the client
*/
async function handleChatMessage(socket, message) {
const handleChatMessage = async (socket, message) => {
const time = Date.now();
const text = message.data.text;
const {padId, author: authorId} = sessioninfos[socket.id];
await exports.sendChatMessageToPadClients(time, authorId, text, padId);
}
};
/**
* Sends a chat message to all clients of this pad
@ -345,7 +348,7 @@ async function handleChatMessage(socket, message) {
* @param text the text of the chat message
* @param padId the padId to send the chat message to
*/
exports.sendChatMessageToPadClients = async function (time, userId, text, padId) {
exports.sendChatMessageToPadClients = async (time, userId, text, padId) => {
// get the pad
const pad = await padManager.getPad(padId);
@ -371,7 +374,7 @@ exports.sendChatMessageToPadClients = async function (time, userId, text, padId)
* @param socket the socket.io Socket object for the client
* @param message the message from the client
*/
async function handleGetChatMessages(socket, message) {
const handleGetChatMessages = async (socket, message) => {
if (message.data.start == null) {
messageLogger.warn('Dropped message, GetChatMessages Message has no start!');
return;
@ -387,7 +390,8 @@ async function handleGetChatMessages(socket, message) {
const count = end - start;
if (count < 0 || count > 100) {
messageLogger.warn('Dropped message, GetChatMessages Message, client requested invalid amount of messages!');
messageLogger.warn(
'Dropped message, GetChatMessages Message, client requested invalid amount of messages!');
return;
}
@ -405,14 +409,14 @@ async function handleGetChatMessages(socket, message) {
// send the messages back to the client
socket.json.send(infoMsg);
}
};
/**
* Handles a handleSuggestUserName, that means a user have suggest a userName for a other user
* @param socket the socket.io Socket object for the client
* @param message the message from the client
*/
function handleSuggestUserName(socket, message) {
const handleSuggestUserName = (socket, message) => {
// check if all ok
if (message.data.payload.newName == null) {
messageLogger.warn('Dropped message, suggestUserName Message has no newName!');
@ -433,14 +437,15 @@ function handleSuggestUserName(socket, message) {
socket.json.send(message);
}
});
}
};
/**
* Handles a USERINFO_UPDATE, that means that a user have changed his color or name. Anyway, we get both informations
* Handles a USERINFO_UPDATE, that means that a user have changed his color or name.
* Anyway, we get both informations
* @param socket the socket.io Socket object for the client
* @param message the message from the client
*/
async function handleUserInfoUpdate(socket, message) {
const handleUserInfoUpdate = async (socket, message) => {
// check if all ok
if (message.data.userInfo == null) {
messageLogger.warn('Dropped message, USERINFO_UPDATE Message has no userInfo!');
@ -463,7 +468,8 @@ async function handleUserInfoUpdate(socket, message) {
const author = session.author;
// Check colorId is a Hex color
const isColor = /(^#[0-9A-F]{6}$)|(^#[0-9A-F]{3}$)/i.test(message.data.userInfo.colorId); // for #f00 (Thanks Smamatti)
// for #f00 (Thanks Smamatti)
const isColor = /(^#[0-9A-F]{6}$)|(^#[0-9A-F]{3}$)/i.test(message.data.userInfo.colorId);
if (!isColor) {
messageLogger.warn(`Dropped message, USERINFO_UPDATE Color is malformed.${message.data}`);
return;
@ -496,7 +502,7 @@ async function handleUserInfoUpdate(socket, message) {
// Block until the authorManager has stored the new attributes.
await p;
}
};
/**
* Handles a USER_CHANGES message, where the client submits its local
@ -512,7 +518,7 @@ async function handleUserInfoUpdate(socket, message) {
* @param socket the socket.io Socket object for the client
* @param message the message from the client
*/
async function handleUserChanges(socket, message) {
const handleUserChanges = async (socket, message) => {
// This one's no longer pending, as we're gonna process it now
stats.counter('pendingEdits').dec();
@ -578,7 +584,8 @@ async function handleUserChanges(socket, message) {
// + can add text with attribs
// = can change or add attribs
// - can have attribs, but they are discarded and don't show up in the attribs - but do show up in the pool
// - can have attribs, but they are discarded and don't show up in the attribs -
// but do show up in the pool
op.attribs.split('*').forEach((attr) => {
if (!attr) return;
@ -586,9 +593,11 @@ async function handleUserChanges(socket, message) {
attr = wireApool.getAttrib(attr);
if (!attr) return;
// the empty author is used in the clearAuthorship functionality so this should be the only exception
// the empty author is used in the clearAuthorship functionality so this
// should be the only exception
if ('author' === attr[0] && (attr[1] !== thisSession.author && attr[1] !== '')) {
throw new Error(`Author ${thisSession.author} tried to submit changes as author ${attr[1]} in changeset ${changeset}`);
throw new Error(`Author ${thisSession.author} tried to submit changes as author ` +
`${attr[1]} in changeset ${changeset}`);
}
});
}
@ -628,7 +637,7 @@ async function handleUserChanges(socket, message) {
if (baseRev + 1 === r && c === changeset) {
socket.json.send({disconnect: 'badChangeset'});
stats.meter('failedChangesets').mark();
throw new Error("Won't apply USER_CHANGES, because it contains an already accepted changeset");
throw new Error("Won't apply USER_CHANGES, as it contains an already accepted changeset");
}
changeset = Changeset.follow(c, changeset, false, apool);
@ -672,9 +681,9 @@ async function handleUserChanges(socket, message) {
}
stopWatch.end();
}
};
exports.updatePadClients = async function (pad) {
exports.updatePadClients = async (pad) => {
// skip this if no-one is on this pad
const roomSockets = _getRoomSockets(pad.id);
if (roomSockets.length === 0) return;
@ -682,9 +691,12 @@ exports.updatePadClients = async function (pad) {
// since all clients usually get the same set of changesets, store them in local cache
// to remove unnecessary roundtrip to the datalayer
// NB: note below possibly now accommodated via the change to promises/async
// TODO: in REAL world, if we're working without datalayer cache, all requests to revisions will be fired
// BEFORE first result will be landed to our cache object. The solution is to replace parallel processing
// via async.forEach with sequential for() loop. There is no real benefits of running this in parallel,
// TODO: in REAL world, if we're working without datalayer cache,
// all requests to revisions will be fired
// BEFORE first result will be landed to our cache object.
// The solution is to replace parallel processing
// via async.forEach with sequential for() loop. There is no real
// benefits of running this in parallel,
// but benefit of reusing cached revision object is HUGE
const revCache = {};
@ -737,7 +749,7 @@ exports.updatePadClients = async function (pad) {
/**
* Copied from the Etherpad Source Code. Don't know what this method does excatly...
*/
function _correctMarkersInPad(atext, apool) {
const _correctMarkersInPad = (atext, apool) => {
const text = atext.text;
// collect char positions of line markers (e.g. bullets) in new atext
@ -746,9 +758,11 @@ function _correctMarkersInPad(atext, apool) {
const iter = Changeset.opIterator(atext.attribs);
let offset = 0;
while (iter.hasNext()) {
var op = iter.next();
const op = iter.next();
const hasMarker = _.find(AttributeManager.lineAttributes, (attribute) => Changeset.opAttributeValue(op, attribute, apool)) !== undefined;
const hasMarker = _.find(
AttributeManager.lineAttributes,
(attribute) => Changeset.opAttributeValue(op, attribute, apool)) !== undefined;
if (hasMarker) {
for (let i = 0; i < op.chars; i++) {
@ -778,9 +792,9 @@ function _correctMarkersInPad(atext, apool) {
});
return builder.toString();
}
};
async function handleSwitchToPad(socket, message, _authorID) {
const handleSwitchToPad = async (socket, message, _authorID) => {
const currentSessionInfo = sessioninfos[socket.id];
const padId = currentSessionInfo.padId;
@ -816,10 +830,10 @@ async function handleSwitchToPad(socket, message, _authorID) {
const newSessionInfo = sessioninfos[socket.id];
createSessionInfoAuth(newSessionInfo, message);
await handleClientReady(socket, message, authorID);
}
};
// Creates/replaces the auth object in the given session info.
function createSessionInfoAuth(sessionInfo, message) {
const createSessionInfoAuth = (sessionInfo, message) => {
// Remember this information since we won't
// have the cookie in further socket.io messages.
// This information will be used to check if
@ -830,15 +844,16 @@ function createSessionInfoAuth(sessionInfo, message) {
padID: message.padId,
token: message.token,
};
}
};
/**
* Handles a CLIENT_READY. A CLIENT_READY is the first message from the client to the server. The Client sends his token
* Handles a CLIENT_READY. A CLIENT_READY is the first message from the client
* to the server. The Client sends his token
* and the pad it wants to enter. The Server answers with the inital values (clientVars) of the pad
* @param socket the socket.io Socket object for the client
* @param message the message from the client
*/
async function handleClientReady(socket, message, authorID) {
const handleClientReady = async (socket, message, authorID) => {
// check if all ok
if (!message.token) {
messageLogger.warn('Dropped message, CLIENT_READY Message has no token!');
@ -884,9 +899,11 @@ async function handleClientReady(socket, message, authorID) {
const historicalAuthorData = {};
await Promise.all(authors.map((authorId) => authorManager.getAuthor(authorId).then((author) => {
if (!author) {
messageLogger.error('There is no author for authorId: ', authorId, '. This is possibly related to https://github.com/ether/etherpad-lite/issues/2802');
messageLogger.error(`There is no author for authorId: ${authorId}. ` +
'This is possibly related to https://github.com/ether/etherpad-lite/issues/2802');
} else {
historicalAuthorData[authorId] = {name: author.name, colorId: author.colorId}; // Filter author attribs (e.g. don't send author's pads to all clients)
// Filter author attribs (e.g. don't send author's pads to all clients)
historicalAuthorData[authorId] = {name: author.name, colorId: author.colorId};
}
})));
@ -931,7 +948,8 @@ async function handleClientReady(socket, message, authorID) {
// Save the revision in sessioninfos, we take the revision from the info the client send to us
sessionInfo.rev = message.client_rev;
// During the client reconnect, client might miss some revisions from other clients. By using client revision,
// During the client reconnect, client might miss some revisions from other clients.
// By using client revision,
// this below code sends all the revisions missed during the client reconnect
const revisionsNeeded = [];
const changesets = {};
@ -987,12 +1005,13 @@ async function handleClientReady(socket, message, authorID) {
}
} else {
// This is a normal first connect
let atext;
let apool;
// prepare all values for the wire, there's a chance that this throws, if the pad is corrupted
try {
var atext = Changeset.cloneAText(pad.atext);
atext = Changeset.cloneAText(pad.atext);
const attribsForWire = Changeset.prepareForWire(atext.attribs, pad.pool);
var apool = attribsForWire.pool.toJsonable();
apool = attribsForWire.pool.toJsonable();
atext.attribs = attribsForWire.translated;
} catch (e) {
console.error(e.stack || e);
@ -1147,12 +1166,12 @@ async function handleClientReady(socket, message, authorID) {
socket.json.send(msg);
}));
}
}
};
/**
* Handles a request for a rough changeset, the timeslider client needs it
*/
async function handleChangesetRequest(socket, message) {
const handleChangesetRequest = async (socket, message) => {
// check if all ok
if (message.data == null) {
messageLogger.warn('Dropped message, changeset request has no data!');
@ -1197,15 +1216,16 @@ async function handleChangesetRequest(socket, message) {
data.requestID = message.data.requestID;
socket.json.send({type: 'CHANGESET_REQ', data});
} catch (err) {
console.error(`Error while handling a changeset request for ${padIds.padId}`, err.toString(), message.data);
console.error(`Error while handling a changeset request for ${padIds.padId}`,
err.toString(), message.data);
}
}
};
/**
* Tries to rebuild the getChangestInfo function of the original Etherpad
* https://github.com/ether/pad/blob/master/etherpad/src/etherpad/control/pad/pad_changeset_control.js#L144
*/
async function getChangesetInfo(padId, startNum, endNum, granularity) {
const getChangesetInfo = async (padId, startNum, endNum, granularity) => {
const pad = await padManager.getPad(padId);
const head_revision = pad.getHeadRevisionNumber();
@ -1237,15 +1257,25 @@ async function getChangesetInfo(padId, startNum, endNum, granularity) {
// get all needed composite Changesets
const composedChangesets = {};
const p1 = Promise.all(compositesChangesetNeeded.map((item) => composePadChangesets(padId, item.start, item.end).then((changeset) => {
composedChangesets[`${item.start}/${item.end}`] = changeset;
})));
const p1 = Promise.all(
compositesChangesetNeeded.map(
(item) => composePadChangesets(
padId, item.start, item.end
).then(
(changeset) => {
composedChangesets[`${item.start}/${item.end}`] = changeset;
}
)
)
);
// get all needed revision Dates
const revisionDate = [];
const p2 = Promise.all(revTimesNeeded.map((revNum) => pad.getRevisionDate(revNum).then((revDate) => {
revisionDate[revNum] = Math.floor(revDate / 1000);
})));
const p2 = Promise.all(revTimesNeeded.map((revNum) => pad.getRevisionDate(revNum)
.then((revDate) => {
revisionDate[revNum] = Math.floor(revDate / 1000);
})
));
// get the lines
let lines;
@ -1288,13 +1318,13 @@ async function getChangesetInfo(padId, startNum, endNum, granularity) {
return {forwardsChangesets, backwardsChangesets,
apool: apool.toJsonable(), actualEndNum: endNum,
timeDeltas, start: startNum, granularity};
}
};
/**
* Tries to rebuild the getPadLines function of the original Etherpad
* https://github.com/ether/pad/blob/master/etherpad/src/etherpad/control/pad/pad_changeset_control.js#L263
*/
async function getPadLines(padId, revNum) {
const getPadLines = async (padId, revNum) => {
const pad = await padManager.getPad(padId);
// get the atext
@ -1310,13 +1340,13 @@ async function getPadLines(padId, revNum) {
textlines: Changeset.splitTextLines(atext.text),
alines: Changeset.splitAttributionLines(atext.attribs, atext.text),
};
}
};
/**
* Tries to rebuild the composePadChangeset function of the original Etherpad
* https://github.com/ether/pad/blob/master/etherpad/src/etherpad/control/pad/pad_changeset_control.js#L241
*/
async function composePadChangesets(padId, startNum, endNum) {
const composePadChangesets = async (padId, startNum, endNum) => {
const pad = await padManager.getPad(padId);
// fetch all changesets we need
@ -1333,7 +1363,9 @@ async function composePadChangesets(padId, startNum, endNum) {
// get all changesets
const changesets = {};
await Promise.all(changesetsNeeded.map((revNum) => pad.getRevisionChangeset(revNum).then((changeset) => changesets[revNum] = changeset)));
await Promise.all(changesetsNeeded.map(
(revNum) => pad.getRevisionChangeset(revNum).then((changeset) => changesets[revNum] = changeset)
));
// compose Changesets
let r;
@ -1351,9 +1383,9 @@ async function composePadChangesets(padId, startNum, endNum) {
console.warn('failed to compose cs in pad:', padId, ' startrev:', startNum, ' current rev:', r);
throw e;
}
}
};
function _getRoomSockets(padID) {
const _getRoomSockets = (padID) => {
const roomSockets = [];
const room = socketio.sockets.adapter.rooms[padID];
@ -1364,21 +1396,19 @@ function _getRoomSockets(padID) {
}
return roomSockets;
}
};
/**
* Get the number of users in a pad
*/
exports.padUsersCount = function (padID) {
return {
padUsersCount: _getRoomSockets(padID).length,
};
};
exports.padUsersCount = (padID) => ({
padUsersCount: _getRoomSockets(padID).length,
});
/**
* Get the list of users in a pad
*/
exports.padUsers = async function (padID) {
exports.padUsers = async (padID) => {
const padUsers = [];
// iterate over all clients (in parallel)