Você não pode selecionar mais de 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.
 
 

339 linhas
10 KiB

  1. // Server-only video transcoding pipeline.
  2. //
  3. // Why asynchronous (single FIFO worker):
  4. // - HEVC iPhone clips can take 30–120 s to transcode → would exceed reverse-proxy
  5. // timeouts if done inline in the upload response.
  6. // - State is persisted in data/transcode-jobs.json so we can recover orphan jobs
  7. // after a server restart.
  8. //
  9. // Public API:
  10. // - enqueueTranscode(inputPath, finalName, originalUploadName) → registers a job
  11. // and kicks the worker if idle. The caller knows the final URL up-front.
  12. // - getJob(id), listJobs(), cancelJob(id)
  13. //
  14. // All filesystem writes go through a per-file lock (no concurrent writers).
  15. // Only one ffmpeg process runs at a time.
  16. import { spawn } from 'node:child_process';
  17. import { mkdir, readFile, writeFile, rename, unlink, stat } from 'node:fs/promises';
  18. import path from 'node:path';
  19. import crypto from 'node:crypto';
  20. const UPLOAD_DIR = path.join(process.cwd(), 'data', 'uploads');
  21. const TMP_DIR = path.join(UPLOAD_DIR, '.tmp');
  22. const JOBS_PATH = path.join(process.cwd(), 'data', 'transcode-jobs.json');
  23. export type TranscodeStatus = 'queued' | 'running' | 'done' | 'failed' | 'cancelled';
  24. export type TranscodeJob = {
  25. id: string;
  26. inputPath: string; // .tmp/<id>.<origExt>
  27. outputName: string; // final basename in data/uploads/
  28. originalUploadName: string;
  29. status: TranscodeStatus;
  30. progress: number; // 0..1
  31. durationSec?: number;
  32. error?: string;
  33. startedAt?: number;
  34. finishedAt?: number;
  35. pid?: number;
  36. };
  37. let saving: Promise<void> = Promise.resolve();
  38. let workerRunning = false;
  39. let ffmpegAvailable: boolean | null = null;
  40. async function ensureDirs() {
  41. await mkdir(UPLOAD_DIR, { recursive: true });
  42. await mkdir(TMP_DIR, { recursive: true });
  43. }
  44. async function loadJobs(): Promise<TranscodeJob[]> {
  45. try {
  46. const raw = await readFile(JOBS_PATH, 'utf-8');
  47. const parsed = JSON.parse(raw);
  48. return Array.isArray(parsed) ? parsed : [];
  49. } catch (err) {
  50. const e = err as NodeJS.ErrnoException;
  51. if (e.code === 'ENOENT') return [];
  52. throw err;
  53. }
  54. }
  55. async function saveJobs(jobs: TranscodeJob[]): Promise<void> {
  56. // Serialize writes; readers always see a fully-written file.
  57. const next = saving.then(async () => {
  58. await mkdir(path.dirname(JOBS_PATH), { recursive: true });
  59. const tmp = `${JOBS_PATH}.tmp`;
  60. await writeFile(tmp, JSON.stringify(jobs, null, 2), 'utf-8');
  61. await rename(tmp, JOBS_PATH);
  62. });
  63. saving = next.catch(() => {}); // never let the chain reject
  64. return next;
  65. }
  66. async function updateJob(id: string, patch: Partial<TranscodeJob>): Promise<void> {
  67. const jobs = await loadJobs();
  68. const idx = jobs.findIndex(j => j.id === id);
  69. if (idx < 0) return;
  70. jobs[idx] = { ...jobs[idx], ...patch };
  71. await saveJobs(jobs);
  72. }
  73. export async function getJob(id: string): Promise<TranscodeJob | null> {
  74. const jobs = await loadJobs();
  75. return jobs.find(j => j.id === id) ?? null;
  76. }
  77. export async function listJobs(): Promise<TranscodeJob[]> {
  78. return loadJobs();
  79. }
  80. async function checkFfmpeg(): Promise<boolean> {
  81. if (ffmpegAvailable !== null) return ffmpegAvailable;
  82. ffmpegAvailable = await new Promise<boolean>(resolve => {
  83. const p = spawn('ffmpeg', ['-version']);
  84. p.on('error', () => resolve(false));
  85. p.on('exit', code => resolve(code === 0));
  86. });
  87. if (!ffmpegAvailable) {
  88. console.warn('[transcode] ffmpeg not found on PATH — video uploads requiring transcode will fail');
  89. }
  90. return ffmpegAvailable;
  91. }
  92. export async function isFfmpegAvailable(): Promise<boolean> {
  93. return checkFfmpeg();
  94. }
  95. type Codecs = { video?: string; audio?: string };
  96. export async function probeCodecs(inputPath: string): Promise<Codecs> {
  97. const out = await new Promise<string>((resolve, reject) => {
  98. const p = spawn('ffprobe', [
  99. '-v', 'error',
  100. '-show_entries', 'stream=codec_type,codec_name',
  101. '-of', 'default=noprint_wrappers=1',
  102. inputPath,
  103. ]);
  104. let buf = '';
  105. p.stdout.on('data', d => { buf += d.toString(); });
  106. p.on('error', reject);
  107. p.on('exit', () => resolve(buf));
  108. });
  109. const result: Codecs = {};
  110. // Lines come in pairs: codec_name=..., codec_type=...
  111. const lines = out.split(/\r?\n/);
  112. let pendingName: string | undefined;
  113. for (const line of lines) {
  114. const m = /^(codec_(?:name|type))=(.+)$/.exec(line.trim());
  115. if (!m) continue;
  116. if (m[1] === 'codec_name') pendingName = m[2];
  117. else if (m[1] === 'codec_type') {
  118. if (pendingName) {
  119. if (m[2] === 'video' && !result.video) result.video = pendingName;
  120. else if (m[2] === 'audio' && !result.audio) result.audio = pendingName;
  121. }
  122. pendingName = undefined;
  123. }
  124. }
  125. return result;
  126. }
  127. export function needsTranscode(codecs: Codecs): boolean {
  128. const videoOk = codecs.video === 'h264';
  129. const audioOk = !codecs.audio || codecs.audio === 'aac' || codecs.audio === 'mp3';
  130. return !(videoOk && audioOk);
  131. }
  132. async function probeDuration(inputPath: string): Promise<number | undefined> {
  133. const out = await new Promise<string>((resolve, reject) => {
  134. const p = spawn('ffprobe', [
  135. '-v', 'error',
  136. '-show_entries', 'format=duration',
  137. '-of', 'default=noprint_wrappers=1:nokey=1',
  138. inputPath,
  139. ]);
  140. let buf = '';
  141. p.stdout.on('data', d => { buf += d.toString(); });
  142. p.on('error', reject);
  143. p.on('exit', () => resolve(buf));
  144. });
  145. const n = parseFloat(out.trim());
  146. return Number.isFinite(n) && n > 0 ? n : undefined;
  147. }
  148. export async function enqueueTranscode(args: {
  149. inputPath: string;
  150. outputName: string;
  151. originalUploadName: string;
  152. }): Promise<TranscodeJob> {
  153. await ensureDirs();
  154. const available = await checkFfmpeg();
  155. if (!available) {
  156. const err = new Error('ffmpeg non disponibile sul server');
  157. (err as Error & { code?: number }).code = 503;
  158. throw err;
  159. }
  160. const id = crypto.randomUUID();
  161. const duration = await probeDuration(args.inputPath).catch(() => undefined);
  162. const job: TranscodeJob = {
  163. id,
  164. inputPath: args.inputPath,
  165. outputName: args.outputName,
  166. originalUploadName: args.originalUploadName,
  167. status: 'queued',
  168. progress: 0,
  169. durationSec: duration,
  170. };
  171. const jobs = await loadJobs();
  172. jobs.push(job);
  173. await saveJobs(jobs);
  174. void kickWorker();
  175. return job;
  176. }
  177. export async function cancelJob(id: string): Promise<boolean> {
  178. const job = await getJob(id);
  179. if (!job) return false;
  180. if (job.status === 'done' || job.status === 'failed' || job.status === 'cancelled') return false;
  181. if (job.status === 'running' && job.pid) {
  182. try { process.kill(job.pid, 'SIGTERM'); } catch {}
  183. }
  184. await updateJob(id, { status: 'cancelled', finishedAt: Date.now() });
  185. await cleanupTmpForJob(job);
  186. return true;
  187. }
  188. async function cleanupTmpForJob(job: TranscodeJob) {
  189. // Best-effort cleanup of input + tmp output
  190. try { await unlink(job.inputPath); } catch {}
  191. try { await unlink(path.join(TMP_DIR, `${job.id}.mp4`)); } catch {}
  192. }
  193. // ────────────────────────────────────────────────────────────
  194. // Worker
  195. async function kickWorker() {
  196. if (workerRunning) return;
  197. workerRunning = true;
  198. try {
  199. // Recover orphan "running" jobs from a prior crash.
  200. const jobs = await loadJobs();
  201. let dirty = false;
  202. for (const j of jobs) {
  203. if (j.status === 'running') {
  204. j.status = 'queued';
  205. j.pid = undefined;
  206. dirty = true;
  207. }
  208. }
  209. if (dirty) await saveJobs(jobs);
  210. // Drain the queue.
  211. while (true) {
  212. const next = (await loadJobs()).find(j => j.status === 'queued');
  213. if (!next) break;
  214. await runJob(next);
  215. }
  216. } finally {
  217. workerRunning = false;
  218. }
  219. }
  220. async function runJob(job: TranscodeJob): Promise<void> {
  221. const outTmp = path.join(TMP_DIR, `${job.id}.mp4`);
  222. const outFinal = path.join(UPLOAD_DIR, job.outputName);
  223. await updateJob(job.id, { status: 'running', startedAt: Date.now(), progress: 0 });
  224. const totalUs = job.durationSec ? job.durationSec * 1_000_000 : undefined;
  225. const args = [
  226. '-y',
  227. '-i', job.inputPath,
  228. '-c:v', 'libx264', '-preset', 'fast', '-crf', '23', '-pix_fmt', 'yuv420p',
  229. '-vf', "scale='min(1280,iw)':'min(720,ih)':force_original_aspect_ratio=decrease,scale=trunc(iw/2)*2:trunc(ih/2)*2",
  230. '-c:a', 'aac', '-b:a', '128k', '-ac', '2',
  231. '-movflags', '+faststart',
  232. '-progress', 'pipe:1',
  233. outTmp,
  234. ];
  235. const child = spawn('ffmpeg', args);
  236. await updateJob(job.id, { pid: child.pid });
  237. // Parse stdout for `-progress` key=value pairs
  238. let buf = '';
  239. child.stdout?.on('data', async chunk => {
  240. buf += chunk.toString();
  241. const lines = buf.split(/\r?\n/);
  242. buf = lines.pop() ?? '';
  243. for (const line of lines) {
  244. const m = /^out_time_ms=(\d+)/.exec(line);
  245. if (m && totalUs) {
  246. const cur = parseInt(m[1], 10);
  247. const p = Math.max(0, Math.min(1, cur / totalUs));
  248. await updateJob(job.id, { progress: p });
  249. }
  250. }
  251. });
  252. let stderr = '';
  253. child.stderr?.on('data', chunk => {
  254. stderr += chunk.toString();
  255. if (stderr.length > 8000) stderr = stderr.slice(-8000); // bound memory
  256. });
  257. const exitCode: number = await new Promise(resolve => {
  258. child.on('error', () => resolve(-1));
  259. child.on('exit', code => resolve(code ?? -1));
  260. });
  261. // Check whether the job was cancelled while running (SIGTERM removed the pid)
  262. const latest = await getJob(job.id);
  263. if (latest?.status === 'cancelled') {
  264. try { await unlink(outTmp); } catch {}
  265. try { await unlink(job.inputPath); } catch {}
  266. return;
  267. }
  268. if (exitCode !== 0) {
  269. await updateJob(job.id, {
  270. status: 'failed',
  271. finishedAt: Date.now(),
  272. error: stderr.trim().split(/\r?\n/).slice(-5).join('\n'),
  273. pid: undefined,
  274. });
  275. await cleanupTmpForJob(job);
  276. return;
  277. }
  278. // Sanity check: the output must exist and be non-trivial.
  279. try {
  280. const s = await stat(outTmp);
  281. if (s.size < 1024) throw new Error('output too small');
  282. } catch (e) {
  283. await updateJob(job.id, {
  284. status: 'failed',
  285. finishedAt: Date.now(),
  286. error: `Transcoding produced an invalid output: ${(e as Error).message}. The video may be corrupted; try re-exporting from your source tool.`,
  287. pid: undefined,
  288. });
  289. await cleanupTmpForJob(job);
  290. return;
  291. }
  292. await rename(outTmp, outFinal);
  293. try { await unlink(job.inputPath); } catch {}
  294. await updateJob(job.id, {
  295. status: 'done',
  296. finishedAt: Date.now(),
  297. progress: 1,
  298. pid: undefined,
  299. });
  300. }
  301. // Auto-recover orphans on module load (lazy via first import).
  302. void kickWorker();