24 KiB

Media API Module

Overview

The Media API module is a Fastify microservice (port 4100) providing comprehensive video library management with quick actions, scheduled publishing, and detailed analytics tracking. Built with Prisma ORM and PostgreSQL, it operates alongside the main Express API (port 4000) while sharing the same database.

Architecture

Technology Stack

  • Framework: Fastify v5 with TypeScript
  • ORM: Prisma (shared with main API, single database)
  • Database: PostgreSQL 16 (shared with main API)
  • Job Queue: BullMQ with Redis backend
  • File Processing: FFprobe (video metadata extraction)
  • Charts: Recharts (frontend)

Port Configuration

  • Media API: Port 4100 (Docker internal), routed via nginx
  • Main API: Port 4000 (Express)
  • Public Access: https://media.cmlite.org/api/*

Database Strategy

Shared PostgreSQL Database:

  • Media API uses Prisma ORM (same as main API)
  • Schema defined in api/prisma/schema.prisma
  • Migrations managed centrally via Prisma
  • Models: Video, VideoView, VideoEvent, VideoScheduleHistory

Migration Strategy:

# Apply migrations (run from api directory)
cd api && npx prisma migrate deploy

# Generate Prisma client
npx prisma generate

# Seed database
npx prisma db seed

Module Structure

api/src/modules/media/
├── services/
│   ├── video-analytics.service.ts     # Analytics aggregation & queries
│   └── ffprobe.service.ts             # Video metadata extraction
├── routes/
│   ├── videos.routes.ts               # Video CRUD (existing)
│   ├── video-actions.routes.ts        # Quick actions (duplicate, preview, etc.)
│   ├── video-schedule.routes.ts       # Schedule management (admin)
│   ├── video-analytics.routes.ts      # Analytics queries (admin)
│   ├── video-tracking.routes.ts       # Public tracking endpoints
│   └── upload.routes.ts               # Video upload (existing)
└── db/
    └── schema.ts                       # Drizzle schema (deprecated, kept for reference)

api/src/services/
└── video-schedule-queue.service.ts    # BullMQ queue + worker

api/prisma/
└── schema.prisma                       # Prisma schema (includes media models)

Services

VideoAnalyticsService

Purpose: Aggregate and query video analytics data.

Key Methods:

class VideoAnalyticsService {
  // Record a video view (creates VideoView record)
  async recordView(
    videoId: number,
    userId?: number,
    ipAddress?: string,
    userAgent?: string,
    referer?: string
  ): Promise<{ viewId: number }>;

  // Record a video event (play, pause, seek, complete)
  async recordEvent(
    videoId: number,
    viewId: number | undefined,
    eventType: 'play' | 'pause' | 'seek' | 'complete',
    timestamp: number
  ): Promise<void>;

  // Update watch time for a view
  async updateWatchTime(
    viewId: number,
    watchTimeSeconds: number
  ): Promise<void>;

  // Aggregate analytics for a video (updates Video model fields)
  async aggregateVideoAnalytics(videoId: number): Promise<void>;

  // Get detailed analytics for a video
  async getVideoAnalytics(
    videoId: number,
    startDate?: Date,
    endDate?: Date
  ): Promise<VideoAnalytics>;

  // Get top performing videos
  async getTopVideos(
    metric: 'views' | 'watchTime',
    limit: number
  ): Promise<TopVideosResponse>;

  // Get platform-wide analytics overview
  async getAnalyticsOverview(): Promise<AnalyticsOverviewResponse>;

  // Get viewer demographics (registered users)
  async getViewerDemographics(videoId: number): Promise<RegisteredViewer[]>;

  // Reset analytics for a video
  async resetAnalytics(videoId: number): Promise<void>;

  // Privacy-focused helpers
  private hashIpAddress(ipAddress: string): string;
  private truncateUserAgent(userAgent: string): string;
}

Privacy Implementation:

// IP address hashing (SHA-256, one-way)
private hashIpAddress(ipAddress: string): string {
  return createHash('sha256')
    .update(ipAddress)
    .digest('hex');
}

