import { exec as execCb } from 'child_process'; import { promisify } from 'util'; import fs from 'fs/promises'; import path from 'path'; import { UpgradeStatus, AuditAction, InstanceStatus, Prisma, Instance } from '@prisma/client'; import { prisma } from '../lib/prisma'; import { logger } from '../utils/logger'; import { createEvent } from './event.service'; import { getRemoteDriverForInstance } from './execution-driver'; import type { AgentUpdateStatus } from './remote-driver'; /** * Shell-injection guards. Any user- or DB-controlled value that flows into * `bash`/`git` via `exec()` must be validated against these regexes first. * Added 2026-04-12 after red-team audit found unvalidated `branch` and `basePath` * values reaching the shell. */ const SAFE_BRANCH = /^[a-zA-Z0-9][a-zA-Z0-9_.\/-]{0,99}$/; const SAFE_PATH = /^\/[a-zA-Z0-9/_.-]{1,255}$/; function assertSafeBranch(branch: string | null | undefined, ctx: string): void { if (!branch) return; if (!SAFE_BRANCH.test(branch)) { throw new Error(`Invalid git branch name (${ctx}): must match ${SAFE_BRANCH}`); } } function assertSafePath(p: string | null | undefined, ctx: string): void { if (!p) return; if (!SAFE_PATH.test(p)) { throw new Error(`Invalid path (${ctx}): must match ${SAFE_PATH}`); } } /** * Write an INSTANCE_UPGRADE audit log entry capturing a terminal outcome. * Wrapped in try/catch so that an audit-log DB failure cannot mask the * underlying upgrade row status update. * * Called from all three terminal paths (both local and remote): * - 'completed' — upgrade.sh/agent reported success * - 'failed' — upgrade.sh/agent reported failure * - 'orchestration_error' — CCP-side exception, timeout, or unreachable agent */ async function writeUpgradeAuditLog(args: { upgradeId: string; instanceId: string; triggeredById: string | null; source: 'local' | 'remote'; outcome: 'completed' | 'failed' | 'orchestration_error'; previousCommit: string | null; newCommit: string | null; durationSeconds: number | null; errorMessage?: string | null; }): Promise { if (!args.triggeredById) return; try { await prisma.auditLog.create({ data: { userId: args.triggeredById, instanceId: args.instanceId, action: AuditAction.INSTANCE_UPGRADE, details: { upgradeId: args.upgradeId, source: args.source, outcome: args.outcome, previousCommit: args.previousCommit, newCommit: args.newCommit, durationSeconds: args.durationSeconds, ...(args.errorMessage ? { errorMessage: args.errorMessage.substring(0, 500) } : {}), } as unknown as Prisma.InputJsonValue, }, }); } catch (err) { logger.error(`[upgrade] failed to write audit log for ${args.upgradeId}: ${(err as Error).message}`); } } const exec = promisify(execCb); const UPGRADE_TIMEOUT = 600_000; // 10 minutes — local upgrades const REMOTE_UPGRADE_TIMEOUT = 15 * 60 * 1000; // 15 minutes — remote (network round trips) const PROGRESS_POLL_INTERVAL = 2_000; // 2 seconds // ─── Update Check ───────────────────────────────────────────────── export interface UpdateStatus { branch: string; currentCommit: string; currentMessage?: string; remoteCommit: string | null; commitsBehind: number; changelog: Array<{ hash: string; message: string; date: string; author: string }>; checkedAt: string; error: string | null; } /** * Check for available updates. Branches on instance.isRemote: * - Local: runs upgrade-check.sh in the instance's basePath and reads status.json * - Remote: calls the agent's POST /upgrade/check endpoint over mTLS */ export async function checkForUpdates(instanceId: string): Promise { const instance = await prisma.instance.findUnique({ where: { id: instanceId } }); if (!instance) throw new Error('Instance not found'); if (instance.isRemote) { return checkForUpdatesRemote(instance); } return checkForUpdatesLocal(instance); } /** * Remote check: ask the agent to run upgrade-check.sh and return its status.json. */ async function checkForUpdatesRemote(instance: Instance): Promise { try { const driver = await getRemoteDriverForInstance({ id: instance.id, slug: instance.slug, isRemote: instance.isRemote, agentUrl: instance.agentUrl, }); const status: AgentUpdateStatus = await driver.checkForUpdates(); return { branch: status.branch, currentCommit: status.currentCommit, currentMessage: status.currentMessage, remoteCommit: status.remoteCommit, commitsBehind: status.commitsBehind, changelog: status.changelog, checkedAt: status.checkedAt, error: status.error, }; } catch (err) { logger.warn(`[upgrade] remote check failed for ${instance.slug}: ${(err as Error).message}`); return { branch: instance.gitBranch, currentCommit: instance.gitCommit || 'unknown', remoteCommit: null, commitsBehind: 0, changelog: [], checkedAt: new Date().toISOString(), error: `Remote check failed: ${(err as Error).message}`, }; } } async function checkForUpdatesLocal(instance: Instance): Promise { const basePath = instance.basePath; const statusFile = path.join(basePath, 'data', 'upgrade', 'status.json'); const scriptPath = path.join(basePath, 'scripts', 'upgrade-check.sh'); // Try to run upgrade-check.sh try { await fs.access(scriptPath); await exec(`bash "${scriptPath}"`, { cwd: basePath, timeout: 30_000, env: { ...process.env, COMPOSE_ANSI: 'never' }, }); } catch (err) { logger.warn(`[upgrade] upgrade-check.sh failed for ${instance.slug}: ${(err as Error).message}`); // Script may have still written status.json before failing — try reading it } // Read status.json try { const raw = await fs.readFile(statusFile, 'utf-8'); const status = JSON.parse(raw) as UpdateStatus; return status; } catch { // If no status.json exists, try to gather basic git info try { const { stdout: branch } = await exec('git rev-parse --abbrev-ref HEAD', { cwd: basePath, timeout: 5_000 }); const { stdout: commit } = await exec('git rev-parse --short HEAD', { cwd: basePath, timeout: 5_000 }); return { branch: branch.trim(), currentCommit: commit.trim(), remoteCommit: null, commitsBehind: 0, changelog: [], checkedAt: new Date().toISOString(), error: 'Could not check for remote updates', }; } catch { return { branch: instance.gitBranch, currentCommit: instance.gitCommit || 'unknown', remoteCommit: null, commitsBehind: 0, changelog: [], checkedAt: new Date().toISOString(), error: 'Could not determine version info (no .git directory?)', }; } } } // ─── Upgrade Orchestration ──────────────────────────────────────── export interface StartUpgradeOptions { skipBackup?: boolean; useRegistry?: boolean; branch?: string; } /** * Start an upgrade for an instance. Returns the created InstanceUpgrade record. * The actual upgrade runs asynchronously (fire-and-forget). */ export async function startUpgrade( instanceId: string, userId: string, ipAddress?: string, options?: StartUpgradeOptions ) { const instance = await prisma.instance.findUnique({ where: { id: instanceId } }); if (!instance) throw new Error('Instance not found'); if (instance.status !== InstanceStatus.RUNNING && instance.status !== InstanceStatus.STOPPED) { throw new Error(`Cannot upgrade instance in ${instance.status} state`); } // Check for in-progress upgrades const active = await prisma.instanceUpgrade.findFirst({ where: { instanceId, status: { in: [UpgradeStatus.PENDING, UpgradeStatus.IN_PROGRESS] }, }, }); if (active) { throw new Error('An upgrade is already in progress for this instance'); } // Get current commit for tracking. For local instances we can read it from // git directly; for remote instances we either trust the DB-tracked value // (set by previous upgrade-check) or leave it null and let upgrade.sh // report the previous commit in result.json. let currentCommit: string | null = instance.gitCommit; if (!instance.isRemote) { try { const { stdout } = await exec('git rev-parse --short HEAD', { cwd: instance.basePath, timeout: 5_000, }); currentCommit = stdout.trim(); } catch { // Non-critical — may be a release install without .git } } // Guard against shell injection via branch name (flows into bash exec). assertSafeBranch(options?.branch, 'options.branch'); assertSafeBranch(instance.gitBranch, 'instance.gitBranch'); assertSafePath(instance.basePath, 'instance.basePath'); const branch = options?.branch || instance.gitBranch; // Create upgrade record const upgrade = await prisma.instanceUpgrade.create({ data: { instanceId, status: UpgradeStatus.PENDING, previousCommit: currentCommit, branch, triggeredById: userId, }, }); // Audit log await prisma.auditLog.create({ data: { userId, instanceId, action: AuditAction.INSTANCE_UPGRADE, details: { upgradeId: upgrade.id, previousCommit: currentCommit, branch, source: instance.isRemote ? 'remote' : 'local', options: options || {}, } as unknown as Prisma.InputJsonValue, ipAddress, }, }); // Fire-and-forget: branch on isRemote if (instance.isRemote) { runRemoteUpgrade(upgrade.id, instance, options).catch((err) => { logger.error(`[upgrade] Remote upgrade orchestration failed for ${instance.slug}: ${err}`); }); } else { runUpgrade(upgrade.id, instance.basePath, instance.slug, options).catch((err) => { logger.error(`[upgrade] Upgrade orchestration failed for ${instance.slug}: ${err}`); }); } return upgrade; } /** * Async REMOTE upgrade runner. * * Flow: * 1. Get RemoteDriver * 2. Mark InstanceUpgrade IN_PROGRESS * 3. Tell agent to start upgrade.sh in --api-mode * 4. Poll agent /upgrade/progress every 2s, mirror to DB * 5. Try /upgrade/result every poll cycle; when present, finalize * 6. On timeout (15 min), mark FAILED and create error event * * Note: there is no shell or filesystem access on the CCP side — everything * goes through the mTLS agent. The agent's spawn of upgrade.sh is itself * fire-and-forget under a slug mutex. */ async function runRemoteUpgrade( upgradeId: string, instance: Instance, options?: StartUpgradeOptions ) { const slug = instance.slug; try { const driver = await getRemoteDriverForInstance({ id: instance.id, slug: instance.slug, isRemote: instance.isRemote, agentUrl: instance.agentUrl, }); // Mark IN_PROGRESS await prisma.instanceUpgrade.update({ where: { id: upgradeId }, data: { status: UpgradeStatus.IN_PROGRESS, progressMessage: 'Starting remote upgrade...', }, }); // Tell the agent to start. The agent has its own mutex + stale-progress // check, so this can return 409 if a previous upgrade is still running. logger.info(`[upgrade] ${slug}: triggering remote upgrade.sh start`); await driver.startUpgrade({ skipBackup: options?.skipBackup, useRegistry: options?.useRegistry, branch: options?.branch, }); // Poll progress + result. We treat /result returning 200 as the signal // that upgrade.sh exited (successfully or with code != 0 — the script // writes result.json either way in --api-mode). const deadline = Date.now() + REMOTE_UPGRADE_TIMEOUT; let lastProgress: { phase?: number; phaseName?: string; percentage?: number; message?: string } = {}; while (Date.now() < deadline) { await new Promise((r) => setTimeout(r, PROGRESS_POLL_INTERVAL)); // Try to fetch the result first; if it exists, we're done let result = null; try { result = await driver.getUpgradeResult(); } catch { // No result yet — keep polling progress } if (result) { // Final result available — write it and exit const upgradeRowBefore = await prisma.instanceUpgrade.findUnique({ where: { id: upgradeId } }); await prisma.instanceUpgrade.update({ where: { id: upgradeId }, data: { status: result.success ? UpgradeStatus.COMPLETED : UpgradeStatus.FAILED, newCommit: result.newCommit || null, commitCount: result.commitCount || 0, percentage: 100, phaseName: 'Complete', progressMessage: result.message || 'Upgrade completed', durationSeconds: result.durationSeconds || null, warnings: result.warnings?.length ? (result.warnings as unknown as Prisma.InputJsonValue) : undefined, errorMessage: result.success ? null : (result.message || 'Upgrade failed'), completedAt: new Date(), }, }); // Update Instance.gitCommit if we have a new commit if (result.newCommit) { await prisma.instance.update({ where: { id: instance.id }, data: { gitCommit: result.newCommit }, }); } if (!result.success) { await createEvent( instance.id, 'ERROR', 'upgrade', 'Remote upgrade failed', result.message || 'The remote upgrade process failed. Check the agent log for details.', { upgradeId, source: 'remote', warnings: result.warnings } ); } await writeUpgradeAuditLog({ upgradeId, instanceId: instance.id, triggeredById: upgradeRowBefore?.triggeredById ?? null, source: 'remote', outcome: result.success ? 'completed' : 'failed', previousCommit: upgradeRowBefore?.previousCommit ?? null, newCommit: result.newCommit || null, durationSeconds: result.durationSeconds || null, errorMessage: result.success ? null : (result.message || 'Upgrade failed'), }); logger.info(`[upgrade] ${slug}: remote upgrade ${result.success ? 'COMPLETED' : 'FAILED'}`); return; } // No result yet — pull progress try { const progress = await driver.getUpgradeProgress(); // Only update DB if something actually changed (avoid hot-loop writes) if ( progress.phase !== lastProgress.phase || progress.percentage !== lastProgress.percentage || progress.message !== lastProgress.message ) { lastProgress = { phase: progress.phase, phaseName: progress.phaseName, percentage: progress.percentage, message: progress.message, }; await prisma.instanceUpgrade.update({ where: { id: upgradeId }, data: { currentPhase: progress.phase || 0, phaseName: progress.phaseName || null, percentage: progress.percentage || 0, progressMessage: progress.message || null, }, }); } } catch (err) { // Transient network blip during a long upgrade — keep polling logger.debug(`[upgrade] ${slug}: progress poll error: ${(err as Error).message}`); } } // Timeout — mark FAILED throw new Error(`Remote upgrade timed out after ${Math.round(REMOTE_UPGRADE_TIMEOUT / 60_000)} minutes`); } catch (err) { const errorMsg = (err as Error).message; const isTimeout = errorMsg.includes('timed out'); const upgradeRowBefore = await prisma.instanceUpgrade.findUnique({ where: { id: upgradeId } }); await prisma.instanceUpgrade.update({ where: { id: upgradeId }, data: { status: UpgradeStatus.FAILED, errorMessage: isTimeout ? errorMsg : errorMsg.slice(0, 2000), progressMessage: 'Failed', completedAt: new Date(), }, }); await createEvent( instance.id, 'ERROR', 'upgrade', isTimeout ? 'Remote upgrade timed out' : 'Remote upgrade failed', errorMsg.slice(0, 500), { upgradeId, source: 'remote' } ); await writeUpgradeAuditLog({ upgradeId, instanceId: instance.id, triggeredById: upgradeRowBefore?.triggeredById ?? null, source: 'remote', outcome: 'orchestration_error', previousCommit: upgradeRowBefore?.previousCommit ?? null, newCommit: null, durationSeconds: null, errorMessage: errorMsg, }); // Don't flip the instance to ERROR state for remote upgrades — the agent // health check will reflect the real state on the next poll, and we don't // want to mask a recovered instance with stale CCP-side ERROR. logger.error(`[upgrade] ${slug}: ${errorMsg}`); } } /** * Async upgrade runner. Runs upgrade.sh and polls progress. */ async function runUpgrade( upgradeId: string, basePath: string, slug: string, options?: StartUpgradeOptions ) { const progressFile = path.join(basePath, 'data', 'upgrade', 'progress.json'); const resultFile = path.join(basePath, 'data', 'upgrade', 'result.json'); const scriptPath = path.join(basePath, 'scripts', 'upgrade.sh'); // Ensure data/upgrade directory exists await fs.mkdir(path.join(basePath, 'data', 'upgrade'), { recursive: true }); // Clean up any stale progress/result files from previous runs await fs.rm(progressFile, { force: true }); await fs.rm(resultFile, { force: true }); // Mark as IN_PROGRESS await prisma.instanceUpgrade.update({ where: { id: upgradeId }, data: { status: UpgradeStatus.IN_PROGRESS, progressMessage: 'Starting upgrade...', }, }); // Build command flags const flags: string[] = ['--api-mode', '--force']; if (options?.skipBackup) flags.push('--skip-backup'); if (options?.useRegistry) flags.push('--use-registry'); if (options?.branch) flags.push('--branch', options.branch); // Start progress polling let pollTimer: NodeJS.Timeout | null = null; pollTimer = setInterval(async () => { try { const raw = await fs.readFile(progressFile, 'utf-8'); const progress = JSON.parse(raw); await prisma.instanceUpgrade.update({ where: { id: upgradeId }, data: { currentPhase: progress.phase || 0, phaseName: progress.phaseName || null, percentage: progress.percentage || 0, progressMessage: progress.message || null, }, }); } catch { // progress.json may not exist yet or be mid-write } }, PROGRESS_POLL_INTERVAL); try { // Run upgrade.sh await exec(`bash "${scriptPath}" ${flags.join(' ')}`, { cwd: basePath, timeout: UPGRADE_TIMEOUT, maxBuffer: 10 * 1024 * 1024, env: { ...process.env, COMPOSE_ANSI: 'never' }, }); // Read result const result = await readResultFile(resultFile); // Read log tail const logTail = await readLatestLogTail(basePath); // Get new commit let newCommit: string | null = null; try { const { stdout } = await exec('git rev-parse --short HEAD', { cwd: basePath, timeout: 5_000 }); newCommit = stdout.trim(); } catch { /* ignore */ } // Update the upgrade record await prisma.instanceUpgrade.update({ where: { id: upgradeId }, data: { status: result.success ? UpgradeStatus.COMPLETED : UpgradeStatus.FAILED, newCommit: result.newCommit || newCommit, commitCount: result.commitCount || 0, percentage: 100, phaseName: 'Complete', progressMessage: result.message || 'Upgrade completed', durationSeconds: result.durationSeconds || null, warnings: result.warnings?.length ? result.warnings : undefined, errorMessage: result.success ? null : (result.message || 'Upgrade failed'), log: logTail, completedAt: new Date(), }, }); // Update instance gitCommit if (newCommit) { await prisma.instance.update({ where: { id: (await prisma.instanceUpgrade.findUnique({ where: { id: upgradeId } }))!.instanceId }, data: { gitCommit: newCommit }, }); } const upgradeRow = await prisma.instanceUpgrade.findUnique({ where: { id: upgradeId } }); if (!result.success && upgradeRow) { // Create error event await createEvent( upgradeRow.instanceId, 'ERROR', 'upgrade', 'Upgrade failed', result.message || 'The upgrade process failed. Check logs for details.', { upgradeId, previousCommit: upgradeRow.previousCommit, warnings: result.warnings } ); } if (upgradeRow) { await writeUpgradeAuditLog({ upgradeId, instanceId: upgradeRow.instanceId, triggeredById: upgradeRow.triggeredById, source: 'local', outcome: result.success ? 'completed' : 'failed', previousCommit: upgradeRow.previousCommit, newCommit: result.newCommit || newCommit, durationSeconds: result.durationSeconds || null, errorMessage: result.success ? null : (result.message || 'Upgrade failed'), }); } logger.info(`[upgrade] ${slug}: Upgrade ${result.success ? 'completed' : 'failed'}`); } catch (err) { const errorMsg = (err as Error).message; const isTimeout = errorMsg.includes('timed out'); // Read whatever result/progress we have const result = await readResultFile(resultFile); const logTail = await readLatestLogTail(basePath); await prisma.instanceUpgrade.update({ where: { id: upgradeId }, data: { status: UpgradeStatus.FAILED, errorMessage: isTimeout ? 'Upgrade timed out after 10 minutes' : errorMsg.slice(0, 2000), progressMessage: 'Failed', log: logTail, completedAt: new Date(), durationSeconds: result.durationSeconds || null, }, }); // Create error event const upgrade = await prisma.instanceUpgrade.findUnique({ where: { id: upgradeId } }); if (upgrade) { await createEvent( upgrade.instanceId, 'ERROR', 'upgrade', isTimeout ? 'Upgrade timed out' : 'Upgrade failed', isTimeout ? 'The upgrade process timed out after 10 minutes.' : errorMsg.slice(0, 500), { upgradeId } ); // Set instance to ERROR state await prisma.instance.update({ where: { id: upgrade.instanceId }, data: { status: InstanceStatus.ERROR, statusMessage: `Upgrade failed: ${isTimeout ? 'timeout' : errorMsg.slice(0, 200)}`, }, }); await writeUpgradeAuditLog({ upgradeId, instanceId: upgrade.instanceId, triggeredById: upgrade.triggeredById, source: 'local', outcome: 'orchestration_error', previousCommit: upgrade.previousCommit, newCommit: null, durationSeconds: result.durationSeconds || null, errorMessage: errorMsg, }); } logger.error(`[upgrade] ${slug}: Upgrade failed: ${errorMsg}`); } finally { if (pollTimer) clearInterval(pollTimer); } } // ─── File Readers ───────────────────────────────────────────────── interface UpgradeResult { success: boolean; message: string; previousCommit?: string; newCommit?: string; commitCount?: number; durationSeconds?: number; warnings?: string[]; } async function readResultFile(resultFile: string): Promise { try { const raw = await fs.readFile(resultFile, 'utf-8'); return JSON.parse(raw); } catch { return { success: false, message: 'No result file found' }; } } async function readLatestLogTail(basePath: string): Promise { try { const logDir = path.join(basePath, 'logs'); const files = await fs.readdir(logDir); const upgradeLog = files .filter((f) => f.startsWith('upgrade-')) .sort() .pop(); if (!upgradeLog) return null; const content = await fs.readFile(path.join(logDir, upgradeLog), 'utf-8'); // Return last 5000 chars to keep DB storage reasonable return content.slice(-5000); } catch { return null; } } // ─── Query Functions ────────────────────────────────────────────── /** * Get the current/latest upgrade progress for an instance. */ export async function getUpgradeStatus(instanceId: string) { return prisma.instanceUpgrade.findFirst({ where: { instanceId }, orderBy: { startedAt: 'desc' }, include: { triggeredBy: { select: { id: true, name: true, email: true } }, }, }); } /** * Get paginated upgrade history for an instance. */ export async function getUpgradeHistory(instanceId: string, page = 1, limit = 20) { const [data, total] = await Promise.all([ prisma.instanceUpgrade.findMany({ where: { instanceId }, orderBy: { startedAt: 'desc' }, skip: (page - 1) * limit, take: limit, include: { triggeredBy: { select: { id: true, name: true, email: true } }, }, }), prisma.instanceUpgrade.count({ where: { instanceId } }), ]); return { data, total, page, limit }; }