import { IncomingMessage } from 'http'; import crypto from 'crypto'; import type WebSocket from 'ws'; import jwt from 'jsonwebtoken'; import { UserRole } from '@prisma/client'; import * as Y from 'yjs'; import { Hocuspocus } from '@hocuspocus/server'; import type { Extension } from '@hocuspocus/server'; import { env } from '../../config/env'; import { prisma } from '../../config/database'; import { redis } from '../../config/redis'; import { logger } from '../../utils/logger'; import { CONTENT_ROLES } from '../../utils/roles'; import { docsFilesService } from './docs-files.service'; // --- Metrics --- import { Gauge } from 'prom-client'; const collabConnections = new Gauge({ name: 'cm_docs_collab_connections', help: 'Number of active docs collaboration WebSocket connections', }); const collabDocuments = new Gauge({ name: 'cm_docs_collab_documents', help: 'Number of active collaboratively-edited documents', }); // --- Connection tracking --- const connectionsPerUser = new Map(); const MAX_CONNECTIONS_PER_USER = 5; const MAX_CONCURRENT_DOCUMENTS = 50; const MAX_DOC_SIZE_BYTES = 5 * 1024 * 1024; // 5MB // --- JWT token payload --- interface TokenPayload { id: string; email: string; role: UserRole; roles?: UserRole[]; } // --- Deterministic color from user ID --- const COLLAB_COLORS = [ '#FF6B6B', '#4ECDC4', '#45B7D1', '#96CEB4', '#FFEAA7', '#DDA0DD', '#98D8C8', '#F7DC6F', '#BB8FCE', '#85C1E9', '#F0B27A', '#82E0AA', '#F1948A', '#AED6F1', '#D7BDE2', ]; function getUserColor(userId: string): string { let hash = 0; for (let i = 0; i < userId.length; i++) { hash = ((hash << 5) - hash + userId.charCodeAt(i)) | 0; } return COLLAB_COLORS[Math.abs(hash) % COLLAB_COLORS.length]; } // --- Redis cache key helper --- function fileCacheKey(relativePath: string): string { const hash = crypto.createHash('sha256').update(relativePath).digest('hex').substring(0, 16); return `DOCS_CACHE:file:${hash}`; } // --- Hocuspocus extension with hooks --- const docsExtension: Extension = { priority: 1, async onAuthenticate(data) { const { token, documentName } = data; if (!token) { throw new Error('Authentication required'); } // Verify JWT let payload: TokenPayload; try { payload = jwt.verify(token, env.JWT_ACCESS_SECRET, { algorithms: ['HS256'] }) as TokenPayload; } catch { throw new Error('Invalid or expired token'); } const roles = payload.roles || [payload.role]; // Check CONTENT_ROLES for write access const hasWriteAccess = roles.some(r => (CONTENT_ROLES as string[]).includes(r)); if (!hasWriteAccess) { // Allow read-only for any authenticated non-TEMP user if (roles.includes(UserRole.TEMP)) { throw new Error('TEMP users cannot access collaboration'); } data.connectionConfig.readOnly = true; } // Validate document path (prevent path traversal) try { docsFilesService.safeResolve(documentName); } catch { throw new Error('Invalid document path'); } // Rate limit: max connections per user const currentCount = connectionsPerUser.get(payload.id) || 0; if (currentCount >= MAX_CONNECTIONS_PER_USER) { throw new Error('Too many concurrent connections'); } // Rate limit: max concurrent documents if (hocuspocus.getDocumentsCount() >= MAX_CONCURRENT_DOCUMENTS) { // Only block if this is a NEW document (not joining existing) if (!hocuspocus.documents.has(documentName)) { throw new Error('Too many concurrent documents'); } } // Track connection connectionsPerUser.set(payload.id, currentCount + 1); // Look up user name from DB let userName = payload.email.split('@')[0]; try { const user = await prisma.user.findUnique({ where: { id: payload.id }, select: { name: true }, }); if (user?.name) userName = user.name; } catch { // Fall back to email prefix } // Set context for use in other hooks data.context.user = { id: payload.id, email: payload.email, name: userName, color: getUserColor(payload.id), roles, }; logger.info(`Docs collab: ${userName} connected to ${documentName}`); }, async onLoadDocument(data) { const { document, documentName } = data; // Try loading persisted Y.Doc state from DB try { const stored = await prisma.docCollabState.findUnique({ where: { documentId: documentName }, }); if (stored) { const stateVector = new Uint8Array(stored.state); Y.applyUpdate(document, stateVector); logger.debug(`Docs collab: loaded persisted state for ${documentName} (${stateVector.length} bytes)`); return; } } catch (err) { logger.warn(`Docs collab: failed to load persisted state for ${documentName}`, err); } // No persisted state — seed from disk try { const content = await docsFilesService.readFileContent(documentName); const yText = document.getText('content'); // Only seed if the Y.Text is empty (first load) if (yText.length === 0) { yText.insert(0, content); } logger.debug(`Docs collab: seeded from disk for ${documentName} (${content.length} chars)`); } catch (err) { logger.warn(`Docs collab: failed to read file from disk for ${documentName}`, err); // File might not exist yet — that's OK, start with empty doc } }, async onChange(data) { const { document, documentName } = data; // Size guard const yText = document.getText('content'); const textLength = yText.length; if (textLength > MAX_DOC_SIZE_BYTES) { logger.warn(`Docs collab: document ${documentName} exceeds size limit (${textLength} chars)`); return; } // Write plaintext to disk (debounced by Hocuspocus's built-in debounce) const content = yText.toString(); try { await docsFilesService.writeFileContent(documentName, content); // Invalidate Redis file cache try { await redis.del(fileCacheKey(documentName)); } catch { /* ignore */ } logger.debug(`Docs collab: wrote ${documentName} to disk (${content.length} chars)`); } catch (err) { logger.error(`Docs collab: failed to write ${documentName} to disk`, err); } }, async onStoreDocument(data) { const { document, documentName } = data; // Persist Y.Doc binary state to PostgreSQL try { const state = Y.encodeStateAsUpdate(document); await prisma.docCollabState.upsert({ where: { documentId: documentName }, update: { state: Buffer.from(state), }, create: { documentId: documentName, state: Buffer.from(state), }, }); logger.debug(`Docs collab: persisted state for ${documentName}`); } catch (err) { logger.error(`Docs collab: failed to persist state for ${documentName}`, err); } }, async onDisconnect(data) { const { documentName, context } = data; const userId = context?.user?.id; if (userId) { const count = connectionsPerUser.get(userId) || 0; if (count <= 1) { connectionsPerUser.delete(userId); } else { connectionsPerUser.set(userId, count - 1); } } logger.debug(`Docs collab: ${context?.user?.name || 'unknown'} disconnected from ${documentName}`); // Update metrics collabConnections.set(hocuspocus.getConnectionsCount()); collabDocuments.set(hocuspocus.getDocumentsCount()); }, async connected(data) { const { documentName, context } = data; // Awareness is set client-side; we just log here logger.debug(`Docs collab: ${context?.user?.name || 'unknown'} fully synced on ${documentName}`); // Update metrics collabConnections.set(hocuspocus.getConnectionsCount()); collabDocuments.set(hocuspocus.getDocumentsCount()); }, }; // --- Create Hocuspocus instance --- const hocuspocus = new Hocuspocus({ name: 'changemaker-docs', quiet: true, debounce: 1000, // Debounce disk writes by 1s maxDebounce: 5000, // Force write every 5s at most timeout: 30000, // 30s ping timeout extensions: [docsExtension], }); /** * Handle an incoming WebSocket connection for docs collaboration. * Called from server.ts after the HTTP → WS upgrade. */ function handleConnection( ws: WebSocket, request: IncomingMessage, context: { documentName: string; token: string }, ): void { // Hocuspocus expects the token in the URL search params for onAuthenticate // We manually set it via the context's requestParameters const url = new URL(request.url || '', `http://${request.headers.host}`); url.searchParams.set('token', context.token); request.url = url.pathname + url.search; hocuspocus.handleConnection(ws, request, { // Pass initial context documentName: context.documentName, }); } /** * Invalidate a document's collaboration state. * Called when files are modified externally (rename, delete, PUT endpoint). */ async function invalidateDocument(documentName: string): Promise { try { // Delete persisted state await prisma.docCollabState.deleteMany({ where: { documentId: documentName }, }); // Close active connections for this document so clients reload hocuspocus.closeConnections(documentName); logger.debug(`Docs collab: invalidated document ${documentName}`); } catch (err) { logger.warn(`Docs collab: failed to invalidate document ${documentName}`, err); } } /** * Clean up stale DocCollabState records (older than 7 days with no corresponding file). */ async function cleanupStaleStates(): Promise { const sevenDaysAgo = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000); try { const staleRecords = await prisma.docCollabState.findMany({ where: { updatedAt: { lt: sevenDaysAgo } }, select: { id: true, documentId: true }, }); let cleaned = 0; for (const record of staleRecords) { try { // Check if file still exists on disk docsFilesService.safeResolve(record.documentId); await docsFilesService.readFileContent(record.documentId); } catch { // File doesn't exist — delete the state await prisma.docCollabState.delete({ where: { id: record.id } }); cleaned++; } } if (cleaned > 0) { logger.info(`Docs collab: cleaned up ${cleaned} stale collaboration states`); } } catch (err) { logger.warn('Docs collab: cleanup failed', err); } } /** * Gracefully close all connections and stop Hocuspocus. */ async function shutdown(): Promise { hocuspocus.closeConnections(); collabConnections.set(0); collabDocuments.set(0); } export const docsCollabService = { handleConnection, invalidateDocument, cleanupStaleStates, shutdown, getConnectionsCount: () => hocuspocus.getConnectionsCount(), getDocumentsCount: () => hocuspocus.getDocumentsCount(), };