Add engagement scoring and homepage stats EventBus listeners

- Engagement scoring listener: 11 event subscriptions, weighted scoring
  (donation=50, subscription=40, shift=20, canvass=15, email=10, video=3),
  Redis sorted set leaderboard, per-contact score + last-activity tracking
- Homepage stats listener: 12 subscriptions, incremental Redis counters
  (emails, signups, donations, responses, canvass, videos), capped recent
  activity lists (last 20 per type), cache invalidation on data changes
- GET /api/homepage/live-stats — public real-time counters + recent activity
- GET /api/observability/engagement-leaderboard — admin top-N contacts
- Total: 8 listeners, 70 subscriptions across all modules

Bunker Admin
This commit is contained in:
bunker-admin 2026-03-31 10:21:05 -06:00
parent 902adce646
commit 5d15b4cffa
6 changed files with 430 additions and 4 deletions

View File

@ -44,7 +44,9 @@ Service Handler (shift created, donation completed, etc.)
| `api/src/services/event-listeners/crm-activity.listener.ts` | CRM ContactActivity writer |
| `api/src/services/event-listeners/calendar-sync.listener.ts` | Calendar unification |
| `api/src/services/event-listeners/n8n-webhook.listener.ts` | n8n automation bridge |
| `api/src/services/event-listeners/gancio.listener.ts` | Gancio event sync |
| `api/src/services/event-listeners/gancio.listener.ts` | Gancio event sync (shifts + ticketed events) |
| `api/src/services/event-listeners/engagement-scoring.listener.ts` | Contact engagement scores (Redis ZSET) |
| `api/src/services/event-listeners/homepage-stats.listener.ts` | Homepage real-time counters + cache invalidation |
---
@ -92,10 +94,16 @@ Service Handler (shift created, donation completed, etc.)
- [x] Prisma migration: SHIFT, MEETING, TICKETED_EVENT added to CalendarItemSource enum
- [x] Calendar sync listener: uses proper source types (SHIFT, MEETING, TICKETED_EVENT)
### Phase 4c: New Data Listeners (2026-03-31)
- [x] Engagement scoring listener (11 subscriptions, Redis ZSET leaderboard)
- [x] Homepage stats listener (12 subscriptions, Redis counters + recent activity)
- [x] GET /api/homepage/live-stats endpoint (public, real-time counters + recent)
- [x] GET /api/observability/engagement-leaderboard endpoint (admin, top contacts)
### Phase 5: Future
- [ ] Migrate remaining Gancio calls (ticketed-events, meeting-planner) to EventBus
- [ ] Add engagement scoring listener
- [ ] Add Homepage dashboard data listener
- [ ] Migrate meeting-planner Gancio calls to EventBus (blocked: synchronous return value needed)
- [ ] Homepage service: swap COUNT queries for Redis counters in getStats()
- [ ] Engagement score materialization: periodic job to denormalize scores to Contact model
---

View File

@ -1,5 +1,6 @@
import { Router, Request, Response, NextFunction } from 'express';
import { homepageService } from './homepage.service';
import { homepageStats } from '../../services/event-listeners/homepage-stats.listener';
const router = Router();
@ -13,4 +14,17 @@ router.get('/', async (_req: Request, res: Response, next: NextFunction) => {
}
});
// GET /api/homepage/live-stats — Real-time EventBus-driven counters (no auth)
router.get('/live-stats', async (_req: Request, res: Response, next: NextFunction) => {
try {
const counters = await homepageStats.getCounters();
const recentSignups = await homepageStats.getRecent('signups', 5);
const recentDonations = await homepageStats.getRecent('donations', 5);
const recentResponses = await homepageStats.getRecent('responses', 5);
res.json({ counters, recent: { signups: recentSignups, donations: recentDonations, responses: recentResponses } });
} catch (err) {
next(err);
}
});
export { router as homepageRouter };

View File

@ -230,4 +230,15 @@ router.get(
},
);
// GET /api/observability/engagement-leaderboard — Top engaged contacts
router.get(
'/engagement-leaderboard',
async (req: Request, res: Response) => {
const { engagementScoring } = await import('../../services/event-listeners/engagement-scoring.listener');
const limit = Math.min(parseInt(req.query.limit as string) || 20, 100);
const leaderboard = await engagementScoring.getLeaderboard(limit);
res.json({ leaderboard, count: leaderboard.length });
},
);
export const observabilityRouter = router;

