SocketIORouter: Add acknowledgement support

pull/5171/head
Richard Hansen 2021-09-06 05:42:47 -04:00
parent 9f9adb369b
commit bc9cdd6957
2 changed files with 31 additions and 19 deletions

View File

@ -68,7 +68,7 @@ exports.setSocketIO = (_io) => {
components[i].handleConnect(socket); components[i].handleConnect(socket);
} }
socket.on('message', async (message) => { socket.on('message', (message, ack = () => {}) => {
if (message.protocolVersion && message.protocolVersion !== 2) { if (message.protocolVersion && message.protocolVersion !== 2) {
logger.warn(`Protocolversion header is not correct: ${JSON.stringify(message)}`); logger.warn(`Protocolversion header is not correct: ${JSON.stringify(message)}`);
return; return;
@ -78,11 +78,12 @@ exports.setSocketIO = (_io) => {
return; return;
} }
logger.debug(`from ${socket.id}: ${JSON.stringify(message)}`); logger.debug(`from ${socket.id}: ${JSON.stringify(message)}`);
try { (async () => await components[message.component].handleMessage(socket, message))().then(
await components[message.component].handleMessage(socket, message); (val) => ack(null, val),
} catch (err) { (err) => {
logger.error(`Error while handling message from ${socket.id}: ${err.stack || err}`); logger.error(`Error while handling message from ${socket.id}: ${err.stack || err}`);
} ack({name: err.name, message: err.message});
});
}); });
socket.on('disconnect', (reason) => { socket.on('disconnect', (reason) => {

View File

@ -481,23 +481,34 @@ describe(__filename, function () {
assert.deepEqual(await got, want); assert.deepEqual(await got, want);
}); });
it('handleMessage (error)', async function () { const tx = async (socket, message = {}) => await new Promise((resolve, reject) => {
let receive; message = Object.assign({protocolVersion: 2}, message);
const received = new Promise((resolve) => receive = resolve); const AckErr = class extends Error {
constructor(name, ...args) { super(...args); this.name = name; }
};
socket.send(message,
(errj, val) => errj != null ? reject(new AckErr(errj.name, errj.message)) : resolve(val));
});
it('handleMessage with ack (success)', async function () {
const want = 'value';
socketIoRouter.addComponent(this.test.fullTitle(), new class extends Module { socketIoRouter.addComponent(this.test.fullTitle(), new class extends Module {
handleMessage(socket, message) { handleMessage(socket, msg) { return want; }
if (message.throw) throw new Error('injected');
receive();
}
}()); }());
socket = await connect(); socket = await connect();
const tx = (msg = {}) => { const got = await tx(socket, {component: this.test.fullTitle()});
msg = Object.assign({component: this.test.fullTitle(), protocolVersion: 2}, msg); assert.equal(got, want);
socket.send(msg); });
it('handleMessage with ack (error)', async function () {
const InjectedError = class extends Error {
constructor() { super('injected test error'); this.name = 'InjectedError'; }
}; };
tx({throw: true}); socketIoRouter.addComponent(this.test.fullTitle(), new class extends Module {
tx(); handleMessage(socket, msg) { throw new InjectedError(); }
await received; }());
socket = await connect();
await assert.rejects(tx(socket, {component: this.test.fullTitle()}), new InjectedError());
}); });
}); });
}); });