"use strict"; var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) { if (k2 === undefined) k2 = k; var desc = Object.getOwnPropertyDescriptor(m, k); if (!desc || ("get" in desc ? !m.__esModule : desc.writable || desc.configurable)) { desc = { enumerable: true, get: function() { return m[k]; } }; } Object.defineProperty(o, k2, desc); }) : (function(o, m, k, k2) { if (k2 === undefined) k2 = k; o[k2] = m[k]; })); var __setModuleDefault = (this && this.__setModuleDefault) || (Object.create ? (function(o, v) { Object.defineProperty(o, "default", { enumerable: true, value: v }); }) : function(o, v) { o["default"] = v; }); var __importStar = (this && this.__importStar) || (function () { var ownKeys = function(o) { ownKeys = Object.getOwnPropertyNames || function (o) { var ar = []; for (var k in o) if (Object.prototype.hasOwnProperty.call(o, k)) ar[ar.length] = k; return ar; }; return ownKeys(o); }; return function (mod) { if (mod && mod.__esModule) return mod; var result = {}; if (mod != null) for (var k = ownKeys(mod), i = 0; i < k.length; i++) if (k[i] !== "default") __createBinding(result, mod, k[i]); __setModuleDefault(result, mod); return result; }; })(); Object.defineProperty(exports, "__esModule", { value: true }); exports.geocodeQueueService = void 0; const bullmq_1 = require("bullmq"); const client_1 = require("@prisma/client"); const redis_1 = require("../config/redis"); const database_1 = require("../config/database"); const geocoding_service_1 = require("../modules/map/geocoding/geocoding.service"); const logger_1 = require("../utils/logger"); const env_1 = require("../config/env"); class GeocodeQueueService { queue; worker = null; constructor() { this.queue = new bullmq_1.Queue('bulk-geocode', { connection: { host: redis_1.redis.options.host, port: redis_1.redis.options.port || 6379, password: redis_1.redis.options.password, }, }); } async startWorker() { if (this.worker || env_1.env.BULK_GEOCODE_ENABLED !== 'true') { return; } this.worker = new bullmq_1.Worker('bulk-geocode', async (job) => { return this.processBulkGeocode(job); }, { connection: { host: redis_1.redis.options.host, port: redis_1.redis.options.port || 6379, password: redis_1.redis.options.password, }, concurrency: 1, // Process one bulk job at a time }); this.worker.on('completed', (job) => { logger_1.logger.info(`Bulk geocode job ${job.id} completed: ${JSON.stringify(job.returnvalue)}`); }); this.worker.on('failed', (job, err) => { logger_1.logger.error(`Bulk geocode job ${job?.id} failed:`, err); }); logger_1.logger.info('Bulk geocode worker started'); } async close() { if (this.worker) { await this.worker.close(); } await this.queue.close(); } async startBulkGeocode(userId, filters) { // Validate limit const limit = Math.min(filters.limit || 1000, env_1.env.BULK_GEOCODE_MAX_BATCH); const job = await this.queue.add('bulk-geocode', { userId, filters: { ...filters, limit }, timestamp: Date.now(), }); return job.id; } async getJobStatus(jobId) { const job = await this.queue.getJob(jobId); if (!job) { throw new Error('Job not found'); } const state = await job.getState(); const progress = job.progress; return { id: job.id, state, progress, result: job.returnvalue, failedReason: job.failedReason, }; } async getStats() { const [waiting, active, completed, failed] = await Promise.all([ this.queue.getWaitingCount(), this.queue.getActiveCount(), this.queue.getCompletedCount(), this.queue.getFailedCount(), ]); return { waiting, active, completed, failed }; } async processBulkGeocode(job) { const { userId, filters } = job.data; const startTime = Date.now(); logger_1.logger.info(`Starting bulk geocode job ${job.id} with filters: ${JSON.stringify(filters)}`); // Build query filters const where = {}; // Confidence threshold filter if (filters.confidenceThreshold !== undefined) { where.OR = [ { geocodeConfidence: { lt: filters.confidenceThreshold } }, { geocodeConfidence: null }, ]; } // Building type filter if (filters.buildingType) { where.buildingType = filters.buildingType; } // Cut boundary filter (point-in-polygon) if (filters.cutId) { const cut = await database_1.prisma.cut.findUnique({ where: { id: filters.cutId }, select: { geojson: true }, }); if (cut?.geojson) { // Note: This is a simplified version. For production, you'd want to use PostGIS // or pre-filter by bounding box before applying point-in-polygon const geojson = cut.geojson; if (geojson.type === 'Polygon' && geojson.coordinates) { // We'll apply point-in-polygon filtering after fetching logger_1.logger.info(`Will apply point-in-polygon filtering for cut ${filters.cutId}`); } } } // Fetch locations to geocode const locations = await database_1.prisma.location.findMany({ where, select: { id: true, address: true, latitude: true, longitude: true, geocodeConfidence: true, }, take: filters.limit || 1000, orderBy: [ { geocodeConfidence: 'asc' }, // Lowest confidence first { updatedAt: 'asc' }, ], }); // Apply cut filtering if needed (point-in-polygon) let filteredLocations = locations; if (filters.cutId) { const cut = await database_1.prisma.cut.findUnique({ where: { id: filters.cutId }, select: { geojson: true }, }); if (cut?.geojson) { const { isPointInPolygon, parseGeoJsonPolygon } = await Promise.resolve().then(() => __importStar(require('../utils/spatial'))); const polygons = parseGeoJsonPolygon(cut.geojson); if (polygons && polygons.length > 0) { filteredLocations = locations.filter((loc) => { if (!loc.latitude || !loc.longitude) return false; const lat = Number(loc.latitude); const lng = Number(loc.longitude); return polygons.some((polygon) => isPointInPolygon(lat, lng, polygon)); }); } } } const total = filteredLocations.length; let processed = 0; let improved = 0; let failed = 0; let unchanged = 0; logger_1.logger.info(`Processing ${total} locations for bulk geocode`); // Process each location for (const location of filteredLocations) { if (!location.address) { unchanged++; processed++; continue; } try { // Update progress await job.updateProgress({ total, processed, improved, failed, unchanged, currentAddress: location.address, }); // Geocode the address const result = await geocoding_service_1.geocodingService.geocode(location.address); if (result) { const oldConfidence = location.geocodeConfidence || 0; const newConfidence = result.confidence; // Only update if confidence improved if (newConfidence > oldConfidence) { await database_1.prisma.$transaction(async (tx) => { // Update location await tx.location.update({ where: { id: location.id }, data: { latitude: result.latitude, longitude: result.longitude, geocodeConfidence: result.confidence, geocodeProvider: result.provider, }, }); // Record history await tx.locationHistory.create({ data: { locationId: location.id, userId, action: client_1.LocationHistoryAction.BULK_GEOCODED, field: null, oldValue: null, newValue: null, metadata: { provider: result.provider, oldConfidence, newConfidence: result.confidence, improved: true, }, }, }); }); improved++; } else { unchanged++; } } else { failed++; } } catch (error) { logger_1.logger.error(`Failed to geocode location ${location.id}:`, error); failed++; } processed++; } const duration = Math.round((Date.now() - startTime) / 1000); logger_1.logger.info(`Bulk geocode job ${job.id} completed: ${improved} improved, ${unchanged} unchanged, ${failed} failed in ${duration}s`); return { total, processed, improved, failed, unchanged, duration, }; } } exports.geocodeQueueService = new GeocodeQueueService(); //# sourceMappingURL=geocode-queue.service.js.map