diff --git a/SERVICE_INTEGRATIONS.md b/SERVICE_INTEGRATIONS.md index b0f97888..48ca7ea2 100644 --- a/SERVICE_INTEGRATIONS.md +++ b/SERVICE_INTEGRATIONS.md @@ -44,7 +44,9 @@ Service Handler (shift created, donation completed, etc.) | `api/src/services/event-listeners/crm-activity.listener.ts` | CRM ContactActivity writer | | `api/src/services/event-listeners/calendar-sync.listener.ts` | Calendar unification | | `api/src/services/event-listeners/n8n-webhook.listener.ts` | n8n automation bridge | -| `api/src/services/event-listeners/gancio.listener.ts` | Gancio event sync | +| `api/src/services/event-listeners/gancio.listener.ts` | Gancio event sync (shifts + ticketed events) | +| `api/src/services/event-listeners/engagement-scoring.listener.ts` | Contact engagement scores (Redis ZSET) | +| `api/src/services/event-listeners/homepage-stats.listener.ts` | Homepage real-time counters + cache invalidation | --- @@ -92,10 +94,16 @@ Service Handler (shift created, donation completed, etc.) - [x] Prisma migration: SHIFT, MEETING, TICKETED_EVENT added to CalendarItemSource enum - [x] Calendar sync listener: uses proper source types (SHIFT, MEETING, TICKETED_EVENT) +### Phase 4c: New Data Listeners (2026-03-31) +- [x] Engagement scoring listener (11 subscriptions, Redis ZSET leaderboard) +- [x] Homepage stats listener (12 subscriptions, Redis counters + recent activity) +- [x] GET /api/homepage/live-stats endpoint (public, real-time counters + recent) +- [x] GET /api/observability/engagement-leaderboard endpoint (admin, top contacts) + ### Phase 5: Future -- [ ] Migrate remaining Gancio calls (ticketed-events, meeting-planner) to EventBus -- [ ] Add engagement scoring listener -- [ ] Add Homepage dashboard data listener +- [ ] Migrate meeting-planner Gancio calls to EventBus (blocked: synchronous return value needed) +- [ ] Homepage service: swap COUNT queries for Redis counters in getStats() +- [ ] Engagement score materialization: periodic job to denormalize scores to Contact model --- diff --git a/api/src/modules/homepage/homepage.routes.ts b/api/src/modules/homepage/homepage.routes.ts index 40ae762f..00941453 100644 --- a/api/src/modules/homepage/homepage.routes.ts +++ b/api/src/modules/homepage/homepage.routes.ts @@ -1,5 +1,6 @@ import { Router, Request, Response, NextFunction } from 'express'; import { homepageService } from './homepage.service'; +import { homepageStats } from '../../services/event-listeners/homepage-stats.listener'; const router = Router(); @@ -13,4 +14,17 @@ router.get('/', async (_req: Request, res: Response, next: NextFunction) => { } }); +// GET /api/homepage/live-stats — Real-time EventBus-driven counters (no auth) +router.get('/live-stats', async (_req: Request, res: Response, next: NextFunction) => { + try { + const counters = await homepageStats.getCounters(); + const recentSignups = await homepageStats.getRecent('signups', 5); + const recentDonations = await homepageStats.getRecent('donations', 5); + const recentResponses = await homepageStats.getRecent('responses', 5); + res.json({ counters, recent: { signups: recentSignups, donations: recentDonations, responses: recentResponses } }); + } catch (err) { + next(err); + } +}); + export { router as homepageRouter }; diff --git a/api/src/modules/observability/observability.routes.ts b/api/src/modules/observability/observability.routes.ts index 1ff6d5d2..afc68061 100644 --- a/api/src/modules/observability/observability.routes.ts +++ b/api/src/modules/observability/observability.routes.ts @@ -230,4 +230,15 @@ router.get( }, ); +// GET /api/observability/engagement-leaderboard — Top engaged contacts +router.get( + '/engagement-leaderboard', + async (req: Request, res: Response) => { + const { engagementScoring } = await import('../../services/event-listeners/engagement-scoring.listener'); + const limit = Math.min(parseInt(req.query.limit as string) || 20, 100); + const leaderboard = await engagementScoring.getLeaderboard(limit); + res.json({ leaderboard, count: leaderboard.length }); + }, +); + export const observabilityRouter = router; diff --git a/api/src/services/event-listeners/engagement-scoring.listener.ts b/api/src/services/event-listeners/engagement-scoring.listener.ts new file mode 100644 index 00000000..7c733c64 --- /dev/null +++ b/api/src/services/event-listeners/engagement-scoring.listener.ts @@ -0,0 +1,212 @@ +/** + * Engagement Scoring EventBus Listener + * + * Maintains a real-time engagement score for each contact based on their + * activity across the platform. Scores are stored in Redis for fast access + * and a sorted set provides leaderboard queries. + * + * Scoring weights: + * Donation completed +50 + * Subscription activated +40 + * Product purchased +30 + * Shift signup +20 + * Canvass visit +15 + * Response submitted +15 + * Campaign email sent +10 + * SMS received +10 + * Email opened +5 + * Email link clicked +8 + * Video viewed +3 + * + * Redis keys: + * engagement:score:{contactId} — total score (string/integer) + * engagement:leaderboard — sorted set (contactId → score) + * engagement:last:{contactId} — ISO timestamp of last activity + * + * No feature guard — always active (scores are ephemeral in Redis). + */ + +import { eventBus } from '../event-bus.service'; +import { logger } from '../../utils/logger'; + +// Lazy-import to avoid circular dependency +let redisPromise: ReturnType | null = null; +async function getRedis() { + const { redis } = await import('../../config/redis'); + return redis; +} +function lazyRedis() { + if (!redisPromise) redisPromise = getRedis(); + return redisPromise; +} + +let prismaPromise: ReturnType | null = null; +async function getPrisma() { + const { prisma } = await import('../../config/database'); + return prisma; +} +function lazyPrisma() { + if (!prismaPromise) prismaPromise = getPrisma(); + return prismaPromise; +} + +/** Find contactId by email. Returns null if not found. */ +async function findContactByEmail(email?: string | null): Promise { + if (!email) return null; + try { + const prisma = await lazyPrisma(); + const row = await prisma.contactEmail.findFirst({ + where: { email: email.toLowerCase() }, + select: { contactId: true }, + }); + return row?.contactId ?? null; + } catch { + return null; + } +} + +/** Find contactId by phone. Returns null if not found. */ +async function findContactByPhone(phone: string): Promise { + try { + const prisma = await lazyPrisma(); + const row = await prisma.contactPhone.findFirst({ + where: { phone }, + select: { contactId: true }, + }); + return row?.contactId ?? null; + } catch { + return null; + } +} + +/** Increment a contact's engagement score in Redis. */ +async function addScore(contactId: string, points: number): Promise { + try { + const redis = await lazyRedis(); + const pipeline = redis.pipeline(); + pipeline.incrby(`engagement:score:${contactId}`, points); + pipeline.zincrby('engagement:leaderboard', points, contactId); + pipeline.set(`engagement:last:${contactId}`, new Date().toISOString()); + await pipeline.exec(); + } catch (err) { + logger.debug(`Engagement scoring failed for ${contactId}:`, err); + } +} + +export function registerEngagementScoringListener(): void { + // --- High-value actions --- + + eventBus.subscribe('payment.donation.completed', 'engagement:donation', async (payload) => { + const contactId = await findContactByEmail(payload.email); + if (contactId) await addScore(contactId, 50); + }); + + eventBus.subscribe('payment.subscription.activated', 'engagement:subscription', async (payload) => { + const contactId = await findContactByEmail(payload.email); + if (contactId) await addScore(contactId, 40); + }); + + eventBus.subscribe('payment.product.purchased', 'engagement:purchase', async (payload) => { + const contactId = await findContactByEmail(payload.email); + if (contactId) await addScore(contactId, 30); + }); + + // --- Volunteer actions --- + + eventBus.subscribe('shift.signup.created', 'engagement:shift-signup', async (payload) => { + const contactId = await findContactByEmail(payload.userEmail); + if (contactId) await addScore(contactId, 20); + }); + + eventBus.subscribe('canvass.visit.recorded', 'engagement:canvass-visit', async (payload) => { + const contactId = await findContactByEmail(payload.email); + if (contactId) await addScore(contactId, 15); + }); + + eventBus.subscribe('response.submitted', 'engagement:response', async (payload) => { + const contactId = await findContactByEmail(payload.userEmail); + if (contactId) await addScore(contactId, 15); + }); + + // --- Communication actions --- + + eventBus.subscribe('campaign.email.sent', 'engagement:email-sent', async (payload) => { + const contactId = await findContactByEmail(payload.email); + if (contactId) await addScore(contactId, 10); + }); + + eventBus.subscribe('sms.message.received', 'engagement:sms-received', async (payload) => { + const contactId = await findContactByPhone(payload.phone); + if (contactId) await addScore(contactId, 10); + }); + + eventBus.subscribe('listmonk.email.clicked', 'engagement:email-clicked', async (payload) => { + const contactId = await findContactByEmail(payload.subscriberEmail); + if (contactId) await addScore(contactId, 8); + }); + + eventBus.subscribe('listmonk.email.opened', 'engagement:email-opened', async (payload) => { + const contactId = await findContactByEmail(payload.subscriberEmail); + if (contactId) await addScore(contactId, 5); + }); + + // --- Passive actions --- + + eventBus.subscribe('media.video.viewed', 'engagement:video-view', async (payload) => { + if (!payload.userId) return; + try { + const prisma = await lazyPrisma(); + const user = await prisma.user.findUnique({ + where: { id: payload.userId }, + select: { email: true }, + }); + if (!user) return; + const contactId = await findContactByEmail(user.email); + if (contactId) await addScore(contactId, 3); + } catch { + // silent + } + }); +} + +/** + * Utility functions for querying engagement scores. + * Can be imported by API routes for score display. + */ +export const engagementScoring = { + /** Get a single contact's score */ + async getScore(contactId: string): Promise { + try { + const redis = await lazyRedis(); + const score = await redis.get(`engagement:score:${contactId}`); + return score ? parseInt(score, 10) : 0; + } catch { + return 0; + } + }, + + /** Get top N contacts by engagement score */ + async getLeaderboard(limit = 20): Promise> { + try { + const redis = await lazyRedis(); + const results = await redis.zrevrange('engagement:leaderboard', 0, limit - 1, 'WITHSCORES'); + const leaderboard: Array<{ contactId: string; score: number }> = []; + for (let i = 0; i < results.length; i += 2) { + leaderboard.push({ contactId: results[i], score: parseInt(results[i + 1], 10) }); + } + return leaderboard; + } catch { + return []; + } + }, + + /** Get last activity timestamp for a contact */ + async getLastActivity(contactId: string): Promise { + try { + const redis = await lazyRedis(); + return await redis.get(`engagement:last:${contactId}`); + } catch { + return null; + } + }, +}; diff --git a/api/src/services/event-listeners/homepage-stats.listener.ts b/api/src/services/event-listeners/homepage-stats.listener.ts new file mode 100644 index 00000000..b4277d40 --- /dev/null +++ b/api/src/services/event-listeners/homepage-stats.listener.ts @@ -0,0 +1,177 @@ +/** + * Homepage Stats EventBus Listener + * + * Maintains real-time counters in Redis for the homepage dashboard and + * invalidates the homepage cache when underlying data changes. + * + * Redis keys: + * homepage:counter:emails — total campaign emails sent + * homepage:counter:signups — total shift signups + * homepage:counter:donations — total donation count + * homepage:counter:donations:amt — total donation amount (cents) + * homepage:counter:responses — total campaign responses + * homepage:counter:canvass — total canvass visits + * homepage:counter:videos — total video views + * homepage:recent:{type} — recent activity list (capped at 20) + * + * Cache invalidation: + * Deletes `homepage:public` when shifts, campaigns, or media change + * so the next request rebuilds with fresh data. + * + * No feature guard — always active (counters are ephemeral in Redis). + */ + +import { eventBus } from '../event-bus.service'; +import { logger } from '../../utils/logger'; + +let redisPromise: ReturnType | null = null; +async function getRedis() { + const { redis } = await import('../../config/redis'); + return redis; +} +function lazyRedis() { + if (!redisPromise) redisPromise = getRedis(); + return redisPromise; +} + +const HOMEPAGE_CACHE_KEY = 'homepage:public'; + +/** Increment a counter and optionally invalidate the homepage cache. */ +async function incrCounter(key: string, invalidateCache = false): Promise { + try { + const redis = await lazyRedis(); + await redis.incr(`homepage:counter:${key}`); + if (invalidateCache) await redis.del(HOMEPAGE_CACHE_KEY); + } catch (err) { + logger.debug(`Homepage counter increment failed (${key}):`, err); + } +} + +/** Push a recent activity entry to a capped list. */ +async function pushRecent(type: string, entry: Record): Promise { + try { + const redis = await lazyRedis(); + const key = `homepage:recent:${type}`; + await redis.lpush(key, JSON.stringify({ ...entry, at: new Date().toISOString() })); + await redis.ltrim(key, 0, 19); // keep last 20 + await redis.expire(key, 86400); // expire after 24h + } catch { + // silent + } +} + +/** Invalidate homepage cache without incrementing anything. */ +async function invalidateCache(): Promise { + try { + const redis = await lazyRedis(); + await redis.del(HOMEPAGE_CACHE_KEY); + } catch { + // silent + } +} + +export function registerHomepageStatsListener(): void { + // --- Counter increments --- + + eventBus.subscribe('campaign.email.sent', 'homepage:email-sent', async () => { + await incrCounter('emails'); + }); + + eventBus.subscribe('shift.signup.created', 'homepage:shift-signup', async (payload) => { + await incrCounter('signups', true); // invalidate — signup count visible on homepage + await pushRecent('signups', { name: payload.userName, shift: payload.shiftTitle }); + }); + + eventBus.subscribe('payment.donation.completed', 'homepage:donation', async (payload) => { + await incrCounter('donations'); + try { + const redis = await lazyRedis(); + await redis.incrby('homepage:counter:donations:amt', payload.amountCents); + } catch { /* silent */ } + await pushRecent('donations', { name: payload.name, amount: payload.amountCents }); + }); + + eventBus.subscribe('response.submitted', 'homepage:response', async (payload) => { + await incrCounter('responses'); + await pushRecent('responses', { campaign: payload.campaignTitle, rep: payload.representativeName }); + }); + + eventBus.subscribe('canvass.visit.recorded', 'homepage:canvass', async () => { + await incrCounter('canvass'); + }); + + eventBus.subscribe('media.video.viewed', 'homepage:video-view', async () => { + await incrCounter('videos'); + }); + + // --- Cache invalidation (data visible on homepage changed) --- + + eventBus.subscribe('shift.created', 'homepage:shift-changed', async () => { + await invalidateCache(); + }); + + eventBus.subscribe('shift.deleted', 'homepage:shift-deleted', async () => { + await invalidateCache(); + }); + + eventBus.subscribe('campaign.published', 'homepage:campaign-published', async () => { + await invalidateCache(); + }); + + eventBus.subscribe('campaign.status.changed', 'homepage:campaign-status', async () => { + await invalidateCache(); + }); + + eventBus.subscribe('media.video.published', 'homepage:video-published', async () => { + await invalidateCache(); + }); + + eventBus.subscribe('ticketed-event.published', 'homepage:event-published', async () => { + await invalidateCache(); + }); +} + +/** + * Utility functions for reading homepage stats from Redis. + * Can be imported by the homepage service for real-time counters. + */ +export const homepageStats = { + /** Get all counter values */ + async getCounters(): Promise> { + try { + const redis = await lazyRedis(); + const keys = [ + 'homepage:counter:emails', + 'homepage:counter:signups', + 'homepage:counter:donations', + 'homepage:counter:donations:amt', + 'homepage:counter:responses', + 'homepage:counter:canvass', + 'homepage:counter:videos', + ]; + const values = await redis.mget(...keys); + return { + totalEmailsSent: parseInt(values[0] ?? '0', 10), + totalShiftSignups: parseInt(values[1] ?? '0', 10), + totalDonations: parseInt(values[2] ?? '0', 10), + totalDonationAmountCents: parseInt(values[3] ?? '0', 10), + totalResponses: parseInt(values[4] ?? '0', 10), + totalCanvassVisits: parseInt(values[5] ?? '0', 10), + totalVideoViews: parseInt(values[6] ?? '0', 10), + }; + } catch { + return {}; + } + }, + + /** Get recent activity of a given type */ + async getRecent(type: string, limit = 10): Promise>> { + try { + const redis = await lazyRedis(); + const items = await redis.lrange(`homepage:recent:${type}`, 0, limit - 1); + return items.map(item => JSON.parse(item)); + } catch { + return []; + } + }, +}; diff --git a/api/src/services/event-listeners/index.ts b/api/src/services/event-listeners/index.ts index 163a08ab..bd9c061f 100644 --- a/api/src/services/event-listeners/index.ts +++ b/api/src/services/event-listeners/index.ts @@ -12,6 +12,8 @@ import { registerCrmActivityListener } from './crm-activity.listener'; import { registerCalendarSyncListener } from './calendar-sync.listener'; import { registerN8nWebhookListener } from './n8n-webhook.listener'; import { registerGancioListener } from './gancio.listener'; +import { registerEngagementScoringListener } from './engagement-scoring.listener'; +import { registerHomepageStatsListener } from './homepage-stats.listener'; export function registerAllEventListeners(): void { const listeners = [ @@ -21,6 +23,8 @@ export function registerAllEventListeners(): void { { name: 'Calendar Sync', register: registerCalendarSyncListener }, { name: 'n8n Webhook', register: registerN8nWebhookListener }, { name: 'Gancio', register: registerGancioListener }, + { name: 'Engagement Scoring', register: registerEngagementScoringListener }, + { name: 'Homepage Stats', register: registerHomepageStatsListener }, ]; let registered = 0;