// User agent truncation (remove version numbers)
private truncateUserAgent(userAgent: string): string {
  return userAgent
    .replace(/\/[\d.]+/g, '')
    .substring(0, 200);
}

Aggregation Logic:

async aggregateVideoAnalytics(videoId: number): Promise<void> {
  const views = await prisma.videoView.findMany({
    where: { videoId },
  });

  const totalViews = views.length;
  const uniqueViewers = new Set(
    views.map(v => v.userId || v.ipAddress)
  ).size;

  const totalWatchTime = views.reduce((sum, v) => sum + v.watchTimeSeconds, 0);
  const avgWatchTime = totalViews > 0 ? totalWatchTime / totalViews : 0;

  const completedViews = views.filter(v => v.completed).length;
  const completionRate = totalViews > 0 ? (completedViews / totalViews) * 100 : 0;

  await prisma.video.update({
    where: { id: videoId },
    data: {
      uniqueViewers,
      totalWatchTimeSeconds: totalWatchTime,
      averageWatchTimeSeconds: avgWatchTime,
      completionRate,
    },
  });
}

VideoScheduleQueueService

Purpose: Manage BullMQ job queue for scheduled publish/unpublish operations.

Key Methods:

class VideoScheduleQueueService {
  // Schedule a publish job
  async schedulePublish(
    videoId: number,
    publishAt: Date,
    timezone: string
  ): Promise<{ jobId: string; scheduledFor: string }>;

  // Schedule an unpublish job
  async scheduleUnpublish(
    videoId: number,
    unpublishAt: Date,
    timezone: string
  ): Promise<{ jobId: string; scheduledFor: string }>;

  // Cancel a scheduled action
  async cancelSchedule(
    videoId: number,
    action: 'publish' | 'unpublish'
  ): Promise<void>;

  // Get upcoming schedules
  async getUpcomingSchedules(limit?: number): Promise<ScheduleEvent[]>;

  // Get schedule history for a video
  async getScheduleHistory(videoId: number): Promise<VideoScheduleHistory[]>;

  // Worker process (runs in background)
  private async processScheduledAction(job: Job): Promise<void>;

  // Graceful shutdown
  async close(): Promise<void>;
}

Job Structure:

interface ScheduleJobData {
  videoId: number;
  action: 'publish' | 'unpublish';
  scheduledFor: string;  // ISO timestamp
  timezone: string;
  scheduledByUserId: number;
}

Worker Implementation:

private async processScheduledAction(job: Job<ScheduleJobData>): Promise<void> {
  const { videoId, action, scheduledByUserId } = job.data;

  try {
    // Update video published status
    await prisma.video.update({
      where: { id: videoId },
      data: {
        isPublished: action === 'publish',
        ...(action === 'publish' && { scheduledPublishAt: null }),
        ...(action === 'unpublish' && { scheduledUnpublishAt: null }),
      },
    });

    // Record success in history
    await prisma.videoScheduleHistory.create({
      data: {
        videoId,
        action,
        scheduledFor: new Date(job.data.scheduledFor),
        executedAt: new Date(),
        status: 'completed',
        scheduledByUserId,
      },
    });

    logger.info(`Scheduled ${action} executed for video ${videoId}`);
  } catch (error) {
    // Record failure
    await prisma.videoScheduleHistory.create({
      data: {
        videoId,
        action,
        scheduledFor: new Date(job.data.scheduledFor),
        status: 'failed',
        error: error.message,
        scheduledByUserId,
      },
    });

    throw error; // BullMQ will retry
  }
}

FFprobeService (Existing)

Purpose: Extract video metadata using FFprobe.

Key Methods:

class FFprobeService {
  // Extract metadata from video file
  async getVideoMetadata(
    filePath: string
  ): Promise<{
    duration: number;
    width: number;
    height: number;
    orientation: 'H' | 'V';
    quality: 'SD' | 'HD' | 'FHD' | 'UHD';
    hasAudio: boolean;
  }>;
}

Routes

Video Actions Routes (Admin)

File: api/src/modules/media/routes/video-actions.routes.ts

Middleware: requireAdminRole (SUPER_ADMIN, INFLUENCE_ADMIN, MAP_ADMIN)

