437 lines
12 KiB
JavaScript
437 lines
12 KiB
JavaScript
const Fastify = require('fastify');
|
|
const crypto = require('node:crypto');
|
|
const {
|
|
CLIENT_EVENT_TYPES,
|
|
PROTOCOL_PACKAGE_NAME,
|
|
SERVER_EVENT_TYPES,
|
|
createMessageEnvelope,
|
|
isClientEventType,
|
|
isMessageEnvelope
|
|
} = require('@vela/protocol');
|
|
|
|
const WEBSOCKET_ROUTE = '/ws';
|
|
const WEBSOCKET_GUID = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11';
|
|
const MOCKED_ASSISTANT_RESPONSE = '[mocked assistant] This is a deterministic mocked response from the gateway vertical slice.';
|
|
|
|
function createPlaceholderPartialTranscript(audioChunkCount) {
|
|
return audioChunkCount === 1
|
|
? '[mocked partial] Placeholder push-to-talk transcript in progress.'
|
|
: `[mocked partial] Placeholder push-to-talk transcript in progress (${audioChunkCount} chunks).`;
|
|
}
|
|
|
|
function createPlaceholderFinalTranscript(audioChunkCount) {
|
|
if (audioChunkCount === 0) {
|
|
return '[mocked final] Placeholder push-to-talk transcript completed without appended audio.';
|
|
}
|
|
|
|
return audioChunkCount === 1
|
|
? '[mocked final] Placeholder push-to-talk transcript completed from 1 appended chunk.'
|
|
: `[mocked final] Placeholder push-to-talk transcript completed from ${audioChunkCount} appended chunks.`;
|
|
}
|
|
|
|
function createSessionRecord() {
|
|
return {
|
|
id: crypto.randomUUID(),
|
|
connectedAt: new Date().toISOString(),
|
|
state: 'idle',
|
|
audioChunkCount: 0,
|
|
started: false,
|
|
mockedTurnInFlight: false,
|
|
mockedTurnTimers: [],
|
|
activeMockedTurnId: null
|
|
};
|
|
}
|
|
|
|
function clearMockedTurn(session) {
|
|
for (const timer of session.mockedTurnTimers) {
|
|
clearTimeout(timer);
|
|
}
|
|
|
|
session.mockedTurnTimers = [];
|
|
session.mockedTurnInFlight = false;
|
|
session.activeMockedTurnId = null;
|
|
}
|
|
|
|
function scheduleMockedTurnStep(session, turnId, delay, callback) {
|
|
const timer = setTimeout(() => {
|
|
session.mockedTurnTimers = session.mockedTurnTimers.filter((activeTimer) => activeTimer !== timer);
|
|
|
|
if (!session.mockedTurnInFlight || session.activeMockedTurnId !== turnId) {
|
|
return;
|
|
}
|
|
|
|
callback();
|
|
}, delay);
|
|
|
|
session.mockedTurnTimers.push(timer);
|
|
}
|
|
|
|
function startMockedTurn(socket, session, { transcript, includeListeningState = true } = {}) {
|
|
if (session.mockedTurnInFlight) {
|
|
sendSocketError(socket, 'mocked_turn_in_flight', 'Only one mocked turn can run per session at a time.');
|
|
return;
|
|
}
|
|
|
|
clearMockedTurn(session);
|
|
session.mockedTurnInFlight = true;
|
|
const turnId = crypto.randomUUID();
|
|
session.activeMockedTurnId = turnId;
|
|
|
|
if (includeListeningState) {
|
|
updateSessionState(socket, session, 'listening');
|
|
}
|
|
|
|
scheduleMockedTurnStep(session, turnId, 75, () => {
|
|
sendSocketMessage(socket, 'transcript.final', { text: transcript });
|
|
updateSessionState(socket, session, 'thinking');
|
|
});
|
|
|
|
scheduleMockedTurnStep(session, turnId, 150, () => {
|
|
updateSessionState(socket, session, 'speaking');
|
|
sendSocketMessage(socket, 'response.text.delta', { text: '[mocked assistant] ' });
|
|
});
|
|
|
|
scheduleMockedTurnStep(session, turnId, 225, () => {
|
|
sendSocketMessage(socket, 'response.text.delta', { text: MOCKED_ASSISTANT_RESPONSE.replace('[mocked assistant] ', '') });
|
|
});
|
|
|
|
scheduleMockedTurnStep(session, turnId, 300, () => {
|
|
sendSocketMessage(socket, 'response.completed', {});
|
|
clearMockedTurn(session);
|
|
updateSessionState(socket, session, 'idle');
|
|
});
|
|
}
|
|
|
|
function createWebSocketAcceptValue(key) {
|
|
return crypto.createHash('sha1').update(`${key}${WEBSOCKET_GUID}`).digest('base64');
|
|
}
|
|
|
|
function encodeWebSocketFrame(payload, { opcode = 0x1 } = {}) {
|
|
const body = Buffer.from(payload);
|
|
const length = body.length;
|
|
|
|
if (length >= 65536) {
|
|
throw new Error('payload too large for minimal websocket implementation');
|
|
}
|
|
|
|
if (length < 126) {
|
|
return Buffer.concat([Buffer.from([0x80 | opcode, length]), body]);
|
|
}
|
|
|
|
const header = Buffer.alloc(4);
|
|
header[0] = 0x80 | opcode;
|
|
header[1] = 126;
|
|
header.writeUInt16BE(length, 2);
|
|
|
|
return Buffer.concat([header, body]);
|
|
}
|
|
|
|
function decodeWebSocketFrames(buffer) {
|
|
const messages = [];
|
|
let offset = 0;
|
|
|
|
while (offset + 2 <= buffer.length) {
|
|
const firstByte = buffer[offset];
|
|
const secondByte = buffer[offset + 1];
|
|
const fin = (firstByte & 0x80) !== 0;
|
|
const opcode = firstByte & 0x0f;
|
|
const masked = (secondByte & 0x80) !== 0;
|
|
let payloadLength = secondByte & 0x7f;
|
|
let headerLength = 2;
|
|
|
|
if (!fin) {
|
|
throw new Error('fragmented websocket frames are not supported');
|
|
}
|
|
|
|
if (payloadLength === 126) {
|
|
if (offset + 4 > buffer.length) {
|
|
break;
|
|
}
|
|
|
|
payloadLength = buffer.readUInt16BE(offset + 2);
|
|
headerLength = 4;
|
|
} else if (payloadLength === 127) {
|
|
throw new Error('64-bit websocket payloads are not supported');
|
|
}
|
|
|
|
if (!masked) {
|
|
throw new Error('client websocket frames must be masked');
|
|
}
|
|
|
|
const maskOffset = offset + headerLength;
|
|
const payloadOffset = maskOffset + 4;
|
|
const frameLength = headerLength + 4 + payloadLength;
|
|
|
|
if (offset + frameLength > buffer.length) {
|
|
break;
|
|
}
|
|
|
|
const mask = buffer.subarray(maskOffset, payloadOffset);
|
|
const payload = Buffer.from(buffer.subarray(payloadOffset, payloadOffset + payloadLength));
|
|
|
|
for (let index = 0; index < payload.length; index += 1) {
|
|
payload[index] ^= mask[index % 4];
|
|
}
|
|
|
|
messages.push({ opcode, payload });
|
|
offset += frameLength;
|
|
}
|
|
|
|
return {
|
|
messages,
|
|
remaining: buffer.subarray(offset)
|
|
};
|
|
}
|
|
|
|
function sendSocketMessage(socket, type, payload) {
|
|
const frame = encodeWebSocketFrame(JSON.stringify(createMessageEnvelope(type, payload)));
|
|
socket.write(frame);
|
|
}
|
|
|
|
function sendSocketError(socket, code, message, retryable = true) {
|
|
sendSocketMessage(socket, 'error', { code, message, retryable });
|
|
}
|
|
|
|
function updateSessionState(socket, session, nextState) {
|
|
if (session.state === nextState) {
|
|
return;
|
|
}
|
|
|
|
session.state = nextState;
|
|
sendSocketMessage(socket, 'session.state', { value: session.state });
|
|
}
|
|
|
|
function validateClientMessage(message) {
|
|
if (!isMessageEnvelope(message)) {
|
|
return 'message must match the shared envelope shape';
|
|
}
|
|
|
|
if (!isClientEventType(message.type)) {
|
|
return `unsupported client event type: ${message.type}`;
|
|
}
|
|
|
|
if (!message.payload || typeof message.payload !== 'object' || Array.isArray(message.payload)) {
|
|
return 'message payload must be an object';
|
|
}
|
|
|
|
if (message.type === 'input_audio.append' && typeof message.payload.chunk !== 'string') {
|
|
return 'input_audio.append payload.chunk must be a string';
|
|
}
|
|
|
|
return null;
|
|
}
|
|
|
|
function handleClientMessage(socket, session, rawMessage) {
|
|
let message;
|
|
|
|
try {
|
|
message = JSON.parse(rawMessage);
|
|
} catch {
|
|
sendSocketError(socket, 'invalid_json', 'Message must be valid JSON.');
|
|
return;
|
|
}
|
|
|
|
const validationError = validateClientMessage(message);
|
|
|
|
if (validationError) {
|
|
sendSocketError(socket, 'invalid_message', validationError);
|
|
return;
|
|
}
|
|
|
|
switch (message.type) {
|
|
case 'session.start':
|
|
session.started = true;
|
|
sendSocketMessage(socket, 'session.ready', { sessionId: session.id });
|
|
sendSocketMessage(socket, 'session.state', { value: session.state });
|
|
break;
|
|
case 'mocked.turn.trigger':
|
|
sendSocketError(
|
|
socket,
|
|
'unsupported_mocked_turn_trigger',
|
|
'mocked.turn.trigger is no longer supported; use input_audio.append and input_audio.commit instead.'
|
|
);
|
|
break;
|
|
case 'input_audio.append':
|
|
if (session.mockedTurnInFlight) {
|
|
sendSocketError(socket, 'mocked_turn_in_flight', 'Wait for the mocked turn to finish before sending more input.');
|
|
break;
|
|
}
|
|
|
|
session.audioChunkCount += 1;
|
|
updateSessionState(socket, session, 'listening');
|
|
sendSocketMessage(socket, 'transcript.partial', {
|
|
text: createPlaceholderPartialTranscript(session.audioChunkCount)
|
|
});
|
|
break;
|
|
case 'input_audio.commit':
|
|
if (session.mockedTurnInFlight) {
|
|
sendSocketError(socket, 'mocked_turn_in_flight', 'Wait for the mocked turn to finish before committing input.');
|
|
break;
|
|
}
|
|
|
|
const finalTranscript = createPlaceholderFinalTranscript(session.audioChunkCount);
|
|
session.audioChunkCount = 0;
|
|
startMockedTurn(socket, session, {
|
|
transcript: finalTranscript,
|
|
includeListeningState: false
|
|
});
|
|
break;
|
|
case 'response.cancel':
|
|
clearMockedTurn(session);
|
|
session.audioChunkCount = 0;
|
|
updateSessionState(socket, session, 'idle');
|
|
break;
|
|
default:
|
|
sendSocketError(socket, 'unhandled_message', `No handler is implemented for ${message.type}.`);
|
|
}
|
|
}
|
|
|
|
function registerWebSocketSessionRoute(app) {
|
|
app.get(WEBSOCKET_ROUTE, async (_request, reply) => {
|
|
reply.code(426);
|
|
return {
|
|
service: 'vela-gateway',
|
|
transport: 'websocket',
|
|
route: WEBSOCKET_ROUTE,
|
|
protocol: PROTOCOL_PACKAGE_NAME,
|
|
messageEnvelope: 'MessageEnvelope<{type, payload}>',
|
|
note: 'Connect with a WebSocket client to establish an ephemeral session.'
|
|
};
|
|
});
|
|
|
|
app.server.on('upgrade', (request, socket) => {
|
|
const origin = `http://${request.headers.host ?? 'localhost'}`;
|
|
const { pathname } = new URL(request.url ?? '/', origin);
|
|
|
|
if (pathname !== WEBSOCKET_ROUTE) {
|
|
socket.destroy();
|
|
return;
|
|
}
|
|
|
|
if ((request.headers.upgrade ?? '').toLowerCase() !== 'websocket') {
|
|
socket.destroy();
|
|
return;
|
|
}
|
|
|
|
const websocketKey = request.headers['sec-websocket-key'];
|
|
|
|
if (typeof websocketKey !== 'string') {
|
|
socket.destroy();
|
|
return;
|
|
}
|
|
|
|
const session = createSessionRecord();
|
|
app.websocketSessions.set(session.id, session);
|
|
|
|
socket.write(
|
|
[
|
|
'HTTP/1.1 101 Switching Protocols',
|
|
'Upgrade: websocket',
|
|
'Connection: Upgrade',
|
|
`Sec-WebSocket-Accept: ${createWebSocketAcceptValue(websocketKey)}`,
|
|
'\r\n'
|
|
].join('\r\n')
|
|
);
|
|
|
|
app.log.info({ sessionId: session.id, route: WEBSOCKET_ROUTE }, 'websocket session connected');
|
|
sendSocketMessage(socket, 'session.ready', { sessionId: session.id });
|
|
sendSocketMessage(socket, 'session.state', { value: session.state });
|
|
|
|
let frameBuffer = Buffer.alloc(0);
|
|
let closed = false;
|
|
|
|
const cleanup = () => {
|
|
if (closed) {
|
|
return;
|
|
}
|
|
|
|
closed = true;
|
|
clearMockedTurn(session);
|
|
app.websocketSessions.delete(session.id);
|
|
app.log.info({ sessionId: session.id }, 'websocket session disconnected');
|
|
};
|
|
|
|
socket.on('data', (chunk) => {
|
|
frameBuffer = Buffer.concat([frameBuffer, chunk]);
|
|
|
|
try {
|
|
const decoded = decodeWebSocketFrames(frameBuffer);
|
|
frameBuffer = decoded.remaining;
|
|
|
|
for (const message of decoded.messages) {
|
|
if (message.opcode === 0x8) {
|
|
socket.end(encodeWebSocketFrame('', { opcode: 0x8 }));
|
|
return;
|
|
}
|
|
|
|
if (message.opcode === 0x9) {
|
|
socket.write(encodeWebSocketFrame(message.payload, { opcode: 0xA }));
|
|
continue;
|
|
}
|
|
|
|
if (message.opcode !== 0x1) {
|
|
sendSocketError(socket, 'unsupported_message', 'Only text messages are supported.');
|
|
continue;
|
|
}
|
|
|
|
handleClientMessage(socket, session, message.payload.toString('utf8'));
|
|
}
|
|
} catch (error) {
|
|
app.log.warn({ err: error, sessionId: session.id }, 'closing malformed websocket session');
|
|
sendSocketError(socket, 'invalid_frame', error.message, false);
|
|
socket.end(encodeWebSocketFrame('', { opcode: 0x8 }));
|
|
}
|
|
});
|
|
|
|
socket.on('end', cleanup);
|
|
socket.on('close', cleanup);
|
|
socket.on('error', (error) => {
|
|
app.log.warn({ err: error, sessionId: session.id }, 'websocket session socket error');
|
|
cleanup();
|
|
});
|
|
});
|
|
}
|
|
|
|
function buildServer() {
|
|
const app = Fastify({ logger: true });
|
|
app.decorate('websocketSessions', new Map());
|
|
|
|
app.get('/', async () => ({
|
|
service: 'vela-gateway',
|
|
status: 'ok',
|
|
transport: 'http',
|
|
next: 'websocket session skeleton',
|
|
protocol: {
|
|
package: PROTOCOL_PACKAGE_NAME,
|
|
clientEventCount: CLIENT_EVENT_TYPES.length,
|
|
serverEventCount: SERVER_EVENT_TYPES.length
|
|
}
|
|
}));
|
|
|
|
app.get('/health', async () => ({ status: 'ok' }));
|
|
registerWebSocketSessionRoute(app);
|
|
|
|
return app;
|
|
}
|
|
|
|
async function start() {
|
|
const app = buildServer();
|
|
const port = Number(process.env.PORT ?? 3001);
|
|
const host = process.env.HOST ?? '0.0.0.0';
|
|
|
|
try {
|
|
await app.listen({ port, host });
|
|
} catch (error) {
|
|
app.log.error(error);
|
|
process.exit(1);
|
|
}
|
|
}
|
|
|
|
if (require.main === module) {
|
|
start();
|
|
}
|
|
|
|
module.exports = {
|
|
buildServer
|
|
};
|