From 0c2ffe754e7b1e9c45696d33eea7487c77989e81 Mon Sep 17 00:00:00 2001 From: bunker-admin Date: Tue, 31 Mar 2026 08:34:23 -0600 Subject: [PATCH] Harden Stripe payment integration: 15 security fixes from audit Addresses 11 original findings (1 critical, 3 high, 4 medium, 3 low) plus 4 additional findings from security review: - Mask secrets in PUT /settings response (was leaking decrypted keys) - Add paymentCheckoutRateLimit (10/hr/IP) to all 5 checkout endpoints - Implement durable audit logging to payment_audit_log table - Pin Stripe API version to 2026-01-28.clover (SDK v20.3.1) - Add charge.dispute.created/closed webhook handlers with DISPUTED status - Restore tickets on dispute won, handle charge_refunded closure - Guard against sentinel passthrough corrupting stored Stripe keys - Wrap refund DB updates in try/catch with webhook reconciliation fallback - Add $transaction for product maxPurchases race condition - Remove dead Payment model lookup from handleChargeRefunded - Cap donation amount at $100k in both schemas - Add requirePaymentsEnabled middleware on all checkout routes - Remove Stripe internal IDs from CSV exports - Add Cache-Control: no-store on admin settings responses Bunker Admin --- SERVICE_INTEGRATIONS.md | 147 +++++ .../migration.sql | 18 + api/prisma/schema.prisma | 152 +++++- api/src/middleware/rate-limit.ts | 34 ++ .../campaigns/campaigns-moderation.routes.ts | 16 + .../influence/campaigns/campaigns.routes.ts | 19 + .../influence/responses/responses.service.ts | 11 +- .../listmonk/listmonk-webhook.routes.ts | 58 ++ .../modules/map/canvass/canvass.service.ts | 35 +- api/src/modules/map/shifts/shifts.service.ts | 157 +++--- .../media/routes/video-tracking.routes.ts | 8 + api/src/modules/media/routes/videos.routes.ts | 15 + .../observability/observability.routes.ts | 9 + .../payments/donation-pages-public.routes.ts | 4 + .../payments/donation-pages.schemas.ts | 2 +- api/src/modules/payments/donations.service.ts | 18 +- .../payments/payment-settings.service.ts | 27 +- .../modules/payments/payments-admin.routes.ts | 10 +- .../payments/payments-public.routes.ts | 9 +- api/src/modules/payments/payments.schemas.ts | 4 +- api/src/modules/payments/products.service.ts | 29 +- .../modules/payments/subscriptions.service.ts | 2 - api/src/modules/payments/webhook.service.ts | 129 ++++- api/src/modules/people/people.service.ts | 40 +- .../sms/campaigns/sms-campaigns.routes.ts | 11 + .../sms/messages/sms-messages.routes.ts | 25 +- .../modules/social/impact-stories.routes.ts | 9 + .../ticketed-events-public.routes.ts | 4 +- .../ticketed-events.service.ts | 16 + api/src/modules/users/users.routes.ts | 11 +- api/src/modules/users/users.service.ts | 24 + api/src/server.ts | 18 + api/src/services/email-queue.service.ts | 8 +- api/src/services/event-bus.service.ts | 183 +++++++ .../event-listeners/calendar-sync.listener.ts | 268 ++++++++++ .../event-listeners/crm-activity.listener.ts | 213 ++++++++ .../event-listeners/gancio.listener.ts | 93 ++++ api/src/services/event-listeners/index.ts | 37 ++ .../event-listeners/listmonk.listener.ts | 105 ++++ .../event-listeners/n8n-webhook.listener.ts | 55 ++ .../event-listeners/rocketchat.listener.ts | 55 ++ api/src/services/reengagement.service.ts | 8 +- api/src/services/sms-queue.service.ts | 25 + api/src/services/sms-response-sync.service.ts | 9 + api/src/services/stripe.client.ts | 2 +- api/src/types/events.ts | 504 ++++++++++++++++++ docker-compose.yml | 8 + 47 files changed, 2457 insertions(+), 187 deletions(-) create mode 100644 SERVICE_INTEGRATIONS.md create mode 100644 api/prisma/migrations/20260330120000_payment_audit_and_disputed_status/migration.sql create mode 100644 api/src/services/event-bus.service.ts create mode 100644 api/src/services/event-listeners/calendar-sync.listener.ts create mode 100644 api/src/services/event-listeners/crm-activity.listener.ts create mode 100644 api/src/services/event-listeners/gancio.listener.ts create mode 100644 api/src/services/event-listeners/index.ts create mode 100644 api/src/services/event-listeners/listmonk.listener.ts create mode 100644 api/src/services/event-listeners/n8n-webhook.listener.ts create mode 100644 api/src/services/event-listeners/rocketchat.listener.ts create mode 100644 api/src/types/events.ts diff --git a/SERVICE_INTEGRATIONS.md b/SERVICE_INTEGRATIONS.md new file mode 100644 index 00000000..36003df5 --- /dev/null +++ b/SERVICE_INTEGRATIONS.md @@ -0,0 +1,147 @@ +# Service Integrations — EventBus Architecture + +Tracking document for the platform-wide EventBus and service integration work. + +**Started:** 2026-03-30 +**Branch:** v2 + +--- + +## Architecture Overview + +Changemaker Lite has 30+ services but most operate as isolated tools. The EventBus provides a centralized, typed, in-process pub/sub system that decouples event producers from consumers. + +``` +Service Handler (shift created, donation completed, etc.) + | + v + eventBus.publish('shift.created', payload) + | + +-- ListmonkListener (newsletter sync) + +-- RocketChatListener (team notifications) + +-- CrmActivityListener (contact timeline) + +-- CalendarSyncListener (unified calendar) + +-- N8nWebhookListener (external automation) + +-- GancioSyncListener (public event calendar) +``` + +### Why In-Process EventEmitter (not Redis PubSub) + +- Single Express process — no distributed coordination needed +- Zero serialization overhead (pass JS objects directly) +- Data already persisted in DB — events are ephemeral notifications +- Matches the existing fire-and-forget pattern used by Listmonk/RC services +- Can be swapped to Redis PubSub later if we go multi-process + +### Key Files + +| File | Purpose | +|------|---------| +| `api/src/types/events.ts` | Typed event catalog (all event names + payloads) | +| `api/src/services/event-bus.service.ts` | Core EventBus (publish/subscribe/stats) | +| `api/src/services/event-listeners/listmonk.listener.ts` | Listmonk newsletter sync | +| `api/src/services/event-listeners/rocketchat.listener.ts` | Rocket.Chat notifications | +| `api/src/services/event-listeners/crm-activity.listener.ts` | CRM ContactActivity writer | +| `api/src/services/event-listeners/calendar-sync.listener.ts` | Calendar unification | +| `api/src/services/event-listeners/n8n-webhook.listener.ts` | n8n automation bridge | +| `api/src/services/event-listeners/gancio.listener.ts` | Gancio event sync | + +--- + +## Progress Tracker + +### Phase 1: Core Infrastructure +- [x] Explore existing event patterns (Listmonk, RC, Gancio, provisioning) +- [x] Design EventBus architecture +- [x] Implement EventBus service (`api/src/services/event-bus.service.ts`) +- [x] Define typed event catalog (`api/src/types/events.ts` — 46 events across 14 modules) +- [x] Register EventBus in server.ts startup +- [x] Add EventBus stats endpoint (`GET /api/observability/event-bus`) + +### Phase 2: Migrate Existing Integrations +- [x] Listmonk event sync → EventBus listener (9 event subscriptions) +- [x] Rocket.Chat webhook service → EventBus listener (4 event subscriptions) +- [x] Gancio shift/event sync → EventBus listener (3 event subscriptions) + +### Phase 3: New Listeners +- [x] CRM Activity auto-generation listener (11 event subscriptions) +- [x] Calendar sync listener (8 event subscriptions) +- [x] n8n webhook emitter listener (wildcard subscription, forwards all events) +- [x] Listmonk webhook receiver (inbound: open, click, bounce, unsubscribe → EventBus) + +### Phase 4: Wire Up Publishers (migrated from inline calls) +- [x] Shift CRUD + signup (shift.created/updated/deleted, shift.signup.created/cancelled) +- [x] Canvass session complete + visits (canvass.session.completed, contact.address.updated) +- [x] Response submit (response.submitted) +- [x] Campaign email sent (campaign.email.sent) +- [x] Payment/donation/subscription events (3 event types) +- [x] Contact tag changes (contact.tags.changed — 3 call sites) +- [x] Reengagement sent (reengagement.sent) +- [x] Campaign CRUD + publish + moderation (campaign.created/updated/deleted/published/status.changed) +- [x] User create/update/delete/approve (user.created/updated/deleted/approved) +- [x] SMS campaign start/complete + message send/receive (4 event types) +- [x] Media video publish/unpublish/view (3 event types) +- [x] Ticketed event publish/cancel (EventBus publishes alongside existing Gancio calls) +- [x] Impact story publish (social.impact-story.published) +- [ ] Meeting create/update/delete (not yet migrated — meetings module needs review) + +### Phase 5: Future +- [ ] Add SHIFT, MEETING, TICKETED_EVENT to CalendarItemSource enum (Prisma migration) +- [ ] Migrate remaining Gancio calls (ticketed-events, meeting-planner) to EventBus +- [ ] Add engagement scoring listener +- [ ] Add Homepage dashboard data listener + +--- + +## Event Catalog + +### Currently Wired (11 event points, 3 consumers) + +| Event | Listmonk | Rocket.Chat | Gancio | +|-------|----------|-------------|--------| +| shift.signup | yes | yes | - | +| shift.signup.cancelled | - | yes | - | +| shift.created | - | - | yes | +| shift.updated | - | - | yes | +| shift.deleted | - | - | yes | +| canvass.session.completed | yes | yes | - | +| canvass.address.updated | yes | - | - | +| campaign.email.sent | yes | - | - | +| response.submitted | - | yes | - | +| subscription.activated | yes | - | - | +| donation.completed | yes | - | - | +| product.purchased | yes | - | - | +| contact.tags.changed | yes | - | - | +| reengagement.sent | yes | - | - | + +### New Events (49+ handlers need publishers) + +| Event | CRM Activity | Calendar | RC | n8n | +|-------|-------------|----------|-----|-----| +| campaign.created | - | - | - | yes | +| campaign.published | - | - | yes | yes | +| campaign.status.changed | - | - | yes | yes | +| user.approved | - | - | yes | yes | +| user.created | - | - | - | yes | +| video.published | - | - | yes | yes | +| video.viewed | yes | - | - | - | +| sms.message.received | yes | - | yes* | yes | +| sms.campaign.completed | - | - | yes | yes | +| ticketed-event.published | - | yes | - | yes | +| meeting.created | - | yes | - | - | +| impact-story.published | - | - | yes | yes | +| shift.created | - | yes | - | yes | +| donation.completed | yes | - | yes | yes | +| subscription.activated | yes | - | - | yes | + +*SMS escalations (QUESTION/NEGATIVE sentiment) to relevant RC channel + +--- + +## Design Decisions + +1. **Listeners self-guard**: Each listener checks its own feature flag (ENABLE_CHAT, LISTMONK_SYNC_ENABLED, etc.) — the EventBus doesn't filter +2. **Error isolation**: Each listener wraps its handler in try-catch; one listener failing doesn't affect others +3. **No persistence**: Events are ephemeral — if the server restarts mid-event, it's lost (data is already in DB) +4. **Stats tracking**: EventBus tracks per-event emission counts + per-listener execution counts for observability +5. **Wildcard subscriptions**: Listeners can subscribe to `shift.*` to catch all shift events diff --git a/api/prisma/migrations/20260330120000_payment_audit_and_disputed_status/migration.sql b/api/prisma/migrations/20260330120000_payment_audit_and_disputed_status/migration.sql new file mode 100644 index 00000000..90c7f40f --- /dev/null +++ b/api/prisma/migrations/20260330120000_payment_audit_and_disputed_status/migration.sql @@ -0,0 +1,18 @@ +-- AlterEnum: Add DISPUTED status for chargeback tracking +ALTER TYPE "OrderStatus" ADD VALUE 'DISPUTED'; + +-- DropForeignKey: Make paymentId optional on audit log +ALTER TABLE "payment_audit_log" DROP CONSTRAINT "payment_audit_log_payment_id_fkey"; + +-- AlterTable: Add orderId column, make paymentId nullable +ALTER TABLE "payment_audit_log" ADD COLUMN "order_id" TEXT, +ALTER COLUMN "payment_id" DROP NOT NULL; + +-- CreateIndex +CREATE INDEX "idx_payment_audit_log_order" ON "payment_audit_log"("order_id"); + +-- AddForeignKey (nullable) +ALTER TABLE "payment_audit_log" ADD CONSTRAINT "payment_audit_log_payment_id_fkey" FOREIGN KEY ("payment_id") REFERENCES "payments"("id") ON DELETE SET NULL ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "payment_audit_log" ADD CONSTRAINT "payment_audit_log_order_id_fkey" FOREIGN KEY ("order_id") REFERENCES "orders"("id") ON DELETE SET NULL ON UPDATE CASCADE; diff --git a/api/prisma/schema.prisma b/api/prisma/schema.prisma index bce2fc24..d003bd01 100644 --- a/api/prisma/schema.prisma +++ b/api/prisma/schema.prisma @@ -21,6 +21,7 @@ enum UserRole { PAYMENTS_ADMIN EVENTS_ADMIN SOCIAL_ADMIN + POLLS_ADMIN USER TEMP } @@ -167,6 +168,13 @@ model User { schedulingPollVotes SchedulingPollVote[] @relation("PollVoter") schedulingPollComments SchedulingPollComment[] @relation("PollCommenter") + // Straw polls + strawPollsCreated StrawPoll[] @relation("StrawPollCreator") + strawPollVotes StrawPollVote[] @relation("StrawPollVoter") + strawPollComments StrawPollComment[] @relation("StrawPollCommenter") + strawPollChallengesSent StrawPollChallenge[] @relation("StrawPollChallenger") + strawPollChallengesReceived StrawPollChallenge[] @relation("StrawPollChallenged") + // Participant needs participantNeeds ParticipantNeeds? @relation("UserParticipantNeeds") @@ -962,6 +970,7 @@ model SiteSettings { enableMeetingPlanner Boolean @default(false) @map("enable_meeting_planner") enableTicketedEvents Boolean @default(false) @map("enable_ticketed_events") enableSocialCalendar Boolean @default(false) @map("enable_social_calendar") + enablePolls Boolean @default(false) @map("enable_polls") enableDocsCollaboration Boolean @default(false) @map("enable_docs_collaboration") requireEventApproval Boolean @default(true) @map("require_event_approval") autoSyncPeopleToMap Boolean @default(false) @map("auto_sync_people_to_map") @@ -1528,6 +1537,7 @@ enum OrderStatus { COMPLETED FAILED REFUNDED + DISPUTED } enum NotificationType { @@ -1552,6 +1562,10 @@ enum NotificationType { shift_cancelled canvass_session_summary reengagement + // Straw poll notification types + poll_closed + poll_results_available + poll_challenge } // ============================================================================ @@ -3427,7 +3441,8 @@ model Payment { model PaymentAuditLog { id Int @id @default(autoincrement()) - paymentId Int @map("payment_id") + paymentId Int? @map("payment_id") + orderId String? @map("order_id") action String oldStatus String? @map("old_status") newStatus String? @map("new_status") @@ -3436,10 +3451,12 @@ model PaymentAuditLog { createdAt DateTime @default(now()) @map("created_at") // Relations - payment Payment @relation(fields: [paymentId], references: [id]) + payment Payment? @relation(fields: [paymentId], references: [id]) + order Order? @relation(fields: [orderId], references: [id]) user User? @relation("PaymentAuditUser", fields: [userId], references: [id]) @@index([paymentId], map: "idx_payment_audit_log_payment") + @@index([orderId], map: "idx_payment_audit_log_order") @@index([action], map: "idx_payment_audit_log_action") @@index([createdAt], map: "idx_payment_audit_log_created") @@map("payment_audit_log") @@ -3505,6 +3522,7 @@ model Order { influenceCampaignId String? @map("influence_campaign_id") influenceCampaign Campaign? @relation("CampaignDonations", fields: [influenceCampaignId], references: [id], onDelete: SetNull) tickets Ticket[] @relation("TicketOrder") + auditLogs PaymentAuditLog[] @@index([userId], map: "idx_orders_user") @@index([productId], map: "idx_orders_product") @@ -4274,6 +4292,7 @@ model Contact { activities ContactActivity[] smsConversations SmsConversation[] @relation("ContactSmsConversations") pollVotes SchedulingPollVote[] @relation("PollVoteContact") + strawPollVotes StrawPollVote[] @relation("StrawPollVoteContact") participantNeeds ParticipantNeeds? @relation("ContactParticipantNeeds") @@index([email]) @@ -5344,3 +5363,132 @@ model ActionItem { @@index([dueDate]) @@map("action_items") } + +// ============================================================================ +// STRAW POLLS +// ============================================================================ + +enum StrawPollType { + SINGLE_CHOICE + YES_NO_ABSTAIN +} + +enum StrawPollStatus { + DRAFT + ACTIVE + CLOSED + ARCHIVED +} + +enum StrawPollIdentityMode { + ANONYMOUS + TOKEN_GATED + AUTHENTICATED + MIXED +} + +enum StrawPollResultVisibility { + LIVE + AFTER_VOTE + AFTER_CLOSE + CREATOR_ONLY + PUBLIC_ALWAYS +} + +model StrawPoll { + id String @id @default(cuid()) + slug String @unique + title String @db.VarChar(200) + description String? @db.Text + type StrawPollType + status StrawPollStatus @default(DRAFT) + identityMode StrawPollIdentityMode @default(ANONYMOUS) @map("identity_mode") + resultVisibility StrawPollResultVisibility @default(LIVE) @map("result_visibility") + allowComments Boolean @default(true) @map("allow_comments") + closesAt DateTime? @map("closes_at") + closeThreshold Int? @map("close_threshold") + autoCloseJobId String? @map("auto_close_job_id") + isPrivate Boolean @default(false) @map("is_private") + + createdByUserId String @map("created_by_user_id") + createdBy User @relation("StrawPollCreator", fields: [createdByUserId], references: [id]) + createdAt DateTime @default(now()) @map("created_at") + updatedAt DateTime @updatedAt @map("updated_at") + + options StrawPollOption[] + votes StrawPollVote[] + comments StrawPollComment[] + challenges StrawPollChallenge[] + + @@index([createdByUserId]) + @@index([status]) + @@map("straw_polls") +} + +model StrawPollOption { + id String @id @default(cuid()) + pollId String @map("poll_id") + poll StrawPoll @relation(fields: [pollId], references: [id], onDelete: Cascade) + label String @db.VarChar(500) + sortOrder Int @default(0) @map("sort_order") + + votes StrawPollVote[] + + @@index([pollId]) + @@map("straw_poll_options") +} + +model StrawPollVote { + id String @id @default(cuid()) + pollId String @map("poll_id") + poll StrawPoll @relation(fields: [pollId], references: [id], onDelete: Cascade) + optionId String @map("option_id") + option StrawPollOption @relation(fields: [optionId], references: [id], onDelete: Cascade) + + userId String? @map("user_id") + user User? @relation("StrawPollVoter", fields: [userId], references: [id], onDelete: SetNull) + voterName String? @db.VarChar(100) @map("voter_name") + voterToken String? @map("voter_token") + voterIp String? @map("voter_ip") + contactId String? @map("contact_id") + contact Contact? @relation("StrawPollVoteContact", fields: [contactId], references: [id], onDelete: SetNull) + + createdAt DateTime @default(now()) @map("created_at") + updatedAt DateTime @updatedAt @map("updated_at") + + @@unique([pollId, userId]) + @@unique([pollId, voterToken]) + @@unique([pollId, voterIp]) + @@index([pollId]) + @@index([optionId]) + @@map("straw_poll_votes") +} + +model StrawPollComment { + id String @id @default(cuid()) + pollId String @map("poll_id") + poll StrawPoll @relation(fields: [pollId], references: [id], onDelete: Cascade) + userId String? @map("user_id") + user User? @relation("StrawPollCommenter", fields: [userId], references: [id], onDelete: SetNull) + authorName String @db.VarChar(100) @map("author_name") + content String @db.Text + createdAt DateTime @default(now()) @map("created_at") + + @@index([pollId]) + @@map("straw_poll_comments") +} + +model StrawPollChallenge { + id String @id @default(cuid()) + pollId String @map("poll_id") + poll StrawPoll @relation(fields: [pollId], references: [id], onDelete: Cascade) + challengerUserId String @map("challenger_user_id") + challenger User @relation("StrawPollChallenger", fields: [challengerUserId], references: [id]) + challengedUserId String @map("challenged_user_id") + challenged User @relation("StrawPollChallenged", fields: [challengedUserId], references: [id]) + completedAt DateTime? @map("completed_at") + createdAt DateTime @default(now()) @map("created_at") + + @@unique([pollId, challengerUserId, challengedUserId]) + @@map("straw_poll_challenges") +} diff --git a/api/src/middleware/rate-limit.ts b/api/src/middleware/rate-limit.ts index 9ca8aa2c..1aea9dfb 100644 --- a/api/src/middleware/rate-limit.ts +++ b/api/src/middleware/rate-limit.ts @@ -156,6 +156,23 @@ export const adTrackingRateLimit = rateLimit({ }, }); +export const smsSendRateLimit = rateLimit({ + windowMs: 60 * 1000, // 1 minute + max: 10, + standardHeaders: true, + legacyHeaders: false, + store: new RedisStore({ + sendCommand: (command: string, ...args: string[]) => redis.call(command, ...args) as Promise, + prefix: 'rl:sms-send:', + }), + message: { + error: { + message: 'Too many SMS send requests, please try again later', + code: 'SMS_SEND_RATE_LIMIT_EXCEEDED', + }, + }, +}); + export const quickJoinRateLimit = rateLimit({ windowMs: 60 * 60 * 1000, // 1 hour max: 10, @@ -173,6 +190,23 @@ export const quickJoinRateLimit = rateLimit({ }, }); +export const paymentCheckoutRateLimit = rateLimit({ + windowMs: 60 * 60 * 1000, // 1 hour + max: 10, // 10 checkout sessions per hour per IP + standardHeaders: true, + legacyHeaders: false, + store: new RedisStore({ + sendCommand: (command: string, ...args: string[]) => redis.call(command, ...args) as Promise, + prefix: 'rl:payment-checkout:', + }), + message: { + error: { + message: 'Too many payment requests, please try again later', + code: 'PAYMENT_CHECKOUT_RATE_LIMIT_EXCEEDED', + }, + }, +}); + export const authRateLimit = rateLimit({ windowMs: 15 * 60 * 1000, max: 10, // Reduced from 20 to prevent brute force attacks diff --git a/api/src/modules/influence/campaigns/campaigns-moderation.routes.ts b/api/src/modules/influence/campaigns/campaigns-moderation.routes.ts index 268a3673..34f6831f 100644 --- a/api/src/modules/influence/campaigns/campaigns-moderation.routes.ts +++ b/api/src/modules/influence/campaigns/campaigns-moderation.routes.ts @@ -5,6 +5,7 @@ import { validate } from '../../../middleware/validate'; import { authenticate } from '../../../middleware/auth.middleware'; import { requireRole } from '../../../middleware/rbac.middleware'; import { INFLUENCE_ROLES } from '../../../utils/roles'; +import { eventBus } from '../../../services/event-bus.service'; const router = Router(); @@ -45,7 +46,22 @@ router.patch( async (req: Request, res: Response, next: NextFunction) => { try { const id = req.params.id as string; + const before = await campaignsService.findById(id); const campaign = await campaignsService.moderateCampaign(id, req.body, req.user!); + eventBus.publish('campaign.status.changed', { + campaignId: campaign.id, + title: campaign.title, + slug: campaign.slug, + oldStatus: before.moderationStatus ?? 'PENDING', + newStatus: campaign.moderationStatus ?? 'PENDING', + }); + if (campaign.status === 'ACTIVE' && before.status !== 'ACTIVE') { + eventBus.publish('campaign.published', { + campaignId: campaign.id, + title: campaign.title, + slug: campaign.slug, + }); + } res.json(campaign); } catch (err) { next(err); diff --git a/api/src/modules/influence/campaigns/campaigns.routes.ts b/api/src/modules/influence/campaigns/campaigns.routes.ts index a9c458f7..88d7ed5c 100644 --- a/api/src/modules/influence/campaigns/campaigns.routes.ts +++ b/api/src/modules/influence/campaigns/campaigns.routes.ts @@ -5,6 +5,7 @@ import { validate } from '../../../middleware/validate'; import { authenticate } from '../../../middleware/auth.middleware'; import { requireRole } from '../../../middleware/rbac.middleware'; import { INFLUENCE_ROLES } from '../../../utils/roles'; +import { eventBus } from '../../../services/event-bus.service'; const router = Router(); @@ -47,6 +48,12 @@ router.post( async (req: Request, res: Response, next: NextFunction) => { try { const campaign = await campaignsService.create(req.body, req.user!); + eventBus.publish('campaign.created', { + campaignId: campaign.id, + title: campaign.title, + slug: campaign.slug, + createdByUserId: campaign.createdByUserId!, + }); res.status(201).json(campaign); } catch (err) { next(err); @@ -62,6 +69,12 @@ router.put( try { const id = req.params.id as string; const campaign = await campaignsService.update(id, req.body); + eventBus.publish('campaign.updated', { + campaignId: campaign.id, + title: campaign.title, + slug: campaign.slug, + changes: Object.keys(req.body), + }); res.json(campaign); } catch (err) { next(err); @@ -75,7 +88,13 @@ router.delete( async (req: Request, res: Response, next: NextFunction) => { try { const id = req.params.id as string; + const campaign = await campaignsService.findById(id); await campaignsService.delete(id); + eventBus.publish('campaign.deleted', { + campaignId: campaign.id, + title: campaign.title, + slug: campaign.slug, + }); res.status(204).send(); } catch (err) { next(err); diff --git a/api/src/modules/influence/responses/responses.service.ts b/api/src/modules/influence/responses/responses.service.ts index 9fcfbff0..b12f4b63 100644 --- a/api/src/modules/influence/responses/responses.service.ts +++ b/api/src/modules/influence/responses/responses.service.ts @@ -8,7 +8,7 @@ import { getAdminEmailsByRole, isNotificationEnabled } from '../../../services/n import { env } from '../../../config/env'; import { logger } from '../../../utils/logger'; import { recordResponseSubmission } from '../../../utils/metrics'; -import { rocketchatWebhookService } from '../../../services/rocketchat-webhook.service'; +import { eventBus } from '../../../services/event-bus.service'; import type { SubmitResponseInput, ListPublicResponsesInput, @@ -102,11 +102,14 @@ export const responsesService = { logger.error('Failed to enqueue response submitted notification:', err); } - // Notify Rocket.Chat - rocketchatWebhookService.onCampaignResponseSubmitted({ + // Publish response submitted event + eventBus.publish('response.submitted', { + responseId: response.id, + campaignId: campaign.id, campaignTitle: campaign.title, representativeName: data.representativeName, - }).catch(() => {}); + userEmail: data.submittedByEmail, + }); return { id: response.id, diff --git a/api/src/modules/listmonk/listmonk-webhook.routes.ts b/api/src/modules/listmonk/listmonk-webhook.routes.ts index c900ea25..eeb80000 100644 --- a/api/src/modules/listmonk/listmonk-webhook.routes.ts +++ b/api/src/modules/listmonk/listmonk-webhook.routes.ts @@ -2,6 +2,7 @@ import { Router, Request, Response, NextFunction } from 'express'; import { prisma } from '../../config/database'; import { env } from '../../config/env'; import { logger } from '../../utils/logger'; +import { eventBus } from '../../services/event-bus.service'; const router = Router(); @@ -32,6 +33,13 @@ router.post( return; } + // Publish unsubscribe event to EventBus + eventBus.publish('listmonk.unsubscribed', { + subscriberEmail: email, + listId: event?.data?.list?.id ?? 0, + listName: event?.data?.list?.name ?? '', + }); + // Store opt-out flag in user's permissions JSON field const user = await prisma.user.findUnique({ where: { email }, @@ -58,6 +66,56 @@ router.post( return; } + // Email open event + if (eventType === 'campaign.view') { + const email = event?.data?.subscriber?.email; + const campaignId = event?.data?.campaign?.id; + const campaignName = event?.data?.campaign?.name; + if (email && campaignId) { + eventBus.publish('listmonk.email.opened', { + subscriberEmail: email, + campaignId, + campaignName: campaignName ?? '', + }); + } + res.json({ ok: true, action: 'published', eventType }); + return; + } + + // Link click event + if (eventType === 'campaign.link_click') { + const email = event?.data?.subscriber?.email; + const campaignId = event?.data?.campaign?.id; + const campaignName = event?.data?.campaign?.name; + const url = event?.data?.url; + if (email && campaignId) { + eventBus.publish('listmonk.email.clicked', { + subscriberEmail: email, + campaignId, + campaignName: campaignName ?? '', + url: url ?? '', + }); + } + res.json({ ok: true, action: 'published', eventType }); + return; + } + + // Bounce event + if (eventType === 'subscriber.bounced') { + const email = event?.data?.subscriber?.email; + const campaignId = event?.data?.campaign?.id; + const bounceType = event?.data?.bounce_type ?? 'unknown'; + if (email) { + eventBus.publish('listmonk.email.bounced', { + subscriberEmail: email, + campaignId: campaignId ?? 0, + bounceType, + }); + } + res.json({ ok: true, action: 'published', eventType }); + return; + } + // Unknown event type — acknowledge but don't process logger.debug(`Listmonk webhook: unhandled event type "${eventType}"`); res.json({ ok: true, action: 'ignored', eventType }); diff --git a/api/src/modules/map/canvass/canvass.service.ts b/api/src/modules/map/canvass/canvass.service.ts index 851f0d89..5f37ba17 100644 --- a/api/src/modules/map/canvass/canvass.service.ts +++ b/api/src/modules/map/canvass/canvass.service.ts @@ -11,8 +11,7 @@ import { recordCanvassVisit, setActiveCanvassSessions } from '../../../utils/met import { notificationQueueService } from '../../../services/notification-queue.service'; import { getAdminEmailsByRole, isNotificationEnabled } from '../../../services/notification.helper'; import { env } from '../../../config/env'; -import { rocketchatWebhookService } from '../../../services/rocketchat-webhook.service'; -import { listmonkEventSyncService } from '../../../services/listmonk-event-sync.service'; +import { eventBus } from '../../../services/event-bus.service'; import { achievementsService } from '../../social/achievements.service'; import type { RecordVisitInput, @@ -254,20 +253,6 @@ export const canvassService = { // Recalculate cut completion percentage await this.recalculateCutCompletion(session.cutId); - // Notify Rocket.Chat - try { - const [rcUser, rcCut, rcVisitCount] = await Promise.all([ - prisma.user.findUnique({ where: { id: userId }, select: { name: true, email: true } }), - prisma.cut.findUnique({ where: { id: session.cutId }, select: { name: true } }), - prisma.canvassVisit.count({ where: { sessionId } }), - ]); - rocketchatWebhookService.onCanvassSessionCompleted({ - userName: rcUser?.name || rcUser?.email || 'Unknown', - visitCount: rcVisitCount, - cutName: rcCut?.name || undefined, - }).catch(() => {}); - } catch { /* non-critical */ } - // Notification: volunteer session summary try { if (await isNotificationEnabled('notifyVolunteerSessionSummary')) { @@ -315,7 +300,7 @@ export const canvassService = { logger.error('Failed to enqueue session summary notification:', err); } - // Listmonk event sync — add canvasser to subscribers + // Publish canvass session completed event (consumed by RC, Listmonk, etc.) try { const [syncUser, syncCut, syncVisitCount, syncOutcomes] = await Promise.all([ prisma.user.findUnique({ where: { id: userId }, select: { email: true, name: true } }), @@ -328,13 +313,15 @@ export const canvassService = { for (const row of syncOutcomes) { outcomes[row.outcome] = row._count; } - listmonkEventSyncService.onCanvassSessionCompleted({ - email: syncUser.email, - name: syncUser.name || syncUser.email, + eventBus.publish('canvass.session.completed', { + sessionId, + userId, + userName: syncUser.name || syncUser.email, + userEmail: syncUser.email, cutName: syncCut?.name || 'Unknown', visitCount: syncVisitCount, outcomes, - }).catch(() => {}); + }); } } catch { /* non-critical */ } @@ -650,16 +637,16 @@ export const canvassService = { include: { location: { select: { address: true } } }, }); - // Sync support level change to Listmonk (fire-and-forget) + // Publish address updated event (consumed by Listmonk, etc.) if (updatedAddress.email) { const name = [updatedAddress.firstName, updatedAddress.lastName].filter(Boolean).join(' '); - listmonkEventSyncService.onAddressUpdated({ + eventBus.publish('contact.address.updated', { email: updatedAddress.email, name, supportLevel: updatedAddress.supportLevel, sign: updatedAddress.sign, address: updatedAddress.location.address, - }).catch(() => {}); + }); } } diff --git a/api/src/modules/map/shifts/shifts.service.ts b/api/src/modules/map/shifts/shifts.service.ts index 53fe207e..ff93e91e 100644 --- a/api/src/modules/map/shifts/shifts.service.ts +++ b/api/src/modules/map/shifts/shifts.service.ts @@ -8,9 +8,7 @@ import { getAdminEmailsByRole, isNotificationEnabled } from '../../../services/n import { env } from '../../../config/env'; import { logger } from '../../../utils/logger'; import { recordShiftSignup } from '../../../utils/metrics'; -import { rocketchatWebhookService } from '../../../services/rocketchat-webhook.service'; -import { listmonkEventSyncService } from '../../../services/listmonk-event-sync.service'; -import { gancioClient } from '../../../services/gancio.client'; +import { eventBus } from '../../../services/event-bus.service'; import { unifiedCalendarService } from '../../events/unified-calendar.service'; import { groupService } from '../../social/group.service'; import { achievementsService } from '../../social/achievements.service'; @@ -138,26 +136,17 @@ export const shiftsService = { }, }); - // Gancio event sync (fire-and-forget) - if (gancioClient.enabled) { - gancioClient.createEvent({ - title: shift.title, - description: shift.description, - location: shift.location, - date: shift.date, - startTime: shift.startTime, - endTime: shift.endTime, - }).then(async (eventId) => { - if (eventId) { - await prisma.shift.update({ - where: { id: shift.id }, - data: { gancioEventId: eventId }, - }); - } - }).catch((err) => { - logger.warn('Gancio sync on shift create failed:', err); - }); - } + // Publish shift.created event (listeners: Gancio, Calendar, n8n) + eventBus.publish('shift.created', { + shiftId: shift.id, + title: shift.title, + date: new Date(shift.date).toISOString().split('T')[0], + startTime: shift.startTime, + endTime: shift.endTime, + cutId: shift.cutId, + cutName: null, + createdByUserId: userId, + }); // Bust unified calendar cache unifiedCalendarService.bustCache().catch(() => {}); @@ -191,19 +180,17 @@ export const shiftsService = { data: updateData, }); - // Gancio event sync (fire-and-forget) - if (gancioClient.enabled && shift.gancioEventId) { - gancioClient.updateEvent(shift.gancioEventId, { - title: shift.title, - description: shift.description, - location: shift.location, - date: shift.date, - startTime: shift.startTime, - endTime: shift.endTime, - }).catch((err) => { - logger.warn('Gancio sync on shift update failed:', err); - }); - } + // Publish shift.updated event (listeners: Gancio, Calendar, n8n) + eventBus.publish('shift.updated', { + shiftId: shift.id, + title: shift.title, + date: new Date(shift.date).toISOString().split('T')[0], + startTime: shift.startTime, + endTime: shift.endTime, + cutId: shift.cutId, + cutName: null, + changes: Object.keys(data), + }); // Bust unified calendar cache unifiedCalendarService.bustCache().catch(() => {}); @@ -217,12 +204,12 @@ export const shiftsService = { throw new AppError(404, 'Shift not found', 'SHIFT_NOT_FOUND'); } - // Delete Gancio event before deleting shift (fire-and-forget) - if (gancioClient.enabled && existing.gancioEventId) { - gancioClient.deleteEvent(existing.gancioEventId).catch((err) => { - logger.warn('Gancio sync on shift delete failed:', err); - }); - } + // Publish shift.deleted event (listeners: Gancio, Calendar, n8n) + eventBus.publish('shift.deleted', { + shiftId: id, + title: existing.title, + date: new Date(existing.date).toISOString().split('T')[0], + }); // Delete associated meeting if exists if (existing.meetingId) { @@ -359,13 +346,17 @@ export const shiftsService = { }), ]); - // Listmonk event sync - listmonkEventSyncService.onShiftSignup({ - email: data.userEmail, - name: data.userName || data.userEmail, + // Publish shift.signup.created event (listeners: Listmonk, RC, CRM, n8n) + eventBus.publish('shift.signup.created', { + shiftId, shiftTitle: shift.title, shiftDate: new Date(shift.date).toISOString().split('T')[0], - }).catch(() => {}); + userName: data.userName || data.userEmail, + userEmail: data.userEmail, + userId: user?.id ?? null, + cutName: null, + signupType: 'admin', + }); // Social group sync (fire-and-forget) groupService.syncShiftTeam(shiftId).catch(() => {}); @@ -551,14 +542,6 @@ export const shiftsService = { }).catch(err => logger.error('SMS signup confirmation failed:', err)); } - // Notify Rocket.Chat - const shiftDateStr = new Date(shift.date).toLocaleDateString('en-CA', { month: 'short', day: 'numeric' }); - rocketchatWebhookService.onShiftSignup({ - userName: data.name || data.email, - shiftTitle: shift.title, - shiftDate: shiftDateStr, - }).catch(() => {}); - // Notification: admin shift signup alert try { if (await isNotificationEnabled('notifyAdminShiftSignup')) { @@ -651,13 +634,17 @@ export const shiftsService = { recordShiftSignup(); - // Listmonk event sync - listmonkEventSyncService.onShiftSignup({ - email: data.email, - name: data.name, + // Publish shift.signup.created event (listeners: Listmonk, RC, CRM, n8n) + eventBus.publish('shift.signup.created', { + shiftId, shiftTitle: shift.title, shiftDate: new Date(shift.date).toISOString().split('T')[0], - }).catch(() => {}); + userName: data.name || data.email, + userEmail: data.email, + userId: user?.id ?? null, + cutName: null, + signupType: 'public', + }); // Social group sync (fire-and-forget) groupService.syncShiftTeam(shiftId).catch(() => {}); @@ -733,14 +720,16 @@ export const shiftsService = { logger.error('Failed to enqueue cancellation notification:', err); } - // Notify Rocket.Chat of cancellation + // Publish shift.signup.cancelled event (listeners: RC, n8n) if (shift) { - const shiftDateStr = new Date(shift.date).toLocaleDateString('en-CA', { month: 'short', day: 'numeric' }); - rocketchatWebhookService.onShiftCancellation({ - userName: signup.userName || userEmail, + eventBus.publish('shift.signup.cancelled', { + shiftId, shiftTitle: shift.title, - shiftDate: shiftDateStr, - }).catch(() => {}); + shiftDate: new Date(shift.date).toISOString().split('T')[0], + userName: signup.userName || userEmail, + userEmail, + signupType: 'public', + }); } // Notification: admin shift cancellation alert @@ -896,14 +885,6 @@ export const shiftsService = { logger.error('Failed to send volunteer shift signup confirmation email:', err); } - // Notify Rocket.Chat - const shiftDateStr = new Date(shift.date).toLocaleDateString('en-CA', { month: 'short', day: 'numeric' }); - rocketchatWebhookService.onShiftSignup({ - userName: user.name || user.email, - shiftTitle: shift.title, - shiftDate: shiftDateStr, - }).catch(() => {}); - // Notification: admin shift signup alert try { if (await isNotificationEnabled('notifyAdminShiftSignup')) { @@ -980,13 +961,17 @@ export const shiftsService = { logger.error('Failed to schedule shift thank-you:', err); } - // Listmonk event sync - listmonkEventSyncService.onShiftSignup({ - email: user.email, - name: user.name || user.email, + // Publish shift.signup.created event (listeners: Listmonk, RC, CRM, n8n) + eventBus.publish('shift.signup.created', { + shiftId, shiftTitle: shift.title, shiftDate: new Date(shift.date).toISOString().split('T')[0], - }).catch(() => {}); + userName: user.name || user.email, + userEmail: user.email, + userId, + cutName: null, + signupType: 'volunteer', + }); // Social group sync (fire-and-forget) groupService.syncShiftTeam(shiftId).catch(() => {}); @@ -1060,14 +1045,16 @@ export const shiftsService = { logger.error('Failed to enqueue cancellation notification:', err); } - // Notify Rocket.Chat of cancellation + // Publish shift.signup.cancelled event (listeners: RC, n8n) if (shift) { - const shiftDateStr = new Date(shift.date).toLocaleDateString('en-CA', { month: 'short', day: 'numeric' }); - rocketchatWebhookService.onShiftCancellation({ - userName: user.name || user.email, + eventBus.publish('shift.signup.cancelled', { + shiftId, shiftTitle: shift.title, - shiftDate: shiftDateStr, - }).catch(() => {}); + shiftDate: new Date(shift.date).toISOString().split('T')[0], + userName: user.name || user.email, + userEmail: user.email, + signupType: 'volunteer', + }); } // Notification: admin shift cancellation alert diff --git a/api/src/modules/media/routes/video-tracking.routes.ts b/api/src/modules/media/routes/video-tracking.routes.ts index d85aa174..92dd71b8 100644 --- a/api/src/modules/media/routes/video-tracking.routes.ts +++ b/api/src/modules/media/routes/video-tracking.routes.ts @@ -3,6 +3,7 @@ import { optionalAuth } from '../middleware/auth'; import { videoAnalyticsService } from '../services/video-analytics.service'; import { logger } from '../../../utils/logger'; import { z } from 'zod'; +import { eventBus } from '../../../services/event-bus.service'; // Validation schemas const recordViewSchema = z.object({ @@ -62,6 +63,13 @@ export async function videoTrackingRoutes(fastify: FastifyInstance) { referer, }); + eventBus.publish('media.video.viewed', { + videoId: String(videoId), + videoTitle: '', // Title not available in tracking context + userId: userId ?? null, + sessionId: String(viewId), + }); + return { success: true, viewId, diff --git a/api/src/modules/media/routes/videos.routes.ts b/api/src/modules/media/routes/videos.routes.ts index eeed5b0f..b39b743a 100644 --- a/api/src/modules/media/routes/videos.routes.ts +++ b/api/src/modules/media/routes/videos.routes.ts @@ -7,6 +7,7 @@ import { join } from 'path'; import { extractVideoMetadata } from '../services/ffprobe.service'; import { ThumbnailService } from '../services/thumbnail.service'; import { logger } from '../../../utils/logger'; +import { eventBus } from '../../../services/event-bus.service'; // List videos endpoint (admin only for now) interface ListVideosQuery { @@ -206,7 +207,15 @@ export async function videosRoutes(fastify: FastifyInstance) { }, }); + const userId = (request as any).user?.id || 'unknown'; logger.info(`Video ${videoId} published to ${category}`); + + eventBus.publish('media.video.published', { + videoId: String(videoId), + title: video.title || video.filename || `Video #${videoId}`, + publishedByUserId: userId, + }); + return { success: true, video }; } catch (error: any) { logger.error(`Error publishing video ${videoId}:`, error); @@ -233,6 +242,12 @@ export async function videosRoutes(fastify: FastifyInstance) { }); logger.info(`Video ${videoId} unpublished`); + + eventBus.publish('media.video.unpublished', { + videoId: String(videoId), + title: video.title || video.filename || `Video #${videoId}`, + }); + return { success: true, video }; } catch (error: any) { logger.error(`Error unpublishing video ${videoId}:`, error); diff --git a/api/src/modules/observability/observability.routes.ts b/api/src/modules/observability/observability.routes.ts index ef04e647..1ff6d5d2 100644 --- a/api/src/modules/observability/observability.routes.ts +++ b/api/src/modules/observability/observability.routes.ts @@ -221,4 +221,13 @@ router.get( }, ); +// GET /api/observability/event-bus — EventBus stats +router.get( + '/event-bus', + async (_req: Request, res: Response) => { + const { eventBus } = await import('../../services/event-bus.service'); + res.json(eventBus.getStats()); + }, +); + export const observabilityRouter = router; diff --git a/api/src/modules/payments/donation-pages-public.routes.ts b/api/src/modules/payments/donation-pages-public.routes.ts index 6044ab0a..748331dd 100644 --- a/api/src/modules/payments/donation-pages-public.routes.ts +++ b/api/src/modules/payments/donation-pages-public.routes.ts @@ -1,5 +1,7 @@ import { Router, Request, Response, NextFunction } from 'express'; import { validate } from '../../middleware/validate'; +import { paymentCheckoutRateLimit } from '../../middleware/rate-limit'; +import { requirePaymentsEnabled } from './payment-settings.service'; import { donationPagesService } from './donation-pages.service'; import { donationsService } from './donations.service'; import { donationPageCheckoutSchema } from './donation-pages.schemas'; @@ -29,6 +31,8 @@ router.get('/:slug', async (req: Request, res: Response, next: NextFunction) => // POST /api/donation-pages/:slug/donate — create Stripe checkout for this page router.post( '/:slug/donate', + requirePaymentsEnabled, + paymentCheckoutRateLimit, validate(donationPageCheckoutSchema), async (req: Request, res: Response, next: NextFunction) => { try { diff --git a/api/src/modules/payments/donation-pages.schemas.ts b/api/src/modules/payments/donation-pages.schemas.ts index fedc97ff..11038d05 100644 --- a/api/src/modules/payments/donation-pages.schemas.ts +++ b/api/src/modules/payments/donation-pages.schemas.ts @@ -31,7 +31,7 @@ export const listDonationPagesSchema = z.object({ export type ListDonationPagesInput = z.infer; export const donationPageCheckoutSchema = z.object({ - amountCents: z.number().int().min(100), + amountCents: z.number().int().min(100).max(10000000), // max $100,000 email: z.string().email(), name: z.string().max(200).optional(), message: z.string().max(2000).optional(), diff --git a/api/src/modules/payments/donations.service.ts b/api/src/modules/payments/donations.service.ts index b269d417..65572c6b 100644 --- a/api/src/modules/payments/donations.service.ts +++ b/api/src/modules/payments/donations.service.ts @@ -141,10 +141,18 @@ export const donationsService = { }, }); - const updated = await prisma.order.update({ - where: { id: orderId }, - data: { status: 'REFUNDED' }, - }); + // Stripe refund succeeded — update DB. If this fails, the charge.refunded + // webhook will reconcile the status as a fallback. + let updated; + try { + updated = await prisma.order.update({ + where: { id: orderId }, + data: { status: 'REFUNDED' }, + }); + } catch (dbErr) { + logger.error(`Stripe refund succeeded but DB update failed for order ${orderId}. Webhook will reconcile.`, dbErr); + throw new Error('Refund processed by Stripe but local status update failed. It will be reconciled shortly.'); + } logger.info(`Donation refunded: ${orderId}, $${(order.amountCAD / 100).toFixed(2)}`, { orderId, @@ -187,8 +195,6 @@ export const donationsService = { 'Donation Page': sanitizeCsvValue(o.donationPage?.title || 'General'), 'Message': sanitizeCsvValue(o.donorMessage || ''), 'Anonymous': o.isAnonymous ? 'Yes' : 'No', - 'Stripe Payment Intent': o.stripePaymentIntentId || '', - 'Stripe Checkout Session': o.stripeCheckoutSessionId || '', 'Completed At': o.completedAt ? o.completedAt.toISOString() : '', 'Order ID': o.id, })), { header: true }); diff --git a/api/src/modules/payments/payment-settings.service.ts b/api/src/modules/payments/payment-settings.service.ts index 8b876d88..82857c11 100644 --- a/api/src/modules/payments/payment-settings.service.ts +++ b/api/src/modules/payments/payment-settings.service.ts @@ -1,3 +1,4 @@ +import { Request, Response, NextFunction } from 'express'; import { prisma } from '../../config/database'; import type { PaymentSettings } from '@prisma/client'; import type { UpdatePaymentSettingsInput } from './payments.schemas'; @@ -40,10 +41,16 @@ export const paymentSettingsService = { async update(data: UpdatePaymentSettingsInput): Promise { const toWrite = { ...data } as Record; - // Encrypt sensitive fields + // Encrypt sensitive fields, skipping masked sentinel values from the admin UI for (const field of ENCRYPTED_FIELDS) { - if (field in toWrite && typeof toWrite[field] === 'string' && toWrite[field]) { - toWrite[field] = encrypt(toWrite[field] as string); + if (field in toWrite && typeof toWrite[field] === 'string') { + const val = toWrite[field] as string; + if (!val || val.startsWith('••••')) { + // Empty or mask string submitted — preserve existing encrypted value + delete toWrite[field]; + continue; + } + toWrite[field] = encrypt(val); } } @@ -69,3 +76,17 @@ export const paymentSettingsService = { return decryptSettings(settings); }, }; + +/** Middleware: reject requests when payments are disabled in site settings */ +export async function requirePaymentsEnabled(_req: Request, res: Response, next: NextFunction) { + try { + const settings = await prisma.siteSettings.findFirst({ select: { enablePayments: true } }); + if (!settings?.enablePayments) { + res.status(403).json({ error: { message: 'Payments are not enabled', code: 'PAYMENTS_DISABLED' } }); + return; + } + next(); + } catch (err) { + next(err); + } +} diff --git a/api/src/modules/payments/payments-admin.routes.ts b/api/src/modules/payments/payments-admin.routes.ts index 0f32f24d..6ea257c3 100644 --- a/api/src/modules/payments/payments-admin.routes.ts +++ b/api/src/modules/payments/payments-admin.routes.ts @@ -37,6 +37,7 @@ router.get('/settings', async (_req: Request, res: Response, next: NextFunction) stripeSecretKey: settings.stripeSecretKey ? '••••' + settings.stripeSecretKey.slice(-4) : '', stripeWebhookSecret: settings.stripeWebhookSecret ? '••••' + settings.stripeWebhookSecret.slice(-4) : '', }; + res.setHeader('Cache-Control', 'no-store'); res.json(masked); } catch (err) { next(err); @@ -50,7 +51,14 @@ router.put( async (req: Request, res: Response, next: NextFunction) => { try { const settings = await paymentSettingsService.update(req.body); - res.json(settings); + // Mask secrets in response (same as GET) to prevent leaking decrypted keys + const masked = { + ...settings, + stripeSecretKey: settings.stripeSecretKey ? '••••' + settings.stripeSecretKey.slice(-4) : '', + stripeWebhookSecret: settings.stripeWebhookSecret ? '••••' + settings.stripeWebhookSecret.slice(-4) : '', + }; + res.setHeader('Cache-Control', 'no-store'); + res.json(masked); } catch (err) { next(err); } diff --git a/api/src/modules/payments/payments-public.routes.ts b/api/src/modules/payments/payments-public.routes.ts index 6c04128a..08ea79de 100644 --- a/api/src/modules/payments/payments-public.routes.ts +++ b/api/src/modules/payments/payments-public.routes.ts @@ -1,12 +1,13 @@ import { Router, Request, Response, NextFunction } from 'express'; import { getPublishableKey } from '../../services/stripe.client'; -import { paymentSettingsService } from './payment-settings.service'; +import { paymentSettingsService, requirePaymentsEnabled } from './payment-settings.service'; import { subscriptionsService } from './subscriptions.service'; import { plansService } from './plans.service'; import { productsService } from './products.service'; import { donationsService } from './donations.service'; import { authenticate } from '../../middleware/auth.middleware'; import { validate } from '../../middleware/validate'; +import { paymentCheckoutRateLimit } from '../../middleware/rate-limit'; import { createSubscriptionCheckoutSchema, createProductCheckoutSchema, @@ -85,6 +86,8 @@ router.get('/products/:slug', async (req: Request, res: Response, next: NextFunc // POST /api/payments/subscribe — create subscription checkout (requires login) router.post( '/subscribe', + requirePaymentsEnabled, + paymentCheckoutRateLimit, authenticate, validate(createSubscriptionCheckoutSchema), async (req: Request, res: Response, next: NextFunction) => { @@ -105,6 +108,8 @@ router.post( // POST /api/payments/purchase — create product checkout (guest or logged-in) router.post( '/purchase', + requirePaymentsEnabled, + paymentCheckoutRateLimit, validate(createProductCheckoutSchema), async (req: Request, res: Response, next: NextFunction) => { try { @@ -122,6 +127,8 @@ router.post( // POST /api/payments/donate — create donation checkout (no auth required) router.post( '/donate', + requirePaymentsEnabled, + paymentCheckoutRateLimit, validate(createDonationCheckoutSchema), async (req: Request, res: Response, next: NextFunction) => { try { diff --git a/api/src/modules/payments/payments.schemas.ts b/api/src/modules/payments/payments.schemas.ts index 413df650..08418ef9 100644 --- a/api/src/modules/payments/payments.schemas.ts +++ b/api/src/modules/payments/payments.schemas.ts @@ -84,7 +84,7 @@ export const createProductCheckoutSchema = z.object({ // --- Donation --- export const createDonationCheckoutSchema = z.object({ - amountCents: z.number().int().min(100), + amountCents: z.number().int().min(100).max(10000000), // max $100,000 email: z.string().email(), name: z.string().max(200).optional(), message: z.string().max(2000).optional(), @@ -111,7 +111,7 @@ export const subscriptionFiltersSchema = z.object({ export const orderFiltersSchema = z.object({ page: z.coerce.number().int().min(1).default(1), limit: z.coerce.number().int().min(1).max(100).default(20), - status: z.enum(['PENDING', 'COMPLETED', 'FAILED', 'REFUNDED']).optional(), + status: z.enum(['PENDING', 'COMPLETED', 'FAILED', 'REFUNDED', 'DISPUTED']).optional(), type: z.enum(['product', 'donation']).optional(), search: z.string().optional(), }); diff --git a/api/src/modules/payments/products.service.ts b/api/src/modules/payments/products.service.ts index c4ccd1db..9e10be15 100644 --- a/api/src/modules/payments/products.service.ts +++ b/api/src/modules/payments/products.service.ts @@ -228,12 +228,16 @@ export const productsService = { /** Create Stripe Checkout for a product purchase */ async createProductCheckout(productId: string, buyerEmail: string, buyerName?: string, userId?: string) { const stripe = await getStripe(); - const product = await prisma.product.findUnique({ where: { id: productId } }); - if (!product || !product.isActive) throw new Error('Product not found or inactive'); - if (product.maxPurchases && product.purchaseCount >= product.maxPurchases) { - throw new Error('Product is sold out'); - } + // Atomic availability check to prevent overselling under concurrency + const product = await prisma.$transaction(async (tx) => { + const p = await tx.product.findUnique({ where: { id: productId } }); + if (!p || !p.isActive) throw new Error('Product not found or inactive'); + if (p.maxPurchases && p.purchaseCount >= p.maxPurchases) { + throw new Error('Product is sold out'); + } + return p; + }); const session = await stripe.checkout.sessions.create({ mode: 'payment', @@ -367,9 +371,16 @@ export const productsService = { await stripe.refunds.create({ payment_intent: order.stripePaymentIntentId }); } - return prisma.order.update({ - where: { id: orderId }, - data: { status: 'REFUNDED' }, - }); + // Stripe refund succeeded — update DB. If this fails, the charge.refunded + // webhook will reconcile the status as a fallback. + try { + return await prisma.order.update({ + where: { id: orderId }, + data: { status: 'REFUNDED' }, + }); + } catch (dbErr) { + logger.error(`Stripe refund succeeded but DB update failed for order ${orderId}. Webhook will reconcile.`, dbErr); + throw new Error('Refund processed by Stripe but local status update failed. It will be reconciled shortly.'); + } }, }; diff --git a/api/src/modules/payments/subscriptions.service.ts b/api/src/modules/payments/subscriptions.service.ts index d83af969..7ca54751 100644 --- a/api/src/modules/payments/subscriptions.service.ts +++ b/api/src/modules/payments/subscriptions.service.ts @@ -259,8 +259,6 @@ export const subscriptionsService = { 'Current Period End': s.currentPeriodEnd ? s.currentPeriodEnd.toISOString() : '', 'Cancel at Period End': s.cancelAtPeriodEnd ? 'Yes' : 'No', 'Cancelled At': s.cancelledAt ? s.cancelledAt.toISOString() : '', - 'Stripe Subscription ID': s.stripeSubscriptionId || '', - 'Stripe Customer ID': s.stripeCustomerId || '', 'Subscription ID': s.id.toString(), 'User ID': s.userId, })), { header: true }); diff --git a/api/src/modules/payments/webhook.service.ts b/api/src/modules/payments/webhook.service.ts index 9e0b11f4..66276333 100644 --- a/api/src/modules/payments/webhook.service.ts +++ b/api/src/modules/payments/webhook.service.ts @@ -4,7 +4,7 @@ import { getStripe, getWebhookSecret } from '../../services/stripe.client'; import { logger } from '../../utils/logger'; import { recordCrmActivity } from '../../utils/crm-activity'; import { paymentEmailService } from './payment-email.service'; -import { listmonkEventSyncService } from '../../services/listmonk-event-sync.service'; +import { eventBus } from '../../services/event-bus.service'; // Helper to extract subscription ID from invoice (may be string, object, or missing in newer types) function getSubscriptionId(invoice: Stripe.Invoice): string | null { @@ -48,6 +48,12 @@ export const webhookService = { case 'charge.refunded': await this.handleChargeRefunded(event.data.object as Stripe.Charge); break; + case 'charge.dispute.created': + await this.handleDisputeCreated(event.data.object as Stripe.Dispute); + break; + case 'charge.dispute.closed': + await this.handleDisputeClosed(event.data.object as Stripe.Dispute); + break; case 'checkout.session.expired': await this.handleCheckoutExpired(event.data.object as Stripe.Checkout.Session); break; @@ -142,12 +148,12 @@ export const webhookService = { const subUser = await prisma.user.findUnique({ where: { id: userId }, select: { email: true, name: true } }); const plan = await prisma.subscriptionPlan.findUnique({ where: { id: parseInt(planId, 10) }, select: { name: true } }); if (subUser) { - listmonkEventSyncService.onSubscriptionActivated({ + eventBus.publish('payment.subscription.activated', { email: subUser.email, name: subUser.name || '', planName: plan?.name || `Plan ${planId}`, subscriptionId, - }).catch(() => {}); + }); } }, @@ -207,13 +213,13 @@ export const webhookService = { // Sync to Listmonk Donors list (fire-and-forget) if (updatedOrder.buyerEmail) { - listmonkEventSyncService.onProductPurchased({ + eventBus.publish('payment.product.purchased', { email: updatedOrder.buyerEmail, name: updatedOrder.buyerName || '', productTitle: updatedOrder.product?.title || 'Product', amountCents: updatedOrder.amountCAD, orderId: updatedOrder.id, - }).catch(() => {}); + }); } // CRM activity (fire-and-forget) @@ -282,12 +288,12 @@ export const webhookService = { // Sync to Listmonk Donors list (fire-and-forget) if (order.buyerEmail) { - listmonkEventSyncService.onDonationCompleted({ + eventBus.publish('payment.donation.completed', { email: order.buyerEmail, name: order.buyerName || '', amountCents: order.amountCAD, orderId: order.id, - }).catch(() => {}); + }); } // CRM activity (fire-and-forget) @@ -518,17 +524,103 @@ export const webhookService = { } } } + }, - // Check payments - const payment = await prisma.payment.findFirst({ + async handleDisputeCreated(dispute: Stripe.Dispute) { + const paymentIntentId = typeof dispute.payment_intent === 'string' + ? dispute.payment_intent + : (dispute.payment_intent as { id: string } | null)?.id; + if (!paymentIntentId) return; + + const order = await prisma.order.findFirst({ where: { stripePaymentIntentId: paymentIntentId }, }); - if (payment && payment.status !== 'refunded') { - await prisma.payment.update({ - where: { id: payment.id }, - data: { status: 'refunded' }, + if (!order || order.status === 'DISPUTED') return; + + const previousStatus = order.status; + await prisma.order.update({ + where: { id: order.id }, + data: { status: 'DISPUTED' }, + }); + + // Invalidate event tickets if applicable + if (order.type === 'event_ticket') { + const tickets = await prisma.ticket.findMany({ + where: { orderId: order.id, status: 'VALID' }, }); + for (const ticket of tickets) { + await prisma.ticket.update({ + where: { id: ticket.id }, + data: { status: 'CANCELLED' }, + }); + } + if (tickets.length > 0) { + logger.info(`Invalidated ${tickets.length} tickets for disputed order ${order.id}`); + } } + + await this.createAuditLog('dispute_created', { + orderId: order.id, + previousStatus, + disputeId: dispute.id, + reason: dispute.reason, + amount: dispute.amount, + }); + logger.warn(`Dispute created for order ${order.id}: ${dispute.reason} ($${(dispute.amount / 100).toFixed(2)})`); + }, + + async handleDisputeClosed(dispute: Stripe.Dispute) { + const paymentIntentId = typeof dispute.payment_intent === 'string' + ? dispute.payment_intent + : (dispute.payment_intent as { id: string } | null)?.id; + if (!paymentIntentId) return; + + const order = await prisma.order.findFirst({ + where: { stripePaymentIntentId: paymentIntentId }, + }); + if (!order) return; + + // Stripe types don't include closed dispute statuses (won/lost/charge_refunded) + const disputeStatus = dispute.status as string; + + // If dispute was won (resolved in our favor), restore the order + tickets + if (disputeStatus === 'won') { + await prisma.order.update({ + where: { id: order.id }, + data: { status: 'COMPLETED' }, + }); + + // Restore tickets that were cancelled when the dispute was opened + if (order.type === 'event_ticket') { + await prisma.ticket.updateMany({ + where: { orderId: order.id, status: 'CANCELLED' }, + data: { status: 'VALID' }, + }); + logger.info(`Restored tickets for dispute-won order ${order.id}`); + } + + logger.info(`Dispute won for order ${order.id}, restored to COMPLETED`); + } else if (disputeStatus === 'lost') { + // Dispute lost — funds returned to customer + await prisma.order.update({ + where: { id: order.id }, + data: { status: 'REFUNDED' }, + }); + logger.warn(`Dispute lost for order ${order.id}, marked REFUNDED`); + } else if (disputeStatus === 'charge_refunded') { + // Merchant refunded while dispute was in flight — dispute auto-closed + await prisma.order.update({ + where: { id: order.id }, + data: { status: 'REFUNDED' }, + }); + logger.info(`Dispute closed via refund for order ${order.id}`); + } + + await this.createAuditLog('dispute_closed', { + orderId: order.id, + disputeId: dispute.id, + outcome: dispute.status, + }); }, async handleCheckoutExpired(session: Stripe.Checkout.Session) { @@ -562,8 +654,19 @@ export const webhookService = { async createAuditLog(action: string, metadata: Record) { try { + const orderId = typeof metadata.orderId === 'string' ? metadata.orderId : undefined; + const userId = typeof metadata.userId === 'string' ? metadata.userId : undefined; + await prisma.paymentAuditLog.create({ + data: { + action, + orderId: orderId || null, + userId: userId || null, + metadata: metadata as import('@prisma/client').Prisma.InputJsonValue, + }, + }); logger.info(`Payment audit: ${action}`, metadata); } catch (err) { + // Audit log failure must not break payment processing logger.error('Failed to create audit log', err); } }, diff --git a/api/src/modules/people/people.service.ts b/api/src/modules/people/people.service.ts index b04e76df..2970cb7a 100644 --- a/api/src/modules/people/people.service.ts +++ b/api/src/modules/people/people.service.ts @@ -2,6 +2,7 @@ import { prisma } from '../../config/database'; import { redis } from '../../config/redis'; import { logger } from '../../utils/logger'; import { AppError } from '../../middleware/error-handler'; +import { eventBus } from '../../services/event-bus.service'; import type { Prisma } from '@prisma/client'; import type { ListPeopleInput, @@ -1053,13 +1054,12 @@ export const peopleService = { }); if (input.email) { - import('../../services/listmonk-event-sync.service').then(({ listmonkEventSyncService }) => { - listmonkEventSyncService.onContactTagsChanged({ - email: input.email!, - name: contact.displayName || '', - addedTags: initialTags, - removedTags: [], - }).catch(err => logger.debug('Listmonk tag sync failed on create:', err)); + eventBus.publish('contact.tags.changed', { + email: input.email!, + name: contact.displayName || '', + contactId: contact.id, + addedTags: initialTags, + removedTags: [], }); } } @@ -1133,13 +1133,12 @@ export const peopleService = { const email = (data.email !== undefined ? (data.email === '' ? null : data.email) : existing.email); if (email) { - import('../../services/listmonk-event-sync.service').then(({ listmonkEventSyncService }) => { - listmonkEventSyncService.onContactTagsChanged({ - email, - name: contact.displayName || '', - addedTags, - removedTags, - }).catch(err => logger.debug('Listmonk tag sync failed:', err)); + eventBus.publish('contact.tags.changed', { + email, + name: contact.displayName || '', + contactId: existing.id, + addedTags, + removedTags, }); } } @@ -1358,13 +1357,12 @@ export const peopleService = { const mergedEmail = target.email || sourceContact?.email; if (mergedEmail) { - import('../../services/listmonk-event-sync.service').then(({ listmonkEventSyncService }) => { - listmonkEventSyncService.onContactTagsChanged({ - email: mergedEmail, - name: target.displayName, - addedTags: addedToTarget, - removedTags: [], - }).catch(err => logger.debug('Listmonk tag sync failed on merge:', err)); + eventBus.publish('contact.tags.changed', { + email: mergedEmail, + name: target.displayName, + contactId: targetId, + addedTags: addedToTarget, + removedTags: [], }); } } diff --git a/api/src/modules/sms/campaigns/sms-campaigns.routes.ts b/api/src/modules/sms/campaigns/sms-campaigns.routes.ts index 9f8bea1b..a6aa3b07 100644 --- a/api/src/modules/sms/campaigns/sms-campaigns.routes.ts +++ b/api/src/modules/sms/campaigns/sms-campaigns.routes.ts @@ -5,6 +5,7 @@ import { validate } from '../../../middleware/validate'; import { smsCampaignsService } from './sms-campaigns.service'; import { createSmsCampaignSchema, updateSmsCampaignSchema } from './sms-campaigns.schemas'; import { smsQueueService } from '../../../services/sms-queue.service'; +import { eventBus } from '../../../services/event-bus.service'; import { BROADCAST_ROLES } from '../../../utils/roles'; const router = Router(); @@ -66,7 +67,17 @@ router.delete('/:id', async (req, res, next) => { // POST /api/sms/campaigns/:id/start — start sending router.post('/:id/start', async (req, res, next) => { try { + const campaign = await smsCampaignsService.findById(req.params.id as string); const result = await smsCampaignsService.start(req.params.id as string); + + if (campaign) { + eventBus.publish('sms.campaign.started', { + campaignId: campaign.id, + title: campaign.name, + recipientCount: campaign.totalRecipients, + }); + } + res.json(result); } catch (err) { next(err); } }); diff --git a/api/src/modules/sms/messages/sms-messages.routes.ts b/api/src/modules/sms/messages/sms-messages.routes.ts index 1c967692..5fdd4b45 100644 --- a/api/src/modules/sms/messages/sms-messages.routes.ts +++ b/api/src/modules/sms/messages/sms-messages.routes.ts @@ -1,9 +1,14 @@ import { Router } from 'express'; import { authenticate } from '../../../middleware/auth.middleware'; import { requireRole } from '../../../middleware/rbac.middleware'; +import { smsSendRateLimit } from '../../../middleware/rate-limit'; import { smsMessagesService } from './sms-messages.service'; +import { eventBus } from '../../../services/event-bus.service'; import { BROADCAST_ROLES } from '../../../utils/roles'; +const MAX_SMS_LENGTH = 1600; +const PHONE_DIGITS_RE = /^\d{10,11}$/; + const router = Router(); router.use(authenticate, requireRole(...BROADCAST_ROLES)); @@ -32,14 +37,30 @@ router.get('/followups', async (_req, res, next) => { }); // POST /api/sms/messages/send — send ad-hoc SMS -router.post('/send', async (req, res, next) => { +router.post('/send', smsSendRateLimit, async (req, res, next) => { try { const { phone, message } = req.body as { phone?: string; message?: string }; if (!phone || !message) { - res.status(400).json({ error: 'Phone and message are required' }); + res.status(400).json({ error: { message: 'Phone and message are required', code: 'VALIDATION_ERROR' } }); + return; + } + const digits = phone.replace(/\D/g, ''); + if (!PHONE_DIGITS_RE.test(digits)) { + res.status(400).json({ error: { message: 'Invalid phone number format', code: 'VALIDATION_ERROR' } }); + return; + } + if (message.length > MAX_SMS_LENGTH) { + res.status(400).json({ error: { message: `Message too long (max ${MAX_SMS_LENGTH} characters)`, code: 'VALIDATION_ERROR' } }); return; } const result = await smsMessagesService.sendSingle(phone, message); + + eventBus.publish('sms.message.sent', { + messageId: result.id, + phone: result.phone, + body: result.message, + }); + res.json(result); } catch (err) { next(err); } }); diff --git a/api/src/modules/social/impact-stories.routes.ts b/api/src/modules/social/impact-stories.routes.ts index b6dc4d06..d713528b 100644 --- a/api/src/modules/social/impact-stories.routes.ts +++ b/api/src/modules/social/impact-stories.routes.ts @@ -3,6 +3,7 @@ import { requireRole } from '../../middleware/rbac.middleware'; import { INFLUENCE_ROLES } from '../../utils/roles'; import { impactStoriesService } from './impact-stories.service'; import { createStorySchema, updateStorySchema, listStoriesSchema } from './impact-stories.schemas'; +import { eventBus } from '../../services/event-bus.service'; const router = Router(); @@ -42,6 +43,14 @@ router.post('/:id/publish', requireRole(...INFLUENCE_ROLES), async (req, res, ne const story = await impactStoriesService.publish(req.params.id as string); // Fire-and-forget: notify participants impactStoriesService.notifyParticipants(story.id).catch(() => {}); + + eventBus.publish('social.impact-story.published', { + storyId: story.id, + title: story.title, + authorUserId: story.createdByUserId || req.user!.id, + campaignId: story.campaignId ?? null, + }); + res.json(story); } catch (err) { next(err); diff --git a/api/src/modules/ticketed-events/ticketed-events-public.routes.ts b/api/src/modules/ticketed-events/ticketed-events-public.routes.ts index d37891c2..a7c424f3 100644 --- a/api/src/modules/ticketed-events/ticketed-events-public.routes.ts +++ b/api/src/modules/ticketed-events/ticketed-events-public.routes.ts @@ -9,6 +9,8 @@ import { getStripe } from '../../services/stripe.client'; import { prisma } from '../../config/database'; import { env } from '../../config/env'; import { AppError } from '../../middleware/error-handler'; +import { paymentCheckoutRateLimit } from '../../middleware/rate-limit'; +import { requirePaymentsEnabled } from '../payments/payment-settings.service'; const router = Router(); @@ -101,7 +103,7 @@ router.get('/:slug/availability', async (req: Request, res: Response, next: Next }); // POST /:slug/checkout — create Stripe checkout for paid ticket -router.post('/:slug/checkout', optionalAuth, validate(checkoutSchema), async (req: Request, res: Response, next: NextFunction) => { +router.post('/:slug/checkout', requirePaymentsEnabled, paymentCheckoutRateLimit, optionalAuth, validate(checkoutSchema), async (req: Request, res: Response, next: NextFunction) => { try { const slug = req.params.slug as string; const { tierId, quantity, buyerEmail, buyerName } = req.body; diff --git a/api/src/modules/ticketed-events/ticketed-events.service.ts b/api/src/modules/ticketed-events/ticketed-events.service.ts index ce433795..592e7cb1 100644 --- a/api/src/modules/ticketed-events/ticketed-events.service.ts +++ b/api/src/modules/ticketed-events/ticketed-events.service.ts @@ -9,6 +9,7 @@ import { generateSlug as generateMeetingSlug } from '../../utils/slug'; import { env } from '../../config/env'; import crypto from 'crypto'; import { EVENTS_ROLES } from '../../utils/roles'; +import { eventBus } from '../../services/event-bus.service'; function generateSlug(title: string): string { return title @@ -388,6 +389,16 @@ export const ticketedEventsService = { this.syncToGancio(updated).catch(() => {}); unifiedCalendarService.bustCache().catch(() => {}); + eventBus.publish('ticketed-event.published', { + eventId: updated.id, + title: updated.title, + date: updated.date.toISOString().split('T')[0], + startTime: updated.startTime, + endTime: updated.endTime, + location: updated.venueAddress || updated.venueName || undefined, + gancioEventId: updated.gancioEventId ?? undefined, + }); + return updated; }, @@ -451,6 +462,11 @@ export const ticketedEventsService = { } unifiedCalendarService.bustCache().catch(() => {}); + eventBus.publish('ticketed-event.cancelled', { + eventId: updated.id, + title: event.title, + }); + return updated; }, diff --git a/api/src/modules/users/users.routes.ts b/api/src/modules/users/users.routes.ts index 8aa9bee5..b5c2c997 100644 --- a/api/src/modules/users/users.routes.ts +++ b/api/src/modules/users/users.routes.ts @@ -10,6 +10,7 @@ import { requireRole } from '../../middleware/rbac.middleware'; import { hasAnyRole, ADMIN_ROLES, getUserRoles } from '../../utils/roles'; import { prisma } from '../../config/database'; import { emailService } from '../../services/email.service'; +import { eventBus } from '../../services/event-bus.service'; import { env } from '../../config/env'; import { logger } from '../../utils/logger'; import { userProvisioningService } from '../../services/user-provisioning/provisioning.service'; @@ -115,7 +116,7 @@ router.put( } // Self-service password change requires current password verification - if (isSelf && !isAdminUser && req.body.password) { + if (isSelf && req.body.password) { if (!req.body.currentPassword) { res.status(400).json({ error: { message: 'Current password is required to change your password', code: 'CURRENT_PASSWORD_REQUIRED' } }); return; @@ -183,6 +184,14 @@ router.post( roles: user.roles, status: 'ACTIVE', permissions: user.permissions as Record | null, }).catch(err => logger.warn('User provisioning hook (approve) failed:', err)); + eventBus.publish('user.approved', { + userId: user.id, + email: user.email, + name: user.name || '', + role: user.role, + approvedByUserId: req.user!.id, + }); + res.json({ message: 'User approved', userId: id }); } catch (err) { next(err); diff --git a/api/src/modules/users/users.service.ts b/api/src/modules/users/users.service.ts index eba2b7fa..70c4578d 100644 --- a/api/src/modules/users/users.service.ts +++ b/api/src/modules/users/users.service.ts @@ -4,6 +4,7 @@ import { prisma } from '../../config/database'; import { AppError } from '../../middleware/error-handler'; import { getPrimaryRole } from '../../utils/roles'; import { userProvisioningService } from '../../services/user-provisioning/provisioning.service'; +import { eventBus } from '../../services/event-bus.service'; import { logger } from '../../utils/logger'; import type { CMUser } from '../../services/user-provisioning/provisioner.interface'; import type { CreateUserInput, UpdateUserInput, ListUsersInput } from './users.schemas'; @@ -122,6 +123,13 @@ export const usersService = { logger.warn('User provisioning hook (create) failed:', err); }); + eventBus.publish('user.created', { + userId: user.id, + email: user.email, + name: user.name || '', + role: user.role, + }); + return user; }, @@ -182,6 +190,16 @@ export const usersService = { logger.warn('User provisioning hook (update) failed:', err); }); + // Compute list of changed fields for the event payload + const changes = Object.keys(data).filter(k => k !== 'currentPassword'); + eventBus.publish('user.updated', { + userId: user.id, + email: user.email, + name: user.name || '', + role: user.role, + changes, + }); + return user; }, @@ -198,6 +216,12 @@ export const usersService = { }); await prisma.user.delete({ where: { id } }); + + eventBus.publish('user.deleted', { + userId: existing.id, + email: existing.email, + name: existing.name || '', + }); }, }; diff --git a/api/src/server.ts b/api/src/server.ts index 1a8a7f5a..ca8829e2 100644 --- a/api/src/server.ts +++ b/api/src/server.ts @@ -14,6 +14,7 @@ import { authenticate } from './middleware/auth.middleware'; import { requireRole } from './middleware/rbac.middleware'; import { globalRateLimit, healthMetricsRateLimit } from './middleware/rate-limit'; import { authRouter } from './modules/auth/auth.routes'; +import { giteaSsoRouter } from './modules/auth/gitea-sso.routes'; import { usersRouter } from './modules/users/users.routes'; import { provisioningRouter } from './modules/users/provisioning.routes'; import { campaignsRouter } from './modules/influence/campaigns/campaigns.routes'; @@ -34,6 +35,9 @@ import { qrRouter } from './modules/qr/qr.routes'; import { listmonkRouter } from './modules/listmonk/listmonk.routes'; import { listmonkWebhookRouter } from './modules/listmonk/listmonk-webhook.routes'; import { meetingPlannerAdminRouter, meetingPlannerPublicRouter } from './modules/meeting-planner/meeting-planner.routes'; +import { strawPollAdminRouter } from './modules/polls/polls.routes'; +import { strawPollPublicRouter } from './modules/polls/polls-public.routes'; +import { strawPollWidgetRouter } from './modules/polls/polls-widget.routes'; import { pagesPublicRouter } from './modules/pages/pages-public.routes'; import { pagesAdminRouter } from './modules/pages/pages-admin.routes'; import { blocksRouter } from './modules/pages/blocks.routes'; @@ -123,12 +127,16 @@ import { autoUpgradeService } from './services/auto-upgrade.service'; import { calendarFeedQueueService } from './services/calendar-feed-queue.service'; import { scheduledJobsQueueService } from './services/scheduled-jobs-queue.service'; import { pollAutoFinalizeQueueService } from './services/poll-auto-finalize-queue.service'; +import { pollAutoCloseQueueService } from './services/poll-auto-close-queue.service'; +import { pollSseService } from './modules/polls/polls-sse.service'; import { agendaRouter } from './modules/meetings/agenda.routes'; import { actionItemsRouter } from './modules/meetings/action-items.routes'; import { WebSocketServer } from 'ws'; import { docsCollabService } from './modules/docs/docs-collab.service'; import { correlationId } from './middleware/correlation-id'; import cookieParser from 'cookie-parser'; +import { registerAllEventListeners } from './services/event-listeners'; +import { eventBus } from './services/event-bus.service'; const app = express(); @@ -272,6 +280,7 @@ app.get('/api/metrics/internal', async (req, res) => { // --- API Routes --- app.use('/api/auth', authRouter); +app.use('/api/auth', giteaSsoRouter); // Gitea SSO validation (nginx auth_request) app.use('/api/users', usersRouter); app.use('/api/users', provisioningRouter); // User provisioning management (ADMIN roles) app.use('/api/campaigns', campaignPublicRouter); // Public campaign details (no auth) @@ -301,6 +310,9 @@ app.use('/api/map/settings', mapSettingsRouter); // Map settings (public app.use('/api/map/events', eventsPublicRouter); // Public map events from Gancio (no auth) app.use('/api/meeting-planner', meetingPlannerPublicRouter); // Public poll viewing + voting (no auth) app.use('/api/meeting-planner', meetingPlannerAdminRouter); // Admin poll CRUD (auth required) +app.use('/api/straw-polls', strawPollPublicRouter); // Public straw poll voting + viewing (no auth) +app.use('/api/straw-polls', strawPollWidgetRouter); // Straw poll widget endpoint (no auth, cached) +app.use('/api/straw-polls', strawPollAdminRouter); // Admin straw poll CRUD (auth required) app.use('/api/meetings/agendas', agendaRouter); // Meeting agendas + minutes (EVENTS roles) app.use('/api/meetings/action-items', actionItemsRouter); // Action items CRUD (EVENTS roles / auth) app.use('/api/qr', qrRouter); // QR code generation (public) @@ -390,6 +402,9 @@ async function start() { // Register user provisioning framework registerProvisioners(); + // Register EventBus listeners (Listmonk, RC, CRM, Calendar, n8n, Gancio) + registerAllEventListeners(); + // Rebuild SMTP transporter from DB settings (env fallback for empty fields) await emailService.rebuildTransporter(); @@ -399,6 +414,7 @@ async function start() { calendarFeedQueueService.startWorker(); scheduledJobsQueueService.startWorker(); pollAutoFinalizeQueueService.startWorker(); + pollAutoCloseQueueService.startWorker(); startProxy(); // Load SMS config from DB (env fallback for empty fields) @@ -432,6 +448,7 @@ async function start() { // SSE + Presence: mark all users offline on startup, start heartbeat + stale cleanup presenceService.markAllOffline().catch(() => {}); sseService.startHeartbeat(); + pollSseService.startHeartbeat(); setInterval(() => presenceService.cleanupStale().catch(() => {}), 60 * 1000); // every 1 min // Challenge lifecycle: activate/complete/score every 5 minutes @@ -543,6 +560,7 @@ for (const signal of ['SIGTERM', 'SIGINT']) { process.on(signal, async () => { logger.info(`${signal} received, shutting down...`); sseService.closeAll(); + pollSseService.closeAll(); await docsCollabService.shutdown(); await stopProxy(); await emailQueueService.close(); diff --git a/api/src/services/email-queue.service.ts b/api/src/services/email-queue.service.ts index 3ebc30d9..e4a85e86 100644 --- a/api/src/services/email-queue.service.ts +++ b/api/src/services/email-queue.service.ts @@ -5,7 +5,7 @@ import { prisma } from '../config/database'; import { logger } from '../utils/logger'; import { emailService } from './email.service'; import { recordEmailSent, recordEmailFailed, setEmailQueueSize, emailSendDuration } from '../utils/metrics'; -import { listmonkEventSyncService } from './listmonk-event-sync.service'; +import { eventBus } from './event-bus.service'; interface CampaignEmailJobData { campaignEmailId: string; @@ -66,13 +66,13 @@ class EmailQueueService { if (result.success) { recordEmailSent(campaignId); - // Listmonk event sync - listmonkEventSyncService.onCampaignEmailSent({ + // Publish campaign email sent event + eventBus.publish('campaign.email.sent', { email: emailData.userEmail, name: emailData.userName, campaignSlug: emailData.campaignTitle, postalCode: emailData.postalCode, - }).catch(() => {}); + }); } else { recordEmailFailed(campaignId, 'send_failure'); throw new Error(`Failed to send email to ${emailData.recipientEmail}`); diff --git a/api/src/services/event-bus.service.ts b/api/src/services/event-bus.service.ts new file mode 100644 index 00000000..0ad1d954 --- /dev/null +++ b/api/src/services/event-bus.service.ts @@ -0,0 +1,183 @@ +/** + * Platform EventBus — in-process pub/sub for decoupled service integration. + * + * Design: + * - Uses Node.js EventEmitter (single process, zero serialization overhead) + * - Typed events via PlatformEventMap (compile-time safety) + * - Wildcard subscriptions: subscribe('shift.*') catches all shift events + * - Error isolation: each listener wraps its handler in try-catch + * - Stats tracking: per-event and per-listener counters for observability + * + * Usage: + * // Publish (from any service) + * eventBus.publish('shift.signup.created', { shiftId, userName, ... }); + * + * // Subscribe (from listeners registered at startup) + * eventBus.subscribe('shift.signup.created', async (payload) => { ... }); + * eventBus.subscribe('shift.*', async (payload) => { ... }); // wildcard + */ + +import { EventEmitter } from 'events'; +import { logger } from '../utils/logger'; +import type { PlatformEventMap, PlatformEventName, EventPayload } from '../types/events'; + +type EventHandler = (payload: EventPayload) => void | Promise; + +interface ListenerRegistration { + name: string; + pattern: string; + handler: (event: string, payload: unknown) => void | Promise; +} + +interface EventStats { + published: number; + lastPublishedAt: Date | null; +} + +class EventBus { + private emitter = new EventEmitter(); + private listeners: ListenerRegistration[] = []; + private eventStats = new Map(); + private listenerStats = new Map(); + + constructor() { + // Allow many listeners (we'll have multiple per event) + this.emitter.setMaxListeners(100); + } + + /** + * Publish a typed event. All matching subscribers are called asynchronously. + * This is fire-and-forget — errors in listeners do NOT propagate to the publisher. + */ + publish(event: E, payload: EventPayload): void { + // Update stats + const stats = this.eventStats.get(event) ?? { published: 0, lastPublishedAt: null }; + stats.published++; + stats.lastPublishedAt = new Date(); + this.eventStats.set(event, stats); + + // Emit to exact subscribers + this.emitter.emit(event, payload); + + // Emit to wildcard subscribers + for (const reg of this.listeners) { + if (reg.pattern.endsWith('.*')) { + const prefix = reg.pattern.slice(0, -2); + if (event.startsWith(prefix + '.') && event !== reg.pattern) { + this.safeCall(reg.name, () => reg.handler(event, payload)); + } + } + } + + logger.debug(`EventBus: ${event}`, { event }); + } + + /** + * Subscribe to a specific event with a named listener. + * The name is used for stats tracking and debugging. + */ + subscribe( + event: E, + name: string, + handler: EventHandler, + ): void { + const wrappedHandler = (payload: EventPayload) => { + this.safeCall(name, () => handler(payload)); + }; + + this.emitter.on(event, wrappedHandler); + this.listeners.push({ + name, + pattern: event, + handler: (_event: string, payload: unknown) => handler(payload as EventPayload), + }); + this.listenerStats.set(name, { handled: 0, errors: 0 }); + } + + /** + * Subscribe to all events matching a wildcard pattern (e.g., 'shift.*'). + * Handler receives both the event name and payload. + */ + subscribePattern( + pattern: string, + name: string, + handler: (event: string, payload: unknown) => void | Promise, + ): void { + this.listeners.push({ name, pattern, handler }); + this.listenerStats.set(name, { handled: 0, errors: 0 }); + } + + /** + * Call a handler with error isolation and stats tracking. + */ + private safeCall(listenerName: string, fn: () => void | Promise): void { + const stats = this.listenerStats.get(listenerName); + + try { + const result = fn(); + if (result instanceof Promise) { + result + .then(() => { + if (stats) stats.handled++; + }) + .catch((err) => { + if (stats) { + stats.handled++; + stats.errors++; + } + logger.debug(`EventBus listener "${listenerName}" error:`, err); + }); + } else { + if (stats) stats.handled++; + } + } catch (err) { + if (stats) { + stats.handled++; + stats.errors++; + } + logger.debug(`EventBus listener "${listenerName}" sync error:`, err); + } + } + + /** + * Get stats for observability dashboard. + */ + getStats(): { + totalEventsPublished: number; + eventCounts: Record; + listenerCounts: Record; + registeredListeners: { name: string; pattern: string }[]; + } { + let total = 0; + const eventCounts: Record = {}; + for (const [name, stats] of this.eventStats) { + total += stats.published; + eventCounts[name] = { + published: stats.published, + lastPublishedAt: stats.lastPublishedAt?.toISOString() ?? null, + }; + } + + const listenerCounts: Record = {}; + for (const [name, stats] of this.listenerStats) { + listenerCounts[name] = { ...stats }; + } + + return { + totalEventsPublished: total, + eventCounts, + listenerCounts, + registeredListeners: this.listeners.map(l => ({ name: l.name, pattern: l.pattern })), + }; + } + + /** + * Remove all listeners (for testing or shutdown). + */ + removeAllListeners(): void { + this.emitter.removeAllListeners(); + this.listeners = []; + } +} + +export const eventBus = new EventBus(); diff --git a/api/src/services/event-listeners/calendar-sync.listener.ts b/api/src/services/event-listeners/calendar-sync.listener.ts new file mode 100644 index 00000000..551aead6 --- /dev/null +++ b/api/src/services/event-listeners/calendar-sync.listener.ts @@ -0,0 +1,268 @@ +/** + * Calendar Sync EventBus Listener + * + * Auto-populates CalendarItems from Shifts, Meetings, and TicketedEvents. + * Creates items on a system "Platform Events" layer, giving volunteers a + * unified timeline of all scheduled activities. + * + * Uses the existing CalendarItem.sourceType + sourceId fields for tracking + * which external entity each calendar item came from. + * + * No feature guard — always active if enableSocialCalendar is true (checked per-event). + */ + +import { eventBus } from '../event-bus.service'; +import { logger } from '../../utils/logger'; + +// Lazy-import prisma +let prismaPromise: ReturnType | null = null; +async function getPrisma() { + const { prisma } = await import('../../config/database'); + return prisma; +} +function lazyPrisma() { + if (!prismaPromise) prismaPromise = getPrisma(); + return prismaPromise; +} + +/** + * Check if the social calendar feature is enabled in site settings. + */ +async function isCalendarEnabled(): Promise { + try { + const prisma = await lazyPrisma(); + const settings = await prisma.siteSettings.findFirst({ select: { enableSocialCalendar: true } }); + return settings?.enableSocialCalendar ?? false; + } catch { + return false; + } +} + +/** + * Find or create the system "Platform Events" calendar layer. + * Uses a well-known layer name so all sync items land in one place. + */ +async function getSystemLayer(userId: string): Promise { + try { + const prisma = await lazyPrisma(); + + // Look for existing system layer for this user + const existing = await prisma.calendarLayer.findFirst({ + where: { userId, name: 'Platform Events', layerType: 'SYSTEM' }, + select: { id: true }, + }); + if (existing) return existing.id; + + // Create a new one + const layer = await prisma.calendarLayer.create({ + data: { + userId, + name: 'Platform Events', + color: '#3498db', + layerType: 'SYSTEM', + visibility: 'PRIVATE', + isEnabled: true, + }, + }); + return layer.id; + } catch (err) { + logger.debug('Calendar sync: failed to get/create system layer:', err); + return null; + } +} + +/** + * Upsert a calendar item linked to an external source. + */ +async function upsertCalendarItem( + userId: string, + sourceType: string, + sourceId: string, + data: { + title: string; + date: string; + startTime: string; + endTime: string; + description?: string; + location?: string; + }, +): Promise { + try { + if (!(await isCalendarEnabled())) return; + + const prisma = await lazyPrisma(); + const layerId = await getSystemLayer(userId); + if (!layerId) return; + + const dateObj = new Date(data.date + 'T00:00:00Z'); + + // Check if calendar item already exists for this source + const existing = await prisma.calendarItem.findFirst({ + where: { sourceType: sourceType as any, sourceId }, + select: { id: true }, + }); + + if (existing) { + // Update existing item + await prisma.calendarItem.update({ + where: { id: existing.id }, + data: { + title: data.title, + date: dateObj, + startTime: data.startTime, + endTime: data.endTime, + description: data.description, + location: data.location, + }, + }); + } else { + // Create new item + await prisma.calendarItem.create({ + data: { + userId, + layerId, + title: data.title, + date: dateObj, + startTime: data.startTime, + endTime: data.endTime, + description: data.description, + location: data.location, + sourceType: sourceType as any, + sourceId, + itemType: 'EVENT', + busyStatus: 'BUSY', + }, + }); + } + } catch (err) { + logger.debug(`Calendar sync: upsert failed for ${sourceType}:${sourceId}:`, err); + } +} + +/** + * Delete a calendar item by its source reference. + */ +async function deleteBySource(sourceType: string, sourceId: string): Promise { + try { + const prisma = await lazyPrisma(); + await prisma.calendarItem.deleteMany({ + where: { sourceType: sourceType as any, sourceId }, + }); + } catch (err) { + logger.debug(`Calendar sync: delete failed for ${sourceType}:${sourceId}:`, err); + } +} + +export function registerCalendarSyncListener(): void { + // Shift created → Calendar item + eventBus.subscribe('shift.created', 'calendar:shift-created', async (payload) => { + await upsertCalendarItem(payload.createdByUserId, 'MANUAL', payload.shiftId, { + title: `Shift: ${payload.title}`, + date: payload.date, + startTime: payload.startTime, + endTime: payload.endTime, + location: payload.cutName ?? undefined, + }); + }); + + // Shift updated → Update calendar item + eventBus.subscribe('shift.updated', 'calendar:shift-updated', async (payload) => { + // We don't know who created the shift — find existing calendar item + try { + const prisma = await lazyPrisma(); + const existing = await prisma.calendarItem.findFirst({ + where: { sourceId: payload.shiftId }, + select: { userId: true }, + }); + if (!existing) return; + await upsertCalendarItem(existing.userId, 'MANUAL', payload.shiftId, { + title: `Shift: ${payload.title}`, + date: payload.date, + startTime: payload.startTime, + endTime: payload.endTime, + location: payload.cutName ?? undefined, + }); + } catch { + // silent + } + }); + + // Shift deleted → Remove calendar item + eventBus.subscribe('shift.deleted', 'calendar:shift-deleted', async (payload) => { + await deleteBySource('MANUAL', payload.shiftId); + }); + + // Meeting created → Calendar item + eventBus.subscribe('meeting.created', 'calendar:meeting-created', async (payload) => { + const date = payload.scheduledAt.split('T')[0]; + const time = payload.scheduledAt.split('T')[1]?.slice(0, 5) ?? '00:00'; + const endHour = parseInt(time.split(':')[0]) + 1; + const endTime = `${String(endHour).padStart(2, '0')}:${time.split(':')[1]}`; + + await upsertCalendarItem(payload.createdByUserId, 'MANUAL', payload.meetingId, { + title: `Meeting: ${payload.title}`, + date, + startTime: time, + endTime, + description: payload.jitsiRoomName ? `Jitsi room: ${payload.jitsiRoomName}` : undefined, + }); + }); + + // Meeting updated → Update calendar item + eventBus.subscribe('meeting.updated', 'calendar:meeting-updated', async (payload) => { + try { + const prisma = await lazyPrisma(); + const existing = await prisma.calendarItem.findFirst({ + where: { sourceId: payload.meetingId }, + select: { userId: true }, + }); + if (!existing) return; + + const date = payload.scheduledAt.split('T')[0]; + const time = payload.scheduledAt.split('T')[1]?.slice(0, 5) ?? '00:00'; + const endHour = parseInt(time.split(':')[0]) + 1; + const endTime = `${String(endHour).padStart(2, '0')}:${time.split(':')[1]}`; + + await upsertCalendarItem(existing.userId, 'MANUAL', payload.meetingId, { + title: `Meeting: ${payload.title}`, + date, + startTime: time, + endTime, + }); + } catch { + // silent + } + }); + + // Meeting deleted → Remove calendar item + eventBus.subscribe('meeting.deleted', 'calendar:meeting-deleted', async (payload) => { + await deleteBySource('MANUAL', payload.meetingId); + }); + + // Ticketed event published → Calendar item + eventBus.subscribe('ticketed-event.published', 'calendar:ticketed-event', async (payload) => { + // Find who created this event + try { + const prisma = await lazyPrisma(); + const event = await prisma.ticketedEvent.findUnique({ + where: { id: payload.eventId }, + select: { createdByUserId: true }, + }); + if (!event) return; + await upsertCalendarItem(event.createdByUserId, 'MANUAL', payload.eventId, { + title: `Event: ${payload.title}`, + date: payload.date, + startTime: payload.startTime, + endTime: payload.endTime ?? payload.startTime, + location: payload.location, + }); + } catch { + // silent + } + }); + + // Ticketed event cancelled → Remove calendar item + eventBus.subscribe('ticketed-event.cancelled', 'calendar:ticketed-event-cancel', async (payload) => { + await deleteBySource('MANUAL', payload.eventId); + }); +} diff --git a/api/src/services/event-listeners/crm-activity.listener.ts b/api/src/services/event-listeners/crm-activity.listener.ts new file mode 100644 index 00000000..e401d6d0 --- /dev/null +++ b/api/src/services/event-listeners/crm-activity.listener.ts @@ -0,0 +1,213 @@ +/** + * CRM Activity EventBus Listener + * + * Auto-creates ContactActivity entries for every meaningful engagement touchpoint. + * This makes the CRM contact timeline actually useful — staff can see a contact's + * full interaction history across campaigns, canvassing, donations, and SMS. + * + * No feature guard — always active (activities are core CRM data). + */ + +import { eventBus } from '../event-bus.service'; +import { logger } from '../../utils/logger'; + +// Lazy-import prisma to avoid circular dependency at module load time +let prismaPromise: ReturnType | null = null; +async function getPrisma() { + const { prisma } = await import('../../config/database'); + return prisma; +} +function lazyPrisma() { + if (!prismaPromise) prismaPromise = getPrisma(); + return prismaPromise; +} + +/** + * Find a contact by email. Returns null if not found or no email provided. + */ +async function findContactByEmail(email?: string | null): Promise { + if (!email) return null; + try { + const prisma = await lazyPrisma(); + const contactEmail = await prisma.contactEmail.findFirst({ + where: { email: email.toLowerCase() }, + select: { contactId: true }, + }); + return contactEmail?.contactId ?? null; + } catch { + return null; + } +} + +/** + * Create a ContactActivity entry. Silently fails if contact not found. + */ +async function createActivity( + contactId: string, + type: string, + title: string, + description?: string, + metadata?: Record, +): Promise { + try { + const prisma = await lazyPrisma(); + await prisma.contactActivity.create({ + data: { + contactId, + type: type as any, + title, + description, + metadata: metadata ? (metadata as unknown as import('@prisma/client').Prisma.InputJsonValue) : undefined, + }, + }); + } catch (err) { + logger.debug(`CRM activity creation failed for contact ${contactId}:`, err); + } +} + +export function registerCrmActivityListener(): void { + // Campaign email sent + eventBus.subscribe('campaign.email.sent', 'crm:campaign-email', async (payload) => { + const contactId = await findContactByEmail(payload.email); + if (!contactId) return; + await createActivity(contactId, 'EMAIL_SENT', `Sent advocacy email for "${payload.campaignSlug}"`, undefined, { + campaignSlug: payload.campaignSlug, + postalCode: payload.postalCode, + }); + }); + + // Shift signup + eventBus.subscribe('shift.signup.created', 'crm:shift-signup', async (payload) => { + const contactId = await findContactByEmail(payload.userEmail); + if (!contactId) return; + await createActivity(contactId, 'SHIFT_SIGNUP', `Signed up for shift: ${payload.shiftTitle}`, undefined, { + shiftId: payload.shiftId, + shiftDate: payload.shiftDate, + signupType: payload.signupType, + }); + }); + + // Canvass visit recorded + eventBus.subscribe('canvass.visit.recorded', 'crm:canvass-visit', async (payload) => { + const contactId = await findContactByEmail(payload.email); + if (!contactId) return; + await createActivity(contactId, 'CANVASS_VISIT', `Canvass visit: ${payload.outcome}`, undefined, { + visitId: payload.visitId, + outcome: payload.outcome, + supportLevel: payload.supportLevel, + }); + }); + + // Response submitted + eventBus.subscribe('response.submitted', 'crm:response-submitted', async (payload) => { + const contactId = await findContactByEmail(payload.userEmail); + if (!contactId) return; + await createActivity(contactId, 'RESPONSE_SUBMITTED', `Submitted response for "${payload.campaignTitle}"`, undefined, { + responseId: payload.responseId, + campaignId: payload.campaignId, + representativeName: payload.representativeName, + }); + }); + + // Donation completed + eventBus.subscribe('payment.donation.completed', 'crm:donation', async (payload) => { + const contactId = await findContactByEmail(payload.email); + if (!contactId) return; + const amount = (payload.amountCents / 100).toFixed(2); + await createActivity(contactId, 'DONATION', `Donated $${amount}`, undefined, { + orderId: payload.orderId, + amountCents: payload.amountCents, + }); + }); + + // Product purchased + eventBus.subscribe('payment.product.purchased', 'crm:product-purchase', async (payload) => { + const contactId = await findContactByEmail(payload.email); + if (!contactId) return; + const amount = (payload.amountCents / 100).toFixed(2); + await createActivity(contactId, 'PURCHASE', `Purchased "${payload.productTitle}" ($${amount})`, undefined, { + orderId: payload.orderId, + productTitle: payload.productTitle, + amountCents: payload.amountCents, + }); + }); + + // SMS sent + eventBus.subscribe('sms.message.sent', 'crm:sms-sent', async (payload) => { + // SMS uses phone numbers — find contact by phone + try { + const prisma = await lazyPrisma(); + const contactPhone = await prisma.contactPhone.findFirst({ + where: { phone: payload.phone }, + select: { contactId: true }, + }); + if (!contactPhone) return; + await createActivity(contactPhone.contactId, 'SMS_SENT', `SMS sent to ${payload.phone}`, payload.body.slice(0, 200), { + messageId: payload.messageId, + campaignId: payload.campaignId, + }); + } catch { + // silent + } + }); + + // SMS received + eventBus.subscribe('sms.message.received', 'crm:sms-received', async (payload) => { + try { + const prisma = await lazyPrisma(); + const contactPhone = await prisma.contactPhone.findFirst({ + where: { phone: payload.phone }, + select: { contactId: true }, + }); + if (!contactPhone) return; + await createActivity(contactPhone.contactId, 'SMS_RECEIVED', `SMS received from ${payload.phone}`, payload.body.slice(0, 200), { + messageId: payload.messageId, + conversationId: payload.conversationId, + responseType: payload.responseType, + }); + } catch { + // silent + } + }); + + // Video viewed (only logged-in users) + eventBus.subscribe('media.video.viewed', 'crm:video-view', async (payload) => { + if (!payload.userId) return; + try { + const prisma = await lazyPrisma(); + const user = await prisma.user.findUnique({ + where: { id: payload.userId }, + select: { email: true }, + }); + if (!user) return; + const contactId = await findContactByEmail(user.email); + if (!contactId) return; + await createActivity(contactId, 'VIDEO_VIEW', `Watched "${payload.videoTitle}"`, undefined, { + videoId: payload.videoId, + }); + } catch { + // silent + } + }); + + // Listmonk email opened → activity + eventBus.subscribe('listmonk.email.opened', 'crm:email-opened', async (payload) => { + const contactId = await findContactByEmail(payload.subscriberEmail); + if (!contactId) return; + await createActivity(contactId, 'EMAIL_SENT', `Opened newsletter: "${payload.campaignName}"`, undefined, { + listmonkCampaignId: payload.campaignId, + action: 'opened', + }); + }); + + // Listmonk email link clicked → activity + eventBus.subscribe('listmonk.email.clicked', 'crm:email-clicked', async (payload) => { + const contactId = await findContactByEmail(payload.subscriberEmail); + if (!contactId) return; + await createActivity(contactId, 'EMAIL_SENT', `Clicked link in "${payload.campaignName}"`, payload.url, { + listmonkCampaignId: payload.campaignId, + action: 'clicked', + url: payload.url, + }); + }); +} diff --git a/api/src/services/event-listeners/gancio.listener.ts b/api/src/services/event-listeners/gancio.listener.ts new file mode 100644 index 00000000..745735a5 --- /dev/null +++ b/api/src/services/event-listeners/gancio.listener.ts @@ -0,0 +1,93 @@ +/** + * Gancio EventBus Listener + * + * Syncs shift events to the Gancio public event calendar. + * This replaces the inline gancioClient calls in shifts.service.ts. + * + * Feature guard: GANCIO_SYNC_ENABLED=true (checked inside gancioClient) + */ + +import { eventBus } from '../event-bus.service'; +import { logger } from '../../utils/logger'; + +// Lazy-import to avoid circular dependency at module load +async function getGancioClient() { + const { gancioClient } = await import('../gancio.client'); + return gancioClient; +} + +export function registerGancioListener(): void { + // Shift created → Create Gancio event + eventBus.subscribe('shift.created', 'gancio:shift-created', async (payload) => { + try { + const gancio = await getGancioClient(); + if (!gancio.enabled) return; + + const eventId = await gancio.createEvent({ + title: payload.title, + description: `Volunteer shift: ${payload.title}`, + location: payload.cutName ?? 'TBD', + date: new Date(payload.date), + startTime: payload.startTime, + endTime: payload.endTime, + tags: ['volunteer', 'shift'], + }); + + // Store gancioEventId back on the shift + if (eventId) { + const { prisma } = await import('../../config/database'); + await prisma.shift.update({ + where: { id: payload.shiftId }, + data: { gancioEventId: eventId }, + }); + } + } catch (err) { + logger.debug('Gancio sync: shift create failed:', err); + } + }); + + // Shift updated → Update Gancio event + eventBus.subscribe('shift.updated', 'gancio:shift-updated', async (payload) => { + try { + const gancio = await getGancioClient(); + if (!gancio.enabled) return; + + const { prisma } = await import('../../config/database'); + const shift = await prisma.shift.findUnique({ + where: { id: payload.shiftId }, + select: { gancioEventId: true }, + }); + if (!shift?.gancioEventId) return; + + await gancio.updateEvent(shift.gancioEventId, { + title: payload.title, + description: `Volunteer shift: ${payload.title}`, + location: payload.cutName ?? 'TBD', + date: new Date(payload.date), + startTime: payload.startTime, + endTime: payload.endTime, + }); + } catch (err) { + logger.debug('Gancio sync: shift update failed:', err); + } + }); + + // Shift deleted → Delete Gancio event + eventBus.subscribe('shift.deleted', 'gancio:shift-deleted', async (payload) => { + try { + const gancio = await getGancioClient(); + if (!gancio.enabled) return; + + const { prisma } = await import('../../config/database'); + const shift = await prisma.shift.findUnique({ + where: { id: payload.shiftId }, + select: { gancioEventId: true }, + }); + if (!shift?.gancioEventId) return; + + await gancio.deleteEvent(shift.gancioEventId); + } catch (err) { + logger.debug('Gancio sync: shift delete failed:', err); + } + }); +} diff --git a/api/src/services/event-listeners/index.ts b/api/src/services/event-listeners/index.ts new file mode 100644 index 00000000..163a08ab --- /dev/null +++ b/api/src/services/event-listeners/index.ts @@ -0,0 +1,37 @@ +/** + * EventBus Listener Registry + * + * Registers all event listeners at application startup. + * Each listener is independent — if one fails to register, others continue. + */ + +import { logger } from '../../utils/logger'; +import { registerListmonkListener } from './listmonk.listener'; +import { registerRocketChatListener } from './rocketchat.listener'; +import { registerCrmActivityListener } from './crm-activity.listener'; +import { registerCalendarSyncListener } from './calendar-sync.listener'; +import { registerN8nWebhookListener } from './n8n-webhook.listener'; +import { registerGancioListener } from './gancio.listener'; + +export function registerAllEventListeners(): void { + const listeners = [ + { name: 'Listmonk', register: registerListmonkListener }, + { name: 'Rocket.Chat', register: registerRocketChatListener }, + { name: 'CRM Activity', register: registerCrmActivityListener }, + { name: 'Calendar Sync', register: registerCalendarSyncListener }, + { name: 'n8n Webhook', register: registerN8nWebhookListener }, + { name: 'Gancio', register: registerGancioListener }, + ]; + + let registered = 0; + for (const listener of listeners) { + try { + listener.register(); + registered++; + } catch (err) { + logger.warn(`EventBus: failed to register ${listener.name} listener:`, err); + } + } + + logger.info(`EventBus: ${registered}/${listeners.length} listeners registered`); +} diff --git a/api/src/services/event-listeners/listmonk.listener.ts b/api/src/services/event-listeners/listmonk.listener.ts new file mode 100644 index 00000000..b344e631 --- /dev/null +++ b/api/src/services/event-listeners/listmonk.listener.ts @@ -0,0 +1,105 @@ +/** + * Listmonk EventBus Listener + * + * Subscribes to platform events and syncs subscribers to Listmonk newsletter lists. + * Replaces the inline listmonkEventSyncService calls scattered across service files. + * + * Feature guard: LISTMONK_SYNC_ENABLED=true + */ + +import { eventBus } from '../event-bus.service'; +import { listmonkEventSyncService } from '../listmonk-event-sync.service'; + +export function registerListmonkListener(): void { + // Shift signups → Volunteers list + eventBus.subscribe('shift.signup.created', 'listmonk:shift-signup', (payload) => { + listmonkEventSyncService.onShiftSignup({ + email: payload.userEmail, + name: payload.userName, + shiftTitle: payload.shiftTitle, + shiftDate: payload.shiftDate, + cutName: payload.cutName ?? undefined, + }); + }); + + // Canvass session completed → Canvassers list + eventBus.subscribe('canvass.session.completed', 'listmonk:canvass-completed', (payload) => { + listmonkEventSyncService.onCanvassSessionCompleted({ + email: payload.userEmail, + name: payload.userName, + cutName: payload.cutName, + visitCount: payload.visitCount, + outcomes: payload.outcomes, + }); + }); + + // Campaign email sent → Campaign Participants list + eventBus.subscribe('campaign.email.sent', 'listmonk:campaign-email', (payload) => { + listmonkEventSyncService.onCampaignEmailSent({ + email: payload.email, + name: payload.name, + campaignSlug: payload.campaignSlug, + postalCode: payload.postalCode, + }); + }); + + // Address updated (canvass visit) → Support level lists + eventBus.subscribe('contact.address.updated', 'listmonk:address-updated', (payload) => { + listmonkEventSyncService.onAddressUpdated({ + email: payload.email, + name: payload.name, + supportLevel: payload.supportLevel, + sign: payload.sign, + address: payload.address, + }); + }); + + // Subscription activated → Subscribers list + eventBus.subscribe('payment.subscription.activated', 'listmonk:subscription', (payload) => { + listmonkEventSyncService.onSubscriptionActivated({ + email: payload.email, + name: payload.name, + planName: payload.planName, + subscriptionId: payload.subscriptionId, + }); + }); + + // Donation completed → Donors list + eventBus.subscribe('payment.donation.completed', 'listmonk:donation', (payload) => { + listmonkEventSyncService.onDonationCompleted({ + email: payload.email, + name: payload.name, + amountCents: payload.amountCents, + orderId: payload.orderId, + }); + }); + + // Product purchased → Donors list + eventBus.subscribe('payment.product.purchased', 'listmonk:product-purchase', (payload) => { + listmonkEventSyncService.onProductPurchased({ + email: payload.email, + name: payload.name, + productTitle: payload.productTitle, + amountCents: payload.amountCents, + orderId: payload.orderId, + }); + }); + + // Contact tags changed → CRM tag lists + eventBus.subscribe('contact.tags.changed', 'listmonk:contact-tags', (payload) => { + listmonkEventSyncService.onContactTagsChanged({ + email: payload.email, + name: payload.name, + addedTags: payload.addedTags, + removedTags: payload.removedTags, + }); + }); + + // Reengagement sent → Volunteers list + eventBus.subscribe('reengagement.sent', 'listmonk:reengagement', (payload) => { + listmonkEventSyncService.onReengagementSent({ + email: payload.email, + name: payload.name, + }); + }); +} diff --git a/api/src/services/event-listeners/n8n-webhook.listener.ts b/api/src/services/event-listeners/n8n-webhook.listener.ts new file mode 100644 index 00000000..6d85f269 --- /dev/null +++ b/api/src/services/event-listeners/n8n-webhook.listener.ts @@ -0,0 +1,55 @@ +/** + * n8n Webhook EventBus Listener + * + * Forwards ALL platform events to n8n webhook endpoints. + * n8n workflows can filter events by type on the receiving end. + * + * Configuration: + * N8N_WEBHOOK_URLS — comma-separated list of n8n webhook URLs to forward events to. + * Each URL receives all events; n8n workflows filter internally. + * + * Example .env: + * N8N_WEBHOOK_URLS=http://n8n-changemaker:5678/webhook/changemaker-events + * + * Feature guard: N8N_WEBHOOK_URLS must be set (non-empty). + */ + +import { eventBus } from '../event-bus.service'; +import { env } from '../../config/env'; +import { logger } from '../../utils/logger'; + +function getWebhookUrls(): string[] { + const raw = (env as unknown as Record).N8N_WEBHOOK_URLS || ''; + if (!raw) return []; + return raw.split(',').map(u => u.trim()).filter(Boolean); +} + +async function forwardToN8n(event: string, payload: unknown): Promise { + const urls = getWebhookUrls(); + if (urls.length === 0) return; + + for (const url of urls) { + try { + await fetch(url, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + event, + payload, + timestamp: new Date().toISOString(), + source: 'changemaker-lite', + }), + signal: AbortSignal.timeout(5000), + }); + } catch (err) { + logger.debug(`n8n webhook delivery failed for ${event} → ${url}:`, err); + } + } +} + +export function registerN8nWebhookListener(): void { + // Subscribe to ALL events using wildcard pattern + eventBus.subscribePattern('*', 'n8n:webhook-emitter', (event, payload) => { + forwardToN8n(event, payload); + }); +} diff --git a/api/src/services/event-listeners/rocketchat.listener.ts b/api/src/services/event-listeners/rocketchat.listener.ts new file mode 100644 index 00000000..db298c9b --- /dev/null +++ b/api/src/services/event-listeners/rocketchat.listener.ts @@ -0,0 +1,55 @@ +/** + * Rocket.Chat EventBus Listener + * + * Subscribes to platform events and posts notifications to RC channels. + * Extends the existing rocketchat-webhook.service with new event coverage. + * + * Channels: + * #shifts — shift CRUD + signups + * #canvassing — canvass sessions + visit milestones + * #campaigns — campaign publish, responses, email milestones + * + * Feature guard: ENABLE_CHAT=true (checked inside rocketchatWebhookService) + */ + +import { eventBus } from '../event-bus.service'; +import { rocketchatWebhookService } from '../rocketchat-webhook.service'; + +export function registerRocketChatListener(): void { + // --- Shifts --- + + eventBus.subscribe('shift.signup.created', 'rocketchat:shift-signup', (payload) => { + rocketchatWebhookService.onShiftSignup({ + userName: payload.userName, + shiftTitle: payload.shiftTitle, + shiftDate: payload.shiftDate, + }); + }); + + eventBus.subscribe('shift.signup.cancelled', 'rocketchat:shift-cancel', (payload) => { + rocketchatWebhookService.onShiftCancellation({ + userName: payload.userName, + shiftTitle: payload.shiftTitle, + shiftDate: payload.shiftDate, + }); + }); + + // --- Canvass --- + + eventBus.subscribe('canvass.session.completed', 'rocketchat:canvass-completed', (payload) => { + rocketchatWebhookService.onCanvassSessionCompleted({ + userName: payload.userName, + visitCount: payload.visitCount, + cutName: payload.cutName, + }); + }); + + // --- Responses --- + + eventBus.subscribe('response.submitted', 'rocketchat:response-submitted', (payload) => { + rocketchatWebhookService.onCampaignResponseSubmitted({ + campaignTitle: payload.campaignTitle, + representativeName: payload.representativeName, + }); + }); +} diff --git a/api/src/services/reengagement.service.ts b/api/src/services/reengagement.service.ts index 31a07113..441ccf39 100644 --- a/api/src/services/reengagement.service.ts +++ b/api/src/services/reengagement.service.ts @@ -4,7 +4,7 @@ import { env } from '../config/env'; import { logger } from '../utils/logger'; import { siteSettingsService } from '../modules/settings/settings.service'; import { notificationQueueService } from './notification-queue.service'; -import { listmonkEventSyncService } from './listmonk-event-sync.service'; +import { eventBus } from './event-bus.service'; /** * Volunteer Re-Engagement Scanner @@ -76,11 +76,11 @@ class ReengagementService { const cooldownSeconds = cooldownDays * 24 * 60 * 60; await redis.set(cooldownKey, '', 'EX', cooldownSeconds); - // Listmonk event sync: tag as re-engaged - listmonkEventSyncService.onReengagementSent({ + // Publish re-engagement event + eventBus.publish('reengagement.sent', { email: volunteer.email, name: volunteer.name || volunteer.email, - }).catch(() => {}); + }); sent++; } catch (err) { diff --git a/api/src/services/sms-queue.service.ts b/api/src/services/sms-queue.service.ts index 272d8752..7ce39a11 100644 --- a/api/src/services/sms-queue.service.ts +++ b/api/src/services/sms-queue.service.ts @@ -4,6 +4,7 @@ import { env } from '../config/env'; import { prisma } from '../config/database'; import { logger } from '../utils/logger'; import { termuxClient } from './termux.client'; +import { eventBus } from './event-bus.service'; export interface SmsJobData { recipientId: string; // empty string for notification jobs @@ -129,6 +130,13 @@ class SmsQueueService { where: { id: campaignId }, data: { totalSent: { increment: 1 } }, }); + + eventBus.publish('sms.message.sent', { + messageId: smsMessage.id, + campaignId, + phone, + body: message, + }); } else { await prisma.smsCampaign.update({ where: { id: campaignId }, @@ -136,6 +144,23 @@ class SmsQueueService { }); throw new Error(`Failed to send SMS to ${phone}: ${result.error}`); } + + // Check if campaign is complete (no more PENDING recipients) + const pendingCount = await prisma.smsCampaignRecipient.count({ + where: { campaignId, status: 'PENDING' }, + }); + if (pendingCount === 0) { + const updatedCampaign = await prisma.smsCampaign.update({ + where: { id: campaignId }, + data: { status: 'COMPLETED', completedAt: new Date() }, + }); + eventBus.publish('sms.campaign.completed', { + campaignId, + title: updatedCampaign.name, + sentCount: updatedCampaign.totalSent, + failedCount: updatedCampaign.totalFailed, + }); + } } else { // Notification job: just throw on failure for BullMQ retry if (!result.success) { diff --git a/api/src/services/sms-response-sync.service.ts b/api/src/services/sms-response-sync.service.ts index 188eb130..1eae55e6 100644 --- a/api/src/services/sms-response-sync.service.ts +++ b/api/src/services/sms-response-sync.service.ts @@ -2,6 +2,7 @@ import { env } from '../config/env'; import { prisma } from '../config/database'; import { logger } from '../utils/logger'; import { termuxClient } from './termux.client'; +import { eventBus } from './event-bus.service'; import type { SmsResponseType } from '@prisma/client'; // Opt-out keywords (case-insensitive) @@ -116,6 +117,14 @@ class SmsResponseSyncService { }, }); + eventBus.publish('sms.message.received', { + messageId: smsMessage.id, + conversationId: conversation?.id || '', + phone: msg.number, + body: msg.body, + responseType, + }); + // Update conversation stats if we have one if (conversation) { const updates: Record = { diff --git a/api/src/services/stripe.client.ts b/api/src/services/stripe.client.ts index 1251e65e..c5ab3115 100644 --- a/api/src/services/stripe.client.ts +++ b/api/src/services/stripe.client.ts @@ -19,7 +19,7 @@ export async function getStripe(): Promise { throw new Error('Stripe secret key not configured — set it in admin payment settings'); } - _stripe = new Stripe(secretKey); + _stripe = new Stripe(secretKey, { apiVersion: '2026-01-28.clover' }); logger.info('Stripe client initialized'); return _stripe; diff --git a/api/src/types/events.ts b/api/src/types/events.ts new file mode 100644 index 00000000..3eb21e0b --- /dev/null +++ b/api/src/types/events.ts @@ -0,0 +1,504 @@ +/** + * Platform Event Catalog + * + * Typed event definitions for the EventBus. Each event has a dot-separated name + * and a strongly-typed payload. Services publish events; listeners subscribe. + * + * Naming convention: .. + * e.g. shift.signup.created, campaign.email.sent, payment.donation.completed + */ + +// ============================================================================= +// SHIFT EVENTS +// ============================================================================= + +export interface ShiftCreatedEvent { + shiftId: string; + title: string; + date: string; + startTime: string; + endTime: string; + cutId?: string | null; + cutName?: string | null; + createdByUserId: string; +} + +export interface ShiftUpdatedEvent { + shiftId: string; + title: string; + date: string; + startTime: string; + endTime: string; + cutId?: string | null; + cutName?: string | null; + changes: string[]; // field names that changed +} + +export interface ShiftDeletedEvent { + shiftId: string; + title: string; + date: string; +} + +export interface ShiftSignupCreatedEvent { + shiftId: string; + shiftTitle: string; + shiftDate: string; + userName: string; + userEmail: string; + userId?: string | null; + cutName?: string | null; + signupType: 'admin' | 'volunteer' | 'public'; +} + +export interface ShiftSignupCancelledEvent { + shiftId: string; + shiftTitle: string; + shiftDate: string; + userName: string; + userEmail: string; + signupType: 'admin' | 'volunteer' | 'public'; +} + +// ============================================================================= +// CAMPAIGN EVENTS (Influence) +// ============================================================================= + +export interface CampaignCreatedEvent { + campaignId: string; + title: string; + slug: string; + createdByUserId: string; +} + +export interface CampaignUpdatedEvent { + campaignId: string; + title: string; + slug: string; + changes: string[]; +} + +export interface CampaignDeletedEvent { + campaignId: string; + title: string; + slug: string; +} + +export interface CampaignPublishedEvent { + campaignId: string; + title: string; + slug: string; +} + +export interface CampaignStatusChangedEvent { + campaignId: string; + title: string; + slug: string; + oldStatus: string; + newStatus: string; +} + +export interface CampaignEmailSentEvent { + email: string; + name: string; + campaignSlug: string; + postalCode?: string; +} + +// ============================================================================= +// RESPONSE EVENTS (Influence) +// ============================================================================= + +export interface ResponseSubmittedEvent { + responseId: string; + campaignId: string; + campaignTitle: string; + representativeName: string; + userEmail?: string; +} + +export interface ResponseApprovedEvent { + responseId: string; + campaignId: string; + campaignTitle: string; +} + +export interface ResponseRejectedEvent { + responseId: string; + campaignId: string; + campaignTitle: string; + reason?: string; +} + +// ============================================================================= +// CANVASS EVENTS +// ============================================================================= + +export interface CanvassSessionStartedEvent { + sessionId: string; + userId: string; + userName: string; + cutId: string; + cutName: string; +} + +export interface CanvassSessionCompletedEvent { + sessionId: string; + userId: string; + userName: string; + userEmail: string; + cutName: string; + visitCount: number; + outcomes: Record; +} + +export interface CanvassVisitRecordedEvent { + visitId: string; + sessionId: string; + addressId: string; + outcome: string; + email?: string | null; + name?: string | null; + supportLevel?: string | null; + sign?: boolean; + address?: string | null; +} + +// ============================================================================= +// USER EVENTS +// ============================================================================= + +export interface UserCreatedEvent { + userId: string; + email: string; + name: string; + role: string; +} + +export interface UserUpdatedEvent { + userId: string; + email: string; + name: string; + role: string; + changes: string[]; +} + +export interface UserApprovedEvent { + userId: string; + email: string; + name: string; + role: string; + approvedByUserId: string; +} + +export interface UserDeletedEvent { + userId: string; + email: string; + name: string; +} + +// ============================================================================= +// PAYMENT EVENTS +// ============================================================================= + +export interface SubscriptionActivatedEvent { + email: string; + name: string; + planName: string; + subscriptionId: string; + amountCents?: number; +} + +export interface SubscriptionCancelledEvent { + email: string; + name: string; + subscriptionId: string; +} + +export interface DonationCompletedEvent { + email: string; + name: string; + amountCents: number; + orderId: string; + donationPageSlug?: string; +} + +export interface DonationRefundedEvent { + email: string; + orderId: string; + amountCents: number; +} + +export interface ProductPurchasedEvent { + email: string; + name: string; + productTitle: string; + amountCents: number; + orderId: string; +} + +// ============================================================================= +// SMS EVENTS +// ============================================================================= + +export interface SmsCampaignStartedEvent { + campaignId: string; + title: string; + recipientCount: number; +} + +export interface SmsCampaignCompletedEvent { + campaignId: string; + title: string; + sentCount: number; + failedCount: number; +} + +export interface SmsMessageSentEvent { + messageId: string; + campaignId?: string; + phone: string; + body: string; +} + +export interface SmsMessageReceivedEvent { + messageId: string; + conversationId: string; + phone: string; + body: string; + responseType?: string; // POSITIVE, NEGATIVE, QUESTION, OPT_OUT, NEUTRAL +} + +// ============================================================================= +// MEDIA EVENTS +// ============================================================================= + +export interface VideoPublishedEvent { + videoId: string; + title: string; + publishedByUserId: string; +} + +export interface VideoUnpublishedEvent { + videoId: string; + title: string; +} + +export interface VideoViewedEvent { + videoId: string; + videoTitle: string; + userId?: string | null; + sessionId: string; +} + +// ============================================================================= +// TICKETED EVENT EVENTS +// ============================================================================= + +export interface TicketedEventPublishedEvent { + eventId: string; + title: string; + date: string; + startTime: string; + endTime?: string; + location?: string; + gancioEventId?: number | null; +} + +export interface TicketedEventCancelledEvent { + eventId: string; + title: string; +} + +// ============================================================================= +// MEETING EVENTS +// ============================================================================= + +export interface MeetingCreatedEvent { + meetingId: string; + title: string; + scheduledAt: string; + jitsiRoomName?: string; + createdByUserId: string; +} + +export interface MeetingUpdatedEvent { + meetingId: string; + title: string; + scheduledAt: string; + changes: string[]; +} + +export interface MeetingDeletedEvent { + meetingId: string; + title: string; +} + +// ============================================================================= +// SOCIAL EVENTS +// ============================================================================= + +export interface ImpactStoryPublishedEvent { + storyId: string; + title: string; + authorUserId: string; + campaignId?: string | null; +} + +// ============================================================================= +// CONTACT / CRM EVENTS +// ============================================================================= + +export interface ContactTagsChangedEvent { + email: string; + name: string; + contactId: string; + addedTags: string[]; + removedTags: string[]; +} + +export interface ContactCreatedEvent { + contactId: string; + email?: string; + name: string; +} + +export interface ContactMergedEvent { + survivorId: string; + mergedId: string; + survivorEmail?: string; +} + +export interface AddressUpdatedEvent { + email: string; + name: string; + supportLevel?: string | null; + sign?: boolean; + address?: string | null; +} + +// ============================================================================= +// REENGAGEMENT EVENTS +// ============================================================================= + +export interface ReengagementSentEvent { + email: string; + name: string; +} + +// ============================================================================= +// LISTMONK WEBHOOK EVENTS (inbound from Listmonk) +// ============================================================================= + +export interface ListmonkEmailOpenedEvent { + subscriberEmail: string; + campaignId: number; + campaignName: string; +} + +export interface ListmonkEmailClickedEvent { + subscriberEmail: string; + campaignId: number; + campaignName: string; + url: string; +} + +export interface ListmonkEmailBouncedEvent { + subscriberEmail: string; + campaignId: number; + bounceType: string; +} + +export interface ListmonkUnsubscribedEvent { + subscriberEmail: string; + listId: number; + listName: string; +} + +// ============================================================================= +// EVENT MAP — maps event names to payload types +// ============================================================================= + +export interface PlatformEventMap { + // Shifts + 'shift.created': ShiftCreatedEvent; + 'shift.updated': ShiftUpdatedEvent; + 'shift.deleted': ShiftDeletedEvent; + 'shift.signup.created': ShiftSignupCreatedEvent; + 'shift.signup.cancelled': ShiftSignupCancelledEvent; + + // Campaigns + 'campaign.created': CampaignCreatedEvent; + 'campaign.updated': CampaignUpdatedEvent; + 'campaign.deleted': CampaignDeletedEvent; + 'campaign.published': CampaignPublishedEvent; + 'campaign.status.changed': CampaignStatusChangedEvent; + 'campaign.email.sent': CampaignEmailSentEvent; + + // Responses + 'response.submitted': ResponseSubmittedEvent; + 'response.approved': ResponseApprovedEvent; + 'response.rejected': ResponseRejectedEvent; + + // Canvass + 'canvass.session.started': CanvassSessionStartedEvent; + 'canvass.session.completed': CanvassSessionCompletedEvent; + 'canvass.visit.recorded': CanvassVisitRecordedEvent; + + // Users + 'user.created': UserCreatedEvent; + 'user.updated': UserUpdatedEvent; + 'user.approved': UserApprovedEvent; + 'user.deleted': UserDeletedEvent; + + // Payments + 'payment.subscription.activated': SubscriptionActivatedEvent; + 'payment.subscription.cancelled': SubscriptionCancelledEvent; + 'payment.donation.completed': DonationCompletedEvent; + 'payment.donation.refunded': DonationRefundedEvent; + 'payment.product.purchased': ProductPurchasedEvent; + + // SMS + 'sms.campaign.started': SmsCampaignStartedEvent; + 'sms.campaign.completed': SmsCampaignCompletedEvent; + 'sms.message.sent': SmsMessageSentEvent; + 'sms.message.received': SmsMessageReceivedEvent; + + // Media + 'media.video.published': VideoPublishedEvent; + 'media.video.unpublished': VideoUnpublishedEvent; + 'media.video.viewed': VideoViewedEvent; + + // Ticketed Events + 'ticketed-event.published': TicketedEventPublishedEvent; + 'ticketed-event.cancelled': TicketedEventCancelledEvent; + + // Meetings + 'meeting.created': MeetingCreatedEvent; + 'meeting.updated': MeetingUpdatedEvent; + 'meeting.deleted': MeetingDeletedEvent; + + // Social + 'social.impact-story.published': ImpactStoryPublishedEvent; + + // Contact / CRM + 'contact.created': ContactCreatedEvent; + 'contact.merged': ContactMergedEvent; + 'contact.tags.changed': ContactTagsChangedEvent; + 'contact.address.updated': AddressUpdatedEvent; + + // Reengagement + 'reengagement.sent': ReengagementSentEvent; + + // Listmonk webhooks (inbound) + 'listmonk.email.opened': ListmonkEmailOpenedEvent; + 'listmonk.email.clicked': ListmonkEmailClickedEvent; + 'listmonk.email.bounced': ListmonkEmailBouncedEvent; + 'listmonk.unsubscribed': ListmonkUnsubscribedEvent; +} + +/** All valid platform event names */ +export type PlatformEventName = keyof PlatformEventMap; + +/** Helper: extract payload type for a given event name */ +export type EventPayload = PlatformEventMap[E]; diff --git a/docker-compose.yml b/docker-compose.yml index f3f32f6e..1df87971 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -54,7 +54,9 @@ services: - LISTMONK_ADMIN_USER=${LISTMONK_ADMIN_USER:-admin} - LISTMONK_ADMIN_PASSWORD=${LISTMONK_ADMIN_PASSWORD:-} - LISTMONK_SYNC_ENABLED=${LISTMONK_SYNC_ENABLED:-false} + - LISTMONK_WEBHOOK_SECRET=${LISTMONK_WEBHOOK_SECRET:-} - LISTMONK_PROXY_PORT=${LISTMONK_PROXY_PORT:-9002} + - N8N_WEBHOOK_URLS=${N8N_WEBHOOK_URLS:-} - REPRESENT_API_URL=${REPRESENT_API_URL:-https://represent.opennorth.ca} - CORS_ORIGINS=${CORS_ORIGINS:-http://localhost:3000,http://localhost} - ADMIN_URL=${ADMIN_URL:-http://localhost:3000} @@ -689,6 +691,12 @@ services: - GITEA__server__LFS_MAX_FILE_SIZE=1024 - GITEA__repository__upload__FILE_MAX_SIZE=1024 - GITEA__repository__upload__MAX_FILES=1000 + # Reverse proxy auth — nginx injects X-WEBAUTH-USER for SSO + - GITEA__service__ENABLE_REVERSE_PROXY_AUTHENTICATION=true + - GITEA__service__ENABLE_REVERSE_PROXY_AUTO_REGISTRATION=false + - GITEA__service__ENABLE_REVERSE_PROXY_EMAIL=false + - GITEA__service__REVERSE_PROXY_AUTHENTICATION_HEADER=X-WEBAUTH-USER + - GITEA__service__REQUIRE_SIGNIN_VIEW=true restart: unless-stopped volumes: - gitea-data:/data