102 lines
4.0 KiB
JavaScript
102 lines
4.0 KiB
JavaScript
"use strict";
|
|
Object.defineProperty(exports, "__esModule", { value: true });
|
|
exports.emailQueueService = void 0;
|
|
const bullmq_1 = require("bullmq");
|
|
const client_1 = require("@prisma/client");
|
|
const env_1 = require("../config/env");
|
|
const database_1 = require("../config/database");
|
|
const logger_1 = require("../utils/logger");
|
|
const email_service_1 = require("./email.service");
|
|
const metrics_1 = require("../utils/metrics");
|
|
class EmailQueueService {
|
|
queue;
|
|
worker = null;
|
|
constructor() {
|
|
this.queue = new bullmq_1.Queue('campaign-emails', {
|
|
connection: { url: env_1.env.REDIS_URL },
|
|
defaultJobOptions: {
|
|
attempts: 3,
|
|
backoff: { type: 'exponential', delay: 2000 },
|
|
removeOnComplete: { age: 24 * 60 * 60, count: 1000 },
|
|
removeOnFail: { age: 7 * 24 * 60 * 60 },
|
|
},
|
|
});
|
|
}
|
|
startWorker() {
|
|
this.worker = new bullmq_1.Worker('campaign-emails', async (job) => {
|
|
const { campaignEmailId, ...emailData } = job.data;
|
|
const endTimer = metrics_1.emailSendDuration.startTimer();
|
|
logger_1.logger.info(`Processing email job ${job.id} for campaign-email ${campaignEmailId}`);
|
|
// Look up campaign ID for metrics
|
|
const emailRecord = await database_1.prisma.campaignEmail.findUnique({
|
|
where: { id: campaignEmailId },
|
|
select: { campaignId: true },
|
|
});
|
|
const campaignId = emailRecord?.campaignId || 'unknown';
|
|
const result = await email_service_1.emailService.sendCampaignEmail(emailData);
|
|
endTimer();
|
|
const status = result.success
|
|
? client_1.CampaignEmailStatus.SENT
|
|
: client_1.CampaignEmailStatus.FAILED;
|
|
await database_1.prisma.campaignEmail.update({
|
|
where: { id: campaignEmailId },
|
|
data: { status },
|
|
});
|
|
if (result.success) {
|
|
(0, metrics_1.recordEmailSent)(campaignId);
|
|
}
|
|
else {
|
|
(0, metrics_1.recordEmailFailed)(campaignId, 'send_failure');
|
|
throw new Error(`Failed to send email to ${emailData.recipientEmail}`);
|
|
}
|
|
return { messageId: result.messageId, testMode: result.testMode };
|
|
}, {
|
|
connection: { url: env_1.env.REDIS_URL },
|
|
concurrency: 3,
|
|
});
|
|
this.worker.on('completed', (job) => {
|
|
logger_1.logger.info(`Email job ${job.id} completed`);
|
|
});
|
|
this.worker.on('failed', (job, err) => {
|
|
logger_1.logger.error(`Email job ${job?.id} failed: ${err.message}`);
|
|
});
|
|
logger_1.logger.info('Email queue worker started');
|
|
}
|
|
async addCampaignEmail(data) {
|
|
const job = await this.queue.add('campaign', data);
|
|
return job.id;
|
|
}
|
|
async getStats() {
|
|
const [waiting, active, completed, failed, paused] = await Promise.all([
|
|
this.queue.getWaitingCount(),
|
|
this.queue.getActiveCount(),
|
|
this.queue.getCompletedCount(),
|
|
this.queue.getFailedCount(),
|
|
this.queue.isPaused(),
|
|
]);
|
|
(0, metrics_1.setEmailQueueSize)(waiting + active);
|
|
return { waiting, active, completed, failed, paused };
|
|
}
|
|
async pause() {
|
|
await this.queue.pause();
|
|
logger_1.logger.info('Email queue paused');
|
|
}
|
|
async resume() {
|
|
await this.queue.resume();
|
|
logger_1.logger.info('Email queue resumed');
|
|
}
|
|
async clean(grace = 24 * 60 * 60 * 1000) {
|
|
const cleaned = await this.queue.clean(grace, 1000, 'completed');
|
|
logger_1.logger.info(`Cleaned ${cleaned.length} completed jobs`);
|
|
return cleaned.length;
|
|
}
|
|
async close() {
|
|
if (this.worker) {
|
|
await this.worker.close();
|
|
}
|
|
await this.queue.close();
|
|
logger_1.logger.info('Email queue closed');
|
|
}
|
|
}
|
|
exports.emailQueueService = new EmailQueueService();
|
|
//# sourceMappingURL=email-queue.service.js.map
|