# Media API Module ## Overview The Media API module is a Fastify microservice (port 4100) providing comprehensive video library management with quick actions, scheduled publishing, and detailed analytics tracking. Built with Prisma ORM and PostgreSQL, it operates alongside the main Express API (port 4000) while sharing the same database. ## Architecture ### Technology Stack - **Framework:** Fastify v5 with TypeScript - **ORM:** Prisma (shared with main API, single database) - **Database:** PostgreSQL 16 (shared with main API) - **Job Queue:** BullMQ with Redis backend - **File Processing:** FFprobe (video metadata extraction) - **Charts:** Recharts (frontend) ### Port Configuration - **Media API:** Port 4100 (Docker internal), routed via nginx - **Main API:** Port 4000 (Express) - **Public Access:** `https://media.cmlite.org/api/*` ### Database Strategy **Shared PostgreSQL Database:** - Media API uses Prisma ORM (same as main API) - Schema defined in `api/prisma/schema.prisma` - Migrations managed centrally via Prisma - Models: Video, VideoView, VideoEvent, VideoScheduleHistory **Migration Strategy:** ```bash # Apply migrations (run from api directory) cd api && npx prisma migrate deploy # Generate Prisma client npx prisma generate # Seed database npx prisma db seed ``` ## Module Structure ``` api/src/modules/media/ ├── services/ │ ├── video-analytics.service.ts # Analytics aggregation & queries │ └── ffprobe.service.ts # Video metadata extraction ├── routes/ │ ├── videos.routes.ts # Video CRUD (existing) │ ├── video-actions.routes.ts # Quick actions (duplicate, preview, etc.) │ ├── video-schedule.routes.ts # Schedule management (admin) │ ├── video-analytics.routes.ts # Analytics queries (admin) │ ├── video-tracking.routes.ts # Public tracking endpoints │ └── upload.routes.ts # Video upload (existing) └── db/ └── schema.ts # Drizzle schema (deprecated, kept for reference) api/src/services/ └── video-schedule-queue.service.ts # BullMQ queue + worker api/prisma/ └── schema.prisma # Prisma schema (includes media models) ``` ## Services ### VideoAnalyticsService **Purpose:** Aggregate and query video analytics data. **Key Methods:** ```typescript class VideoAnalyticsService { // Record a video view (creates VideoView record) async recordView( videoId: number, userId?: number, ipAddress?: string, userAgent?: string, referer?: string ): Promise<{ viewId: number }>; // Record a video event (play, pause, seek, complete) async recordEvent( videoId: number, viewId: number | undefined, eventType: 'play' | 'pause' | 'seek' | 'complete', timestamp: number ): Promise; // Update watch time for a view async updateWatchTime( viewId: number, watchTimeSeconds: number ): Promise; // Aggregate analytics for a video (updates Video model fields) async aggregateVideoAnalytics(videoId: number): Promise; // Get detailed analytics for a video async getVideoAnalytics( videoId: number, startDate?: Date, endDate?: Date ): Promise; // Get top performing videos async getTopVideos( metric: 'views' | 'watchTime', limit: number ): Promise; // Get platform-wide analytics overview async getAnalyticsOverview(): Promise; // Get viewer demographics (registered users) async getViewerDemographics(videoId: number): Promise; // Reset analytics for a video async resetAnalytics(videoId: number): Promise; // Privacy-focused helpers private hashIpAddress(ipAddress: string): string; private truncateUserAgent(userAgent: string): string; } ``` **Privacy Implementation:** ```typescript // IP address hashing (SHA-256, one-way) private hashIpAddress(ipAddress: string): string { return createHash('sha256') .update(ipAddress) .digest('hex'); } // User agent truncation (remove version numbers) private truncateUserAgent(userAgent: string): string { return userAgent .replace(/\/[\d.]+/g, '') .substring(0, 200); } ``` **Aggregation Logic:** ```typescript async aggregateVideoAnalytics(videoId: number): Promise { const views = await prisma.videoView.findMany({ where: { videoId }, }); const totalViews = views.length; const uniqueViewers = new Set( views.map(v => v.userId || v.ipAddress) ).size; const totalWatchTime = views.reduce((sum, v) => sum + v.watchTimeSeconds, 0); const avgWatchTime = totalViews > 0 ? totalWatchTime / totalViews : 0; const completedViews = views.filter(v => v.completed).length; const completionRate = totalViews > 0 ? (completedViews / totalViews) * 100 : 0; await prisma.video.update({ where: { id: videoId }, data: { uniqueViewers, totalWatchTimeSeconds: totalWatchTime, averageWatchTimeSeconds: avgWatchTime, completionRate, }, }); } ``` ### VideoScheduleQueueService **Purpose:** Manage BullMQ job queue for scheduled publish/unpublish operations. **Key Methods:** ```typescript class VideoScheduleQueueService { // Schedule a publish job async schedulePublish( videoId: number, publishAt: Date, timezone: string ): Promise<{ jobId: string; scheduledFor: string }>; // Schedule an unpublish job async scheduleUnpublish( videoId: number, unpublishAt: Date, timezone: string ): Promise<{ jobId: string; scheduledFor: string }>; // Cancel a scheduled action async cancelSchedule( videoId: number, action: 'publish' | 'unpublish' ): Promise; // Get upcoming schedules async getUpcomingSchedules(limit?: number): Promise; // Get schedule history for a video async getScheduleHistory(videoId: number): Promise; // Worker process (runs in background) private async processScheduledAction(job: Job): Promise; // Graceful shutdown async close(): Promise; } ``` **Job Structure:** ```typescript interface ScheduleJobData { videoId: number; action: 'publish' | 'unpublish'; scheduledFor: string; // ISO timestamp timezone: string; scheduledByUserId: number; } ``` **Worker Implementation:** ```typescript private async processScheduledAction(job: Job): Promise { const { videoId, action, scheduledByUserId } = job.data; try { // Update video published status await prisma.video.update({ where: { id: videoId }, data: { isPublished: action === 'publish', ...(action === 'publish' && { scheduledPublishAt: null }), ...(action === 'unpublish' && { scheduledUnpublishAt: null }), }, }); // Record success in history await prisma.videoScheduleHistory.create({ data: { videoId, action, scheduledFor: new Date(job.data.scheduledFor), executedAt: new Date(), status: 'completed', scheduledByUserId, }, }); logger.info(`Scheduled ${action} executed for video ${videoId}`); } catch (error) { // Record failure await prisma.videoScheduleHistory.create({ data: { videoId, action, scheduledFor: new Date(job.data.scheduledFor), status: 'failed', error: error.message, scheduledByUserId, }, }); throw error; // BullMQ will retry } } ``` ### FFprobeService (Existing) **Purpose:** Extract video metadata using FFprobe. **Key Methods:** ```typescript class FFprobeService { // Extract metadata from video file async getVideoMetadata( filePath: string ): Promise<{ duration: number; width: number; height: number; orientation: 'H' | 'V'; quality: 'SD' | 'HD' | 'FHD' | 'UHD'; hasAudio: boolean; }>; } ``` ## Routes ### Video Actions Routes (Admin) **File:** `api/src/modules/media/routes/video-actions.routes.ts` **Middleware:** `requireAdminRole` (SUPER_ADMIN, INFLUENCE_ADMIN, MAP_ADMIN) **Endpoints:** ```typescript // POST /videos/:id/duplicate // Duplicate video with new title fastify.post( '/videos/:id/duplicate', { preHandler: [requireAdminRole] }, async (request, reply) => { const { id } = request.params; const duplicatedVideo = await videosService.duplicateVideo(id); return { ...duplicatedVideo }; } ); // GET /videos/:id/preview-link // Generate expiring preview link (24h JWT token) fastify.get( '/videos/:id/preview-link', { preHandler: [requireAdminRole] }, async (request, reply) => { const { id } = request.params; const token = jwt.sign( { videoId: id }, process.env.JWT_ACCESS_SECRET, { expiresIn: `${VIDEO_PREVIEW_LINK_EXPIRY_HOURS}h` } ); return { previewUrl: `${process.env.VITE_MEDIA_API_URL}/api/videos/preview/${token}`, expiryHours: VIDEO_PREVIEW_LINK_EXPIRY_HOURS, }; } ); // POST /videos/:id/reset-analytics // Reset all analytics for a video fastify.post( '/videos/:id/reset-analytics', { preHandler: [requireAdminRole] }, async (request, reply) => { const { id } = request.params; await videoAnalyticsService.resetAnalytics(id); return { success: true }; } ); // GET /videos/analytics/top // Get top performing videos fastify.get( '/videos/analytics/top', { preHandler: [requireAdminRole] }, async (request, reply) => { const { metric = 'views', limit = 10 } = request.query; const topVideos = await videoAnalyticsService.getTopVideos(metric, limit); return topVideos; } ); // GET /videos/analytics/overview // Get platform-wide analytics fastify.get( '/videos/analytics/overview', { preHandler: [requireAdminRole] }, async (request, reply) => { const overview = await videoAnalyticsService.getAnalyticsOverview(); return overview; } ); ``` ### Video Schedule Routes (Admin) **File:** `api/src/modules/media/routes/video-schedule.routes.ts` **Middleware:** `requireAdminRole` **Endpoints:** ```typescript // POST /videos/:id/schedule-publish // Schedule video to be published at future date/time fastify.post( '/videos/:id/schedule-publish', { preHandler: [requireAdminRole], schema: { body: { type: 'object', required: ['publishAt', 'timezone'], properties: { publishAt: { type: 'string' }, // ISO timestamp timezone: { type: 'string' }, // IANA timezone }, }, }, }, async (request, reply) => { const { id } = request.params; const { publishAt, timezone } = request.body; const result = await scheduleQueueService.schedulePublish( id, new Date(publishAt), timezone ); return result; } ); // POST /videos/:id/schedule-unpublish // Schedule video to be unpublished fastify.post( '/videos/:id/schedule-unpublish', { preHandler: [requireAdminRole], schema: { body: { type: 'object', required: ['unpublishAt', 'timezone'], properties: { unpublishAt: { type: 'string' }, timezone: { type: 'string' }, }, }, }, }, async (request, reply) => { const { id } = request.params; const { unpublishAt, timezone } = request.body; const result = await scheduleQueueService.scheduleUnpublish( id, new Date(unpublishAt), timezone ); return result; } ); // DELETE /videos/:id/schedule/:action // Cancel scheduled publish or unpublish fastify.delete( '/videos/:id/schedule/:action', { preHandler: [requireAdminRole] }, async (request, reply) => { const { id, action } = request.params; await scheduleQueueService.cancelSchedule(id, action); return { success: true, message: `${action} schedule cancelled` }; } ); // GET /videos/schedules/upcoming // Get all upcoming scheduled events fastify.get( '/videos/schedules/upcoming', { preHandler: [requireAdminRole] }, async (request, reply) => { const { limit = 100 } = request.query; const schedules = await scheduleQueueService.getUpcomingSchedules(limit); return { schedules }; } ); // GET /videos/:id/schedule-history // Get schedule history for a video fastify.get( '/videos/:id/schedule-history', { preHandler: [requireAdminRole] }, async (request, reply) => { const { id } = request.params; const history = await scheduleQueueService.getScheduleHistory(id); return { history }; } ); ``` ### Video Analytics Routes (Admin) **File:** `api/src/modules/media/routes/video-analytics.routes.ts` **Middleware:** `requireAdminRole` **Endpoints:** ```typescript // GET /videos/:id/analytics // Get detailed analytics for a specific video fastify.get( '/videos/:id/analytics', { preHandler: [requireAdminRole] }, async (request, reply) => { const { id } = request.params; const { startDate, endDate } = request.query; const analytics = await videoAnalyticsService.getVideoAnalytics( id, startDate ? new Date(startDate) : undefined, endDate ? new Date(endDate) : undefined ); return analytics; } ); // GET /videos/:id/analytics/viewers // Get list of registered users who watched this video fastify.get( '/videos/:id/analytics/viewers', { preHandler: [requireAdminRole] }, async (request, reply) => { const { id } = request.params; const viewers = await videoAnalyticsService.getViewerDemographics(id); return { viewers }; } ); ``` ### Video Tracking Routes (Public) **File:** `api/src/modules/media/routes/video-tracking.routes.ts` **Middleware:** `optionalAuth` (tracks user ID if logged in, but doesn't require auth) **Rate Limiting:** - `/track/view`: 100 requests/min per IP - `/track/event`: 100 requests/min per IP - `/track/heartbeat`: 200 requests/min per IP (higher for frequent updates) **Endpoints:** ```typescript // POST /track/view // Record a video view (when video loads) fastify.post( '/track/view', { preHandler: [optionalAuth, rateLimit('track-view', 100, 60000)], schema: { body: { type: 'object', required: ['videoId'], properties: { videoId: { type: 'number' }, referer: { type: 'string' }, }, }, }, }, async (request, reply) => { const { videoId, referer } = request.body; const userId = request.user?.id; const ipAddress = request.ip; const userAgent = request.headers['user-agent']; const { viewId } = await videoAnalyticsService.recordView( videoId, userId, ipAddress, userAgent, referer ); return { viewId }; } ); // POST /track/event // Record a video event (play, pause, seek, complete) fastify.post( '/track/event', { preHandler: [optionalAuth, rateLimit('track-event', 100, 60000)], schema: { body: { type: 'object', required: ['videoId', 'eventType', 'timestamp'], properties: { videoId: { type: 'number' }, viewId: { type: 'number' }, eventType: { type: 'string', enum: ['play', 'pause', 'seek', 'complete'] }, timestamp: { type: 'number' }, }, }, }, }, async (request, reply) => { const { videoId, viewId, eventType, timestamp } = request.body; await videoAnalyticsService.recordEvent( videoId, viewId, eventType, timestamp ); return { success: true }; } ); // POST /track/heartbeat // Update watch time (called every 10 seconds during playback) fastify.post( '/track/heartbeat', { preHandler: [rateLimit('track-heartbeat', 200, 60000)], // Higher limit for frequent updates schema: { body: { type: 'object', required: ['viewId', 'watchTimeSeconds'], properties: { viewId: { type: 'number' }, watchTimeSeconds: { type: 'number' }, }, }, }, }, async (request, reply) => { const { viewId, watchTimeSeconds } = request.body; await videoAnalyticsService.updateWatchTime(viewId, watchTimeSeconds); return { success: true }; } ); ``` ## Database Models ### Video Model (Enhanced) ```prisma model Video { id Int @id @default(autoincrement()) filename String @unique title String? producer String? creator String? thumbnailUrl String? duration Int // seconds fileSize BigInt width Int height Int orientation String // 'H' or 'V' quality String // 'SD', 'HD', 'FHD', 'UHD' hasAudio Boolean @default(true) isPublished Boolean @default(false) // Scheduling scheduledPublishAt DateTime? scheduledUnpublishAt DateTime? // Analytics (aggregated) uniqueViewers Int @default(0) totalWatchTimeSeconds Int @default(0) averageWatchTimeSeconds Decimal @default(0) @db.Decimal(10, 2) completionRate Decimal @default(0) @db.Decimal(5, 2) // Relations videoViews VideoView[] videoEvents VideoEvent[] scheduleHistory VideoScheduleHistory[] createdAt DateTime @default(now()) updatedAt DateTime @updatedAt @@map("videos") } ``` ### VideoView Model ```prisma model VideoView { id Int @id @default(autoincrement()) videoId Int userId Int? ipAddress String? @db.VarChar(64) // SHA-256 hash (64 chars) userAgent String? @db.Text referer String? @db.Text watchTimeSeconds Int @default(0) completed Boolean @default(false) createdAt DateTime @default(now()) video Video @relation(fields: [videoId], references: [id], onDelete: Cascade) user User? @relation(fields: [userId], references: [id], onDelete: SetNull) @@index([videoId]) @@index([userId]) @@index([createdAt]) @@map("video_views") } ``` ### VideoEvent Model ```prisma model VideoEvent { id Int @id @default(autoincrement()) videoId Int viewId Int? eventType String @db.VarChar(50) // 'play', 'pause', 'seek', 'complete' timestamp Decimal @db.Decimal(10, 2) // Video timestamp in seconds createdAt DateTime @default(now()) video Video @relation(fields: [videoId], references: [id], onDelete: Cascade) @@index([videoId]) @@index([viewId]) @@map("video_events") } ``` ### VideoScheduleHistory Model ```prisma model VideoScheduleHistory { id Int @id @default(autoincrement()) videoId Int action String @db.VarChar(20) // 'publish' or 'unpublish' scheduledFor DateTime executedAt DateTime? status String @db.VarChar(20) // 'pending', 'completed', 'failed', 'cancelled' error String? @db.Text scheduledByUserId Int video Video @relation(fields: [videoId], references: [id], onDelete: Cascade) scheduledBy User @relation(fields: [scheduledByUserId], references: [id]) @@index([videoId]) @@index([scheduledFor]) @@index([status]) @@map("video_schedule_history") } ``` ## Environment Variables ```env # Media API VITE_MEDIA_API_URL=http://localhost:4100 # Video Analytics VIDEO_ANALYTICS_RETENTION_DAYS=90 VIDEO_ANALYTICS_IP_HASHING_ENABLED=true # Video Scheduling VIDEO_SCHEDULE_DEFAULT_TIMEZONE=UTC VIDEO_SCHEDULE_NOTIFICATION_ENABLED=true # Preview Links VIDEO_PREVIEW_LINK_EXPIRY_HOURS=24 # BullMQ (uses shared Redis) REDIS_URL=redis://:password@redis:6379 ``` ## Security Considerations ### Authentication - **Admin routes:** `requireAdminRole` middleware - **Tracking routes:** No auth required (public), optional user tracking - **Preview links:** JWT token-based with expiration ### Rate Limiting - Tracking endpoints heavily rate-limited (100-200 req/min) - Admin endpoints: Global admin rate limits (500 req/min) ### Privacy - IP addresses hashed before storage (SHA-256, one-way) - User agents truncated (version numbers removed) - GDPR-compliant data retention (90-day default) - User consent required for registered user tracking ### Input Validation - Zod schemas for all request bodies - Fastify schema validation - Sanitized error messages (no stack traces in production) ## Deployment ### Docker Configuration **Media API Container:** ```yaml media-api: build: context: ./api dockerfile: Dockerfile.media ports: - "4100:4100" environment: - DATABASE_URL=${V2_POSTGRES_URL} - REDIS_URL=${REDIS_URL} - VIDEO_ANALYTICS_RETENTION_DAYS=90 depends_on: - v2-postgres - redis volumes: - ./media:/media:ro # Read-only video library ``` **Nginx Routing:** ```nginx # Media API location /api/ { proxy_pass http://media-api:4100; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; } ``` ### Health Checks ```typescript // Health check endpoint fastify.get('/health', async (request, reply) => { return { status: 'ok', timestamp: new Date().toISOString(), uptime: process.uptime(), database: await checkDatabaseConnection(), redis: await checkRedisConnection(), queue: await scheduleQueueService.getQueueStatus(), }; }); ``` ### Monitoring **Prometheus Metrics:** ```typescript // Custom metrics const videoViewsCounter = new Counter({ name: 'cm_video_views_total', help: 'Total video views recorded', }); const videoWatchTimeHistogram = new Histogram({ name: 'cm_video_watch_time_seconds', help: 'Video watch time distribution', buckets: [10, 30, 60, 120, 300, 600, 1800], // seconds }); const scheduleQueueGauge = new Gauge({ name: 'cm_schedule_queue_size', help: 'Number of pending scheduled actions', }); ``` ## Testing ### Unit Tests (Jest) ```typescript // video-analytics.service.spec.ts describe('VideoAnalyticsService', () => { it('should hash IP addresses', () => { const service = new VideoAnalyticsService(); const hash = service['hashIpAddress']('192.168.1.100'); expect(hash).toHaveLength(64); // SHA-256 = 64 hex chars expect(hash).not.toContain('192.168'); }); it('should aggregate analytics correctly', async () => { // Create test views await prisma.videoView.createMany({ data: [ { videoId: 1, watchTimeSeconds: 100, completed: true }, { videoId: 1, watchTimeSeconds: 50, completed: false }, ], }); await service.aggregateVideoAnalytics(1); const video = await prisma.video.findUnique({ where: { id: 1 } }); expect(video.uniqueViewers).toBe(2); expect(video.totalWatchTimeSeconds).toBe(150); expect(video.averageWatchTimeSeconds).toBeCloseTo(75); expect(video.completionRate).toBeCloseTo(50); }); }); ``` ### Integration Tests ```bash # Test tracking endpoints ./test-media-api.sh ``` ## Troubleshooting ### Common Issues **Queue not processing jobs:** ```bash # Check Redis connection docker compose logs redis # Check queue status docker compose exec media-api npm run queue:status # Restart queue worker docker compose restart media-api ``` **Analytics not updating:** ```bash # Manually trigger aggregation curl -X POST http://localhost:4100/api/videos/1/analytics/aggregate \ -H "Authorization: Bearer $ADMIN_TOKEN" ``` **High memory usage:** - Check for memory leaks in video processing - Reduce `VIDEO_ANALYTICS_RETENTION_DAYS` - Clear old analytics data manually ## Future Roadmap **Q2 2026:** - Batch event submission endpoint - Real-time analytics dashboard - Automated thumbnail generation **Q3 2026:** - Video transcoding pipeline - CDN integration - Live streaming support **Q4 2026:** - AI-powered content recommendations - Automated video tagging - Advanced segmentation --- **Last Updated:** February 2026 **Version:** 1.0 **Maintainer:** Changemaker Lite Development Team