Building A Distributed Video Transcoding System with Node.js.

Building A Distributed Video Transcoding System with Node.js.

Source: Dev.to

A Distributed Video Transcoding Example ## server.js ## producer.js ## consumer.js Brokers are the hello world of distributed systems, for two reasons: So I thought: why not bring a pure JavaScript broker to Node.js? To be honest, low-level Node.js is super impressive. It took a few days, but it works, with a few optimizations: It can actually be way better, which (by the way) Node supports seamlessly: So here’s an example of an FFmpeg distributed system running purely in Node.js. But first, make sure you have FFmpeg installed and available in your PATH. Test it in the terminal: Start a Node project: Put your secret credentials here: username:password:privileges (see privileges in the repo) Simple, non-TLS setup (TLS is supported - see the GitHub repo): This is the server browsers and other clients talk to. It accepts requests and pushes jobs into the hive. Create the queue if it doesn’t exist: Usually videos come from the client. For the demo, we’ll just read from a local folder: These are the nodes, the workers that pull jobs, transcode videos, and report back. Consume the transcode queue: Open multiple terminals: The nodes can be as many terminals as you want, that’s the parallel, distributed part: This is the entire pattern. Simple, powerful, and it just scales, because the hive is responsible for that. You can take this exact pattern and translate it to RabbitMQ and it’ll work. I built bunnimq mostly as a joke after reading RabbitMQ’s source and how it works… and somehow it worked. But that’s the point: Brokers are the hello world of distributed systems. It’s actually really hard to fail at them. Templates let you quickly answer FAQs or store snippets for re-use. Are you sure you want to hide this comment? It will become hidden in your post, but will still be visible via the comment's permalink. Hide child comments as well For further actions, you may consider blocking this person and/or reporting abuse COMMAND_BLOCK: node <--> node <--> hive / broker <--> client-facing server <--> client node <--> <--> client Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: node <--> node <--> hive / broker <--> client-facing server <--> client node <--> <--> client COMMAND_BLOCK: node <--> node <--> hive / broker <--> client-facing server <--> client node <--> <--> client CODE_BLOCK: // broker.js import Bunny from "bunnimq"; import path from "path"; import { fileURLToPath } from "url"; Bunny({ port: 3000, DEBUG: true, cwd: path.dirname(fileURLToPath(import.meta.url)), // path to the .auth file queue: { Durable: true, MessageExpiry: 60 // 1 hour } }); Enter fullscreen mode Exit fullscreen mode CODE_BLOCK: // broker.js import Bunny from "bunnimq"; import path from "path"; import { fileURLToPath } from "url"; Bunny({ port: 3000, DEBUG: true, cwd: path.dirname(fileURLToPath(import.meta.url)), // path to the .auth file queue: { Durable: true, MessageExpiry: 60 // 1 hour } }); CODE_BLOCK: // broker.js import Bunny from "bunnimq"; import path from "path"; import { fileURLToPath } from "url"; Bunny({ port: 3000, DEBUG: true, cwd: path.dirname(fileURLToPath(import.meta.url)), // path to the .auth file queue: { Durable: true, MessageExpiry: 60 // 1 hour } }); CODE_BLOCK: const buffer = new SharedArrayBuffer(); const worker = new Worker(); // <- Enter fullscreen mode Exit fullscreen mode CODE_BLOCK: const buffer = new SharedArrayBuffer(); const worker = new Worker(); // <- CODE_BLOCK: const buffer = new SharedArrayBuffer(); const worker = new Worker(); // <- CODE_BLOCK: ffmpeg -i img.jpg img.png Enter fullscreen mode Exit fullscreen mode CODE_BLOCK: ffmpeg -i img.jpg img.png CODE_BLOCK: ffmpeg -i img.jpg img.png COMMAND_BLOCK: npm init -y && npm i bunnimq bunnimq-driver Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: npm init -y && npm i bunnimq bunnimq-driver COMMAND_BLOCK: npm init -y && npm i bunnimq bunnimq-driver COMMAND_BLOCK: ffmpegserver/ server.js # <- the hive producer.js # client-facing server consumer.js # node servers / workers .auth # credentials for producer and consumer verification (like .env) Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: ffmpegserver/ server.js # <- the hive producer.js # client-facing server consumer.js # node servers / workers .auth # credentials for producer and consumer verification (like .env) COMMAND_BLOCK: ffmpegserver/ server.js # <- the hive producer.js # client-facing server consumer.js # node servers / workers .auth # credentials for producer and consumer verification (like .env) CODE_BLOCK: sk:mypassword:4 jane:doeeee:1 john:doees:3 Enter fullscreen mode Exit fullscreen mode CODE_BLOCK: sk:mypassword:4 jane:doeeee:1 john:doees:3 CODE_BLOCK: sk:mypassword:4 jane:doeeee:1 john:doees:3 CODE_BLOCK: import Bunny from "bunnimq"; import path from "path"; import { fileURLToPath } from "url"; Bunny({ port: 3000, DEBUG: true, cwd: path.dirname(fileURLToPath(import.meta.url)), // for .auth file queue: { Durable: true, QueueExpiry: 0, MessageExpiry: 3600 } }); Enter fullscreen mode Exit fullscreen mode CODE_BLOCK: import Bunny from "bunnimq"; import path from "path"; import { fileURLToPath } from "url"; Bunny({ port: 3000, DEBUG: true, cwd: path.dirname(fileURLToPath(import.meta.url)), // for .auth file queue: { Durable: true, QueueExpiry: 0, MessageExpiry: 3600 } }); CODE_BLOCK: import Bunny from "bunnimq"; import path from "path"; import { fileURLToPath } from "url"; Bunny({ port: 3000, DEBUG: true, cwd: path.dirname(fileURLToPath(import.meta.url)), // for .auth file queue: { Durable: true, QueueExpiry: 0, MessageExpiry: 3600 } }); CODE_BLOCK: import BunnyMQ from "bunnimq-driver"; import fs from "node:fs/promises"; const bunny = new BunnyMQ({ port: 3000, host: "localhost", username: "sk", password: "mypassword", }); Enter fullscreen mode Exit fullscreen mode CODE_BLOCK: import BunnyMQ from "bunnimq-driver"; import fs from "node:fs/promises"; const bunny = new BunnyMQ({ port: 3000, host: "localhost", username: "sk", password: "mypassword", }); CODE_BLOCK: import BunnyMQ from "bunnimq-driver"; import fs from "node:fs/promises"; const bunny = new BunnyMQ({ port: 3000, host: "localhost", username: "sk", password: "mypassword", }); COMMAND_BLOCK: bunny.queueDeclare( { name: "transcode_queue", config: { QueueExpiry: 60, MessageExpiry: 20, AckExpiry: 10, Durable: true, noAck: false, }, }, (res) => { console.log("Queue creation:", res); } ); Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: bunny.queueDeclare( { name: "transcode_queue", config: { QueueExpiry: 60, MessageExpiry: 20, AckExpiry: 10, Durable: true, noAck: false, }, }, (res) => { console.log("Queue creation:", res); } ); COMMAND_BLOCK: bunny.queueDeclare( { name: "transcode_queue", config: { QueueExpiry: 60, MessageExpiry: 20, AckExpiry: 10, Durable: true, noAck: false, }, }, (res) => { console.log("Queue creation:", res); } ); COMMAND_BLOCK: async function processVideos() { const videos = await fs.readdir( "C:/Users/[path to a folder with videos]/Videos/Capcut/test" ); // usually a storage bucket link for (const video of videos) { const job = { id: Date.now() + Math.random().toString(36).substring(2), input: `C:/Users/[path to a folder with videos]/Videos/Capcut/test/${video}`, outputFormat: "webm", }; // put into the queue bunny.publish("transcode_queue", JSON.stringify(job), (res) => { console.log(`Job ${job.id} published:`, res ? "ok" : "400"); }); } } processVideos(); Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: async function processVideos() { const videos = await fs.readdir( "C:/Users/[path to a folder with videos]/Videos/Capcut/test" ); // usually a storage bucket link for (const video of videos) { const job = { id: Date.now() + Math.random().toString(36).substring(2), input: `C:/Users/[path to a folder with videos]/Videos/Capcut/test/${video}`, outputFormat: "webm", }; // put into the queue bunny.publish("transcode_queue", JSON.stringify(job), (res) => { console.log(`Job ${job.id} published:`, res ? "ok" : "400"); }); } } processVideos(); COMMAND_BLOCK: async function processVideos() { const videos = await fs.readdir( "C:/Users/[path to a folder with videos]/Videos/Capcut/test" ); // usually a storage bucket link for (const video of videos) { const job = { id: Date.now() + Math.random().toString(36).substring(2), input: `C:/Users/[path to a folder with videos]/Videos/Capcut/test/${video}`, outputFormat: "webm", }; // put into the queue bunny.publish("transcode_queue", JSON.stringify(job), (res) => { console.log(`Job ${job.id} published:`, res ? "ok" : "400"); }); } } processVideos(); CODE_BLOCK: import BunnyMQ from "bunnimq-driver"; import { spawn } from "child_process"; import path from "path"; const bunny = new BunnyMQ({ port: 3000, host: "localhost", username: "john", password: "doees", }); Enter fullscreen mode Exit fullscreen mode CODE_BLOCK: import BunnyMQ from "bunnimq-driver"; import { spawn } from "child_process"; import path from "path"; const bunny = new BunnyMQ({ port: 3000, host: "localhost", username: "john", password: "doees", }); CODE_BLOCK: import BunnyMQ from "bunnimq-driver"; import { spawn } from "child_process"; import path from "path"; const bunny = new BunnyMQ({ port: 3000, host: "localhost", username: "john", password: "doees", }); COMMAND_BLOCK: bunny.consume("transcode_queue", async (msg) => { console.log("Received message:", msg); try { const { input, outputFormat } = JSON.parse(msg); // normalize paths const absInput = path.resolve(input); const output = absInput.replace(/\.[^.]+$/, `.${outputFormat}`); console.log( `Spawning: ffmpeg -i "${absInput}" -f ${outputFormat} "${output}" -y` ); await new Promise((resolve, reject) => { const ffmpeg = spawn( "ffmpeg", ["-i", absInput, "-f", outputFormat, output, "-y"], { shell: true } // helps Windows find ffmpeg.exe ); ffmpeg.on("error", reject); // FFmpeg logs to stderr ffmpeg.stderr.on("data", (chunk) => { process.stderr.write(chunk); }); ffmpeg.on("close", (code, signal) => { if (code === 0) { console.log(`Transcoding complete: ${output}`); return resolve( bunny.Ack((ok) => console.log("Ack sent:", ok)) ); } reject( new Error( signal ? `Signaled with ${signal}` : `Exited with code ${code}` ) ); }); }); } catch (error) { console.error("Error processing message:", error); if (bunny.Nack) bunny.Nack(); } }); Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: bunny.consume("transcode_queue", async (msg) => { console.log("Received message:", msg); try { const { input, outputFormat } = JSON.parse(msg); // normalize paths const absInput = path.resolve(input); const output = absInput.replace(/\.[^.]+$/, `.${outputFormat}`); console.log( `Spawning: ffmpeg -i "${absInput}" -f ${outputFormat} "${output}" -y` ); await new Promise((resolve, reject) => { const ffmpeg = spawn( "ffmpeg", ["-i", absInput, "-f", outputFormat, output, "-y"], { shell: true } // helps Windows find ffmpeg.exe ); ffmpeg.on("error", reject); // FFmpeg logs to stderr ffmpeg.stderr.on("data", (chunk) => { process.stderr.write(chunk); }); ffmpeg.on("close", (code, signal) => { if (code === 0) { console.log(`Transcoding complete: ${output}`); return resolve( bunny.Ack((ok) => console.log("Ack sent:", ok)) ); } reject( new Error( signal ? `Signaled with ${signal}` : `Exited with code ${code}` ) ); }); }); } catch (error) { console.error("Error processing message:", error); if (bunny.Nack) bunny.Nack(); } }); COMMAND_BLOCK: bunny.consume("transcode_queue", async (msg) => { console.log("Received message:", msg); try { const { input, outputFormat } = JSON.parse(msg); // normalize paths const absInput = path.resolve(input); const output = absInput.replace(/\.[^.]+$/, `.${outputFormat}`); console.log( `Spawning: ffmpeg -i "${absInput}" -f ${outputFormat} "${output}" -y` ); await new Promise((resolve, reject) => { const ffmpeg = spawn( "ffmpeg", ["-i", absInput, "-f", outputFormat, output, "-y"], { shell: true } // helps Windows find ffmpeg.exe ); ffmpeg.on("error", reject); // FFmpeg logs to stderr ffmpeg.stderr.on("data", (chunk) => { process.stderr.write(chunk); }); ffmpeg.on("close", (code, signal) => { if (code === 0) { console.log(`Transcoding complete: ${output}`); return resolve( bunny.Ack((ok) => console.log("Ack sent:", ok)) ); } reject( new Error( signal ? `Signaled with ${signal}` : `Exited with code ${code}` ) ); }); }); } catch (error) { console.error("Error processing message:", error); if (bunny.Nack) bunny.Nack(); } }); COMMAND_BLOCK: node .\server.js # terminal 1 node .\producer.js # terminal 2 Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: node .\server.js # terminal 1 node .\producer.js # terminal 2 COMMAND_BLOCK: node .\server.js # terminal 1 node .\producer.js # terminal 2 CODE_BLOCK: node .\consumer.js Enter fullscreen mode Exit fullscreen mode CODE_BLOCK: node .\consumer.js CODE_BLOCK: node .\consumer.js - Easy to get up and running - They enforce the hive / master-node pattern, which scales naturally - Object → binary compiler - SharedArrayBuffers and threads