Endpoints:

// POST /videos/:id/duplicate
// Duplicate video with new title
fastify.post(
  '/videos/:id/duplicate',
  { preHandler: [requireAdminRole] },
  async (request, reply) => {
    const { id } = request.params;
    const duplicatedVideo = await videosService.duplicateVideo(id);
    return { ...duplicatedVideo };
  }
);

// GET /videos/:id/preview-link
// Generate expiring preview link (24h JWT token)
fastify.get(
  '/videos/:id/preview-link',
  { preHandler: [requireAdminRole] },
  async (request, reply) => {
    const { id } = request.params;
    const token = jwt.sign(
      { videoId: id },
      process.env.JWT_ACCESS_SECRET,
      { expiresIn: `${VIDEO_PREVIEW_LINK_EXPIRY_HOURS}h` }
    );
    return {
      previewUrl: `${process.env.VITE_MEDIA_API_URL}/api/videos/preview/${token}`,
      expiryHours: VIDEO_PREVIEW_LINK_EXPIRY_HOURS,
    };
  }
);

// POST /videos/:id/reset-analytics
// Reset all analytics for a video
fastify.post(
  '/videos/:id/reset-analytics',
  { preHandler: [requireAdminRole] },
  async (request, reply) => {
    const { id } = request.params;
    await videoAnalyticsService.resetAnalytics(id);
    return { success: true };
  }
);

// GET /videos/analytics/top
// Get top performing videos
fastify.get(
  '/videos/analytics/top',
  { preHandler: [requireAdminRole] },
  async (request, reply) => {
    const { metric = 'views', limit = 10 } = request.query;
    const topVideos = await videoAnalyticsService.getTopVideos(metric, limit);
    return topVideos;
  }
);

// GET /videos/analytics/overview
// Get platform-wide analytics
fastify.get(
  '/videos/analytics/overview',
  { preHandler: [requireAdminRole] },
  async (request, reply) => {
    const overview = await videoAnalyticsService.getAnalyticsOverview();
    return overview;
  }
);

Video Schedule Routes (Admin)

File: api/src/modules/media/routes/video-schedule.routes.ts

Middleware: requireAdminRole

Endpoints:

// POST /videos/:id/schedule-publish
// Schedule video to be published at future date/time
fastify.post(
  '/videos/:id/schedule-publish',
  {
    preHandler: [requireAdminRole],
    schema: {
      body: {
        type: 'object',
        required: ['publishAt', 'timezone'],
        properties: {
          publishAt: { type: 'string' },  // ISO timestamp
          timezone: { type: 'string' },    // IANA timezone
        },
      },
    },
  },
  async (request, reply) => {
    const { id } = request.params;
    const { publishAt, timezone } = request.body;
    const result = await scheduleQueueService.schedulePublish(
      id,
      new Date(publishAt),
      timezone
    );
    return result;
  }
);

// POST /videos/:id/schedule-unpublish
// Schedule video to be unpublished
fastify.post(
  '/videos/:id/schedule-unpublish',
  {
    preHandler: [requireAdminRole],
    schema: {
      body: {
        type: 'object',
        required: ['unpublishAt', 'timezone'],
        properties: {
          unpublishAt: { type: 'string' },
          timezone: { type: 'string' },
        },
      },
    },
  },
  async (request, reply) => {
    const { id } = request.params;
    const { unpublishAt, timezone } = request.body;
    const result = await scheduleQueueService.scheduleUnpublish(
      id,
      new Date(unpublishAt),
      timezone
    );
    return result;
  }
);

// DELETE /videos/:id/schedule/:action
// Cancel scheduled publish or unpublish
fastify.delete(
  '/videos/:id/schedule/:action',
  { preHandler: [requireAdminRole] },
  async (request, reply) => {
    const { id, action } = request.params;
    await scheduleQueueService.cancelSchedule(id, action);
    return { success: true, message: `${action} schedule cancelled` };
  }
);

