import type { Response } from 'express'; import { logger } from '../../utils/logger'; interface SSEClient { id: string; userId: string; res: Response; connectedAt: Date; } /** In-memory SSE connection manager (single-node) */ class SSEService { private clients = new Map(); // userId → client connections private heartbeatInterval: NodeJS.Timeout | null = null; /** Start heartbeat interval (30s keep-alive) */ startHeartbeat() { if (this.heartbeatInterval) return; this.heartbeatInterval = setInterval(() => { let total = 0; for (const [, clients] of this.clients) { for (const client of clients) { try { client.res.write(': heartbeat\n\n'); total++; } catch { this.removeClient(client.id); } } } if (total > 0) { logger.debug(`SSE heartbeat sent to ${total} clients`); } }, 30_000); } /** Stop heartbeat */ stopHeartbeat() { if (this.heartbeatInterval) { clearInterval(this.heartbeatInterval); this.heartbeatInterval = null; } } /** Add an SSE client */ addClient(userId: string, res: Response): string { const id = `${userId}-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; const client: SSEClient = { id, userId, res, connectedAt: new Date() }; if (!this.clients.has(userId)) { this.clients.set(userId, []); } this.clients.get(userId)!.push(client); logger.info(`SSE client connected: ${id} (user: ${userId})`); return id; } /** Remove an SSE client by connection ID */ removeClient(connectionId: string) { for (const [userId, clients] of this.clients) { const idx = clients.findIndex((c) => c.id === connectionId); if (idx >= 0) { clients.splice(idx, 1); if (clients.length === 0) { this.clients.delete(userId); } logger.info(`SSE client disconnected: ${connectionId} (user: ${userId})`); return userId; } } return null; } /** Send an event to a specific user (all connections) */ sendToUser(userId: string, event: string, data: unknown) { const clients = this.clients.get(userId); if (!clients || clients.length === 0) return false; const payload = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`; for (const client of clients) { try { client.res.write(payload); } catch { this.removeClient(client.id); } } return true; } /** Broadcast an event to a list of user IDs */ sendToUsers(userIds: string[], event: string, data: unknown) { for (const userId of userIds) { this.sendToUser(userId, event, data); } } /** Check if a user has any active SSE connections */ isConnected(userId: string): boolean { const clients = this.clients.get(userId); return !!clients && clients.length > 0; } /** Get all currently connected user IDs */ getConnectedUserIds(): string[] { return Array.from(this.clients.keys()); } /** Get connection count */ getConnectionCount(): number { let count = 0; for (const clients of this.clients.values()) { count += clients.length; } return count; } /** Get connection count for a specific user */ getConnectionCountForUser(userId: string): number { return this.clients.get(userId)?.length ?? 0; } /** Close all connections (graceful shutdown) */ closeAll() { this.stopHeartbeat(); for (const [, clients] of this.clients) { for (const client of clients) { try { client.res.end(); } catch {} } } this.clients.clear(); logger.info('SSE: All connections closed'); } } export const sseService = new SSEService();