import { Queue, Worker, type Job } from 'bullmq'; import { env } from '../config/env'; import { prisma } from '../config/database'; import { logger } from '../utils/logger'; interface VideoScheduleJobData { videoId: number; action: 'publish' | 'unpublish'; scheduledFor: Date; scheduledByUserId: string; } class VideoScheduleQueueService { private queue: Queue; private worker: Worker | null = null; constructor() { this.queue = new Queue('video-schedules', { connection: { url: env.REDIS_URL }, defaultJobOptions: { attempts: 3, backoff: { type: 'exponential', delay: 5000 }, removeOnComplete: { age: 7 * 24 * 60 * 60, count: 1000 }, // Keep 7 days removeOnFail: { age: 30 * 24 * 60 * 60 }, // Keep failed 30 days }, }); } /** * Start the worker to process scheduled publish/unpublish jobs */ startWorker() { this.worker = new Worker( 'video-schedules', async (job: Job) => { const { videoId, action, scheduledByUserId } = job.data; logger.info(`Processing video schedule job ${job.id}`, { videoId, action }); try { // Update video publish status const updates: any = {}; if (action === 'publish') { updates.isPublished = true; updates.publishedAt = new Date(); updates.scheduledPublishAt = null; // Clear scheduled time } else { updates.isPublished = false; updates.publishedAt = null; updates.scheduledUnpublishAt = null; // Clear scheduled time } await prisma.video.update({ where: { id: videoId }, data: updates, }); // Record in schedule history await prisma.videoScheduleHistory.create({ data: { videoId, action, scheduledFor: job.data.scheduledFor, executedAt: new Date(), status: 'completed', scheduledByUserId, }, }); logger.info(`Successfully ${action}ed video ${videoId}`); return { success: true, action, videoId }; } catch (error) { logger.error(`Failed to ${action} video ${videoId}`, { error }); // Record failure in history await prisma.videoScheduleHistory.create({ data: { videoId, action, scheduledFor: job.data.scheduledFor, status: 'failed', error: error instanceof Error ? error.message : 'Unknown error', scheduledByUserId, }, }); throw error; } }, { connection: { url: env.REDIS_URL }, concurrency: 1, // Process one at a time for safety } ); this.worker.on('completed', (job) => { logger.info(`Video schedule job ${job.id} completed`); }); this.worker.on('failed', (job, err) => { logger.error(`Video schedule job ${job?.id} failed: ${err.message}`); }); logger.info('Video schedule queue worker started'); } /** * Schedule a video to be published at a specific time */ async schedulePublish( videoId: number, publishAt: Date, scheduledByUserId: string ): Promise<{ jobId: string; scheduledFor: Date }> { try { // Calculate delay until publish time const now = new Date(); const delay = publishAt.getTime() - now.getTime(); if (delay < 0) { throw new Error('Publish time must be in the future'); } // Cancel any existing publish job for this video await this.cancelSchedule(videoId, 'publish'); // Create job data const jobData: VideoScheduleJobData = { videoId, action: 'publish', scheduledFor: publishAt, scheduledByUserId, }; // Add job with delay const job = await this.queue.add(`publish-${videoId}`, jobData, { delay, jobId: `publish-${videoId}-${publishAt.getTime()}`, }); // Update video with scheduled publish time await prisma.video.update({ where: { id: videoId }, data: { scheduledPublishAt: publishAt, }, }); // Record in history as pending await prisma.videoScheduleHistory.create({ data: { videoId, action: 'publish', scheduledFor: publishAt, status: 'pending', scheduledByUserId, }, }); logger.info(`Scheduled video ${videoId} to publish at ${publishAt.toISOString()}`, { jobId: job.id, }); return { jobId: job.id!, scheduledFor: publishAt }; } catch (error) { logger.error('Failed to schedule video publish', { error, videoId }); throw error; } } /** * Schedule a video to be unpublished at a specific time */ async scheduleUnpublish( videoId: number, unpublishAt: Date, scheduledByUserId: string ): Promise<{ jobId: string; scheduledFor: Date }> { try { // Calculate delay until unpublish time const now = new Date(); const delay = unpublishAt.getTime() - now.getTime(); if (delay < 0) { throw new Error('Unpublish time must be in the future'); } // Cancel any existing unpublish job for this video await this.cancelSchedule(videoId, 'unpublish'); // Create job data const jobData: VideoScheduleJobData = { videoId, action: 'unpublish', scheduledFor: unpublishAt, scheduledByUserId, }; // Add job with delay const job = await this.queue.add(`unpublish-${videoId}`, jobData, { delay, jobId: `unpublish-${videoId}-${unpublishAt.getTime()}`, }); // Update video with scheduled unpublish time await prisma.video.update({ where: { id: videoId }, data: { scheduledUnpublishAt: unpublishAt, }, }); // Record in history as pending await prisma.videoScheduleHistory.create({ data: { videoId, action: 'unpublish', scheduledFor: unpublishAt, status: 'pending', scheduledByUserId, }, }); logger.info(`Scheduled video ${videoId} to unpublish at ${unpublishAt.toISOString()}`, { jobId: job.id, }); return { jobId: job.id!, scheduledFor: unpublishAt }; } catch (error) { logger.error('Failed to schedule video unpublish', { error, videoId }); throw error; } } /** * Cancel a scheduled publish or unpublish */ async cancelSchedule(videoId: number, action: 'publish' | 'unpublish'): Promise { try { // Find and remove pending jobs for this video and action const jobs = await this.queue.getJobs(['delayed', 'waiting']); const relevantJobs = jobs.filter( (job) => job.name === `${action}-${videoId}` && job.data.videoId === videoId ); for (const job of relevantJobs) { await job.remove(); logger.info(`Cancelled ${action} job for video ${videoId}`, { jobId: job.id }); // Update history to cancelled await prisma.videoScheduleHistory.updateMany({ where: { videoId, action, status: 'pending', }, data: { status: 'cancelled', }, }); } // Clear scheduled time from video const updates: any = {}; if (action === 'publish') { updates.scheduledPublishAt = null; } else { updates.scheduledUnpublishAt = null; } await prisma.video.update({ where: { id: videoId }, data: updates, }); } catch (error) { logger.error(`Failed to cancel ${action} schedule`, { error, videoId }); throw error; } } /** * Get upcoming scheduled publish/unpublish operations */ async getUpcomingSchedules(limit: number = 50): Promise< Array<{ jobId: string; videoId: number; videoTitle: string | null; action: 'publish' | 'unpublish'; scheduledFor: Date; status: string; }> > { try { // Get delayed and waiting jobs const jobs = await this.queue.getJobs(['delayed', 'waiting'], 0, limit); const schedules = await Promise.all( jobs.map(async (job) => { const video = await prisma.video.findUnique({ where: { id: job.data.videoId }, select: { id: true, title: true }, }); return { jobId: job.id!, videoId: job.data.videoId, videoTitle: video?.title || null, action: job.data.action, scheduledFor: new Date(job.data.scheduledFor), status: await job.getState(), }; }) ); return schedules.sort((a, b) => a.scheduledFor.getTime() - b.scheduledFor.getTime()); } catch (error) { logger.error('Failed to get upcoming schedules', { error }); throw error; } } /** * Get schedule history for a video */ async getScheduleHistory(videoId: number, limit: number = 10) { return prisma.videoScheduleHistory.findMany({ where: { videoId }, orderBy: { createdAt: 'desc' }, take: limit, include: { scheduledBy: { select: { id: true, name: true, email: true, }, }, }, }); } /** * Get queue stats */ async getStats() { const [waiting, active, delayed, completed, failed] = await Promise.all([ this.queue.getWaitingCount(), this.queue.getActiveCount(), this.queue.getDelayedCount(), this.queue.getCompletedCount(), this.queue.getFailedCount(), ]); return { waiting, active, delayed, completed, failed }; } /** * Pause the queue */ async pause() { await this.queue.pause(); logger.info('Video schedule queue paused'); } /** * Resume the queue */ async resume() { await this.queue.resume(); logger.info('Video schedule queue resumed'); } /** * Clean up old completed and failed jobs */ async cleanup() { const cleaned = await this.queue.clean(7 * 24 * 60 * 60 * 1000, 1000); // 7 days logger.info(`Cleaned ${cleaned.length} old jobs from video schedule queue`); return cleaned.length; } /** * Close queue and worker */ async close() { if (this.worker) { await this.worker.close(); } await this.queue.close(); logger.info('Video schedule queue closed'); } } export const videoScheduleQueueService = new VideoScheduleQueueService();