From bc9cdd695794a4149df5102a1ef483f20c3ba9fb Mon Sep 17 00:00:00 2001 From: Richard Hansen Date: Mon, 6 Sep 2021 05:42:47 -0400 Subject: [PATCH] SocketIORouter: Add acknowledgement support --- src/node/handler/SocketIORouter.js | 13 +++++----- src/tests/backend/specs/socketio.js | 37 +++++++++++++++++++---------- 2 files changed, 31 insertions(+), 19 deletions(-) diff --git a/src/node/handler/SocketIORouter.js b/src/node/handler/SocketIORouter.js index a4bfb41c4..0fa18755e 100644 --- a/src/node/handler/SocketIORouter.js +++ b/src/node/handler/SocketIORouter.js @@ -68,7 +68,7 @@ exports.setSocketIO = (_io) => { components[i].handleConnect(socket); } - socket.on('message', async (message) => { + socket.on('message', (message, ack = () => {}) => { if (message.protocolVersion && message.protocolVersion !== 2) { logger.warn(`Protocolversion header is not correct: ${JSON.stringify(message)}`); return; @@ -78,11 +78,12 @@ exports.setSocketIO = (_io) => { return; } logger.debug(`from ${socket.id}: ${JSON.stringify(message)}`); - try { - await components[message.component].handleMessage(socket, message); - } catch (err) { - logger.error(`Error while handling message from ${socket.id}: ${err.stack || err}`); - } + (async () => await components[message.component].handleMessage(socket, message))().then( + (val) => ack(null, val), + (err) => { + logger.error(`Error while handling message from ${socket.id}: ${err.stack || err}`); + ack({name: err.name, message: err.message}); + }); }); socket.on('disconnect', (reason) => { diff --git a/src/tests/backend/specs/socketio.js b/src/tests/backend/specs/socketio.js index 7a99e0e34..30ef82748 100644 --- a/src/tests/backend/specs/socketio.js +++ b/src/tests/backend/specs/socketio.js @@ -481,23 +481,34 @@ describe(__filename, function () { assert.deepEqual(await got, want); }); - it('handleMessage (error)', async function () { - let receive; - const received = new Promise((resolve) => receive = resolve); + const tx = async (socket, message = {}) => await new Promise((resolve, reject) => { + message = Object.assign({protocolVersion: 2}, message); + const AckErr = class extends Error { + constructor(name, ...args) { super(...args); this.name = name; } + }; + socket.send(message, + (errj, val) => errj != null ? reject(new AckErr(errj.name, errj.message)) : resolve(val)); + }); + + it('handleMessage with ack (success)', async function () { + const want = 'value'; socketIoRouter.addComponent(this.test.fullTitle(), new class extends Module { - handleMessage(socket, message) { - if (message.throw) throw new Error('injected'); - receive(); - } + handleMessage(socket, msg) { return want; } }()); socket = await connect(); - const tx = (msg = {}) => { - msg = Object.assign({component: this.test.fullTitle(), protocolVersion: 2}, msg); - socket.send(msg); + const got = await tx(socket, {component: this.test.fullTitle()}); + assert.equal(got, want); + }); + + it('handleMessage with ack (error)', async function () { + const InjectedError = class extends Error { + constructor() { super('injected test error'); this.name = 'InjectedError'; } }; - tx({throw: true}); - tx(); - await received; + socketIoRouter.addComponent(this.test.fullTitle(), new class extends Module { + handleMessage(socket, msg) { throw new InjectedError(); } + }()); + socket = await connect(); + await assert.rejects(tx(socket, {component: this.test.fullTitle()}), new InjectedError()); }); }); });