import { Queue, Worker, type Job } from 'bullmq'; import { SmsMessageStatus } from '@prisma/client'; import { env } from '../config/env'; import { prisma } from '../config/database'; import { logger } from '../utils/logger'; import { termuxClient } from './termux.client'; import { eventBus } from './event-bus.service'; export interface SmsJobData { recipientId: string; // empty string for notification jobs campaignId: string; // empty string for notification jobs phone: string; message: string; attemptNumber: number; } class SmsQueueService { private queue: Queue; private worker: Worker | null = null; constructor() { this.queue = new Queue('sms-campaigns', { connection: { url: env.REDIS_URL }, defaultJobOptions: { attempts: env.SMS_MAX_RETRIES, backoff: { type: 'exponential', delay: 2000 }, removeOnComplete: { age: 24 * 60 * 60, count: 1000 }, removeOnFail: { age: 7 * 24 * 60 * 60 }, }, }); } startWorker() { this.worker = new Worker( 'sms-campaigns', async (job: Job) => { const { recipientId, campaignId, phone, message } = job.data; const isNotification = !campaignId; logger.info(`Processing SMS job ${job.id}${isNotification ? ' (notification)' : ` for campaign ${campaignId}`}, phone ${phone}`); // For campaign jobs, check if campaign is still RUNNING (support pause) if (!isNotification) { const campaign = await prisma.smsCampaign.findUnique({ where: { id: campaignId }, select: { status: true }, }); if (!campaign || campaign.status !== 'RUNNING') { logger.info(`Campaign ${campaignId} is ${campaign?.status || 'deleted'}, skipping SMS to ${phone}`); return { skipped: true, reason: 'campaign_not_running' }; } } // Send SMS via Termux const result = await termuxClient.sendSms(phone, message); const status: SmsMessageStatus = result.success ? 'SENT' : 'FAILED'; // Update recipient status (campaign jobs only) if (!isNotification && recipientId) { await prisma.smsCampaignRecipient.update({ where: { id: recipientId }, data: { status, sentAt: result.success ? new Date() : undefined, errorMessage: result.error || undefined, }, }); } // Create SmsMessage record const smsMessage = await prisma.smsMessage.create({ data: { phone, message, direction: 'OUTBOUND', status, connectionType: 'termux', campaignId: campaignId || null, }, }); // Create or update conversation (campaign jobs use compound unique, notifications use phone-only) if (!isNotification) { const conversation = await prisma.smsConversation.upsert({ where: { phone_campaignId: { phone, campaignId } }, create: { phone, campaignId, totalMessages: 1, lastMessageAt: new Date(), }, update: { totalMessages: { increment: 1 }, lastMessageAt: new Date(), }, }); // Link message to conversation await prisma.smsMessage.update({ where: { id: smsMessage.id }, data: { conversationId: conversation.id }, }); // Record outbound SMS as ContactActivity if conversation has a contactId if (conversation.contactId) { try { await prisma.contactActivity.create({ data: { contactId: conversation.contactId, type: 'SMS_SENT', title: 'SMS sent', description: message.length > 200 ? message.slice(0, 200) + '...' : message, metadata: { phone, conversationId: conversation.id, campaignId, }, }, }); } catch (err) { logger.debug('Failed to record outbound SMS ContactActivity:', err); } } // Update campaign counters if (result.success) { await prisma.smsCampaign.update({ where: { id: campaignId }, data: { totalSent: { increment: 1 } }, }); eventBus.publish('sms.message.sent', { messageId: smsMessage.id, campaignId, phone, body: message, }); } else { await prisma.smsCampaign.update({ where: { id: campaignId }, data: { totalFailed: { increment: 1 } }, }); throw new Error(`Failed to send SMS to ${phone}: ${result.error}`); } // Check if campaign is complete (no more PENDING recipients) const pendingCount = await prisma.smsCampaignRecipient.count({ where: { campaignId, status: 'PENDING' }, }); if (pendingCount === 0) { const updatedCampaign = await prisma.smsCampaign.update({ where: { id: campaignId }, data: { status: 'COMPLETED', completedAt: new Date() }, }); eventBus.publish('sms.campaign.completed', { campaignId, title: updatedCampaign.name, sentCount: updatedCampaign.totalSent, failedCount: updatedCampaign.totalFailed, }); } } else { // Notification job: just throw on failure for BullMQ retry if (!result.success) { throw new Error(`Failed to send notification SMS to ${phone}: ${result.error}`); } } return { success: true, phone }; }, { connection: { url: env.REDIS_URL }, concurrency: 1, // Serial — Termux/carrier rate limit }, ); this.worker.on('completed', (job) => { logger.debug(`SMS job ${job.id} completed`); }); this.worker.on('failed', (job, err) => { logger.error(`SMS job ${job?.id} failed: ${err.message}`); }); logger.info('SMS queue worker started'); } /** * Enqueue a single SMS job with configurable delay */ async addSmsJob(data: SmsJobData, delayMs = 0): Promise { const job = await this.queue.add('sms', data, { delay: delayMs, }); return job.id!; } /** * Enqueue all pending recipients for a campaign */ async enqueueCampaignRecipients(campaignId: string, delayBetweenMs: number) { const recipients = await prisma.smsCampaignRecipient.findMany({ where: { campaignId, status: 'PENDING' }, orderBy: { createdAt: 'asc' }, }); // Get campaign template for message substitution const campaign = await prisma.smsCampaign.findUnique({ where: { id: campaignId }, select: { messageTemplate: true }, }); if (!campaign) throw new Error('Campaign not found'); let enqueued = 0; for (const recipient of recipients) { // Substitute template variables const message = substituteTemplate(campaign.messageTemplate, { name: recipient.name || '', phone: recipient.phone, }); await this.addSmsJob( { recipientId: recipient.id, campaignId, phone: recipient.phone, message, attemptNumber: 1, }, enqueued * delayBetweenMs, ); enqueued++; } return enqueued; } async getStats() { const [waiting, active, completed, failed, paused] = await Promise.all([ this.queue.getWaitingCount(), this.queue.getActiveCount(), this.queue.getCompletedCount(), this.queue.getFailedCount(), this.queue.isPaused(), ]); return { waiting, active, completed, failed, paused }; } async pause() { await this.queue.pause(); logger.info('SMS queue paused'); } async resume() { await this.queue.resume(); logger.info('SMS queue resumed'); } async close() { if (this.worker) { await this.worker.close(); } await this.queue.close(); logger.info('SMS queue closed'); } } /** * Substitute template variables like {name}, {phone} in a message. */ function substituteTemplate( template: string, vars: Record, ): string { return template.replace(/\{(\w+)\}/g, (match, key) => { return vars[key] !== undefined ? vars[key] : match; }); } export const smsQueueService = new SmsQueueService();