View File

@ -0,0 +1,212 @@
/**
* Engagement Scoring EventBus Listener
*
* Maintains a real-time engagement score for each contact based on their
* activity across the platform. Scores are stored in Redis for fast access
* and a sorted set provides leaderboard queries.
*
* Scoring weights:
* Donation completed +50
* Subscription activated +40
* Product purchased +30
* Shift signup +20
* Canvass visit +15
* Response submitted +15
* Campaign email sent +10
* SMS received +10
* Email opened +5
* Email link clicked +8
* Video viewed +3
*
* Redis keys:
* engagement:score:{contactId} total score (string/integer)
* engagement:leaderboard sorted set (contactId score)
* engagement:last:{contactId} ISO timestamp of last activity
*
* No feature guard always active (scores are ephemeral in Redis).
*/
import { eventBus } from '../event-bus.service';
import { logger } from '../../utils/logger';
// Lazy-import to avoid circular dependency
let redisPromise: ReturnType<typeof getRedis> | null = null;
async function getRedis() {
const { redis } = await import('../../config/redis');
return redis;
}
function lazyRedis() {
if (!redisPromise) redisPromise = getRedis();
return redisPromise;
}
let prismaPromise: ReturnType<typeof getPrisma> | null = null;
async function getPrisma() {
const { prisma } = await import('../../config/database');
return prisma;
}
function lazyPrisma() {
if (!prismaPromise) prismaPromise = getPrisma();
return prismaPromise;
}
/** Find contactId by email. Returns null if not found. */
async function findContactByEmail(email?: string | null): Promise<string | null> {
if (!email) return null;
try {
const prisma = await lazyPrisma();
const row = await prisma.contactEmail.findFirst({
where: { email: email.toLowerCase() },
select: { contactId: true },
});
return row?.contactId ?? null;
} catch {
return null;
}
}
/** Find contactId by phone. Returns null if not found. */
async function findContactByPhone(phone: string): Promise<string | null> {
try {
const prisma = await lazyPrisma();
const row = await prisma.contactPhone.findFirst({
where: { phone },
select: { contactId: true },
});
return row?.contactId ?? null;
} catch {
return null;
}
}
/** Increment a contact's engagement score in Redis. */
async function addScore(contactId: string, points: number): Promise<void> {
try {
const redis = await lazyRedis();
const pipeline = redis.pipeline();
pipeline.incrby(`engagement:score:${contactId}`, points);
pipeline.zincrby('engagement:leaderboard', points, contactId);
pipeline.set(`engagement:last:${contactId}`, new Date().toISOString());
await pipeline.exec();
} catch (err) {
logger.debug(`Engagement scoring failed for ${contactId}:`, err);
}
}
export function registerEngagementScoringListener(): void {
// --- High-value actions ---
eventBus.subscribe('payment.donation.completed', 'engagement:donation', async (payload) => {
const contactId = await findContactByEmail(payload.email);
if (contactId) await addScore(contactId, 50);
});
eventBus.subscribe('payment.subscription.activated', 'engagement:subscription', async (payload) => {
const contactId = await findContactByEmail(payload.email);
if (contactId) await addScore(contactId, 40);
});
eventBus.subscribe('payment.product.purchased', 'engagement:purchase', async (payload) => {
const contactId = await findContactByEmail(payload.email);
if (contactId) await addScore(contactId, 30);
});
// --- Volunteer actions ---
eventBus.subscribe('shift.signup.created', 'engagement:shift-signup', async (payload) => {
const contactId = await findContactByEmail(payload.userEmail);
if (contactId) await addScore(contactId, 20);
});
eventBus.subscribe('canvass.visit.recorded', 'engagement:canvass-visit', async (payload) => {
const contactId = await findContactByEmail(payload.email);
if (contactId) await addScore(contactId, 15);
});
eventBus.subscribe('response.submitted', 'engagement:response', async (payload) => {
const contactId = await findContactByEmail(payload.userEmail);
if (contactId) await addScore(contactId, 15);
});
// --- Communication actions ---
eventBus.subscribe('campaign.email.sent', 'engagement:email-sent', async (payload) => {
const contactId = await findContactByEmail(payload.email);
if (contactId) await addScore(contactId, 10);
});
eventBus.subscribe('sms.message.received', 'engagement:sms-received', async (payload) => {
const contactId = await findContactByPhone(payload.phone);
if (contactId) await addScore(contactId, 10);
});
eventBus.subscribe('listmonk.email.clicked', 'engagement:email-clicked', async (payload) => {
const contactId = await findContactByEmail(payload.subscriberEmail);
if (contactId) await addScore(contactId, 8);
});
eventBus.subscribe('listmonk.email.opened', 'engagement:email-opened', async (payload) => {
const contactId = await findContactByEmail(payload.subscriberEmail);
if (contactId) await addScore(contactId, 5);
});
// --- Passive actions ---
eventBus.subscribe('media.video.viewed', 'engagement:video-view', async (payload) => {
if (!payload.userId) return;
try {
const prisma = await lazyPrisma();
const user = await prisma.user.findUnique({
where: { id: payload.userId },
select: { email: true },
});
if (!user) return;
const contactId = await findContactByEmail(user.email);
if (contactId) await addScore(contactId, 3);
} catch {
// silent
}
});
}
/**
* Utility functions for querying engagement scores.
* Can be imported by API routes for score display.
*/
export const engagementScoring = {
/** Get a single contact's score */
async getScore(contactId: string): Promise<number> {
try {
const redis = await lazyRedis();
const score = await redis.get(`engagement:score:${contactId}`);
return score ? parseInt(score, 10) : 0;
} catch {
return 0;
}
},
/** Get top N contacts by engagement score */
async getLeaderboard(limit = 20): Promise<Array<{ contactId: string; score: number }>> {
try {
const redis = await lazyRedis();
const results = await redis.zrevrange('engagement:leaderboard', 0, limit - 1, 'WITHSCORES');
const leaderboard: Array<{ contactId: string; score: number }> = [];
for (let i = 0; i < results.length; i += 2) {
leaderboard.push({ contactId: results[i], score: parseInt(results[i + 1], 10) });
}
return leaderboard;
} catch {
return [];
}
},
/** Get last activity timestamp for a contact */
async getLastActivity(contactId: string): Promise<string | null> {
try {
const redis = await lazyRedis();
return await redis.get(`engagement:last:${contactId}`);
} catch {
return null;
}
},
};

