diff --git a/apps/vela-gateway/README.md b/apps/vela-gateway/README.md index 8941b2a..c1818f0 100644 --- a/apps/vela-gateway/README.md +++ b/apps/vela-gateway/README.md @@ -1,9 +1,10 @@ # vela-gateway -This workspace contains the Vela gateway service as a minimal Fastify app. +This workspace contains the Vela gateway service as a minimal Fastify app with an initial WebSocket session skeleton. Current status: - Fastify server boots in the Yarn workspace - `/` and `/health` endpoints provide a runnable service baseline -- WebSocket session orchestration remains a later increment +- WebSocket session skeleton wiring now exists +- Full WebSocket session orchestration and behavior remain future work diff --git a/apps/vela-gateway/package.json b/apps/vela-gateway/package.json index 1cd96b2..8873714 100644 --- a/apps/vela-gateway/package.json +++ b/apps/vela-gateway/package.json @@ -6,7 +6,8 @@ "scripts": { "dev": "node --watch src/index.js", "start": "node src/index.js", - "build": "node -e \"console.log('vela-gateway: no build step required')\"" + "build": "node -e \"console.log('vela-gateway: no build step required')\"", + "test": "node --test" }, "dependencies": { "@vela/protocol": "0.0.0", diff --git a/apps/vela-gateway/src/index.js b/apps/vela-gateway/src/index.js index c5b8d2d..5fa012a 100644 --- a/apps/vela-gateway/src/index.js +++ b/apps/vela-gateway/src/index.js @@ -1,12 +1,294 @@ const Fastify = require('fastify'); +const crypto = require('node:crypto'); const { CLIENT_EVENT_TYPES, PROTOCOL_PACKAGE_NAME, - SERVER_EVENT_TYPES + SERVER_EVENT_TYPES, + createMessageEnvelope, + isClientEventType, + isMessageEnvelope } = require('@vela/protocol'); +const WEBSOCKET_ROUTE = '/ws'; +const WEBSOCKET_GUID = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'; + +function createSessionRecord() { + return { + id: crypto.randomUUID(), + connectedAt: new Date().toISOString(), + state: 'idle', + audioChunkCount: 0, + started: false + }; +} + +function createWebSocketAcceptValue(key) { + return crypto.createHash('sha1').update(`${key}${WEBSOCKET_GUID}`).digest('base64'); +} + +function encodeWebSocketFrame(payload, { opcode = 0x1 } = {}) { + const body = Buffer.from(payload); + const length = body.length; + + if (length >= 65536) { + throw new Error('payload too large for minimal websocket implementation'); + } + + if (length < 126) { + return Buffer.concat([Buffer.from([0x80 | opcode, length]), body]); + } + + const header = Buffer.alloc(4); + header[0] = 0x80 | opcode; + header[1] = 126; + header.writeUInt16BE(length, 2); + + return Buffer.concat([header, body]); +} + +function decodeWebSocketFrames(buffer) { + const messages = []; + let offset = 0; + + while (offset + 2 <= buffer.length) { + const firstByte = buffer[offset]; + const secondByte = buffer[offset + 1]; + const fin = (firstByte & 0x80) !== 0; + const opcode = firstByte & 0x0f; + const masked = (secondByte & 0x80) !== 0; + let payloadLength = secondByte & 0x7f; + let headerLength = 2; + + if (!fin) { + throw new Error('fragmented websocket frames are not supported'); + } + + if (payloadLength === 126) { + if (offset + 4 > buffer.length) { + break; + } + + payloadLength = buffer.readUInt16BE(offset + 2); + headerLength = 4; + } else if (payloadLength === 127) { + throw new Error('64-bit websocket payloads are not supported'); + } + + if (!masked) { + throw new Error('client websocket frames must be masked'); + } + + const maskOffset = offset + headerLength; + const payloadOffset = maskOffset + 4; + const frameLength = headerLength + 4 + payloadLength; + + if (offset + frameLength > buffer.length) { + break; + } + + const mask = buffer.subarray(maskOffset, payloadOffset); + const payload = Buffer.from(buffer.subarray(payloadOffset, payloadOffset + payloadLength)); + + for (let index = 0; index < payload.length; index += 1) { + payload[index] ^= mask[index % 4]; + } + + messages.push({ opcode, payload }); + offset += frameLength; + } + + return { + messages, + remaining: buffer.subarray(offset) + }; +} + +function sendSocketMessage(socket, type, payload) { + const frame = encodeWebSocketFrame(JSON.stringify(createMessageEnvelope(type, payload))); + socket.write(frame); +} + +function sendSocketError(socket, code, message, retryable = true) { + sendSocketMessage(socket, 'error', { code, message, retryable }); +} + +function updateSessionState(socket, session, nextState) { + if (session.state === nextState) { + return; + } + + session.state = nextState; + sendSocketMessage(socket, 'session.state', { value: session.state }); +} + +function validateClientMessage(message) { + if (!isMessageEnvelope(message)) { + return 'message must match the shared envelope shape'; + } + + if (!isClientEventType(message.type)) { + return `unsupported client event type: ${message.type}`; + } + + if (!message.payload || typeof message.payload !== 'object' || Array.isArray(message.payload)) { + return 'message payload must be an object'; + } + + if (message.type === 'input_audio.append' && typeof message.payload.chunk !== 'string') { + return 'input_audio.append payload.chunk must be a string'; + } + + return null; +} + +function handleClientMessage(socket, session, rawMessage) { + let message; + + try { + message = JSON.parse(rawMessage); + } catch { + sendSocketError(socket, 'invalid_json', 'Message must be valid JSON.'); + return; + } + + const validationError = validateClientMessage(message); + + if (validationError) { + sendSocketError(socket, 'invalid_message', validationError); + return; + } + + switch (message.type) { + case 'session.start': + session.started = true; + sendSocketMessage(socket, 'session.ready', { sessionId: session.id }); + sendSocketMessage(socket, 'session.state', { value: session.state }); + break; + case 'input_audio.append': + session.audioChunkCount += 1; + updateSessionState(socket, session, 'listening'); + break; + case 'input_audio.commit': + session.audioChunkCount = 0; + updateSessionState(socket, session, 'idle'); + break; + case 'response.cancel': + session.audioChunkCount = 0; + updateSessionState(socket, session, 'idle'); + break; + default: + sendSocketError(socket, 'unhandled_message', `No handler is implemented for ${message.type}.`); + } +} + +function registerWebSocketSessionRoute(app) { + app.get(WEBSOCKET_ROUTE, async (_request, reply) => { + reply.code(426); + return { + service: 'vela-gateway', + transport: 'websocket', + route: WEBSOCKET_ROUTE, + protocol: PROTOCOL_PACKAGE_NAME, + messageEnvelope: 'MessageEnvelope<{type, payload}>', + note: 'Connect with a WebSocket client to establish an ephemeral session.' + }; + }); + + app.server.on('upgrade', (request, socket) => { + const origin = `http://${request.headers.host ?? 'localhost'}`; + const { pathname } = new URL(request.url ?? '/', origin); + + if (pathname !== WEBSOCKET_ROUTE) { + socket.destroy(); + return; + } + + if ((request.headers.upgrade ?? '').toLowerCase() !== 'websocket') { + socket.destroy(); + return; + } + + const websocketKey = request.headers['sec-websocket-key']; + + if (typeof websocketKey !== 'string') { + socket.destroy(); + return; + } + + const session = createSessionRecord(); + app.websocketSessions.set(session.id, session); + + socket.write( + [ + 'HTTP/1.1 101 Switching Protocols', + 'Upgrade: websocket', + 'Connection: Upgrade', + `Sec-WebSocket-Accept: ${createWebSocketAcceptValue(websocketKey)}`, + '\r\n' + ].join('\r\n') + ); + + app.log.info({ sessionId: session.id, route: WEBSOCKET_ROUTE }, 'websocket session connected'); + sendSocketMessage(socket, 'session.ready', { sessionId: session.id }); + sendSocketMessage(socket, 'session.state', { value: session.state }); + + let frameBuffer = Buffer.alloc(0); + let closed = false; + + const cleanup = () => { + if (closed) { + return; + } + + closed = true; + app.websocketSessions.delete(session.id); + app.log.info({ sessionId: session.id }, 'websocket session disconnected'); + }; + + socket.on('data', (chunk) => { + frameBuffer = Buffer.concat([frameBuffer, chunk]); + + try { + const decoded = decodeWebSocketFrames(frameBuffer); + frameBuffer = decoded.remaining; + + for (const message of decoded.messages) { + if (message.opcode === 0x8) { + socket.end(encodeWebSocketFrame('', { opcode: 0x8 })); + return; + } + + if (message.opcode === 0x9) { + socket.write(encodeWebSocketFrame(message.payload, { opcode: 0xA })); + continue; + } + + if (message.opcode !== 0x1) { + sendSocketError(socket, 'unsupported_message', 'Only text messages are supported.'); + continue; + } + + handleClientMessage(socket, session, message.payload.toString('utf8')); + } + } catch (error) { + app.log.warn({ err: error, sessionId: session.id }, 'closing malformed websocket session'); + sendSocketError(socket, 'invalid_frame', error.message, false); + socket.end(encodeWebSocketFrame('', { opcode: 0x8 })); + } + }); + + socket.on('end', cleanup); + socket.on('close', cleanup); + socket.on('error', (error) => { + app.log.warn({ err: error, sessionId: session.id }, 'websocket session socket error'); + cleanup(); + }); + }); +} + function buildServer() { const app = Fastify({ logger: true }); + app.decorate('websocketSessions', new Map()); app.get('/', async () => ({ service: 'vela-gateway', @@ -21,6 +303,7 @@ function buildServer() { })); app.get('/health', async () => ({ status: 'ok' })); + registerWebSocketSessionRoute(app); return app; } diff --git a/apps/vela-gateway/test/websocket-session.test.js b/apps/vela-gateway/test/websocket-session.test.js new file mode 100644 index 0000000..8a3a577 --- /dev/null +++ b/apps/vela-gateway/test/websocket-session.test.js @@ -0,0 +1,328 @@ +const assert = require('node:assert/strict'); +const crypto = require('node:crypto'); +const net = require('node:net'); +const test = require('node:test'); + +const { + CLIENT_EVENT_TYPES, + PROTOCOL_PACKAGE_NAME, + SERVER_EVENT_TYPES +} = require('@vela/protocol'); + +const { buildServer } = require('../src/index.js'); + +function encodeMaskedTextFrame(payload) { + const body = Buffer.from(payload); + const mask = crypto.randomBytes(4); + const length = body.length; + let header; + + if (length < 126) { + header = Buffer.from([0x81, 0x80 | length]); + } else { + header = Buffer.alloc(4); + header[0] = 0x81; + header[1] = 0x80 | 126; + header.writeUInt16BE(length, 2); + } + + const maskedBody = Buffer.from(body); + + for (let index = 0; index < maskedBody.length; index += 1) { + maskedBody[index] ^= mask[index % 4]; + } + + return Buffer.concat([header, mask, maskedBody]); +} + +function encodeMaskedCloseFrame() { + return Buffer.from([0x88, 0x80, 0x00, 0x00, 0x00, 0x00]); +} + +function decodeServerFrames(buffer) { + const messages = []; + let offset = 0; + + while (offset + 2 <= buffer.length) { + const firstByte = buffer[offset]; + const secondByte = buffer[offset + 1]; + const opcode = firstByte & 0x0f; + let payloadLength = secondByte & 0x7f; + let headerLength = 2; + + if ((secondByte & 0x80) !== 0) { + throw new Error('server frames must not be masked'); + } + + if (payloadLength === 126) { + if (offset + 4 > buffer.length) { + break; + } + + payloadLength = buffer.readUInt16BE(offset + 2); + headerLength = 4; + } else if (payloadLength === 127) { + throw new Error('64-bit payloads are not supported in tests'); + } + + const frameLength = headerLength + payloadLength; + + if (offset + frameLength > buffer.length) { + break; + } + + const payload = buffer.subarray(offset + headerLength, offset + frameLength); + messages.push({ opcode, payload: payload.toString('utf8') }); + offset += frameLength; + } + + return { + messages, + remaining: buffer.subarray(offset) + }; +} + +async function startServer() { + const app = buildServer(); + await app.listen({ port: 0, host: '127.0.0.1' }); + const address = app.server.address(); + + return { + app, + port: address.port, + async close() { + await app.close(); + } + }; +} + +function waitForCondition(check, timeout = 1000) { + return new Promise((resolve, reject) => { + const start = Date.now(); + + const poll = () => { + if (check()) { + resolve(); + return; + } + + if (Date.now() - start >= timeout) { + reject(new Error('timed out waiting for condition')); + return; + } + + setTimeout(poll, 10); + }; + + poll(); + }); +} + +async function connectWebSocket(port) { + const socket = net.createConnection({ host: '127.0.0.1', port }); + let handshakeComplete = false; + let handshakeBuffer = Buffer.alloc(0); + let frameBuffer = Buffer.alloc(0); + const queue = []; + const waiters = []; + + const pushMessage = (message) => { + if (waiters.length > 0) { + waiters.shift()(message); + return; + } + + queue.push(message); + }; + + socket.on('data', (chunk) => { + let data = chunk; + + if (!handshakeComplete) { + handshakeBuffer = Buffer.concat([handshakeBuffer, chunk]); + const separatorIndex = handshakeBuffer.indexOf('\r\n\r\n'); + + if (separatorIndex === -1) { + return; + } + + handshakeComplete = true; + const response = handshakeBuffer.subarray(0, separatorIndex).toString('utf8'); + assert.match(response, /101 Switching Protocols/); + data = handshakeBuffer.subarray(separatorIndex + 4); + handshakeBuffer = Buffer.alloc(0); + } + + if (data.length === 0) { + return; + } + + frameBuffer = Buffer.concat([frameBuffer, data]); + + const decoded = decodeServerFrames(frameBuffer); + frameBuffer = decoded.remaining; + + for (const message of decoded.messages) { + if (message.opcode === 0x1) { + pushMessage(JSON.parse(message.payload)); + } else { + pushMessage({ opcode: message.opcode, payload: message.payload }); + } + } + }); + + await new Promise((resolve, reject) => { + socket.once('connect', resolve); + socket.once('error', reject); + socket.write( + [ + 'GET /ws HTTP/1.1', + 'Host: 127.0.0.1', + 'Upgrade: websocket', + 'Connection: Upgrade', + 'Sec-WebSocket-Version: 13', + `Sec-WebSocket-Key: ${crypto.randomBytes(16).toString('base64')}`, + '\r\n' + ].join('\r\n') + ); + }); + + return { + socket, + nextMessage(timeout = 1000) { + if (queue.length > 0) { + return Promise.resolve(queue.shift()); + } + + return new Promise((resolve, reject) => { + const timer = setTimeout(() => { + const waiterIndex = waiters.indexOf(onMessage); + if (waiterIndex >= 0) { + waiters.splice(waiterIndex, 1); + } + reject(new Error('timed out waiting for websocket message')); + }, timeout); + + const onMessage = (message) => { + clearTimeout(timer); + resolve(message); + }; + + waiters.push(onMessage); + }); + }, + sendJson(message) { + socket.write(encodeMaskedTextFrame(JSON.stringify(message))); + }, + sendText(payload) { + socket.write(encodeMaskedTextFrame(payload)); + }, + async close() { + await new Promise((resolve) => { + socket.once('close', resolve); + socket.write(encodeMaskedCloseFrame()); + }); + } + }; +} + +test('http routes stay available and preserve the root response contract', async () => { + const server = await startServer(); + + try { + const rootResponse = await server.app.inject({ method: 'GET', url: '/' }); + assert.equal(rootResponse.statusCode, 200); + assert.deepEqual(rootResponse.json(), { + service: 'vela-gateway', + status: 'ok', + transport: 'http', + next: 'websocket session skeleton', + protocol: { + package: PROTOCOL_PACKAGE_NAME, + clientEventCount: CLIENT_EVENT_TYPES.length, + serverEventCount: SERVER_EVENT_TYPES.length + } + }); + + const healthResponse = await server.app.inject({ method: 'GET', url: '/health' }); + assert.equal(healthResponse.statusCode, 200); + assert.deepEqual(healthResponse.json(), { status: 'ok' }); + + const wsDocResponse = await server.app.inject({ method: 'GET', url: '/ws' }); + assert.equal(wsDocResponse.statusCode, 426); + assert.equal(wsDocResponse.json().route, '/ws'); + } finally { + await server.close(); + } +}); + +test('websocket connect creates and cleans up an ephemeral session', async () => { + const server = await startServer(); + + try { + const client = await connectWebSocket(server.port); + const readyMessage = await client.nextMessage(); + assert.equal(readyMessage.type, 'session.ready'); + assert.equal(typeof readyMessage.payload.sessionId, 'string'); + assert.equal(server.app.websocketSessions.size, 1); + + const stateMessage = await client.nextMessage(); + assert.deepEqual(stateMessage, { + type: 'session.state', + payload: { value: 'idle' } + }); + + await client.close(); + await waitForCondition(() => server.app.websocketSessions.size === 0); + assert.equal(server.app.websocketSessions.size, 0); + } finally { + await server.close(); + } +}); + +test('websocket handles valid and invalid client messages safely', async () => { + const server = await startServer(); + + try { + const client = await connectWebSocket(server.port); + await client.nextMessage(); + await client.nextMessage(); + + client.sendJson({ type: 'input_audio.append', payload: { chunk: 'Zm9v' } }); + assert.deepEqual(await client.nextMessage(), { + type: 'session.state', + payload: { value: 'listening' } + }); + + client.sendJson({ type: 'input_audio.commit', payload: {} }); + assert.deepEqual(await client.nextMessage(), { + type: 'session.state', + payload: { value: 'idle' } + }); + + client.sendText('{bad json'); + assert.deepEqual(await client.nextMessage(), { + type: 'error', + payload: { + code: 'invalid_json', + message: 'Message must be valid JSON.', + retryable: true + } + }); + + client.sendJson({ type: 'unknown.event', payload: {} }); + assert.deepEqual(await client.nextMessage(), { + type: 'error', + payload: { + code: 'invalid_message', + message: 'unsupported client event type: unknown.event', + retryable: true + } + }); + + assert.equal(server.app.websocketSessions.size, 1); + await client.close(); + } finally { + await server.close(); + } +}); diff --git a/docs/architecture.md b/docs/architecture.md index ba355e3..f612609 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -65,7 +65,7 @@ The current implementation is a minimal SvelteKit app with a single starter page - Fastify (Node) - WebSocket-based session layer -The current implementation is a minimal Fastify service with `/` and `/health` HTTP endpoints. The WebSocket session layer is a later increment. +The current implementation is a minimal Fastify service with `/`, `/health`, and a documented `/ws` WebSocket session endpoint. The gateway keeps one ephemeral in-memory session record per live socket connection and removes it on disconnect. #### Responsibilities @@ -77,6 +77,14 @@ The current implementation is a minimal Fastify service with `/` and `/health` H - TTS orchestration - event streaming +#### Current WebSocket skeleton + +- `GET /ws` documents the route for plain HTTP clients and returns `426 Upgrade Required` +- WebSocket upgrades on `/ws` create an ephemeral session immediately +- the gateway sends `session.ready` followed by `session.state` (`idle`) when the socket is established +- valid minimal client events can move the session between `idle` and `listening` +- invalid JSON, invalid envelopes, and malformed frames are handled defensively so the process stays up + ## Voice Pipeline ```text diff --git a/docs/backlog.md b/docs/backlog.md index 2881d77..f2282c3 100644 --- a/docs/backlog.md +++ b/docs/backlog.md @@ -34,7 +34,7 @@ Prove the end-to-end interaction model with mocked or stubbed providers. - [x] bootstrap `vela-ui` as a runnable SvelteKit app in the Yarn workspace - [x] bootstrap `vela-gateway` as a runnable Fastify app in the Yarn workspace - create a minimal UI with mic control, state indicator, transcript, and response text -- create a gateway WebSocket session skeleton +- [x] create a gateway WebSocket session skeleton - implement mocked STT flow for partial and final transcript events - implement mocked LLM response streaming - implement stubbed audio playback or placeholder TTS output @@ -180,5 +180,6 @@ Polish the system after the core voice loop is reliable. - `apps/vela-ui` now boots as a minimal SvelteKit app with a starter page - `apps/vela-gateway` now boots as a minimal Fastify app with `/` and `/health` endpoints +- `apps/vela-gateway` now exposes a minimal `/ws` WebSocket session skeleton with ephemeral in-memory sessions and defensive message handling - `apps/vela-protocol` now provides the shared WebSocket event contract for the UI and gateway - backend framework choice is now concrete: Fastify diff --git a/docs/integrations.md b/docs/integrations.md index fd37191..c092447 100644 --- a/docs/integrations.md +++ b/docs/integrations.md @@ -4,7 +4,16 @@ - `vela-ui` is implemented as a SvelteKit application - `vela-gateway` is implemented as a Fastify service -- current integration work beyond the gateway HTTP baseline remains future implementation +- `vela-gateway` now exposes `/ws` as the minimal WebSocket session entrypoint using the shared `@vela/protocol` contract +- current integration work beyond the gateway WebSocket/session baseline remains future implementation + +## Gateway Session Contract + +- transport: WebSocket on `/ws` +- session storage: in-memory only, one ephemeral record per live connection +- message format: `@vela/protocol` `MessageEnvelope<{ type, payload }>` +- current server behavior: acknowledge connect with `session.ready` and `session.state` +- safety baseline: invalid JSON, invalid envelopes, and malformed frames return protocol errors or close that socket without taking down the service ## STT (Speech-to-Text) diff --git a/docs/protocol.md b/docs/protocol.md index d6fce51..fa2f346 100644 --- a/docs/protocol.md +++ b/docs/protocol.md @@ -5,6 +5,12 @@ The shared code-level contract lives in the Yarn workspace package `@vela/protocol` so both the gateway and UI import the same event names and envelope shape. +Current gateway baseline: + +- WebSocket endpoint: `/ws` +- the gateway sends `session.ready` and `session.state` immediately after a successful socket upgrade +- the gateway accepts JSON text messages only in the shared envelope shape + ## WebSocket Message Envelope Every WebSocket message uses one envelope format: @@ -40,6 +46,17 @@ type ClientEvent = - `input_audio.commit` marks the current buffered user turn as ready for downstream processing - `response.cancel` interrupts the active listen/think/speak flow +### Current skeleton behavior + +- on connect, the gateway creates an ephemeral in-memory session and emits `session.ready` plus `session.state` +- `session.start` is accepted as an idempotent session acknowledgment and re-sends readiness/state +- `input_audio.append` updates the ephemeral session record and moves the session to `listening` +- `input_audio.commit` resets the minimal buffered state and returns the session to `idle` +- `response.cancel` resets the minimal session state back to `idle` +- malformed JSON produces `error` with code `invalid_json` +- invalid envelopes or unsupported client event names produce `error` with code `invalid_message` +- malformed WebSocket frames are rejected without crashing the gateway process + ### Server → Client ```ts