changemaker.lite/api/dist/services/video-schedule-queue.service.js

326 lines
12 KiB
JavaScript

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.videoScheduleQueueService = void 0;
const bullmq_1 = require("bullmq");
const env_1 = require("../config/env");
const database_1 = require("../config/database");
const logger_1 = require("../utils/logger");
class VideoScheduleQueueService {
queue;
worker = null;
constructor() {
this.queue = new bullmq_1.Queue('video-schedules', {
connection: { url: env_1.env.REDIS_URL },
defaultJobOptions: {
attempts: 3,
backoff: { type: 'exponential', delay: 5000 },
removeOnComplete: { age: 7 * 24 * 60 * 60, count: 1000 }, // Keep 7 days
removeOnFail: { age: 30 * 24 * 60 * 60 }, // Keep failed 30 days
},
});
}
/**
* Start the worker to process scheduled publish/unpublish jobs
*/
startWorker() {
this.worker = new bullmq_1.Worker('video-schedules', async (job) => {
const { videoId, action, scheduledByUserId } = job.data;
logger_1.logger.info(`Processing video schedule job ${job.id}`, { videoId, action });
try {
// Update video publish status
const updates = {};
if (action === 'publish') {
updates.isPublished = true;
updates.publishedAt = new Date();
updates.scheduledPublishAt = null; // Clear scheduled time
}
else {
updates.isPublished = false;
updates.publishedAt = null;
updates.scheduledUnpublishAt = null; // Clear scheduled time
}
await database_1.prisma.video.update({
where: { id: videoId },
data: updates,
});
// Record in schedule history
await database_1.prisma.videoScheduleHistory.create({
data: {
videoId,
action,
scheduledFor: job.data.scheduledFor,
executedAt: new Date(),
status: 'completed',
scheduledByUserId,
},
});
logger_1.logger.info(`Successfully ${action}ed video ${videoId}`);
return { success: true, action, videoId };
}
catch (error) {
logger_1.logger.error(`Failed to ${action} video ${videoId}`, { error });
// Record failure in history
await database_1.prisma.videoScheduleHistory.create({
data: {
videoId,
action,
scheduledFor: job.data.scheduledFor,
status: 'failed',
error: error instanceof Error ? error.message : 'Unknown error',
scheduledByUserId,
},
});
throw error;
}
}, {
connection: { url: env_1.env.REDIS_URL },
concurrency: 1, // Process one at a time for safety
});
this.worker.on('completed', (job) => {
logger_1.logger.info(`Video schedule job ${job.id} completed`);
});
this.worker.on('failed', (job, err) => {
logger_1.logger.error(`Video schedule job ${job?.id} failed: ${err.message}`);
});
logger_1.logger.info('Video schedule queue worker started');
}
/**
* Schedule a video to be published at a specific time
*/
async schedulePublish(videoId, publishAt, scheduledByUserId) {
try {
// Calculate delay until publish time
const now = new Date();
const delay = publishAt.getTime() - now.getTime();
if (delay < 0) {
throw new Error('Publish time must be in the future');
}
// Cancel any existing publish job for this video
await this.cancelSchedule(videoId, 'publish');
// Create job data
const jobData = {
videoId,
action: 'publish',
scheduledFor: publishAt,
scheduledByUserId,
};
// Add job with delay
const job = await this.queue.add(`publish-${videoId}`, jobData, {
delay,
jobId: `publish-${videoId}-${publishAt.getTime()}`,
});
// Update video with scheduled publish time
await database_1.prisma.video.update({
where: { id: videoId },
data: {
scheduledPublishAt: publishAt,
},
});
// Record in history as pending
await database_1.prisma.videoScheduleHistory.create({
data: {
videoId,
action: 'publish',
scheduledFor: publishAt,
status: 'pending',
scheduledByUserId,
},
});
logger_1.logger.info(`Scheduled video ${videoId} to publish at ${publishAt.toISOString()}`, {
jobId: job.id,
});
return { jobId: job.id, scheduledFor: publishAt };
}
catch (error) {
logger_1.logger.error('Failed to schedule video publish', { error, videoId });
throw error;
}
}
/**
* Schedule a video to be unpublished at a specific time
*/
async scheduleUnpublish(videoId, unpublishAt, scheduledByUserId) {
try {
// Calculate delay until unpublish time
const now = new Date();
const delay = unpublishAt.getTime() - now.getTime();
if (delay < 0) {
throw new Error('Unpublish time must be in the future');
}
// Cancel any existing unpublish job for this video
await this.cancelSchedule(videoId, 'unpublish');
// Create job data
const jobData = {
videoId,
action: 'unpublish',
scheduledFor: unpublishAt,
scheduledByUserId,
};
// Add job with delay
const job = await this.queue.add(`unpublish-${videoId}`, jobData, {
delay,
jobId: `unpublish-${videoId}-${unpublishAt.getTime()}`,
});
// Update video with scheduled unpublish time
await database_1.prisma.video.update({
where: { id: videoId },
data: {
scheduledUnpublishAt: unpublishAt,
},
});
// Record in history as pending
await database_1.prisma.videoScheduleHistory.create({
data: {
videoId,
action: 'unpublish',
scheduledFor: unpublishAt,
status: 'pending',
scheduledByUserId,
},
});
logger_1.logger.info(`Scheduled video ${videoId} to unpublish at ${unpublishAt.toISOString()}`, {
jobId: job.id,
});
return { jobId: job.id, scheduledFor: unpublishAt };
}
catch (error) {
logger_1.logger.error('Failed to schedule video unpublish', { error, videoId });
throw error;
}
}
/**
* Cancel a scheduled publish or unpublish
*/
async cancelSchedule(videoId, action) {
try {
// Find and remove pending jobs for this video and action
const jobs = await this.queue.getJobs(['delayed', 'waiting']);
const relevantJobs = jobs.filter((job) => job.name === `${action}-${videoId}` && job.data.videoId === videoId);
for (const job of relevantJobs) {
await job.remove();
logger_1.logger.info(`Cancelled ${action} job for video ${videoId}`, { jobId: job.id });
// Update history to cancelled
await database_1.prisma.videoScheduleHistory.updateMany({
where: {
videoId,
action,
status: 'pending',
},
data: {
status: 'cancelled',
},
});
}
// Clear scheduled time from video
const updates = {};
if (action === 'publish') {
updates.scheduledPublishAt = null;
}
else {
updates.scheduledUnpublishAt = null;
}
await database_1.prisma.video.update({
where: { id: videoId },
data: updates,
});
}
catch (error) {
logger_1.logger.error(`Failed to cancel ${action} schedule`, { error, videoId });
throw error;
}
}
/**
* Get upcoming scheduled publish/unpublish operations
*/
async getUpcomingSchedules(limit = 50) {
try {
// Get delayed and waiting jobs
const jobs = await this.queue.getJobs(['delayed', 'waiting'], 0, limit);
const schedules = await Promise.all(jobs.map(async (job) => {
const video = await database_1.prisma.video.findUnique({
where: { id: job.data.videoId },
select: { id: true, title: true },
});
return {
jobId: job.id,
videoId: job.data.videoId,
videoTitle: video?.title || null,
action: job.data.action,
scheduledFor: new Date(job.data.scheduledFor),
status: await job.getState(),
};
}));
return schedules.sort((a, b) => a.scheduledFor.getTime() - b.scheduledFor.getTime());
}
catch (error) {
logger_1.logger.error('Failed to get upcoming schedules', { error });
throw error;
}
}
/**
* Get schedule history for a video
*/
async getScheduleHistory(videoId, limit = 10) {
return database_1.prisma.videoScheduleHistory.findMany({
where: { videoId },
orderBy: { createdAt: 'desc' },
take: limit,
include: {
scheduledBy: {
select: {
id: true,
name: true,
email: true,
},
},
},
});
}
/**
* Get queue stats
*/
async getStats() {
const [waiting, active, delayed, completed, failed] = await Promise.all([
this.queue.getWaitingCount(),
this.queue.getActiveCount(),
this.queue.getDelayedCount(),
this.queue.getCompletedCount(),
this.queue.getFailedCount(),
]);
return { waiting, active, delayed, completed, failed };
}
/**
* Pause the queue
*/
async pause() {
await this.queue.pause();
logger_1.logger.info('Video schedule queue paused');
}
/**
* Resume the queue
*/
async resume() {
await this.queue.resume();
logger_1.logger.info('Video schedule queue resumed');
}
/**
* Clean up old completed and failed jobs
*/
async cleanup() {
const cleaned = await this.queue.clean(7 * 24 * 60 * 60 * 1000, 1000); // 7 days
logger_1.logger.info(`Cleaned ${cleaned.length} old jobs from video schedule queue`);
return cleaned.length;
}
/**
* Close queue and worker
*/
async close() {
if (this.worker) {
await this.worker.close();
}
await this.queue.close();
logger_1.logger.info('Video schedule queue closed');
}
}
exports.videoScheduleQueueService = new VideoScheduleQueueService();
//# sourceMappingURL=video-schedule-queue.service.js.map