changemaker.lite/api/dist/modules/map/locations/nar-import.service.js

532 lines
24 KiB
JavaScript

"use strict";
var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) {
if (k2 === undefined) k2 = k;
var desc = Object.getOwnPropertyDescriptor(m, k);
if (!desc || ("get" in desc ? !m.__esModule : desc.writable || desc.configurable)) {
desc = { enumerable: true, get: function() { return m[k]; } };
}
Object.defineProperty(o, k2, desc);
}) : (function(o, m, k, k2) {
if (k2 === undefined) k2 = k;
o[k2] = m[k];
}));
var __setModuleDefault = (this && this.__setModuleDefault) || (Object.create ? (function(o, v) {
Object.defineProperty(o, "default", { enumerable: true, value: v });
}) : function(o, v) {
o["default"] = v;
});
var __importStar = (this && this.__importStar) || (function () {
var ownKeys = function(o) {
ownKeys = Object.getOwnPropertyNames || function (o) {
var ar = [];
for (var k in o) if (Object.prototype.hasOwnProperty.call(o, k)) ar[ar.length] = k;
return ar;
};
return ownKeys(o);
};
return function (mod) {
if (mod && mod.__esModule) return mod;
var result = {};
if (mod != null) for (var k = ownKeys(mod), i = 0; i < k.length; i++) if (k[i] !== "default") __createBinding(result, mod, k[i]);
__setModuleDefault(result, mod);
return result;
};
})();
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
Object.defineProperty(exports, "__esModule", { value: true });
exports.narImportService = void 0;
exports.writeProgress = writeProgress;
const fs = __importStar(require("fs"));
const path = __importStar(require("path"));
const csv_parse_1 = require("csv-parse");
const proj4_1 = __importDefault(require("proj4"));
const database_1 = require("../../../config/database");
const redis_1 = require("../../../config/redis");
const logger_1 = require("../../../utils/logger");
const spatial_1 = require("../../../utils/spatial");
const path_validator_1 = require("../../../utils/path-validator");
const env_1 = require("../../../config/env");
// Statistics Canada Lambert Conformal Conic (EPSG:3347) → WGS84
proj4_1.default.defs('EPSG:3347', '+proj=lcc +lat_1=49 +lat_2=77 +lat_0=63.390675 +lon_0=-91.86666666666666 +x_0=6200000 +y_0=3000000 +ellps=GRS80 +units=m +no_defs');
const PROVINCE_MAP = {
'10': { name: 'Newfoundland and Labrador', abbr: 'NL' },
'11': { name: 'Prince Edward Island', abbr: 'PE' },
'12': { name: 'Nova Scotia', abbr: 'NS' },
'13': { name: 'New Brunswick', abbr: 'NB' },
'24': { name: 'Quebec', abbr: 'QC' },
'35': { name: 'Ontario', abbr: 'ON' },
'46': { name: 'Manitoba', abbr: 'MB' },
'47': { name: 'Saskatchewan', abbr: 'SK' },
'48': { name: 'Alberta', abbr: 'AB' },
'59': { name: 'British Columbia', abbr: 'BC' },
'60': { name: 'Yukon', abbr: 'YT' },
'61': { name: 'Northwest Territories', abbr: 'NT' },
'62': { name: 'Nunavut', abbr: 'NU' },
};
function formatBytes(bytes) {
if (bytes < 1024)
return `${bytes} B`;
if (bytes < 1024 * 1024)
return `${(bytes / 1024).toFixed(1)} KB`;
if (bytes < 1024 * 1024 * 1024)
return `${(bytes / (1024 * 1024)).toFixed(1)} MB`;
return `${(bytes / (1024 * 1024 * 1024)).toFixed(1)} GB`;
}
function roundCoord(val, decimals = 5) {
const factor = Math.pow(10, decimals);
return Math.round(val * factor) / factor;
}
function getDataDir() {
return env_1.env.NAR_DATA_DIR;
}
/** Scan the NAR data directory and find the most recent dataset */
function findNarDirectory(baseDir) {
if (!fs.existsSync(baseDir))
return null;
// Look for subdirectories with YYYYMM pattern (e.g., 202512)
const entries = fs.readdirSync(baseDir, { withFileTypes: true });
const dateDirs = entries
.filter((e) => e.isDirectory() && /^\d{6}$/.test(e.name))
.map((e) => e.name)
.sort()
.reverse();
if (dateDirs.length > 0)
return path.join(baseDir, dateDirs[0]);
// Fallback: check if Addresses/ exists directly
if (fs.existsSync(path.join(baseDir, 'Addresses')))
return baseDir;
return null;
}
function parseFileInfo(filePath, filename) {
// Match patterns like Address_13.csv, Address_24_part_1.csv, Location_59_part_2.csv
const match = filename.match(/^(Address|Location)_(\d+)(?:_part_(\d+))?\.csv$/i);
if (!match)
return null;
const type = match[1].toLowerCase() === 'address' ? 'address' : 'location';
const provinceCode = match[2];
const partNumber = match[3] ? parseInt(match[3], 10) : undefined;
const prov = PROVINCE_MAP[provinceCode];
if (!prov)
return null;
const stats = fs.statSync(filePath);
return {
filename,
fullPath: filePath,
sizeBytes: stats.size,
sizeFormatted: formatBytes(stats.size),
provinceCode,
provinceName: prov.name,
provinceAbbr: prov.abbr,
type,
partNumber,
};
}
const PROGRESS_TTL = 3600; // 1 hour
const PROGRESS_INTERVAL = 5000; // write every 5000 rows
async function writeProgress(importId, progress) {
try {
await redis_1.redis.set(`nar-import:${importId}`, JSON.stringify(progress), 'EX', PROGRESS_TTL);
}
catch (err) {
logger_1.logger.warn('Failed to write NAR import progress to Redis', err);
}
}
exports.narImportService = {
/** List available NAR datasets grouped by province */
async listDatasets() {
const baseDir = getDataDir();
const narDir = findNarDirectory(baseDir);
if (!narDir)
return { narDir: null, datasets: [] };
// Validate directory paths to prevent traversal attacks
const addressDir = await (0, path_validator_1.validateFilePath)(baseDir, path.relative(baseDir, path.join(narDir, 'Addresses')));
const locationDir = await (0, path_validator_1.validateFilePath)(baseDir, path.relative(baseDir, path.join(narDir, 'Locations')));
const files = [];
if (fs.existsSync(addressDir)) {
for (const f of fs.readdirSync(addressDir)) {
// Validate each file path
const filePath = await (0, path_validator_1.validateFilePath)(addressDir, f);
const info = parseFileInfo(filePath, f);
if (info)
files.push(info);
}
}
if (fs.existsSync(locationDir)) {
for (const f of fs.readdirSync(locationDir)) {
// Validate each file path
const filePath = await (0, path_validator_1.validateFilePath)(locationDir, f);
const info = parseFileInfo(filePath, f);
if (info)
files.push(info);
}
}
// Group by province
const byProvince = new Map();
for (const file of files) {
let dataset = byProvince.get(file.provinceCode);
if (!dataset) {
dataset = {
provinceCode: file.provinceCode,
provinceName: file.provinceName,
provinceAbbr: file.provinceAbbr,
addressFiles: [],
locationFiles: [],
totalAddressSize: 0,
totalLocationSize: 0,
};
byProvince.set(file.provinceCode, dataset);
}
if (file.type === 'address') {
dataset.addressFiles.push(file);
dataset.totalAddressSize += file.sizeBytes;
}
else {
dataset.locationFiles.push(file);
dataset.totalLocationSize += file.sizeBytes;
}
}
// Sort files within each dataset, sort datasets by province code
const datasets = Array.from(byProvince.values());
for (const ds of datasets) {
ds.addressFiles.sort((a, b) => (a.partNumber ?? 0) - (b.partNumber ?? 0));
ds.locationFiles.sort((a, b) => (a.partNumber ?? 0) - (b.partNumber ?? 0));
}
datasets.sort((a, b) => a.provinceCode.localeCompare(b.provinceCode));
return { narDir, datasets };
},
/**
* Load Location file(s) into a lookup Map for a given province.
* Returns Map<LOC_GUID, { lat, lng, fedDistrict, fedCode }>.
*/
async loadLocationLookup(locationFiles) {
const lookup = new Map();
for (const file of locationFiles) {
logger_1.logger.info(`Loading location lookup from ${file.filename} (${file.sizeFormatted})`);
const parser = fs.createReadStream(file.fullPath).pipe((0, csv_parse_1.parse)({ columns: true, skip_empty_lines: true, trim: true, bom: true }));
for await (const record of parser) {
const locGuid = (record.LOC_GUID ?? '').trim();
if (!locGuid)
continue;
const lat = parseFloat(record.BG_LATITUDE ?? '');
const lng = parseFloat(record.BG_LONGITUDE ?? '');
if (isNaN(lat) || isNaN(lng))
continue;
lookup.set(locGuid, {
lat,
lng,
fedDistrict: (record.FED_ENG_NAME ?? '').trim() || undefined,
fedCode: (record.FED_CODE ?? '').trim() || undefined,
});
}
logger_1.logger.info(`Loaded ${lookup.size} locations from ${file.filename}`);
}
return lookup;
},
/** Import a province's NAR data using Location+Address file join */
async importProvince(userId, options, importId) {
const startTime = Date.now();
const { narDir, datasets } = await this.listDatasets();
if (!narDir) {
throw new Error('NAR data directory not found. Place NAR data in ' + getDataDir());
}
const dataset = datasets.find((d) => d.provinceCode === options.provinceCode);
if (!dataset) {
throw new Error(`Province ${options.provinceCode} not found in NAR data`);
}
if (dataset.addressFiles.length === 0) {
throw new Error(`No address files found for province ${options.provinceCode}`);
}
const result = {
provinceCode: dataset.provinceCode,
provinceName: dataset.provinceName,
totalRows: 0,
locationsCreated: 0,
addressesCreated: 0,
skippedDuplicate: 0,
skippedOutOfBounds: 0,
skippedNonResidential: 0,
skippedInvalid: 0,
errors: [],
durationMs: 0,
};
// Helper to write progress if importId is provided
const progress = {
status: 'loading-locations',
totalRows: 0,
locationsCreated: 0,
addressesCreated: 0,
skippedDuplicate: 0,
skippedOutOfBounds: 0,
skippedNonResidential: 0,
skippedInvalid: 0,
currentFile: '',
provinceName: dataset.provinceName,
};
const updateProgress = async (updates) => {
if (!importId)
return;
if (updates)
Object.assign(progress, updates);
await writeProgress(importId, progress);
};
// Phase 1: Load Location file(s) for coordinate + federal district lookup
await updateProgress();
let locationLookup;
if (dataset.locationFiles.length > 0) {
locationLookup = await this.loadLocationLookup(dataset.locationFiles);
logger_1.logger.info(`Location lookup ready: ${locationLookup.size} entries for ${dataset.provinceName}`);
}
else {
locationLookup = new Map();
logger_1.logger.warn(`No Location files for ${dataset.provinceName}, will use proj4 coordinate conversion`);
}
// Load cut polygon if filtering by cut
let cutPolygon;
if (options.filterType === 'cut' && options.cutId) {
const cut = await database_1.prisma.cut.findUnique({ where: { id: options.cutId } });
if (cut) {
try {
cutPolygon = (0, spatial_1.parseGeoJsonPolygon)(cut.geojson);
}
catch {
logger_1.logger.warn(`Invalid cut polygon for ${options.cutId}`);
}
}
}
// Load existing coordinates for deduplication
const existingCoords = new Set();
if (options.deduplicateRadius > 0) {
const existing = await database_1.prisma.location.findMany({
select: { latitude: true, longitude: true },
}); // No where clause needed - latitude/longitude are non-nullable
for (const loc of existing) {
const key = `${roundCoord(Number(loc.latitude))}:${roundCoord(Number(loc.longitude))}`;
existingCoords.add(key);
}
}
const inFileCoords = new Set();
// Phase 2: Stream Address file(s) and group by LOC_GUID
await updateProgress({ status: 'importing' });
const addressesByLocGuid = new Map();
for (const addressFile of dataset.addressFiles) {
logger_1.logger.info(`Processing ${addressFile.filename} (${addressFile.sizeFormatted})`);
await updateProgress({ currentFile: addressFile.filename });
const parser = fs.createReadStream(addressFile.fullPath).pipe((0, csv_parse_1.parse)({ columns: true, skip_empty_lines: true, trim: true, bom: true }));
for await (const record of parser) {
result.totalRows++;
try {
const locGuid = (record.LOC_GUID ?? '').trim();
if (!locGuid) {
result.skippedInvalid++;
continue;
}
// Extract BU_USE for residential filter
const buUse = parseInt(record.BU_USE ?? '', 10);
if (options.residentialOnly && buUse === 3) {
result.skippedNonResidential++;
continue;
}
// Extract address components
const civicNo = (record.CIVIC_NO ?? '').trim();
const civicSuffix = (record.CIVIC_NO_SUFFIX ?? '').trim();
const streetName = (record.OFFICIAL_STREET_NAME ?? '').trim();
const streetType = (record.OFFICIAL_STREET_TYPE ?? '').trim();
const streetDir = (record.OFFICIAL_STREET_DIR ?? '').trim();
const city = (record.MAIL_MUN_NAME ?? record.CSD_ENG_NAME ?? '').trim();
const prov = (record.MAIL_PROV_ABVN ?? '').trim();
const postalCode = (record.MAIL_POSTAL_CODE ?? '').trim() || undefined;
const unitNumber = (record.APT_NO_LABEL ?? '').trim() || undefined;
const addrGuid = (record.ADDR_GUID ?? '').trim();
// City filter
if (options.filterType === 'city' && options.filterCity) {
if (city.toLowerCase() !== options.filterCity.toLowerCase()) {
result.skippedOutOfBounds++;
continue;
}
}
// Postal prefix filter
if (options.filterType === 'postalPrefix' && options.filterPostalPrefix && postalCode) {
if (!postalCode.toUpperCase().startsWith(options.filterPostalPrefix.toUpperCase())) {
result.skippedOutOfBounds++;
continue;
}
}
else if (options.filterType === 'postalPrefix' && options.filterPostalPrefix && !postalCode) {
result.skippedOutOfBounds++;
continue;
}
if (!streetName) {
result.skippedInvalid++;
continue;
}
// Get coordinates + federal district from Location lookup
let lat;
let lng;
let federalDistrict;
const locData = locationLookup.get(locGuid);
if (locData) {
lat = locData.lat;
lng = locData.lng;
federalDistrict = locData.fedDistrict;
}
else {
// Fallback: convert BG_X/BG_Y via proj4
const bgX = parseFloat(record.BG_X ?? '');
const bgY = parseFloat(record.BG_Y ?? '');
if (!isNaN(bgX) && !isNaN(bgY)) {
const [projLng, projLat] = (0, proj4_1.default)('EPSG:3347', 'EPSG:4326', [bgX, bgY]);
lat = projLat;
lng = projLng;
}
}
if (lat === undefined || lng === undefined) {
result.skippedInvalid++;
continue;
}
// Cut boundary filter
if (cutPolygon && cutPolygon.length > 0) {
const inside = cutPolygon.some((ring) => (0, spatial_1.isPointInPolygon)(lat, lng, ring));
if (!inside) {
result.skippedOutOfBounds++;
continue;
}
}
// Group addresses by LOC_GUID
if (!addressesByLocGuid.has(locGuid)) {
addressesByLocGuid.set(locGuid, []);
}
addressesByLocGuid.get(locGuid).push({
addrGuid,
locGuid,
unitNumber,
civicNo,
civicSuffix,
streetName,
streetType,
streetDir,
city,
prov,
postalCode,
buUse: isNaN(buUse) ? undefined : buUse,
lat,
lng,
federalDistrict,
});
// Write progress to Redis every PROGRESS_INTERVAL rows
if (result.totalRows % PROGRESS_INTERVAL === 0) {
await updateProgress({
totalRows: result.totalRows,
locationsCreated: result.locationsCreated,
addressesCreated: result.addressesCreated,
skippedDuplicate: result.skippedDuplicate,
skippedOutOfBounds: result.skippedOutOfBounds,
skippedNonResidential: result.skippedNonResidential,
skippedInvalid: result.skippedInvalid,
});
}
}
catch (err) {
result.skippedInvalid++;
if (result.errors.length < 50) {
const msg = err instanceof Error ? err.message : 'Unknown error';
result.errors.push(`Row ${result.totalRows}: ${msg}`);
}
}
}
}
// Phase 3: Create Locations and Addresses in batches
logger_1.logger.info(`Creating locations and addresses from ${addressesByLocGuid.size} unique buildings`);
await updateProgress({ status: 'creating-records' });
const locationBatch = [];
const addressBatch = [];
const locationIdByGuid = new Map();
// Helper function to build base address without unit number
const buildBaseAddress = (addr) => {
const streetParts = [
addr.civicNo + (addr.civicSuffix || ''),
addr.streetName,
addr.streetType,
addr.streetDir,
].filter(Boolean);
let baseAddr = streetParts.join(' ');
if (addr.city)
baseAddr += `, ${addr.city}`;
if (addr.prov)
baseAddr += `, ${addr.prov}`;
return baseAddr;
};
for (const [locGuid, addressRecords] of addressesByLocGuid.entries()) {
const firstRecord = addressRecords[0];
// Deduplication check (using first address coordinates)
if (options.deduplicateRadius > 0) {
const coordKey = `${roundCoord(firstRecord.lat)}:${roundCoord(firstRecord.lng)}`;
if (existingCoords.has(coordKey) || inFileCoords.has(coordKey)) {
result.skippedDuplicate += addressRecords.length;
continue;
}
inFileCoords.add(coordKey);
}
// Create Location (one per LOC_GUID)
const locationId = `loc_${Date.now()}_${Math.random().toString(36).substring(7)}`;
locationBatch.push({
id: locationId,
latitude: firstRecord.lat,
longitude: firstRecord.lng,
address: buildBaseAddress(firstRecord),
postalCode: firstRecord.postalCode,
province: firstRecord.prov || undefined,
federalDistrict: firstRecord.federalDistrict,
buildingUse: firstRecord.buUse,
locGuid,
buildingType: addressRecords.length > 1 ? 'MULTI_UNIT' : 'SINGLE_FAMILY',
totalUnits: addressRecords.length,
createdByUserId: userId,
});
locationIdByGuid.set(locGuid, locationId);
// Create Addresses (one per unit)
for (const record of addressRecords) {
addressBatch.push({
id: `addr_${Date.now()}_${Math.random().toString(36).substring(7)}`,
locationId,
unitNumber: record.unitNumber,
addrGuid: record.addrGuid,
createdByUserId: userId,
});
}
// Batch insert every 1000 locations
if (locationBatch.length >= 1000) {
await database_1.prisma.location.createMany({ data: locationBatch, skipDuplicates: true });
await database_1.prisma.address.createMany({ data: addressBatch, skipDuplicates: true });
result.locationsCreated += locationBatch.length;
result.addressesCreated += addressBatch.length;
locationBatch.length = 0;
addressBatch.length = 0;
logger_1.logger.info(`Progress: ${result.locationsCreated} locations, ${result.addressesCreated} addresses created`);
}
}
// Flush remaining batches
if (locationBatch.length > 0) {
await database_1.prisma.location.createMany({ data: locationBatch, skipDuplicates: true });
await database_1.prisma.address.createMany({ data: addressBatch, skipDuplicates: true });
result.locationsCreated += locationBatch.length;
result.addressesCreated += addressBatch.length;
}
result.durationMs = Date.now() - startTime;
logger_1.logger.info(`NAR import complete for ${dataset.provinceName}: ${result.locationsCreated} locations, ${result.addressesCreated} addresses created from ${result.totalRows} rows in ${(result.durationMs / 1000).toFixed(1)}s`);
await updateProgress({
status: 'complete',
totalRows: result.totalRows,
locationsCreated: result.locationsCreated,
addressesCreated: result.addressesCreated,
skippedDuplicate: result.skippedDuplicate,
skippedOutOfBounds: result.skippedOutOfBounds,
skippedNonResidential: result.skippedNonResidential,
skippedInvalid: result.skippedInvalid,
result,
});
return result;
},
};
//# sourceMappingURL=nar-import.service.js.map