// Server-only video transcoding pipeline. // // Why asynchronous (single FIFO worker): // - HEVC iPhone clips can take 30–120 s to transcode → would exceed reverse-proxy // timeouts if done inline in the upload response. // - State is persisted in data/transcode-jobs.json so we can recover orphan jobs // after a server restart. // // Public API: // - enqueueTranscode(inputPath, finalName, originalUploadName) → registers a job // and kicks the worker if idle. The caller knows the final URL up-front. // - getJob(id), listJobs(), cancelJob(id) // // All filesystem writes go through a per-file lock (no concurrent writers). // Only one ffmpeg process runs at a time. import { spawn } from 'node:child_process'; import { mkdir, readFile, writeFile, rename, unlink, stat } from 'node:fs/promises'; import path from 'node:path'; import crypto from 'node:crypto'; const UPLOAD_DIR = path.join(process.cwd(), 'data', 'uploads'); const TMP_DIR = path.join(UPLOAD_DIR, '.tmp'); const JOBS_PATH = path.join(process.cwd(), 'data', 'transcode-jobs.json'); export type TranscodeStatus = 'queued' | 'running' | 'done' | 'failed' | 'cancelled'; export type TranscodeJob = { id: string; inputPath: string; // .tmp/. outputName: string; // final basename in data/uploads/ originalUploadName: string; status: TranscodeStatus; progress: number; // 0..1 durationSec?: number; error?: string; startedAt?: number; finishedAt?: number; pid?: number; }; let saving: Promise = Promise.resolve(); let workerRunning = false; let ffmpegAvailable: boolean | null = null; async function ensureDirs() { await mkdir(UPLOAD_DIR, { recursive: true }); await mkdir(TMP_DIR, { recursive: true }); } async function loadJobs(): Promise { try { const raw = await readFile(JOBS_PATH, 'utf-8'); const parsed = JSON.parse(raw); return Array.isArray(parsed) ? parsed : []; } catch (err) { const e = err as NodeJS.ErrnoException; if (e.code === 'ENOENT') return []; throw err; } } async function saveJobs(jobs: TranscodeJob[]): Promise { // Serialize writes; readers always see a fully-written file. const next = saving.then(async () => { await mkdir(path.dirname(JOBS_PATH), { recursive: true }); const tmp = `${JOBS_PATH}.tmp`; await writeFile(tmp, JSON.stringify(jobs, null, 2), 'utf-8'); await rename(tmp, JOBS_PATH); }); saving = next.catch(() => {}); // never let the chain reject return next; } async function updateJob(id: string, patch: Partial): Promise { const jobs = await loadJobs(); const idx = jobs.findIndex(j => j.id === id); if (idx < 0) return; jobs[idx] = { ...jobs[idx], ...patch }; await saveJobs(jobs); } export async function getJob(id: string): Promise { const jobs = await loadJobs(); return jobs.find(j => j.id === id) ?? null; } export async function listJobs(): Promise { return loadJobs(); } async function checkFfmpeg(): Promise { if (ffmpegAvailable !== null) return ffmpegAvailable; ffmpegAvailable = await new Promise(resolve => { const p = spawn('ffmpeg', ['-version']); p.on('error', () => resolve(false)); p.on('exit', code => resolve(code === 0)); }); if (!ffmpegAvailable) { console.warn('[transcode] ffmpeg not found on PATH — video uploads requiring transcode will fail'); } return ffmpegAvailable; } export async function isFfmpegAvailable(): Promise { return checkFfmpeg(); } type Codecs = { video?: string; audio?: string }; export async function probeCodecs(inputPath: string): Promise { const out = await new Promise((resolve, reject) => { const p = spawn('ffprobe', [ '-v', 'error', '-show_entries', 'stream=codec_type,codec_name', '-of', 'default=noprint_wrappers=1', inputPath, ]); let buf = ''; p.stdout.on('data', d => { buf += d.toString(); }); p.on('error', reject); p.on('exit', () => resolve(buf)); }); const result: Codecs = {}; // Lines come in pairs: codec_name=..., codec_type=... const lines = out.split(/\r?\n/); let pendingName: string | undefined; for (const line of lines) { const m = /^(codec_(?:name|type))=(.+)$/.exec(line.trim()); if (!m) continue; if (m[1] === 'codec_name') pendingName = m[2]; else if (m[1] === 'codec_type') { if (pendingName) { if (m[2] === 'video' && !result.video) result.video = pendingName; else if (m[2] === 'audio' && !result.audio) result.audio = pendingName; } pendingName = undefined; } } return result; } export function needsTranscode(codecs: Codecs): boolean { const videoOk = codecs.video === 'h264'; const audioOk = !codecs.audio || codecs.audio === 'aac' || codecs.audio === 'mp3'; return !(videoOk && audioOk); } async function probeDuration(inputPath: string): Promise { const out = await new Promise((resolve, reject) => { const p = spawn('ffprobe', [ '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', inputPath, ]); let buf = ''; p.stdout.on('data', d => { buf += d.toString(); }); p.on('error', reject); p.on('exit', () => resolve(buf)); }); const n = parseFloat(out.trim()); return Number.isFinite(n) && n > 0 ? n : undefined; } export async function enqueueTranscode(args: { inputPath: string; outputName: string; originalUploadName: string; }): Promise { await ensureDirs(); const available = await checkFfmpeg(); if (!available) { const err = new Error('ffmpeg non disponibile sul server'); (err as Error & { code?: number }).code = 503; throw err; } const id = crypto.randomUUID(); const duration = await probeDuration(args.inputPath).catch(() => undefined); const job: TranscodeJob = { id, inputPath: args.inputPath, outputName: args.outputName, originalUploadName: args.originalUploadName, status: 'queued', progress: 0, durationSec: duration, }; const jobs = await loadJobs(); jobs.push(job); await saveJobs(jobs); void kickWorker(); return job; } export async function cancelJob(id: string): Promise { const job = await getJob(id); if (!job) return false; if (job.status === 'done' || job.status === 'failed' || job.status === 'cancelled') return false; if (job.status === 'running' && job.pid) { try { process.kill(job.pid, 'SIGTERM'); } catch {} } await updateJob(id, { status: 'cancelled', finishedAt: Date.now() }); await cleanupTmpForJob(job); return true; } async function cleanupTmpForJob(job: TranscodeJob) { // Best-effort cleanup of input + tmp output try { await unlink(job.inputPath); } catch {} try { await unlink(path.join(TMP_DIR, `${job.id}.mp4`)); } catch {} } // ──────────────────────────────────────────────────────────── // Worker async function kickWorker() { if (workerRunning) return; workerRunning = true; try { // Recover orphan "running" jobs from a prior crash. const jobs = await loadJobs(); let dirty = false; for (const j of jobs) { if (j.status === 'running') { j.status = 'queued'; j.pid = undefined; dirty = true; } } if (dirty) await saveJobs(jobs); // Drain the queue. while (true) { const next = (await loadJobs()).find(j => j.status === 'queued'); if (!next) break; await runJob(next); } } finally { workerRunning = false; } } async function runJob(job: TranscodeJob): Promise { const outTmp = path.join(TMP_DIR, `${job.id}.mp4`); const outFinal = path.join(UPLOAD_DIR, job.outputName); await updateJob(job.id, { status: 'running', startedAt: Date.now(), progress: 0 }); const totalUs = job.durationSec ? job.durationSec * 1_000_000 : undefined; const args = [ '-y', '-i', job.inputPath, '-c:v', 'libx264', '-preset', 'fast', '-crf', '23', '-pix_fmt', 'yuv420p', '-vf', "scale='min(1280,iw)':'min(720,ih)':force_original_aspect_ratio=decrease,scale=trunc(iw/2)*2:trunc(ih/2)*2", '-c:a', 'aac', '-b:a', '128k', '-ac', '2', '-movflags', '+faststart', '-progress', 'pipe:1', outTmp, ]; const child = spawn('ffmpeg', args); await updateJob(job.id, { pid: child.pid }); // Parse stdout for `-progress` key=value pairs let buf = ''; child.stdout?.on('data', async chunk => { buf += chunk.toString(); const lines = buf.split(/\r?\n/); buf = lines.pop() ?? ''; for (const line of lines) { const m = /^out_time_ms=(\d+)/.exec(line); if (m && totalUs) { const cur = parseInt(m[1], 10); const p = Math.max(0, Math.min(1, cur / totalUs)); await updateJob(job.id, { progress: p }); } } }); let stderr = ''; child.stderr?.on('data', chunk => { stderr += chunk.toString(); if (stderr.length > 8000) stderr = stderr.slice(-8000); // bound memory }); const exitCode: number = await new Promise(resolve => { child.on('error', () => resolve(-1)); child.on('exit', code => resolve(code ?? -1)); }); // Check whether the job was cancelled while running (SIGTERM removed the pid) const latest = await getJob(job.id); if (latest?.status === 'cancelled') { try { await unlink(outTmp); } catch {} try { await unlink(job.inputPath); } catch {} return; } if (exitCode !== 0) { await updateJob(job.id, { status: 'failed', finishedAt: Date.now(), error: stderr.trim().split(/\r?\n/).slice(-5).join('\n'), pid: undefined, }); await cleanupTmpForJob(job); return; } // Sanity check: the output must exist and be non-trivial. try { const s = await stat(outTmp); if (s.size < 1024) throw new Error('output too small'); } catch (e) { await updateJob(job.id, { status: 'failed', finishedAt: Date.now(), error: `Output non valido: ${(e as Error).message}`, pid: undefined, }); await cleanupTmpForJob(job); return; } await rename(outTmp, outFinal); try { await unlink(job.inputPath); } catch {} await updateJob(job.id, { status: 'done', finishedAt: Date.now(), progress: 1, pid: undefined, }); } // Auto-recover orphans on module load (lazy via first import). void kickWorker();