feat(vela-gateway): add websocket session skeleton

This commit is contained in:
2026-04-08 18:30:21 +02:00
parent 4fd27db11e
commit fa5a458003
8 changed files with 655 additions and 7 deletions

View File

@@ -0,0 +1,328 @@
const assert = require('node:assert/strict');
const crypto = require('node:crypto');
const net = require('node:net');
const test = require('node:test');
const {
CLIENT_EVENT_TYPES,
PROTOCOL_PACKAGE_NAME,
SERVER_EVENT_TYPES
} = require('@vela/protocol');
const { buildServer } = require('../src/index.js');
function encodeMaskedTextFrame(payload) {
const body = Buffer.from(payload);
const mask = crypto.randomBytes(4);
const length = body.length;
let header;
if (length < 126) {
header = Buffer.from([0x81, 0x80 | length]);
} else {
header = Buffer.alloc(4);
header[0] = 0x81;
header[1] = 0x80 | 126;
header.writeUInt16BE(length, 2);
}
const maskedBody = Buffer.from(body);
for (let index = 0; index < maskedBody.length; index += 1) {
maskedBody[index] ^= mask[index % 4];
}
return Buffer.concat([header, mask, maskedBody]);
}
function encodeMaskedCloseFrame() {
return Buffer.from([0x88, 0x80, 0x00, 0x00, 0x00, 0x00]);
}
function decodeServerFrames(buffer) {
const messages = [];
let offset = 0;
while (offset + 2 <= buffer.length) {
const firstByte = buffer[offset];
const secondByte = buffer[offset + 1];
const opcode = firstByte & 0x0f;
let payloadLength = secondByte & 0x7f;
let headerLength = 2;
if ((secondByte & 0x80) !== 0) {
throw new Error('server frames must not be masked');
}
if (payloadLength === 126) {
if (offset + 4 > buffer.length) {
break;
}
payloadLength = buffer.readUInt16BE(offset + 2);
headerLength = 4;
} else if (payloadLength === 127) {
throw new Error('64-bit payloads are not supported in tests');
}
const frameLength = headerLength + payloadLength;
if (offset + frameLength > buffer.length) {
break;
}
const payload = buffer.subarray(offset + headerLength, offset + frameLength);
messages.push({ opcode, payload: payload.toString('utf8') });
offset += frameLength;
}
return {
messages,
remaining: buffer.subarray(offset)
};
}
async function startServer() {
const app = buildServer();
await app.listen({ port: 0, host: '127.0.0.1' });
const address = app.server.address();
return {
app,
port: address.port,
async close() {
await app.close();
}
};
}
function waitForCondition(check, timeout = 1000) {
return new Promise((resolve, reject) => {
const start = Date.now();
const poll = () => {
if (check()) {
resolve();
return;
}
if (Date.now() - start >= timeout) {
reject(new Error('timed out waiting for condition'));
return;
}
setTimeout(poll, 10);
};
poll();
});
}
async function connectWebSocket(port) {
const socket = net.createConnection({ host: '127.0.0.1', port });
let handshakeComplete = false;
let handshakeBuffer = Buffer.alloc(0);
let frameBuffer = Buffer.alloc(0);
const queue = [];
const waiters = [];
const pushMessage = (message) => {
if (waiters.length > 0) {
waiters.shift()(message);
return;
}
queue.push(message);
};
socket.on('data', (chunk) => {
let data = chunk;
if (!handshakeComplete) {
handshakeBuffer = Buffer.concat([handshakeBuffer, chunk]);
const separatorIndex = handshakeBuffer.indexOf('\r\n\r\n');
if (separatorIndex === -1) {
return;
}
handshakeComplete = true;
const response = handshakeBuffer.subarray(0, separatorIndex).toString('utf8');
assert.match(response, /101 Switching Protocols/);
data = handshakeBuffer.subarray(separatorIndex + 4);
handshakeBuffer = Buffer.alloc(0);
}
if (data.length === 0) {
return;
}
frameBuffer = Buffer.concat([frameBuffer, data]);
const decoded = decodeServerFrames(frameBuffer);
frameBuffer = decoded.remaining;
for (const message of decoded.messages) {
if (message.opcode === 0x1) {
pushMessage(JSON.parse(message.payload));
} else {
pushMessage({ opcode: message.opcode, payload: message.payload });
}
}
});
await new Promise((resolve, reject) => {
socket.once('connect', resolve);
socket.once('error', reject);
socket.write(
[
'GET /ws HTTP/1.1',
'Host: 127.0.0.1',
'Upgrade: websocket',
'Connection: Upgrade',
'Sec-WebSocket-Version: 13',
`Sec-WebSocket-Key: ${crypto.randomBytes(16).toString('base64')}`,
'\r\n'
].join('\r\n')
);
});
return {
socket,
nextMessage(timeout = 1000) {
if (queue.length > 0) {
return Promise.resolve(queue.shift());
}
return new Promise((resolve, reject) => {
const timer = setTimeout(() => {
const waiterIndex = waiters.indexOf(onMessage);
if (waiterIndex >= 0) {
waiters.splice(waiterIndex, 1);
}
reject(new Error('timed out waiting for websocket message'));
}, timeout);
const onMessage = (message) => {
clearTimeout(timer);
resolve(message);
};
waiters.push(onMessage);
});
},
sendJson(message) {
socket.write(encodeMaskedTextFrame(JSON.stringify(message)));
},
sendText(payload) {
socket.write(encodeMaskedTextFrame(payload));
},
async close() {
await new Promise((resolve) => {
socket.once('close', resolve);
socket.write(encodeMaskedCloseFrame());
});
}
};
}
test('http routes stay available and preserve the root response contract', async () => {
const server = await startServer();
try {
const rootResponse = await server.app.inject({ method: 'GET', url: '/' });
assert.equal(rootResponse.statusCode, 200);
assert.deepEqual(rootResponse.json(), {
service: 'vela-gateway',
status: 'ok',
transport: 'http',
next: 'websocket session skeleton',
protocol: {
package: PROTOCOL_PACKAGE_NAME,
clientEventCount: CLIENT_EVENT_TYPES.length,
serverEventCount: SERVER_EVENT_TYPES.length
}
});
const healthResponse = await server.app.inject({ method: 'GET', url: '/health' });
assert.equal(healthResponse.statusCode, 200);
assert.deepEqual(healthResponse.json(), { status: 'ok' });
const wsDocResponse = await server.app.inject({ method: 'GET', url: '/ws' });
assert.equal(wsDocResponse.statusCode, 426);
assert.equal(wsDocResponse.json().route, '/ws');
} finally {
await server.close();
}
});
test('websocket connect creates and cleans up an ephemeral session', async () => {
const server = await startServer();
try {
const client = await connectWebSocket(server.port);
const readyMessage = await client.nextMessage();
assert.equal(readyMessage.type, 'session.ready');
assert.equal(typeof readyMessage.payload.sessionId, 'string');
assert.equal(server.app.websocketSessions.size, 1);
const stateMessage = await client.nextMessage();
assert.deepEqual(stateMessage, {
type: 'session.state',
payload: { value: 'idle' }
});
await client.close();
await waitForCondition(() => server.app.websocketSessions.size === 0);
assert.equal(server.app.websocketSessions.size, 0);
} finally {
await server.close();
}
});
test('websocket handles valid and invalid client messages safely', async () => {
const server = await startServer();
try {
const client = await connectWebSocket(server.port);
await client.nextMessage();
await client.nextMessage();
client.sendJson({ type: 'input_audio.append', payload: { chunk: 'Zm9v' } });
assert.deepEqual(await client.nextMessage(), {
type: 'session.state',
payload: { value: 'listening' }
});
client.sendJson({ type: 'input_audio.commit', payload: {} });
assert.deepEqual(await client.nextMessage(), {
type: 'session.state',
payload: { value: 'idle' }
});
client.sendText('{bad json');
assert.deepEqual(await client.nextMessage(), {
type: 'error',
payload: {
code: 'invalid_json',
message: 'Message must be valid JSON.',
retryable: true
}
});
client.sendJson({ type: 'unknown.event', payload: {} });
assert.deepEqual(await client.nextMessage(), {
type: 'error',
payload: {
code: 'invalid_message',
message: 'unsupported client event type: unknown.event',
retryable: true
}
});
assert.equal(server.app.websocketSessions.size, 1);
await client.close();
} finally {
await server.close();
}
});