"use strict"; Object.defineProperty(exports, "__esModule", { value: true }); exports.videoScheduleQueueService = void 0; const bullmq_1 = require("bullmq"); const env_1 = require("../config/env"); const database_1 = require("../config/database"); const logger_1 = require("../utils/logger"); class VideoScheduleQueueService { queue; worker = null; constructor() { this.queue = new bullmq_1.Queue('video-schedules', { connection: { url: env_1.env.REDIS_URL }, defaultJobOptions: { attempts: 3, backoff: { type: 'exponential', delay: 5000 }, removeOnComplete: { age: 7 * 24 * 60 * 60, count: 1000 }, // Keep 7 days removeOnFail: { age: 30 * 24 * 60 * 60 }, // Keep failed 30 days }, }); } /** * Start the worker to process scheduled publish/unpublish jobs */ startWorker() { this.worker = new bullmq_1.Worker('video-schedules', async (job) => { const { videoId, action, scheduledByUserId } = job.data; logger_1.logger.info(`Processing video schedule job ${job.id}`, { videoId, action }); try { // Update video publish status const updates = {}; if (action === 'publish') { updates.isPublished = true; updates.publishedAt = new Date(); updates.scheduledPublishAt = null; // Clear scheduled time } else { updates.isPublished = false; updates.publishedAt = null; updates.scheduledUnpublishAt = null; // Clear scheduled time } await database_1.prisma.video.update({ where: { id: videoId }, data: updates, }); // Record in schedule history await database_1.prisma.videoScheduleHistory.create({ data: { videoId, action, scheduledFor: job.data.scheduledFor, executedAt: new Date(), status: 'completed', scheduledByUserId, }, }); logger_1.logger.info(`Successfully ${action}ed video ${videoId}`); return { success: true, action, videoId }; } catch (error) { logger_1.logger.error(`Failed to ${action} video ${videoId}`, { error }); // Record failure in history await database_1.prisma.videoScheduleHistory.create({ data: { videoId, action, scheduledFor: job.data.scheduledFor, status: 'failed', error: error instanceof Error ? error.message : 'Unknown error', scheduledByUserId, }, }); throw error; } }, { connection: { url: env_1.env.REDIS_URL }, concurrency: 1, // Process one at a time for safety }); this.worker.on('completed', (job) => { logger_1.logger.info(`Video schedule job ${job.id} completed`); }); this.worker.on('failed', (job, err) => { logger_1.logger.error(`Video schedule job ${job?.id} failed: ${err.message}`); }); logger_1.logger.info('Video schedule queue worker started'); } /** * Schedule a video to be published at a specific time */ async schedulePublish(videoId, publishAt, scheduledByUserId) { try { // Calculate delay until publish time const now = new Date(); const delay = publishAt.getTime() - now.getTime(); if (delay < 0) { throw new Error('Publish time must be in the future'); } // Cancel any existing publish job for this video await this.cancelSchedule(videoId, 'publish'); // Create job data const jobData = { videoId, action: 'publish', scheduledFor: publishAt, scheduledByUserId, }; // Add job with delay const job = await this.queue.add(`publish-${videoId}`, jobData, { delay, jobId: `publish-${videoId}-${publishAt.getTime()}`, }); // Update video with scheduled publish time await database_1.prisma.video.update({ where: { id: videoId }, data: { scheduledPublishAt: publishAt, }, }); // Record in history as pending await database_1.prisma.videoScheduleHistory.create({ data: { videoId, action: 'publish', scheduledFor: publishAt, status: 'pending', scheduledByUserId, }, }); logger_1.logger.info(`Scheduled video ${videoId} to publish at ${publishAt.toISOString()}`, { jobId: job.id, }); return { jobId: job.id, scheduledFor: publishAt }; } catch (error) { logger_1.logger.error('Failed to schedule video publish', { error, videoId }); throw error; } } /** * Schedule a video to be unpublished at a specific time */ async scheduleUnpublish(videoId, unpublishAt, scheduledByUserId) { try { // Calculate delay until unpublish time const now = new Date(); const delay = unpublishAt.getTime() - now.getTime(); if (delay < 0) { throw new Error('Unpublish time must be in the future'); } // Cancel any existing unpublish job for this video await this.cancelSchedule(videoId, 'unpublish'); // Create job data const jobData = { videoId, action: 'unpublish', scheduledFor: unpublishAt, scheduledByUserId, }; // Add job with delay const job = await this.queue.add(`unpublish-${videoId}`, jobData, { delay, jobId: `unpublish-${videoId}-${unpublishAt.getTime()}`, }); // Update video with scheduled unpublish time await database_1.prisma.video.update({ where: { id: videoId }, data: { scheduledUnpublishAt: unpublishAt, }, }); // Record in history as pending await database_1.prisma.videoScheduleHistory.create({ data: { videoId, action: 'unpublish', scheduledFor: unpublishAt, status: 'pending', scheduledByUserId, }, }); logger_1.logger.info(`Scheduled video ${videoId} to unpublish at ${unpublishAt.toISOString()}`, { jobId: job.id, }); return { jobId: job.id, scheduledFor: unpublishAt }; } catch (error) { logger_1.logger.error('Failed to schedule video unpublish', { error, videoId }); throw error; } } /** * Cancel a scheduled publish or unpublish */ async cancelSchedule(videoId, action) { try { // Find and remove pending jobs for this video and action const jobs = await this.queue.getJobs(['delayed', 'waiting']); const relevantJobs = jobs.filter((job) => job.name === `${action}-${videoId}` && job.data.videoId === videoId); for (const job of relevantJobs) { await job.remove(); logger_1.logger.info(`Cancelled ${action} job for video ${videoId}`, { jobId: job.id }); // Update history to cancelled await database_1.prisma.videoScheduleHistory.updateMany({ where: { videoId, action, status: 'pending', }, data: { status: 'cancelled', }, }); } // Clear scheduled time from video const updates = {}; if (action === 'publish') { updates.scheduledPublishAt = null; } else { updates.scheduledUnpublishAt = null; } await database_1.prisma.video.update({ where: { id: videoId }, data: updates, }); } catch (error) { logger_1.logger.error(`Failed to cancel ${action} schedule`, { error, videoId }); throw error; } } /** * Get upcoming scheduled publish/unpublish operations */ async getUpcomingSchedules(limit = 50) { try { // Get delayed and waiting jobs const jobs = await this.queue.getJobs(['delayed', 'waiting'], 0, limit); const schedules = await Promise.all(jobs.map(async (job) => { const video = await database_1.prisma.video.findUnique({ where: { id: job.data.videoId }, select: { id: true, title: true }, }); return { jobId: job.id, videoId: job.data.videoId, videoTitle: video?.title || null, action: job.data.action, scheduledFor: new Date(job.data.scheduledFor), status: await job.getState(), }; })); return schedules.sort((a, b) => a.scheduledFor.getTime() - b.scheduledFor.getTime()); } catch (error) { logger_1.logger.error('Failed to get upcoming schedules', { error }); throw error; } } /** * Get schedule history for a video */ async getScheduleHistory(videoId, limit = 10) { return database_1.prisma.videoScheduleHistory.findMany({ where: { videoId }, orderBy: { createdAt: 'desc' }, take: limit, include: { scheduledBy: { select: { id: true, name: true, email: true, }, }, }, }); } /** * Get queue stats */ async getStats() { const [waiting, active, delayed, completed, failed] = await Promise.all([ this.queue.getWaitingCount(), this.queue.getActiveCount(), this.queue.getDelayedCount(), this.queue.getCompletedCount(), this.queue.getFailedCount(), ]); return { waiting, active, delayed, completed, failed }; } /** * Pause the queue */ async pause() { await this.queue.pause(); logger_1.logger.info('Video schedule queue paused'); } /** * Resume the queue */ async resume() { await this.queue.resume(); logger_1.logger.info('Video schedule queue resumed'); } /** * Clean up old completed and failed jobs */ async cleanup() { const cleaned = await this.queue.clean(7 * 24 * 60 * 60 * 1000, 1000); // 7 days logger_1.logger.info(`Cleaned ${cleaned.length} old jobs from video schedule queue`); return cleaned.length; } /** * Close queue and worker */ async close() { if (this.worker) { await this.worker.close(); } await this.queue.close(); logger_1.logger.info('Video schedule queue closed'); } } exports.videoScheduleQueueService = new VideoScheduleQueueService(); //# sourceMappingURL=video-schedule-queue.service.js.map