View File

@ -0,0 +1,177 @@
/**
* Homepage Stats EventBus Listener
*
* Maintains real-time counters in Redis for the homepage dashboard and
* invalidates the homepage cache when underlying data changes.
*
* Redis keys:
* homepage:counter:emails total campaign emails sent
* homepage:counter:signups total shift signups
* homepage:counter:donations total donation count
* homepage:counter:donations:amt total donation amount (cents)
* homepage:counter:responses total campaign responses
* homepage:counter:canvass total canvass visits
* homepage:counter:videos total video views
* homepage:recent:{type} recent activity list (capped at 20)
*
* Cache invalidation:
* Deletes `homepage:public` when shifts, campaigns, or media change
* so the next request rebuilds with fresh data.
*
* No feature guard always active (counters are ephemeral in Redis).
*/
import { eventBus } from '../event-bus.service';
import { logger } from '../../utils/logger';
let redisPromise: ReturnType<typeof getRedis> | null = null;
async function getRedis() {
const { redis } = await import('../../config/redis');
return redis;
}
function lazyRedis() {
if (!redisPromise) redisPromise = getRedis();
return redisPromise;
}
const HOMEPAGE_CACHE_KEY = 'homepage:public';
/** Increment a counter and optionally invalidate the homepage cache. */
async function incrCounter(key: string, invalidateCache = false): Promise<void> {
try {
const redis = await lazyRedis();
await redis.incr(`homepage:counter:${key}`);
if (invalidateCache) await redis.del(HOMEPAGE_CACHE_KEY);
} catch (err) {
logger.debug(`Homepage counter increment failed (${key}):`, err);
}
}
/** Push a recent activity entry to a capped list. */
async function pushRecent(type: string, entry: Record<string, unknown>): Promise<void> {
try {
const redis = await lazyRedis();
const key = `homepage:recent:${type}`;
await redis.lpush(key, JSON.stringify({ ...entry, at: new Date().toISOString() }));
await redis.ltrim(key, 0, 19); // keep last 20
await redis.expire(key, 86400); // expire after 24h
} catch {
// silent
}
}
/** Invalidate homepage cache without incrementing anything. */
async function invalidateCache(): Promise<void> {
try {
const redis = await lazyRedis();
await redis.del(HOMEPAGE_CACHE_KEY);
} catch {
// silent
}
}
export function registerHomepageStatsListener(): void {
// --- Counter increments ---
eventBus.subscribe('campaign.email.sent', 'homepage:email-sent', async () => {
await incrCounter('emails');
});
eventBus.subscribe('shift.signup.created', 'homepage:shift-signup', async (payload) => {
await incrCounter('signups', true); // invalidate — signup count visible on homepage
await pushRecent('signups', { name: payload.userName, shift: payload.shiftTitle });
});
eventBus.subscribe('payment.donation.completed', 'homepage:donation', async (payload) => {
await incrCounter('donations');
try {
const redis = await lazyRedis();
await redis.incrby('homepage:counter:donations:amt', payload.amountCents);
} catch { /* silent */ }
await pushRecent('donations', { name: payload.name, amount: payload.amountCents });
});
eventBus.subscribe('response.submitted', 'homepage:response', async (payload) => {
await incrCounter('responses');
await pushRecent('responses', { campaign: payload.campaignTitle, rep: payload.representativeName });
});
eventBus.subscribe('canvass.visit.recorded', 'homepage:canvass', async () => {
await incrCounter('canvass');
});
eventBus.subscribe('media.video.viewed', 'homepage:video-view', async () => {
await incrCounter('videos');
});
// --- Cache invalidation (data visible on homepage changed) ---
eventBus.subscribe('shift.created', 'homepage:shift-changed', async () => {
await invalidateCache();
});
eventBus.subscribe('shift.deleted', 'homepage:shift-deleted', async () => {
await invalidateCache();
});
eventBus.subscribe('campaign.published', 'homepage:campaign-published', async () => {
await invalidateCache();
});
eventBus.subscribe('campaign.status.changed', 'homepage:campaign-status', async () => {
await invalidateCache();
});
eventBus.subscribe('media.video.published', 'homepage:video-published', async () => {
await invalidateCache();
});
eventBus.subscribe('ticketed-event.published', 'homepage:event-published', async () => {
await invalidateCache();
});
}
/**
* Utility functions for reading homepage stats from Redis.
* Can be imported by the homepage service for real-time counters.
*/
export const homepageStats = {
/** Get all counter values */
async getCounters(): Promise<Record<string, number>> {
try {
const redis = await lazyRedis();
const keys = [
'homepage:counter:emails',
'homepage:counter:signups',
'homepage:counter:donations',
'homepage:counter:donations:amt',
'homepage:counter:responses',
'homepage:counter:canvass',
'homepage:counter:videos',
];
const values = await redis.mget(...keys);
return {
totalEmailsSent: parseInt(values[0] ?? '0', 10),
totalShiftSignups: parseInt(values[1] ?? '0', 10),
totalDonations: parseInt(values[2] ?? '0', 10),
totalDonationAmountCents: parseInt(values[3] ?? '0', 10),
totalResponses: parseInt(values[4] ?? '0', 10),
totalCanvassVisits: parseInt(values[5] ?? '0', 10),
totalVideoViews: parseInt(values[6] ?? '0', 10),
};
} catch {
return {};
}
},
/** Get recent activity of a given type */
async getRecent(type: string, limit = 10): Promise<Array<Record<string, unknown>>> {
try {
const redis = await lazyRedis();
const items = await redis.lrange(`homepage:recent:${type}`, 0, limit - 1);
return items.map(item => JSON.parse(item));
} catch {
return [];
}
},
};

View File

@ -12,6 +12,8 @@ import { registerCrmActivityListener } from './crm-activity.listener';
import { registerCalendarSyncListener } from './calendar-sync.listener';
import { registerN8nWebhookListener } from './n8n-webhook.listener';
import { registerGancioListener } from './gancio.listener';
import { registerEngagementScoringListener } from './engagement-scoring.listener';
import { registerHomepageStatsListener } from './homepage-stats.listener';
export function registerAllEventListeners(): void {
const listeners = [
@ -21,6 +23,8 @@ export function registerAllEventListeners(): void {
{ name: 'Calendar Sync', register: registerCalendarSyncListener },
{ name: 'n8n Webhook', register: registerN8nWebhookListener },
{ name: 'Gancio', register: registerGancioListener },
{ name: 'Engagement Scoring', register: registerEngagementScoringListener },
{ name: 'Homepage Stats', register: registerHomepageStatsListener },
];
let registered = 0;