326 lines
12 KiB
JavaScript
326 lines
12 KiB
JavaScript
"use strict";
|
|
Object.defineProperty(exports, "__esModule", { value: true });
|
|
exports.videoScheduleQueueService = void 0;
|
|
const bullmq_1 = require("bullmq");
|
|
const env_1 = require("../config/env");
|
|
const database_1 = require("../config/database");
|
|
const logger_1 = require("../utils/logger");
|
|
class VideoScheduleQueueService {
|
|
queue;
|
|
worker = null;
|
|
constructor() {
|
|
this.queue = new bullmq_1.Queue('video-schedules', {
|
|
connection: { url: env_1.env.REDIS_URL },
|
|
defaultJobOptions: {
|
|
attempts: 3,
|
|
backoff: { type: 'exponential', delay: 5000 },
|
|
removeOnComplete: { age: 7 * 24 * 60 * 60, count: 1000 }, // Keep 7 days
|
|
removeOnFail: { age: 30 * 24 * 60 * 60 }, // Keep failed 30 days
|
|
},
|
|
});
|
|
}
|
|
/**
|
|
* Start the worker to process scheduled publish/unpublish jobs
|
|
*/
|
|
startWorker() {
|
|
this.worker = new bullmq_1.Worker('video-schedules', async (job) => {
|
|
const { videoId, action, scheduledByUserId } = job.data;
|
|
logger_1.logger.info(`Processing video schedule job ${job.id}`, { videoId, action });
|
|
try {
|
|
// Update video publish status
|
|
const updates = {};
|
|
if (action === 'publish') {
|
|
updates.isPublished = true;
|
|
updates.publishedAt = new Date();
|
|
updates.scheduledPublishAt = null; // Clear scheduled time
|
|
}
|
|
else {
|
|
updates.isPublished = false;
|
|
updates.publishedAt = null;
|
|
updates.scheduledUnpublishAt = null; // Clear scheduled time
|
|
}
|
|
await database_1.prisma.video.update({
|
|
where: { id: videoId },
|
|
data: updates,
|
|
});
|
|
// Record in schedule history
|
|
await database_1.prisma.videoScheduleHistory.create({
|
|
data: {
|
|
videoId,
|
|
action,
|
|
scheduledFor: job.data.scheduledFor,
|
|
executedAt: new Date(),
|
|
status: 'completed',
|
|
scheduledByUserId,
|
|
},
|
|
});
|
|
logger_1.logger.info(`Successfully ${action}ed video ${videoId}`);
|
|
return { success: true, action, videoId };
|
|
}
|
|
catch (error) {
|
|
logger_1.logger.error(`Failed to ${action} video ${videoId}`, { error });
|
|
// Record failure in history
|
|
await database_1.prisma.videoScheduleHistory.create({
|
|
data: {
|
|
videoId,
|
|
action,
|
|
scheduledFor: job.data.scheduledFor,
|
|
status: 'failed',
|
|
error: error instanceof Error ? error.message : 'Unknown error',
|
|
scheduledByUserId,
|
|
},
|
|
});
|
|
throw error;
|
|
}
|
|
}, {
|
|
connection: { url: env_1.env.REDIS_URL },
|
|
concurrency: 1, // Process one at a time for safety
|
|
});
|
|
this.worker.on('completed', (job) => {
|
|
logger_1.logger.info(`Video schedule job ${job.id} completed`);
|
|
});
|
|
this.worker.on('failed', (job, err) => {
|
|
logger_1.logger.error(`Video schedule job ${job?.id} failed: ${err.message}`);
|
|
});
|
|
logger_1.logger.info('Video schedule queue worker started');
|
|
}
|
|
/**
|
|
* Schedule a video to be published at a specific time
|
|
*/
|
|
async schedulePublish(videoId, publishAt, scheduledByUserId) {
|
|
try {
|
|
// Calculate delay until publish time
|
|
const now = new Date();
|
|
const delay = publishAt.getTime() - now.getTime();
|
|
if (delay < 0) {
|
|
throw new Error('Publish time must be in the future');
|
|
}
|
|
// Cancel any existing publish job for this video
|
|
await this.cancelSchedule(videoId, 'publish');
|
|
// Create job data
|
|
const jobData = {
|
|
videoId,
|
|
action: 'publish',
|
|
scheduledFor: publishAt,
|
|
scheduledByUserId,
|
|
};
|
|
// Add job with delay
|
|
const job = await this.queue.add(`publish-${videoId}`, jobData, {
|
|
delay,
|
|
jobId: `publish-${videoId}-${publishAt.getTime()}`,
|
|
});
|
|
// Update video with scheduled publish time
|
|
await database_1.prisma.video.update({
|
|
where: { id: videoId },
|
|
data: {
|
|
scheduledPublishAt: publishAt,
|
|
},
|
|
});
|
|
// Record in history as pending
|
|
await database_1.prisma.videoScheduleHistory.create({
|
|
data: {
|
|
videoId,
|
|
action: 'publish',
|
|
scheduledFor: publishAt,
|
|
status: 'pending',
|
|
scheduledByUserId,
|
|
},
|
|
});
|
|
logger_1.logger.info(`Scheduled video ${videoId} to publish at ${publishAt.toISOString()}`, {
|
|
jobId: job.id,
|
|
});
|
|
return { jobId: job.id, scheduledFor: publishAt };
|
|
}
|
|
catch (error) {
|
|
logger_1.logger.error('Failed to schedule video publish', { error, videoId });
|
|
throw error;
|
|
}
|
|
}
|
|
/**
|
|
* Schedule a video to be unpublished at a specific time
|
|
*/
|
|
async scheduleUnpublish(videoId, unpublishAt, scheduledByUserId) {
|
|
try {
|
|
// Calculate delay until unpublish time
|
|
const now = new Date();
|
|
const delay = unpublishAt.getTime() - now.getTime();
|
|
if (delay < 0) {
|
|
throw new Error('Unpublish time must be in the future');
|
|
}
|
|
// Cancel any existing unpublish job for this video
|
|
await this.cancelSchedule(videoId, 'unpublish');
|
|
// Create job data
|
|
const jobData = {
|
|
videoId,
|
|
action: 'unpublish',
|
|
scheduledFor: unpublishAt,
|
|
scheduledByUserId,
|
|
};
|
|
// Add job with delay
|
|
const job = await this.queue.add(`unpublish-${videoId}`, jobData, {
|
|
delay,
|
|
jobId: `unpublish-${videoId}-${unpublishAt.getTime()}`,
|
|
});
|
|
// Update video with scheduled unpublish time
|
|
await database_1.prisma.video.update({
|
|
where: { id: videoId },
|
|
data: {
|
|
scheduledUnpublishAt: unpublishAt,
|
|
},
|
|
});
|
|
// Record in history as pending
|
|
await database_1.prisma.videoScheduleHistory.create({
|
|
data: {
|
|
videoId,
|
|
action: 'unpublish',
|
|
scheduledFor: unpublishAt,
|
|
status: 'pending',
|
|
scheduledByUserId,
|
|
},
|
|
});
|
|
logger_1.logger.info(`Scheduled video ${videoId} to unpublish at ${unpublishAt.toISOString()}`, {
|
|
jobId: job.id,
|
|
});
|
|
return { jobId: job.id, scheduledFor: unpublishAt };
|
|
}
|
|
catch (error) {
|
|
logger_1.logger.error('Failed to schedule video unpublish', { error, videoId });
|
|
throw error;
|
|
}
|
|
}
|
|
/**
|
|
* Cancel a scheduled publish or unpublish
|
|
*/
|
|
async cancelSchedule(videoId, action) {
|
|
try {
|
|
// Find and remove pending jobs for this video and action
|
|
const jobs = await this.queue.getJobs(['delayed', 'waiting']);
|
|
const relevantJobs = jobs.filter((job) => job.name === `${action}-${videoId}` && job.data.videoId === videoId);
|
|
for (const job of relevantJobs) {
|
|
await job.remove();
|
|
logger_1.logger.info(`Cancelled ${action} job for video ${videoId}`, { jobId: job.id });
|
|
// Update history to cancelled
|
|
await database_1.prisma.videoScheduleHistory.updateMany({
|
|
where: {
|
|
videoId,
|
|
action,
|
|
status: 'pending',
|
|
},
|
|
data: {
|
|
status: 'cancelled',
|
|
},
|
|
});
|
|
}
|
|
// Clear scheduled time from video
|
|
const updates = {};
|
|
if (action === 'publish') {
|
|
updates.scheduledPublishAt = null;
|
|
}
|
|
else {
|
|
updates.scheduledUnpublishAt = null;
|
|
}
|
|
await database_1.prisma.video.update({
|
|
where: { id: videoId },
|
|
data: updates,
|
|
});
|
|
}
|
|
catch (error) {
|
|
logger_1.logger.error(`Failed to cancel ${action} schedule`, { error, videoId });
|
|
throw error;
|
|
}
|
|
}
|
|
/**
|
|
* Get upcoming scheduled publish/unpublish operations
|
|
*/
|
|
async getUpcomingSchedules(limit = 50) {
|
|
try {
|
|
// Get delayed and waiting jobs
|
|
const jobs = await this.queue.getJobs(['delayed', 'waiting'], 0, limit);
|
|
const schedules = await Promise.all(jobs.map(async (job) => {
|
|
const video = await database_1.prisma.video.findUnique({
|
|
where: { id: job.data.videoId },
|
|
select: { id: true, title: true },
|
|
});
|
|
return {
|
|
jobId: job.id,
|
|
videoId: job.data.videoId,
|
|
videoTitle: video?.title || null,
|
|
action: job.data.action,
|
|
scheduledFor: new Date(job.data.scheduledFor),
|
|
status: await job.getState(),
|
|
};
|
|
}));
|
|
return schedules.sort((a, b) => a.scheduledFor.getTime() - b.scheduledFor.getTime());
|
|
}
|
|
catch (error) {
|
|
logger_1.logger.error('Failed to get upcoming schedules', { error });
|
|
throw error;
|
|
}
|
|
}
|
|
/**
|
|
* Get schedule history for a video
|
|
*/
|
|
async getScheduleHistory(videoId, limit = 10) {
|
|
return database_1.prisma.videoScheduleHistory.findMany({
|
|
where: { videoId },
|
|
orderBy: { createdAt: 'desc' },
|
|
take: limit,
|
|
include: {
|
|
scheduledBy: {
|
|
select: {
|
|
id: true,
|
|
name: true,
|
|
email: true,
|
|
},
|
|
},
|
|
},
|
|
});
|
|
}
|
|
/**
|
|
* Get queue stats
|
|
*/
|
|
async getStats() {
|
|
const [waiting, active, delayed, completed, failed] = await Promise.all([
|
|
this.queue.getWaitingCount(),
|
|
this.queue.getActiveCount(),
|
|
this.queue.getDelayedCount(),
|
|
this.queue.getCompletedCount(),
|
|
this.queue.getFailedCount(),
|
|
]);
|
|
return { waiting, active, delayed, completed, failed };
|
|
}
|
|
/**
|
|
* Pause the queue
|
|
*/
|
|
async pause() {
|
|
await this.queue.pause();
|
|
logger_1.logger.info('Video schedule queue paused');
|
|
}
|
|
/**
|
|
* Resume the queue
|
|
*/
|
|
async resume() {
|
|
await this.queue.resume();
|
|
logger_1.logger.info('Video schedule queue resumed');
|
|
}
|
|
/**
|
|
* Clean up old completed and failed jobs
|
|
*/
|
|
async cleanup() {
|
|
const cleaned = await this.queue.clean(7 * 24 * 60 * 60 * 1000, 1000); // 7 days
|
|
logger_1.logger.info(`Cleaned ${cleaned.length} old jobs from video schedule queue`);
|
|
return cleaned.length;
|
|
}
|
|
/**
|
|
* Close queue and worker
|
|
*/
|
|
async close() {
|
|
if (this.worker) {
|
|
await this.worker.close();
|
|
}
|
|
await this.queue.close();
|
|
logger_1.logger.info('Video schedule queue closed');
|
|
}
|
|
}
|
|
exports.videoScheduleQueueService = new VideoScheduleQueueService();
|
|
//# sourceMappingURL=video-schedule-queue.service.js.map
|