// GET /videos/schedules/upcoming
// Get all upcoming scheduled events
fastify.get(
  '/videos/schedules/upcoming',
  { preHandler: [requireAdminRole] },
  async (request, reply) => {
    const { limit = 100 } = request.query;
    const schedules = await scheduleQueueService.getUpcomingSchedules(limit);
    return { schedules };
  }
);

// GET /videos/:id/schedule-history
// Get schedule history for a video
fastify.get(
  '/videos/:id/schedule-history',
  { preHandler: [requireAdminRole] },
  async (request, reply) => {
    const { id } = request.params;
    const history = await scheduleQueueService.getScheduleHistory(id);
    return { history };
  }
);

Video Analytics Routes (Admin)

File: api/src/modules/media/routes/video-analytics.routes.ts

Middleware: requireAdminRole

Endpoints:

// GET /videos/:id/analytics
// Get detailed analytics for a specific video
fastify.get(
  '/videos/:id/analytics',
  { preHandler: [requireAdminRole] },
  async (request, reply) => {
    const { id } = request.params;
    const { startDate, endDate } = request.query;

    const analytics = await videoAnalyticsService.getVideoAnalytics(
      id,
      startDate ? new Date(startDate) : undefined,
      endDate ? new Date(endDate) : undefined
    );

    return analytics;
  }
);

// GET /videos/:id/analytics/viewers
// Get list of registered users who watched this video
fastify.get(
  '/videos/:id/analytics/viewers',
  { preHandler: [requireAdminRole] },
  async (request, reply) => {
    const { id } = request.params;
    const viewers = await videoAnalyticsService.getViewerDemographics(id);
    return { viewers };
  }
);

Video Tracking Routes (Public)

File: api/src/modules/media/routes/video-tracking.routes.ts

