636 lines
18 KiB
JavaScript
636 lines
18 KiB
JavaScript
const assert = require('node:assert/strict');
|
|
const crypto = require('node:crypto');
|
|
const net = require('node:net');
|
|
const test = require('node:test');
|
|
|
|
const {
|
|
CLIENT_EVENT_TYPES,
|
|
PROTOCOL_PACKAGE_NAME,
|
|
SERVER_EVENT_TYPES
|
|
} = require('@vela/protocol');
|
|
|
|
const { buildServer } = require('../src/index.js');
|
|
|
|
function encodeMaskedTextFrame(payload) {
|
|
const body = Buffer.from(payload);
|
|
const mask = crypto.randomBytes(4);
|
|
const length = body.length;
|
|
let header;
|
|
|
|
if (length < 126) {
|
|
header = Buffer.from([0x81, 0x80 | length]);
|
|
} else {
|
|
header = Buffer.alloc(4);
|
|
header[0] = 0x81;
|
|
header[1] = 0x80 | 126;
|
|
header.writeUInt16BE(length, 2);
|
|
}
|
|
|
|
const maskedBody = Buffer.from(body);
|
|
|
|
for (let index = 0; index < maskedBody.length; index += 1) {
|
|
maskedBody[index] ^= mask[index % 4];
|
|
}
|
|
|
|
return Buffer.concat([header, mask, maskedBody]);
|
|
}
|
|
|
|
function encodeMaskedCloseFrame() {
|
|
return Buffer.from([0x88, 0x80, 0x00, 0x00, 0x00, 0x00]);
|
|
}
|
|
|
|
function decodeServerFrames(buffer) {
|
|
const messages = [];
|
|
let offset = 0;
|
|
|
|
while (offset + 2 <= buffer.length) {
|
|
const firstByte = buffer[offset];
|
|
const secondByte = buffer[offset + 1];
|
|
const opcode = firstByte & 0x0f;
|
|
let payloadLength = secondByte & 0x7f;
|
|
let headerLength = 2;
|
|
|
|
if ((secondByte & 0x80) !== 0) {
|
|
throw new Error('server frames must not be masked');
|
|
}
|
|
|
|
if (payloadLength === 126) {
|
|
if (offset + 4 > buffer.length) {
|
|
break;
|
|
}
|
|
|
|
payloadLength = buffer.readUInt16BE(offset + 2);
|
|
headerLength = 4;
|
|
} else if (payloadLength === 127) {
|
|
throw new Error('64-bit payloads are not supported in tests');
|
|
}
|
|
|
|
const frameLength = headerLength + payloadLength;
|
|
|
|
if (offset + frameLength > buffer.length) {
|
|
break;
|
|
}
|
|
|
|
const payload = buffer.subarray(offset + headerLength, offset + frameLength);
|
|
messages.push({ opcode, payload: payload.toString('utf8') });
|
|
offset += frameLength;
|
|
}
|
|
|
|
return {
|
|
messages,
|
|
remaining: buffer.subarray(offset)
|
|
};
|
|
}
|
|
|
|
async function startServer() {
|
|
const app = buildServer();
|
|
await app.listen({ port: 0, host: '127.0.0.1' });
|
|
const address = app.server.address();
|
|
|
|
return {
|
|
app,
|
|
port: address.port,
|
|
async close() {
|
|
await app.close();
|
|
}
|
|
};
|
|
}
|
|
|
|
function waitForCondition(check, timeout = 1000) {
|
|
return new Promise((resolve, reject) => {
|
|
const start = Date.now();
|
|
|
|
const poll = () => {
|
|
if (check()) {
|
|
resolve();
|
|
return;
|
|
}
|
|
|
|
if (Date.now() - start >= timeout) {
|
|
reject(new Error('timed out waiting for condition'));
|
|
return;
|
|
}
|
|
|
|
setTimeout(poll, 10);
|
|
};
|
|
|
|
poll();
|
|
});
|
|
}
|
|
|
|
async function connectWebSocket(port) {
|
|
const socket = net.createConnection({ host: '127.0.0.1', port });
|
|
let handshakeComplete = false;
|
|
let handshakeBuffer = Buffer.alloc(0);
|
|
let frameBuffer = Buffer.alloc(0);
|
|
const queue = [];
|
|
const waiters = [];
|
|
|
|
const pushMessage = (message) => {
|
|
if (waiters.length > 0) {
|
|
waiters.shift()(message);
|
|
return;
|
|
}
|
|
|
|
queue.push(message);
|
|
};
|
|
|
|
socket.on('data', (chunk) => {
|
|
let data = chunk;
|
|
|
|
if (!handshakeComplete) {
|
|
handshakeBuffer = Buffer.concat([handshakeBuffer, chunk]);
|
|
const separatorIndex = handshakeBuffer.indexOf('\r\n\r\n');
|
|
|
|
if (separatorIndex === -1) {
|
|
return;
|
|
}
|
|
|
|
handshakeComplete = true;
|
|
const response = handshakeBuffer.subarray(0, separatorIndex).toString('utf8');
|
|
assert.match(response, /101 Switching Protocols/);
|
|
data = handshakeBuffer.subarray(separatorIndex + 4);
|
|
handshakeBuffer = Buffer.alloc(0);
|
|
}
|
|
|
|
if (data.length === 0) {
|
|
return;
|
|
}
|
|
|
|
frameBuffer = Buffer.concat([frameBuffer, data]);
|
|
|
|
const decoded = decodeServerFrames(frameBuffer);
|
|
frameBuffer = decoded.remaining;
|
|
|
|
for (const message of decoded.messages) {
|
|
if (message.opcode === 0x1) {
|
|
pushMessage(JSON.parse(message.payload));
|
|
} else {
|
|
pushMessage({ opcode: message.opcode, payload: message.payload });
|
|
}
|
|
}
|
|
});
|
|
|
|
await new Promise((resolve, reject) => {
|
|
socket.once('connect', resolve);
|
|
socket.once('error', reject);
|
|
socket.write(
|
|
[
|
|
'GET /ws HTTP/1.1',
|
|
'Host: 127.0.0.1',
|
|
'Upgrade: websocket',
|
|
'Connection: Upgrade',
|
|
'Sec-WebSocket-Version: 13',
|
|
`Sec-WebSocket-Key: ${crypto.randomBytes(16).toString('base64')}`,
|
|
'\r\n'
|
|
].join('\r\n')
|
|
);
|
|
});
|
|
|
|
return {
|
|
socket,
|
|
nextMessage(timeout = 1000) {
|
|
if (queue.length > 0) {
|
|
return Promise.resolve(queue.shift());
|
|
}
|
|
|
|
return new Promise((resolve, reject) => {
|
|
const timer = setTimeout(() => {
|
|
const waiterIndex = waiters.indexOf(onMessage);
|
|
if (waiterIndex >= 0) {
|
|
waiters.splice(waiterIndex, 1);
|
|
}
|
|
reject(new Error('timed out waiting for websocket message'));
|
|
}, timeout);
|
|
|
|
const onMessage = (message) => {
|
|
clearTimeout(timer);
|
|
resolve(message);
|
|
};
|
|
|
|
waiters.push(onMessage);
|
|
});
|
|
},
|
|
sendJson(message) {
|
|
socket.write(encodeMaskedTextFrame(JSON.stringify(message)));
|
|
},
|
|
sendText(payload) {
|
|
socket.write(encodeMaskedTextFrame(payload));
|
|
},
|
|
async close() {
|
|
await new Promise((resolve) => {
|
|
socket.once('close', resolve);
|
|
socket.write(encodeMaskedCloseFrame());
|
|
});
|
|
}
|
|
};
|
|
}
|
|
|
|
test('http routes stay available and preserve the root response contract', async () => {
|
|
const server = await startServer();
|
|
|
|
try {
|
|
const rootResponse = await server.app.inject({ method: 'GET', url: '/' });
|
|
assert.equal(rootResponse.statusCode, 200);
|
|
assert.deepEqual(rootResponse.json(), {
|
|
service: 'vela-gateway',
|
|
status: 'ok',
|
|
transport: 'http',
|
|
next: 'websocket session skeleton',
|
|
protocol: {
|
|
package: PROTOCOL_PACKAGE_NAME,
|
|
clientEventCount: CLIENT_EVENT_TYPES.length,
|
|
serverEventCount: SERVER_EVENT_TYPES.length
|
|
}
|
|
});
|
|
|
|
const healthResponse = await server.app.inject({ method: 'GET', url: '/health' });
|
|
assert.equal(healthResponse.statusCode, 200);
|
|
assert.deepEqual(healthResponse.json(), { status: 'ok' });
|
|
|
|
const wsDocResponse = await server.app.inject({ method: 'GET', url: '/ws' });
|
|
assert.equal(wsDocResponse.statusCode, 426);
|
|
assert.equal(wsDocResponse.json().route, '/ws');
|
|
} finally {
|
|
await server.close();
|
|
}
|
|
});
|
|
|
|
test('shared protocol only exposes currently supported client event types', () => {
|
|
assert.deepEqual(CLIENT_EVENT_TYPES, [
|
|
'session.start',
|
|
'input_audio.append',
|
|
'input_audio.commit',
|
|
'response.cancel'
|
|
]);
|
|
});
|
|
|
|
test('websocket connect creates and cleans up an ephemeral session', async () => {
|
|
const server = await startServer();
|
|
|
|
try {
|
|
const client = await connectWebSocket(server.port);
|
|
const readyMessage = await client.nextMessage();
|
|
assert.equal(readyMessage.type, 'session.ready');
|
|
assert.equal(typeof readyMessage.payload.sessionId, 'string');
|
|
assert.equal(server.app.websocketSessions.size, 1);
|
|
|
|
const stateMessage = await client.nextMessage();
|
|
assert.deepEqual(stateMessage, {
|
|
type: 'session.state',
|
|
payload: { value: 'idle' }
|
|
});
|
|
|
|
await client.close();
|
|
await waitForCondition(() => server.app.websocketSessions.size === 0);
|
|
assert.equal(server.app.websocketSessions.size, 0);
|
|
} finally {
|
|
await server.close();
|
|
}
|
|
});
|
|
|
|
test('websocket handles valid and invalid client messages safely', async () => {
|
|
const server = await startServer();
|
|
|
|
try {
|
|
const client = await connectWebSocket(server.port);
|
|
await client.nextMessage();
|
|
await client.nextMessage();
|
|
|
|
client.sendJson({ type: 'input_audio.append', payload: { chunk: 'Zm9v' } });
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'session.state',
|
|
payload: { value: 'listening' }
|
|
});
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'transcript.partial',
|
|
payload: { text: '[mocked partial] Placeholder push-to-talk transcript in progress.' }
|
|
});
|
|
|
|
client.sendJson({ type: 'input_audio.commit', payload: {} });
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'transcript.final',
|
|
payload: { text: '[mocked final] Placeholder push-to-talk transcript completed from 1 appended chunk.' }
|
|
});
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'session.state',
|
|
payload: { value: 'thinking' }
|
|
});
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'session.state',
|
|
payload: { value: 'speaking' }
|
|
});
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'response.text.delta',
|
|
payload: { text: '[mocked assistant] ' }
|
|
});
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'response.text.delta',
|
|
payload: { text: 'This is a deterministic mocked response from the gateway vertical slice.' }
|
|
});
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'response.completed',
|
|
payload: {}
|
|
});
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'session.state',
|
|
payload: { value: 'idle' }
|
|
});
|
|
|
|
client.sendText('{bad json');
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'error',
|
|
payload: {
|
|
code: 'invalid_json',
|
|
message: 'Message must be valid JSON.',
|
|
retryable: true
|
|
}
|
|
});
|
|
|
|
client.sendJson({ type: 'unknown.event', payload: {} });
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'error',
|
|
payload: {
|
|
code: 'invalid_message',
|
|
message: 'unsupported client event type: unknown.event',
|
|
retryable: true
|
|
}
|
|
});
|
|
|
|
assert.equal(server.app.websocketSessions.size, 1);
|
|
await client.close();
|
|
} finally {
|
|
await server.close();
|
|
}
|
|
});
|
|
|
|
test('websocket accepts repeated placeholder input cycles on the same socket', async () => {
|
|
const server = await startServer();
|
|
|
|
try {
|
|
const client = await connectWebSocket(server.port);
|
|
await client.nextMessage();
|
|
await client.nextMessage();
|
|
|
|
client.sendJson({ type: 'input_audio.append', payload: { chunk: 'placeholder-control-shell-chunk' } });
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'session.state',
|
|
payload: { value: 'listening' }
|
|
});
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'transcript.partial',
|
|
payload: { text: '[mocked partial] Placeholder push-to-talk transcript in progress.' }
|
|
});
|
|
|
|
client.sendJson({ type: 'input_audio.commit', payload: {} });
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'transcript.final',
|
|
payload: { text: '[mocked final] Placeholder push-to-talk transcript completed from 1 appended chunk.' }
|
|
});
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'session.state',
|
|
payload: { value: 'thinking' }
|
|
});
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'session.state',
|
|
payload: { value: 'speaking' }
|
|
});
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'response.text.delta',
|
|
payload: { text: '[mocked assistant] ' }
|
|
});
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'response.text.delta',
|
|
payload: { text: 'This is a deterministic mocked response from the gateway vertical slice.' }
|
|
});
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'response.completed',
|
|
payload: {}
|
|
});
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'session.state',
|
|
payload: { value: 'idle' }
|
|
});
|
|
|
|
client.sendJson({ type: 'input_audio.append', payload: { chunk: 'placeholder-control-shell-chunk-2' } });
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'session.state',
|
|
payload: { value: 'listening' }
|
|
});
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'transcript.partial',
|
|
payload: { text: '[mocked partial] Placeholder push-to-talk transcript in progress.' }
|
|
});
|
|
|
|
await client.close();
|
|
} finally {
|
|
await server.close();
|
|
}
|
|
});
|
|
|
|
test('websocket emits deterministic partials for repeated appends and a deterministic final for commit without append', async () => {
|
|
const server = await startServer();
|
|
|
|
try {
|
|
const client = await connectWebSocket(server.port);
|
|
await client.nextMessage();
|
|
await client.nextMessage();
|
|
|
|
client.sendJson({ type: 'input_audio.append', payload: { chunk: 'chunk-1' } });
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'session.state',
|
|
payload: { value: 'listening' }
|
|
});
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'transcript.partial',
|
|
payload: { text: '[mocked partial] Placeholder push-to-talk transcript in progress.' }
|
|
});
|
|
|
|
client.sendJson({ type: 'input_audio.append', payload: { chunk: 'chunk-2' } });
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'transcript.partial',
|
|
payload: { text: '[mocked partial] Placeholder push-to-talk transcript in progress (2 chunks).' }
|
|
});
|
|
|
|
client.sendJson({ type: 'input_audio.commit', payload: {} });
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'transcript.final',
|
|
payload: { text: '[mocked final] Placeholder push-to-talk transcript completed from 2 appended chunks.' }
|
|
});
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'session.state',
|
|
payload: { value: 'thinking' }
|
|
});
|
|
|
|
client.sendJson({ type: 'input_audio.commit', payload: {} });
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'error',
|
|
payload: {
|
|
code: 'mocked_turn_in_flight',
|
|
message: 'Wait for the mocked turn to finish before committing input.',
|
|
retryable: true
|
|
}
|
|
});
|
|
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'session.state',
|
|
payload: { value: 'speaking' }
|
|
});
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'response.text.delta',
|
|
payload: { text: '[mocked assistant] ' }
|
|
});
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'response.text.delta',
|
|
payload: { text: 'This is a deterministic mocked response from the gateway vertical slice.' }
|
|
});
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'response.completed',
|
|
payload: {}
|
|
});
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'session.state',
|
|
payload: { value: 'idle' }
|
|
});
|
|
|
|
client.sendJson({ type: 'input_audio.commit', payload: {} });
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'transcript.final',
|
|
payload: { text: '[mocked final] Placeholder push-to-talk transcript completed without appended audio.' }
|
|
});
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'session.state',
|
|
payload: { value: 'thinking' }
|
|
});
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'session.state',
|
|
payload: { value: 'speaking' }
|
|
});
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'response.text.delta',
|
|
payload: { text: '[mocked assistant] ' }
|
|
});
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'response.text.delta',
|
|
payload: { text: 'This is a deterministic mocked response from the gateway vertical slice.' }
|
|
});
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'response.completed',
|
|
payload: {}
|
|
});
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'session.state',
|
|
payload: { value: 'idle' }
|
|
});
|
|
|
|
await client.close();
|
|
} finally {
|
|
await server.close();
|
|
}
|
|
});
|
|
|
|
test('websocket rejects the retired mocked.turn.trigger path deterministically', async () => {
|
|
const server = await startServer();
|
|
|
|
try {
|
|
const client = await connectWebSocket(server.port);
|
|
await client.nextMessage();
|
|
await client.nextMessage();
|
|
|
|
client.sendJson({ type: 'mocked.turn.trigger', payload: {} });
|
|
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'error',
|
|
payload: {
|
|
code: 'unsupported_mocked_turn_trigger',
|
|
message:
|
|
'mocked.turn.trigger is no longer supported; use input_audio.append and input_audio.commit instead.',
|
|
retryable: true
|
|
}
|
|
});
|
|
await assert.rejects(() => client.nextMessage(150), /timed out waiting for websocket message/);
|
|
|
|
await client.close();
|
|
} finally {
|
|
await server.close();
|
|
}
|
|
});
|
|
|
|
test('websocket cancel stops a push-to-talk commit response and allows another turn', async () => {
|
|
const server = await startServer();
|
|
|
|
try {
|
|
const client = await connectWebSocket(server.port);
|
|
await client.nextMessage();
|
|
await client.nextMessage();
|
|
|
|
client.sendJson({ type: 'input_audio.append', payload: { chunk: 'chunk-1' } });
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'session.state',
|
|
payload: { value: 'listening' }
|
|
});
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'transcript.partial',
|
|
payload: { text: '[mocked partial] Placeholder push-to-talk transcript in progress.' }
|
|
});
|
|
|
|
client.sendJson({ type: 'input_audio.commit', payload: {} });
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'transcript.final',
|
|
payload: { text: '[mocked final] Placeholder push-to-talk transcript completed from 1 appended chunk.' }
|
|
});
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'session.state',
|
|
payload: { value: 'thinking' }
|
|
});
|
|
|
|
client.sendJson({ type: 'response.cancel', payload: {} });
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'session.state',
|
|
payload: { value: 'idle' }
|
|
});
|
|
await assert.rejects(() => client.nextMessage(150), /timed out waiting for websocket message/);
|
|
|
|
client.sendJson({ type: 'input_audio.append', payload: { chunk: 'chunk-2' } });
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'session.state',
|
|
payload: { value: 'listening' }
|
|
});
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'transcript.partial',
|
|
payload: { text: '[mocked partial] Placeholder push-to-talk transcript in progress.' }
|
|
});
|
|
|
|
await client.close();
|
|
} finally {
|
|
await server.close();
|
|
}
|
|
});
|
|
|
|
test('websocket safely accepts cancel when no turn is active', async () => {
|
|
const server = await startServer();
|
|
|
|
try {
|
|
const client = await connectWebSocket(server.port);
|
|
await client.nextMessage();
|
|
await client.nextMessage();
|
|
|
|
client.sendJson({ type: 'response.cancel', payload: {} });
|
|
await assert.rejects(() => client.nextMessage(150), /timed out waiting for websocket message/);
|
|
|
|
client.sendJson({ type: 'input_audio.append', payload: { chunk: 'chunk-1' } });
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'session.state',
|
|
payload: { value: 'listening' }
|
|
});
|
|
assert.deepEqual(await client.nextMessage(), {
|
|
type: 'transcript.partial',
|
|
payload: { text: '[mocked partial] Placeholder push-to-talk transcript in progress.' }
|
|
});
|
|
|
|
await client.close();
|
|
} finally {
|
|
await server.close();
|
|
}
|
|
});
|