|
- // Server-only video transcoding pipeline.
- //
- // Why asynchronous (single FIFO worker):
- // - HEVC iPhone clips can take 30–120 s to transcode → would exceed reverse-proxy
- // timeouts if done inline in the upload response.
- // - State is persisted in data/transcode-jobs.json so we can recover orphan jobs
- // after a server restart.
- //
- // Public API:
- // - enqueueTranscode(inputPath, finalName, originalUploadName) → registers a job
- // and kicks the worker if idle. The caller knows the final URL up-front.
- // - getJob(id), listJobs(), cancelJob(id)
- //
- // All filesystem writes go through a per-file lock (no concurrent writers).
- // Only one ffmpeg process runs at a time.
-
- import { spawn } from 'node:child_process';
- import { mkdir, readFile, writeFile, rename, unlink, stat } from 'node:fs/promises';
- import path from 'node:path';
- import crypto from 'node:crypto';
-
- const UPLOAD_DIR = path.join(process.cwd(), 'data', 'uploads');
- const TMP_DIR = path.join(UPLOAD_DIR, '.tmp');
- const JOBS_PATH = path.join(process.cwd(), 'data', 'transcode-jobs.json');
-
- export type TranscodeStatus = 'queued' | 'running' | 'done' | 'failed' | 'cancelled';
-
- export type TranscodeJob = {
- id: string;
- inputPath: string; // .tmp/<id>.<origExt>
- outputName: string; // final basename in data/uploads/
- originalUploadName: string;
- status: TranscodeStatus;
- progress: number; // 0..1
- durationSec?: number;
- error?: string;
- startedAt?: number;
- finishedAt?: number;
- pid?: number;
- };
-
- let saving: Promise<void> = Promise.resolve();
- let workerRunning = false;
- let ffmpegAvailable: boolean | null = null;
-
- async function ensureDirs() {
- await mkdir(UPLOAD_DIR, { recursive: true });
- await mkdir(TMP_DIR, { recursive: true });
- }
-
- async function loadJobs(): Promise<TranscodeJob[]> {
- try {
- const raw = await readFile(JOBS_PATH, 'utf-8');
- const parsed = JSON.parse(raw);
- return Array.isArray(parsed) ? parsed : [];
- } catch (err) {
- const e = err as NodeJS.ErrnoException;
- if (e.code === 'ENOENT') return [];
- throw err;
- }
- }
-
- async function saveJobs(jobs: TranscodeJob[]): Promise<void> {
- // Serialize writes; readers always see a fully-written file.
- const next = saving.then(async () => {
- await mkdir(path.dirname(JOBS_PATH), { recursive: true });
- const tmp = `${JOBS_PATH}.tmp`;
- await writeFile(tmp, JSON.stringify(jobs, null, 2), 'utf-8');
- await rename(tmp, JOBS_PATH);
- });
- saving = next.catch(() => {}); // never let the chain reject
- return next;
- }
-
- async function updateJob(id: string, patch: Partial<TranscodeJob>): Promise<void> {
- const jobs = await loadJobs();
- const idx = jobs.findIndex(j => j.id === id);
- if (idx < 0) return;
- jobs[idx] = { ...jobs[idx], ...patch };
- await saveJobs(jobs);
- }
-
- export async function getJob(id: string): Promise<TranscodeJob | null> {
- const jobs = await loadJobs();
- return jobs.find(j => j.id === id) ?? null;
- }
-
- export async function listJobs(): Promise<TranscodeJob[]> {
- return loadJobs();
- }
-
- async function checkFfmpeg(): Promise<boolean> {
- if (ffmpegAvailable !== null) return ffmpegAvailable;
- ffmpegAvailable = await new Promise<boolean>(resolve => {
- const p = spawn('ffmpeg', ['-version']);
- p.on('error', () => resolve(false));
- p.on('exit', code => resolve(code === 0));
- });
- if (!ffmpegAvailable) {
- console.warn('[transcode] ffmpeg not found on PATH — video uploads requiring transcode will fail');
- }
- return ffmpegAvailable;
- }
-
- export async function isFfmpegAvailable(): Promise<boolean> {
- return checkFfmpeg();
- }
-
- type Codecs = { video?: string; audio?: string };
- export async function probeCodecs(inputPath: string): Promise<Codecs> {
- const out = await new Promise<string>((resolve, reject) => {
- const p = spawn('ffprobe', [
- '-v', 'error',
- '-show_entries', 'stream=codec_type,codec_name',
- '-of', 'default=noprint_wrappers=1',
- inputPath,
- ]);
- let buf = '';
- p.stdout.on('data', d => { buf += d.toString(); });
- p.on('error', reject);
- p.on('exit', () => resolve(buf));
- });
- const result: Codecs = {};
- // Lines come in pairs: codec_name=..., codec_type=...
- const lines = out.split(/\r?\n/);
- let pendingName: string | undefined;
- for (const line of lines) {
- const m = /^(codec_(?:name|type))=(.+)$/.exec(line.trim());
- if (!m) continue;
- if (m[1] === 'codec_name') pendingName = m[2];
- else if (m[1] === 'codec_type') {
- if (pendingName) {
- if (m[2] === 'video' && !result.video) result.video = pendingName;
- else if (m[2] === 'audio' && !result.audio) result.audio = pendingName;
- }
- pendingName = undefined;
- }
- }
- return result;
- }
-
- export function needsTranscode(codecs: Codecs): boolean {
- const videoOk = codecs.video === 'h264';
- const audioOk = !codecs.audio || codecs.audio === 'aac' || codecs.audio === 'mp3';
- return !(videoOk && audioOk);
- }
-
- async function probeDuration(inputPath: string): Promise<number | undefined> {
- const out = await new Promise<string>((resolve, reject) => {
- const p = spawn('ffprobe', [
- '-v', 'error',
- '-show_entries', 'format=duration',
- '-of', 'default=noprint_wrappers=1:nokey=1',
- inputPath,
- ]);
- let buf = '';
- p.stdout.on('data', d => { buf += d.toString(); });
- p.on('error', reject);
- p.on('exit', () => resolve(buf));
- });
- const n = parseFloat(out.trim());
- return Number.isFinite(n) && n > 0 ? n : undefined;
- }
-
- export async function enqueueTranscode(args: {
- inputPath: string;
- outputName: string;
- originalUploadName: string;
- }): Promise<TranscodeJob> {
- await ensureDirs();
- const available = await checkFfmpeg();
- if (!available) {
- const err = new Error('ffmpeg non disponibile sul server');
- (err as Error & { code?: number }).code = 503;
- throw err;
- }
-
- const id = crypto.randomUUID();
- const duration = await probeDuration(args.inputPath).catch(() => undefined);
- const job: TranscodeJob = {
- id,
- inputPath: args.inputPath,
- outputName: args.outputName,
- originalUploadName: args.originalUploadName,
- status: 'queued',
- progress: 0,
- durationSec: duration,
- };
- const jobs = await loadJobs();
- jobs.push(job);
- await saveJobs(jobs);
- void kickWorker();
- return job;
- }
-
- export async function cancelJob(id: string): Promise<boolean> {
- const job = await getJob(id);
- if (!job) return false;
- if (job.status === 'done' || job.status === 'failed' || job.status === 'cancelled') return false;
-
- if (job.status === 'running' && job.pid) {
- try { process.kill(job.pid, 'SIGTERM'); } catch {}
- }
- await updateJob(id, { status: 'cancelled', finishedAt: Date.now() });
- await cleanupTmpForJob(job);
- return true;
- }
-
- async function cleanupTmpForJob(job: TranscodeJob) {
- // Best-effort cleanup of input + tmp output
- try { await unlink(job.inputPath); } catch {}
- try { await unlink(path.join(TMP_DIR, `${job.id}.mp4`)); } catch {}
- }
-
- // ────────────────────────────────────────────────────────────
- // Worker
-
- async function kickWorker() {
- if (workerRunning) return;
- workerRunning = true;
- try {
- // Recover orphan "running" jobs from a prior crash.
- const jobs = await loadJobs();
- let dirty = false;
- for (const j of jobs) {
- if (j.status === 'running') {
- j.status = 'queued';
- j.pid = undefined;
- dirty = true;
- }
- }
- if (dirty) await saveJobs(jobs);
-
- // Drain the queue.
- while (true) {
- const next = (await loadJobs()).find(j => j.status === 'queued');
- if (!next) break;
- await runJob(next);
- }
- } finally {
- workerRunning = false;
- }
- }
-
- async function runJob(job: TranscodeJob): Promise<void> {
- const outTmp = path.join(TMP_DIR, `${job.id}.mp4`);
- const outFinal = path.join(UPLOAD_DIR, job.outputName);
- await updateJob(job.id, { status: 'running', startedAt: Date.now(), progress: 0 });
-
- const totalUs = job.durationSec ? job.durationSec * 1_000_000 : undefined;
-
- const args = [
- '-y',
- '-i', job.inputPath,
- '-c:v', 'libx264', '-preset', 'fast', '-crf', '23', '-pix_fmt', 'yuv420p',
- '-vf', "scale='min(1280,iw)':'min(720,ih)':force_original_aspect_ratio=decrease,scale=trunc(iw/2)*2:trunc(ih/2)*2",
- '-c:a', 'aac', '-b:a', '128k', '-ac', '2',
- '-movflags', '+faststart',
- '-progress', 'pipe:1',
- outTmp,
- ];
-
- const child = spawn('ffmpeg', args);
- await updateJob(job.id, { pid: child.pid });
-
- // Parse stdout for `-progress` key=value pairs
- let buf = '';
- child.stdout?.on('data', async chunk => {
- buf += chunk.toString();
- const lines = buf.split(/\r?\n/);
- buf = lines.pop() ?? '';
- for (const line of lines) {
- const m = /^out_time_ms=(\d+)/.exec(line);
- if (m && totalUs) {
- const cur = parseInt(m[1], 10);
- const p = Math.max(0, Math.min(1, cur / totalUs));
- await updateJob(job.id, { progress: p });
- }
- }
- });
-
- let stderr = '';
- child.stderr?.on('data', chunk => {
- stderr += chunk.toString();
- if (stderr.length > 8000) stderr = stderr.slice(-8000); // bound memory
- });
-
- const exitCode: number = await new Promise(resolve => {
- child.on('error', () => resolve(-1));
- child.on('exit', code => resolve(code ?? -1));
- });
-
- // Check whether the job was cancelled while running (SIGTERM removed the pid)
- const latest = await getJob(job.id);
- if (latest?.status === 'cancelled') {
- try { await unlink(outTmp); } catch {}
- try { await unlink(job.inputPath); } catch {}
- return;
- }
-
- if (exitCode !== 0) {
- await updateJob(job.id, {
- status: 'failed',
- finishedAt: Date.now(),
- error: stderr.trim().split(/\r?\n/).slice(-5).join('\n'),
- pid: undefined,
- });
- await cleanupTmpForJob(job);
- return;
- }
-
- // Sanity check: the output must exist and be non-trivial.
- try {
- const s = await stat(outTmp);
- if (s.size < 1024) throw new Error('output too small');
- } catch (e) {
- await updateJob(job.id, {
- status: 'failed',
- finishedAt: Date.now(),
- error: `Transcoding produced an invalid output: ${(e as Error).message}. The video may be corrupted; try re-exporting from your source tool.`,
- pid: undefined,
- });
- await cleanupTmpForJob(job);
- return;
- }
-
- await rename(outTmp, outFinal);
- try { await unlink(job.inputPath); } catch {}
- await updateJob(job.id, {
- status: 'done',
- finishedAt: Date.now(),
- progress: 1,
- pid: undefined,
- });
- }
-
- // Auto-recover orphans on module load (lazy via first import).
- void kickWorker();
|