Middleware: optionalAuth (tracks user ID if logged in, but doesn't require auth)

Rate Limiting:

  • /track/view: 100 requests/min per IP
  • /track/event: 100 requests/min per IP
  • /track/heartbeat: 200 requests/min per IP (higher for frequent updates)

Endpoints:

// POST /track/view
// Record a video view (when video loads)
fastify.post(
  '/track/view',
  {
    preHandler: [optionalAuth, rateLimit('track-view', 100, 60000)],
    schema: {
      body: {
        type: 'object',
        required: ['videoId'],
        properties: {
          videoId: { type: 'number' },
          referer: { type: 'string' },
        },
      },
    },
  },
  async (request, reply) => {
    const { videoId, referer } = request.body;
    const userId = request.user?.id;
    const ipAddress = request.ip;
    const userAgent = request.headers['user-agent'];

    const { viewId } = await videoAnalyticsService.recordView(
      videoId,
      userId,
      ipAddress,
      userAgent,
      referer
    );

    return { viewId };
  }
);

// POST /track/event
// Record a video event (play, pause, seek, complete)
fastify.post(
  '/track/event',
  {
    preHandler: [optionalAuth, rateLimit('track-event', 100, 60000)],
    schema: {
      body: {
        type: 'object',
        required: ['videoId', 'eventType', 'timestamp'],
        properties: {
          videoId: { type: 'number' },
          viewId: { type: 'number' },
          eventType: { type: 'string', enum: ['play', 'pause', 'seek', 'complete'] },
          timestamp: { type: 'number' },
        },
      },
    },
  },
  async (request, reply) => {
    const { videoId, viewId, eventType, timestamp } = request.body;

    await videoAnalyticsService.recordEvent(
      videoId,
      viewId,
      eventType,
      timestamp
    );

    return { success: true };
  }
);

// POST /track/heartbeat
// Update watch time (called every 10 seconds during playback)
fastify.post(
  '/track/heartbeat',
  {
    preHandler: [rateLimit('track-heartbeat', 200, 60000)],  // Higher limit for frequent updates
    schema: {
      body: {
        type: 'object',
        required: ['viewId', 'watchTimeSeconds'],
        properties: {
          viewId: { type: 'number' },
          watchTimeSeconds: { type: 'number' },
        },
      },
    },
  },
  async (request, reply) => {
    const { viewId, watchTimeSeconds } = request.body;

    await videoAnalyticsService.updateWatchTime(viewId, watchTimeSeconds);

    return { success: true };
  }
);

Database Models

Video Model (Enhanced)

model Video {
  id                      Int       @id @default(autoincrement())
  filename                String    @unique
  title                   String?
  producer                String?
  creator                 String?
  thumbnailUrl            String?
  duration                Int       // seconds
  fileSize                BigInt
  width                   Int
  height                  Int
  orientation             String    // 'H' or 'V'
  quality                 String    // 'SD', 'HD', 'FHD', 'UHD'
  hasAudio                Boolean   @default(true)
  isPublished             Boolean   @default(false)

  // Scheduling
  scheduledPublishAt      DateTime?
  scheduledUnpublishAt    DateTime?

  // Analytics (aggregated)
  uniqueViewers           Int       @default(0)
  totalWatchTimeSeconds   Int       @default(0)
  averageWatchTimeSeconds Decimal   @default(0) @db.Decimal(10, 2)
  completionRate          Decimal   @default(0) @db.Decimal(5, 2)

  // Relations
  videoViews              VideoView[]
  videoEvents             VideoEvent[]
  scheduleHistory         VideoScheduleHistory[]

  createdAt               DateTime  @default(now())
  updatedAt               DateTime  @updatedAt

  @@map("videos")
}

VideoView Model

model VideoView {
  id               Int      @id @default(autoincrement())
  videoId          Int
  userId           Int?
  ipAddress        String?  @db.VarChar(64)  // SHA-256 hash (64 chars)
  userAgent        String?  @db.Text
  referer          String?  @db.Text
  watchTimeSeconds Int      @default(0)
  completed        Boolean  @default(false)
  createdAt        DateTime @default(now())

  video            Video    @relation(fields: [videoId], references: [id], onDelete: Cascade)
  user             User?    @relation(fields: [userId], references: [id], onDelete: SetNull)

  @@index([videoId])
  @@index([userId])
  @@index([createdAt])
  @@map("video_views")
}

VideoEvent Model

model VideoEvent {
  id         Int      @id @default(autoincrement())
  videoId    Int
  viewId     Int?
  eventType  String   @db.VarChar(50)  // 'play', 'pause', 'seek', 'complete'
  timestamp  Decimal  @db.Decimal(10, 2)  // Video timestamp in seconds
  createdAt  DateTime @default(now())

  video      Video    @relation(fields: [videoId], references: [id], onDelete: Cascade)

  @@index([videoId])
  @@index([viewId])
  @@map("video_events")
}

VideoScheduleHistory Model

model VideoScheduleHistory {
  id                Int       @id @default(autoincrement())
  videoId           Int
  action            String    @db.VarChar(20)  // 'publish' or 'unpublish'
  scheduledFor      DateTime
  executedAt        DateTime?
  status            String    @db.VarChar(20)  // 'pending', 'completed', 'failed', 'cancelled'
  error             String?   @db.Text
  scheduledByUserId Int

  video             Video     @relation(fields: [videoId], references: [id], onDelete: Cascade)
  scheduledBy       User      @relation(fields: [scheduledByUserId], references: [id])

  @@index([videoId])
  @@index([scheduledFor])
  @@index([status])
  @@map("video_schedule_history")
}

Environment Variables

# Media API
VITE_MEDIA_API_URL=http://localhost:4100

# Video Analytics
VIDEO_ANALYTICS_RETENTION_DAYS=90
VIDEO_ANALYTICS_IP_HASHING_ENABLED=true

# Video Scheduling
VIDEO_SCHEDULE_DEFAULT_TIMEZONE=UTC
VIDEO_SCHEDULE_NOTIFICATION_ENABLED=true

# Preview Links
VIDEO_PREVIEW_LINK_EXPIRY_HOURS=24

# BullMQ (uses shared Redis)
REDIS_URL=redis://:password@redis:6379

Security Considerations

Authentication

  • Admin routes: requireAdminRole middleware
  • Tracking routes: No auth required (public), optional user tracking
  • Preview links: JWT token-based with expiration

Rate Limiting

  • Tracking endpoints heavily rate-limited (100-200 req/min)
  • Admin endpoints: Global admin rate limits (500 req/min)

Privacy

  • IP addresses hashed before storage (SHA-256, one-way)
  • User agents truncated (version numbers removed)
  • GDPR-compliant data retention (90-day default)
  • User consent required for registered user tracking

Input Validation

  • Zod schemas for all request bodies
  • Fastify schema validation
  • Sanitized error messages (no stack traces in production)

Deployment

Docker Configuration

Media API Container:

media-api:
  build:
    context: ./api
    dockerfile: Dockerfile.media
  ports:
    - "4100:4100"
  environment:
    - DATABASE_URL=${V2_POSTGRES_URL}
    - REDIS_URL=${REDIS_URL}
    - VIDEO_ANALYTICS_RETENTION_DAYS=90
  depends_on:
    - v2-postgres
    - redis
  volumes:
    - ./media:/media:ro  # Read-only video library

Nginx Routing:

# Media API
location /api/ {
    proxy_pass http://media-api:4100;
    proxy_set_header Host $host;
    proxy_set_header X-Real-IP $remote_addr;
    proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}

Health Checks

// Health check endpoint
fastify.get('/health', async (request, reply) => {
  return {
    status: 'ok',
    timestamp: new Date().toISOString(),
    uptime: process.uptime(),
    database: await checkDatabaseConnection(),
    redis: await checkRedisConnection(),
    queue: await scheduleQueueService.getQueueStatus(),
  };
});

Monitoring

Prometheus Metrics:

// Custom metrics
const videoViewsCounter = new Counter({
  name: 'cm_video_views_total',
  help: 'Total video views recorded',
});

const videoWatchTimeHistogram = new Histogram({
  name: 'cm_video_watch_time_seconds',
  help: 'Video watch time distribution',
  buckets: [10, 30, 60, 120, 300, 600, 1800],  // seconds
});

const scheduleQueueGauge = new Gauge({
  name: 'cm_schedule_queue_size',
  help: 'Number of pending scheduled actions',
});

Testing

Unit Tests (Jest)

// video-analytics.service.spec.ts
describe('VideoAnalyticsService', () => {
  it('should hash IP addresses', () => {
    const service = new VideoAnalyticsService();
    const hash = service['hashIpAddress']('192.168.1.100');
    expect(hash).toHaveLength(64);  // SHA-256 = 64 hex chars
    expect(hash).not.toContain('192.168');
  });

  it('should aggregate analytics correctly', async () => {
    // Create test views
    await prisma.videoView.createMany({
      data: [
        { videoId: 1, watchTimeSeconds: 100, completed: true },
        { videoId: 1, watchTimeSeconds: 50, completed: false },
      ],
    });

    await service.aggregateVideoAnalytics(1);

    const video = await prisma.video.findUnique({ where: { id: 1 } });
    expect(video.uniqueViewers).toBe(2);
    expect(video.totalWatchTimeSeconds).toBe(150);
    expect(video.averageWatchTimeSeconds).toBeCloseTo(75);
    expect(video.completionRate).toBeCloseTo(50);
  });
});

Integration Tests

# Test tracking endpoints
./test-media-api.sh

Troubleshooting

Common Issues

Queue not processing jobs:

# Check Redis connection
docker compose logs redis

# Check queue status
docker compose exec media-api npm run queue:status

# Restart queue worker
docker compose restart media-api

Analytics not updating:

# Manually trigger aggregation
curl -X POST http://localhost:4100/api/videos/1/analytics/aggregate \
  -H "Authorization: Bearer $ADMIN_TOKEN"

High memory usage:

  • Check for memory leaks in video processing
  • Reduce VIDEO_ANALYTICS_RETENTION_DAYS
  • Clear old analytics data manually

Future Roadmap

Q2 2026:

  • Batch event submission endpoint
  • Real-time analytics dashboard
  • Automated thumbnail generation

Q3 2026:

  • Video transcoding pipeline
  • CDN integration
  • Live streaming support

Q4 2026:

  • AI-powered content recommendations
  • Automated video tagging
  • Advanced segmentation

Last Updated: February 2026 Version: 1.0 Maintainer: Changemaker Lite Development Team