Tools: From Chaos to Clarity: Building a Robust Log Aggregation Pipeline with Kafka (KRaft) and ELK

Tools: From Chaos to Clarity: Building a Robust Log Aggregation Pipeline with Kafka (KRaft) and ELK

Source: Dev.to

From Chaos to Clarity: Building a Robust Log Aggregation Pipeline with Kafka (KRaft) and ELK ## 🏗️ High-Level Architecture ## Phase 1: The Backbone — Setting up the Kafka Cluster (KRaft Mode) ## 1. Configuration ## 2. Initialization & Startup ## Phase 2: The Source — Generating "Fake" Microservice Logs ## The Simulator (microservice.js) ## Orchestration with Docker Compose ## Phase 3: The Shipper — Filebeat to Kafka ## Phase 4: The Processor — Logstash & ELK ## 1. Start Elasticsearch & Kibana ## 2. Configure Logstash ## Phase 5: Visualization & Analysis ## Conclusion As systems grow into microservices, logs become fragmented. Debugging an issue often means SSH-ing into multiple containers or Grepping through scattered files. This is where Centralized Log Aggregation becomes non-negotiable. In this guide, we are going to build a production-grade logging pipeline from scratch. We will decouple log shipping from log indexing using Apache Kafka (running in the modern, Zookeeper-less KRaft mode), process data with Logstash, and visualize it all in Kibana. Whether you are a Junior Developer looking to understand the full flow or a Principal Engineer architecting a decoupled observability layer, this setup has something for you. We are skipping Zookeeper and using Kafka's new KRaft mode for metadata management. We will set up a cluster with 1 Controller and 3 Brokers. Download Kafka and navigate to your kafka/config directory. You will need to create/modify four specific properties files. A. The Controller (controller.properties) This node manages the cluster state. (Note: Ensure node.id and process.roles are distinct for the controller). B. Broker 1 (broker-1.properties) C. Broker 2 (broker-2.properties) Note the different ports (9095) to run on the same machine. D. Broker 3 (broker-3.properties) Running on port 9096. First, generate a unique Cluster UUID and format the storage directories. Now, start them up in separate terminals: To verify our cluster is healthy, we can use Kafbat UI (formerly Kafka UI) running on port 8080. Above: Our local cluster with 3 brokers fully operational. To test a pipeline, we need data. I created a Node.js simulator using winston that generates structured JSON logs for fictitious services (Auth, Payments, Orders, etc.). The script randomizes HTTP status codes, latency, and log levels to mimic real-world traffic. Key Code Snippet (Structured JSON): Our package.json looks like this. Our Dockerfile looks like this, we will use this in our Docker Compose We spin up multiple containers sharing a common volume for logs. Instead of writing directly to Logstash, we use Filebeat. Its lightweight and handles backpressure well. Configuration (filebeat.yml) We configure it to read the logs generated by our Docker containers and output directly to our Kafka broker. Checking Kafbat UI again, we can now see messages flooding into the logs topic! Above: JSON logs arriving in the Kafka topic. Now for the heavy lifting. We need to extract the data from Kafka and push it into Elasticsearch. Using Docker Compose, spin up the standard ELK stack. (Note: Kibana runs on port 5601). At this stage, Index Management in Kibana will show No Indices because we haven't processed anything yet. We create a pipeline (logstash-sample.conf) that acts as a consumer group for Kafka. Once Logstash starts running, head back to Kibana. Above: A complete view of our microservices' health. We just built a fully decoupled logging architecture. By placing Kafka in the middle, we ensured that if Elasticsearch goes down for maintenance, our logs are safe in the Kafka queue, waiting to be replayed. This is the standard pattern for high-scale observability. Templates let you quickly answer FAQs or store snippets for re-use. Are you sure you want to hide this comment? It will become hidden in your post, but will still be visible via the comment's permalink. Hide child comments as well For further actions, you may consider blocking this person and/or reporting abuse CODE_BLOCK: process.roles=controller node.id=1 controller.quorum.bootstrap.servers=localhost:9093 listeners=CONTROLLER://localhost:9093 log.dirs=/tmp/kraft-controller-logs Enter fullscreen mode Exit fullscreen mode CODE_BLOCK: process.roles=controller node.id=1 controller.quorum.bootstrap.servers=localhost:9093 listeners=CONTROLLER://localhost:9093 log.dirs=/tmp/kraft-controller-logs CODE_BLOCK: process.roles=controller node.id=1 controller.quorum.bootstrap.servers=localhost:9093 listeners=CONTROLLER://localhost:9093 log.dirs=/tmp/kraft-controller-logs CODE_BLOCK: node.id=2 process.roles=broker controller.quorum.bootstrap.servers=localhost:9093 listeners=PLAINTEXT://localhost:9092 advertised.listeners=PLAINTEXT://localhost:9092 log.dirs=/workspaces/codespaces-blank/tmp/kraft-broker-logs-1 Enter fullscreen mode Exit fullscreen mode CODE_BLOCK: node.id=2 process.roles=broker controller.quorum.bootstrap.servers=localhost:9093 listeners=PLAINTEXT://localhost:9092 advertised.listeners=PLAINTEXT://localhost:9092 log.dirs=/workspaces/codespaces-blank/tmp/kraft-broker-logs-1 CODE_BLOCK: node.id=2 process.roles=broker controller.quorum.bootstrap.servers=localhost:9093 listeners=PLAINTEXT://localhost:9092 advertised.listeners=PLAINTEXT://localhost:9092 log.dirs=/workspaces/codespaces-blank/tmp/kraft-broker-logs-1 CODE_BLOCK: node.id=3 process.roles=broker controller.quorum.bootstrap.servers=localhost:9093 listeners=PLAINTEXT://localhost:9095 advertised.listeners=PLAINTEXT://localhost:9095 log.dirs=/workspaces/codespaces-blank/tmp/kraft-broker-logs-2 Enter fullscreen mode Exit fullscreen mode CODE_BLOCK: node.id=3 process.roles=broker controller.quorum.bootstrap.servers=localhost:9093 listeners=PLAINTEXT://localhost:9095 advertised.listeners=PLAINTEXT://localhost:9095 log.dirs=/workspaces/codespaces-blank/tmp/kraft-broker-logs-2 CODE_BLOCK: node.id=3 process.roles=broker controller.quorum.bootstrap.servers=localhost:9093 listeners=PLAINTEXT://localhost:9095 advertised.listeners=PLAINTEXT://localhost:9095 log.dirs=/workspaces/codespaces-blank/tmp/kraft-broker-logs-2 CODE_BLOCK: node.id=4 process.roles=broker controller.quorum.bootstrap.servers=localhost:9093 listeners=PLAINTEXT://localhost:9096 advertised.listeners=PLAINTEXT://localhost:9096 log.dirs=/workspaces/codespaces-blank/tmp/kraft-broker-logs-3 Enter fullscreen mode Exit fullscreen mode CODE_BLOCK: node.id=4 process.roles=broker controller.quorum.bootstrap.servers=localhost:9093 listeners=PLAINTEXT://localhost:9096 advertised.listeners=PLAINTEXT://localhost:9096 log.dirs=/workspaces/codespaces-blank/tmp/kraft-broker-logs-3 CODE_BLOCK: node.id=4 process.roles=broker controller.quorum.bootstrap.servers=localhost:9093 listeners=PLAINTEXT://localhost:9096 advertised.listeners=PLAINTEXT://localhost:9096 log.dirs=/workspaces/codespaces-blank/tmp/kraft-broker-logs-3 COMMAND_BLOCK: # Generate UUID KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)" # Format Log Directories bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/controller.properties bin/kafka-storage.sh format --cluster-id $KAFKA_CLUSTER_ID --config config/broker-1.properties --no-initial-controllers bin/kafka-storage.sh format --cluster-id $KAFKA_CLUSTER_ID --config config/broker-2.properties --no-initial-controllers bin/kafka-storage.sh format --cluster-id $KAFKA_CLUSTER_ID --config config/broker-3.properties --no-initial-controllers Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # Generate UUID KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)" # Format Log Directories bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/controller.properties bin/kafka-storage.sh format --cluster-id $KAFKA_CLUSTER_ID --config config/broker-1.properties --no-initial-controllers bin/kafka-storage.sh format --cluster-id $KAFKA_CLUSTER_ID --config config/broker-2.properties --no-initial-controllers bin/kafka-storage.sh format --cluster-id $KAFKA_CLUSTER_ID --config config/broker-3.properties --no-initial-controllers COMMAND_BLOCK: # Generate UUID KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)" # Format Log Directories bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/controller.properties bin/kafka-storage.sh format --cluster-id $KAFKA_CLUSTER_ID --config config/broker-1.properties --no-initial-controllers bin/kafka-storage.sh format --cluster-id $KAFKA_CLUSTER_ID --config config/broker-2.properties --no-initial-controllers bin/kafka-storage.sh format --cluster-id $KAFKA_CLUSTER_ID --config config/broker-3.properties --no-initial-controllers CODE_BLOCK: bin/kafka-server-start.sh config/controller.properties bin/kafka-server-start.sh config/broker-1.properties bin/kafka-server-start.sh config/broker-2.properties bin/kafka-server-start.sh config/broker-3.properties Enter fullscreen mode Exit fullscreen mode CODE_BLOCK: bin/kafka-server-start.sh config/controller.properties bin/kafka-server-start.sh config/broker-1.properties bin/kafka-server-start.sh config/broker-2.properties bin/kafka-server-start.sh config/broker-3.properties CODE_BLOCK: bin/kafka-server-start.sh config/controller.properties bin/kafka-server-start.sh config/broker-1.properties bin/kafka-server-start.sh config/broker-2.properties bin/kafka-server-start.sh config/broker-3.properties COMMAND_BLOCK: /** * fake-logs.js * * Usage: * npm init -y * npm i winston * node fake-logs.js * * Environment variables: * MICROSERVICES (default: 6) -> number of services to simulate * SERVICE_NAMES (optional) -> comma separated names, overrides MICROSERVICES * LOG_FREQ_MS (default: 800) -> avg ms between logs per service (randomized) * LOG_LEVEL (default: info) -> winston log level * LOG_DIR (default: ./logs) -> directory for per-service files * ERROR_RATE (default: 0.08) -> fraction of logs that are errors (0..1) * TRACE_RATE (default: 0.02) -> fraction of logs that include stack traces * RANDOM_SEED (optional) -> for deterministic randomness if desired * * Example: * MICROSERVICES=6 LOG_FREQ_MS=500 LOG_LEVEL=debug node fake-logs.js */ const fs = require('fs'); const path = require('path'); const { createLogger, format, transports } = require('winston'); const env = process.env; const DEFAULTS = { MICROSERVICES: 6, LOG_FREQ_MS: 800, LOG_LEVEL: 'info', LOG_DIR: './logs', ERROR_RATE: 0.08, TRACE_RATE: 0.02 }; const MICRO = parseInt(env.MICROSERVICES || DEFAULTS.MICROSERVICES, 10); const LOG_FREQ_MS = parseInt(env.LOG_FREQ_MS || DEFAULTS.LOG_FREQ_MS, 10); const LOG_LEVEL = env.LOG_LEVEL || DEFAULTS.LOG_LEVEL; const LOG_DIR = env.LOG_DIR || DEFAULTS.LOG_DIR; const ERROR_RATE = parseFloat(env.ERROR_RATE ?? DEFAULTS.ERROR_RATE); const TRACE_RATE = parseFloat(env.TRACE_RATE ?? DEFAULTS.TRACE_RATE); if (!fs.existsSync(LOG_DIR)) fs.mkdirSync(LOG_DIR, { recursive: true }); const providedNames = env.SERVICE_NAMES ? env.SERVICE_NAMES.split(',').map(s => s.trim()).filter(Boolean) : []; const serviceNames = providedNames.length ? providedNames : Array.from({ length: MICRO }).map((_, i) => `service-${i + 1}`); // helper deterministic-ish RNG if seed provided let rnd = Math.random; if (env.RANDOM_SEED) { let seed = parseInt(env.RANDOM_SEED, 10) || 1; rnd = () => { // xorshift32 seed ^= seed << 13; seed ^= seed >>> 17; seed ^= seed << 5; // normalize return (seed >>> 0) / 0xffffffff; }; } function randomInt(min, max) { return Math.floor(rnd() * (max - min + 1)) + min; } function pick(arr) { return arr[Math.floor(rnd() * arr.length)]; } // sample random generators const userIds = Array.from({ length: 50 }).map((_, i) => `user-${1000 + i}`); const endpoints = ['/api/v1/login', '/api/v1/orders', '/api/v1/products', '/api/v1/cart', '/api/v1/search', '/health']; const httpMethods = ['GET', 'POST', 'PUT', 'DELETE', 'PATCH']; const messages = [ 'cache miss', 'cache hit', 'db query success', 'db connection timeout', 'order processed', 'payment failed', 'auth success', 'token expired', 'validation error', 'background job completed' ]; function createServiceLogger(name) { const filename = path.join(LOG_DIR, `service-${name}.log`); const logger = createLogger({ level: LOG_LEVEL, format: format.combine( format.timestamp(), // structured JSON — easy to ship to Kafka/Logstash format.printf(info => { // ensure message is JSON-friendly const base = { timestamp: info.timestamp, service: name, level: info.level, message: info.message, meta: info.meta || {} }; return JSON.stringify(base); }) ), transports: [ new transports.Console({ stderrLevels: ['error', 'warn'] }), new transports.File({ filename, maxsize: 10 * 1024 * 1024 }) // 10MB (no rotation lib required) ] }); return logger; } function generateLogForService(logger, name) { // Simulate either HTTP access log, app event, or error const isHttp = rnd() < 0.7; const levelRoll = rnd(); const isError = levelRoll < ERROR_RATE; const includeTrace = rnd() < TRACE_RATE; if (isHttp) { const method = pick(httpMethods); const endpoint = pick(endpoints); const latency = randomInt(5, 1200); // ms const status = isError ? pick([500, 502, 503, 504, 400]) : (latency > 1000 ? 504 : (rnd() < 0.9 ? 200 : 201)); const user = pick(userIds); const msg = `${method} ${endpoint} ${status} ${latency}ms`; logger.log({ level: isError ? 'error' : 'info', message: msg, meta: { type: 'http', method, endpoint, status, latency, userId: user, requestId: `r-${Date.now().toString(36)}-${randomInt(1000,9999)}` } }); } else { // app event const msg = pick(messages); const severity = isError ? 'error' : (rnd() < 0.2 ? 'warn' : 'info'); const meta = { type: 'app', event: msg, userId: rnd() < 0.5 ? pick(userIds) : undefined, correlationId: `c-${randomInt(100000, 999999)}` }; if (includeTrace) { meta.stack = `Error: ${msg} at /app/module.js:${randomInt(10,200)}:${randomInt(2,80)}\n at processTicksAndRejections (internal/process/task_queues.js:93:5)`; } logger.log({ level: severity, message: msg, meta }); } } // create all service loggers const services = serviceNames.map(name => { const logger = createServiceLogger(name); return { name, logger }; }); // main loop: each service emits logs at random-ish intervals services.forEach(({ name, logger }) => { // stagger start setTimeout(() => { (function emitLoop() { try { generateLogForService(logger, name); } catch (err) { // Logging errors shouldn't crash the whole simulator console.error(`[sim-error] ${name} logger crashed:`, err && err.stack ? err.stack : err); } // random jitter around configured frequency const jitter = Math.floor((rnd() - 0.5) * 0.6 * LOG_FREQ_MS); const next = Math.max(100, LOG_FREQ_MS + jitter); setTimeout(emitLoop, next); })(); }, randomInt(0, LOG_FREQ_MS)); }); // simple status output so user knows it's running console.log(JSON.stringify({ event: 'fake-logs-started', services: serviceNames, logDir: LOG_DIR, now: new Date().toISOString(), notes: 'Structured JSON logs written to console and files per service. Tail logs and push to Kafka/Logstash for ELK.' })); Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: /** * fake-logs.js * * Usage: * npm init -y * npm i winston * node fake-logs.js * * Environment variables: * MICROSERVICES (default: 6) -> number of services to simulate * SERVICE_NAMES (optional) -> comma separated names, overrides MICROSERVICES * LOG_FREQ_MS (default: 800) -> avg ms between logs per service (randomized) * LOG_LEVEL (default: info) -> winston log level * LOG_DIR (default: ./logs) -> directory for per-service files * ERROR_RATE (default: 0.08) -> fraction of logs that are errors (0..1) * TRACE_RATE (default: 0.02) -> fraction of logs that include stack traces * RANDOM_SEED (optional) -> for deterministic randomness if desired * * Example: * MICROSERVICES=6 LOG_FREQ_MS=500 LOG_LEVEL=debug node fake-logs.js */ const fs = require('fs'); const path = require('path'); const { createLogger, format, transports } = require('winston'); const env = process.env; const DEFAULTS = { MICROSERVICES: 6, LOG_FREQ_MS: 800, LOG_LEVEL: 'info', LOG_DIR: './logs', ERROR_RATE: 0.08, TRACE_RATE: 0.02 }; const MICRO = parseInt(env.MICROSERVICES || DEFAULTS.MICROSERVICES, 10); const LOG_FREQ_MS = parseInt(env.LOG_FREQ_MS || DEFAULTS.LOG_FREQ_MS, 10); const LOG_LEVEL = env.LOG_LEVEL || DEFAULTS.LOG_LEVEL; const LOG_DIR = env.LOG_DIR || DEFAULTS.LOG_DIR; const ERROR_RATE = parseFloat(env.ERROR_RATE ?? DEFAULTS.ERROR_RATE); const TRACE_RATE = parseFloat(env.TRACE_RATE ?? DEFAULTS.TRACE_RATE); if (!fs.existsSync(LOG_DIR)) fs.mkdirSync(LOG_DIR, { recursive: true }); const providedNames = env.SERVICE_NAMES ? env.SERVICE_NAMES.split(',').map(s => s.trim()).filter(Boolean) : []; const serviceNames = providedNames.length ? providedNames : Array.from({ length: MICRO }).map((_, i) => `service-${i + 1}`); // helper deterministic-ish RNG if seed provided let rnd = Math.random; if (env.RANDOM_SEED) { let seed = parseInt(env.RANDOM_SEED, 10) || 1; rnd = () => { // xorshift32 seed ^= seed << 13; seed ^= seed >>> 17; seed ^= seed << 5; // normalize return (seed >>> 0) / 0xffffffff; }; } function randomInt(min, max) { return Math.floor(rnd() * (max - min + 1)) + min; } function pick(arr) { return arr[Math.floor(rnd() * arr.length)]; } // sample random generators const userIds = Array.from({ length: 50 }).map((_, i) => `user-${1000 + i}`); const endpoints = ['/api/v1/login', '/api/v1/orders', '/api/v1/products', '/api/v1/cart', '/api/v1/search', '/health']; const httpMethods = ['GET', 'POST', 'PUT', 'DELETE', 'PATCH']; const messages = [ 'cache miss', 'cache hit', 'db query success', 'db connection timeout', 'order processed', 'payment failed', 'auth success', 'token expired', 'validation error', 'background job completed' ]; function createServiceLogger(name) { const filename = path.join(LOG_DIR, `service-${name}.log`); const logger = createLogger({ level: LOG_LEVEL, format: format.combine( format.timestamp(), // structured JSON — easy to ship to Kafka/Logstash format.printf(info => { // ensure message is JSON-friendly const base = { timestamp: info.timestamp, service: name, level: info.level, message: info.message, meta: info.meta || {} }; return JSON.stringify(base); }) ), transports: [ new transports.Console({ stderrLevels: ['error', 'warn'] }), new transports.File({ filename, maxsize: 10 * 1024 * 1024 }) // 10MB (no rotation lib required) ] }); return logger; } function generateLogForService(logger, name) { // Simulate either HTTP access log, app event, or error const isHttp = rnd() < 0.7; const levelRoll = rnd(); const isError = levelRoll < ERROR_RATE; const includeTrace = rnd() < TRACE_RATE; if (isHttp) { const method = pick(httpMethods); const endpoint = pick(endpoints); const latency = randomInt(5, 1200); // ms const status = isError ? pick([500, 502, 503, 504, 400]) : (latency > 1000 ? 504 : (rnd() < 0.9 ? 200 : 201)); const user = pick(userIds); const msg = `${method} ${endpoint} ${status} ${latency}ms`; logger.log({ level: isError ? 'error' : 'info', message: msg, meta: { type: 'http', method, endpoint, status, latency, userId: user, requestId: `r-${Date.now().toString(36)}-${randomInt(1000,9999)}` } }); } else { // app event const msg = pick(messages); const severity = isError ? 'error' : (rnd() < 0.2 ? 'warn' : 'info'); const meta = { type: 'app', event: msg, userId: rnd() < 0.5 ? pick(userIds) : undefined, correlationId: `c-${randomInt(100000, 999999)}` }; if (includeTrace) { meta.stack = `Error: ${msg} at /app/module.js:${randomInt(10,200)}:${randomInt(2,80)}\n at processTicksAndRejections (internal/process/task_queues.js:93:5)`; } logger.log({ level: severity, message: msg, meta }); } } // create all service loggers const services = serviceNames.map(name => { const logger = createServiceLogger(name); return { name, logger }; }); // main loop: each service emits logs at random-ish intervals services.forEach(({ name, logger }) => { // stagger start setTimeout(() => { (function emitLoop() { try { generateLogForService(logger, name); } catch (err) { // Logging errors shouldn't crash the whole simulator console.error(`[sim-error] ${name} logger crashed:`, err && err.stack ? err.stack : err); } // random jitter around configured frequency const jitter = Math.floor((rnd() - 0.5) * 0.6 * LOG_FREQ_MS); const next = Math.max(100, LOG_FREQ_MS + jitter); setTimeout(emitLoop, next); })(); }, randomInt(0, LOG_FREQ_MS)); }); // simple status output so user knows it's running console.log(JSON.stringify({ event: 'fake-logs-started', services: serviceNames, logDir: LOG_DIR, now: new Date().toISOString(), notes: 'Structured JSON logs written to console and files per service. Tail logs and push to Kafka/Logstash for ELK.' })); COMMAND_BLOCK: /** * fake-logs.js * * Usage: * npm init -y * npm i winston * node fake-logs.js * * Environment variables: * MICROSERVICES (default: 6) -> number of services to simulate * SERVICE_NAMES (optional) -> comma separated names, overrides MICROSERVICES * LOG_FREQ_MS (default: 800) -> avg ms between logs per service (randomized) * LOG_LEVEL (default: info) -> winston log level * LOG_DIR (default: ./logs) -> directory for per-service files * ERROR_RATE (default: 0.08) -> fraction of logs that are errors (0..1) * TRACE_RATE (default: 0.02) -> fraction of logs that include stack traces * RANDOM_SEED (optional) -> for deterministic randomness if desired * * Example: * MICROSERVICES=6 LOG_FREQ_MS=500 LOG_LEVEL=debug node fake-logs.js */ const fs = require('fs'); const path = require('path'); const { createLogger, format, transports } = require('winston'); const env = process.env; const DEFAULTS = { MICROSERVICES: 6, LOG_FREQ_MS: 800, LOG_LEVEL: 'info', LOG_DIR: './logs', ERROR_RATE: 0.08, TRACE_RATE: 0.02 }; const MICRO = parseInt(env.MICROSERVICES || DEFAULTS.MICROSERVICES, 10); const LOG_FREQ_MS = parseInt(env.LOG_FREQ_MS || DEFAULTS.LOG_FREQ_MS, 10); const LOG_LEVEL = env.LOG_LEVEL || DEFAULTS.LOG_LEVEL; const LOG_DIR = env.LOG_DIR || DEFAULTS.LOG_DIR; const ERROR_RATE = parseFloat(env.ERROR_RATE ?? DEFAULTS.ERROR_RATE); const TRACE_RATE = parseFloat(env.TRACE_RATE ?? DEFAULTS.TRACE_RATE); if (!fs.existsSync(LOG_DIR)) fs.mkdirSync(LOG_DIR, { recursive: true }); const providedNames = env.SERVICE_NAMES ? env.SERVICE_NAMES.split(',').map(s => s.trim()).filter(Boolean) : []; const serviceNames = providedNames.length ? providedNames : Array.from({ length: MICRO }).map((_, i) => `service-${i + 1}`); // helper deterministic-ish RNG if seed provided let rnd = Math.random; if (env.RANDOM_SEED) { let seed = parseInt(env.RANDOM_SEED, 10) || 1; rnd = () => { // xorshift32 seed ^= seed << 13; seed ^= seed >>> 17; seed ^= seed << 5; // normalize return (seed >>> 0) / 0xffffffff; }; } function randomInt(min, max) { return Math.floor(rnd() * (max - min + 1)) + min; } function pick(arr) { return arr[Math.floor(rnd() * arr.length)]; } // sample random generators const userIds = Array.from({ length: 50 }).map((_, i) => `user-${1000 + i}`); const endpoints = ['/api/v1/login', '/api/v1/orders', '/api/v1/products', '/api/v1/cart', '/api/v1/search', '/health']; const httpMethods = ['GET', 'POST', 'PUT', 'DELETE', 'PATCH']; const messages = [ 'cache miss', 'cache hit', 'db query success', 'db connection timeout', 'order processed', 'payment failed', 'auth success', 'token expired', 'validation error', 'background job completed' ]; function createServiceLogger(name) { const filename = path.join(LOG_DIR, `service-${name}.log`); const logger = createLogger({ level: LOG_LEVEL, format: format.combine( format.timestamp(), // structured JSON — easy to ship to Kafka/Logstash format.printf(info => { // ensure message is JSON-friendly const base = { timestamp: info.timestamp, service: name, level: info.level, message: info.message, meta: info.meta || {} }; return JSON.stringify(base); }) ), transports: [ new transports.Console({ stderrLevels: ['error', 'warn'] }), new transports.File({ filename, maxsize: 10 * 1024 * 1024 }) // 10MB (no rotation lib required) ] }); return logger; } function generateLogForService(logger, name) { // Simulate either HTTP access log, app event, or error const isHttp = rnd() < 0.7; const levelRoll = rnd(); const isError = levelRoll < ERROR_RATE; const includeTrace = rnd() < TRACE_RATE; if (isHttp) { const method = pick(httpMethods); const endpoint = pick(endpoints); const latency = randomInt(5, 1200); // ms const status = isError ? pick([500, 502, 503, 504, 400]) : (latency > 1000 ? 504 : (rnd() < 0.9 ? 200 : 201)); const user = pick(userIds); const msg = `${method} ${endpoint} ${status} ${latency}ms`; logger.log({ level: isError ? 'error' : 'info', message: msg, meta: { type: 'http', method, endpoint, status, latency, userId: user, requestId: `r-${Date.now().toString(36)}-${randomInt(1000,9999)}` } }); } else { // app event const msg = pick(messages); const severity = isError ? 'error' : (rnd() < 0.2 ? 'warn' : 'info'); const meta = { type: 'app', event: msg, userId: rnd() < 0.5 ? pick(userIds) : undefined, correlationId: `c-${randomInt(100000, 999999)}` }; if (includeTrace) { meta.stack = `Error: ${msg} at /app/module.js:${randomInt(10,200)}:${randomInt(2,80)}\n at processTicksAndRejections (internal/process/task_queues.js:93:5)`; } logger.log({ level: severity, message: msg, meta }); } } // create all service loggers const services = serviceNames.map(name => { const logger = createServiceLogger(name); return { name, logger }; }); // main loop: each service emits logs at random-ish intervals services.forEach(({ name, logger }) => { // stagger start setTimeout(() => { (function emitLoop() { try { generateLogForService(logger, name); } catch (err) { // Logging errors shouldn't crash the whole simulator console.error(`[sim-error] ${name} logger crashed:`, err && err.stack ? err.stack : err); } // random jitter around configured frequency const jitter = Math.floor((rnd() - 0.5) * 0.6 * LOG_FREQ_MS); const next = Math.max(100, LOG_FREQ_MS + jitter); setTimeout(emitLoop, next); })(); }, randomInt(0, LOG_FREQ_MS)); }); // simple status output so user knows it's running console.log(JSON.stringify({ event: 'fake-logs-started', services: serviceNames, logDir: LOG_DIR, now: new Date().toISOString(), notes: 'Structured JSON logs written to console and files per service. Tail logs and push to Kafka/Logstash for ELK.' })); CODE_BLOCK: { "name": "microservices", "version": "1.0.0", "description": "", "main": "microservice.js", "scripts": { "test": "echo \"Error: no test specified\" && exit 1" }, "keywords": [], "author": "", "license": "ISC", "dependencies": { "winston": "^3.18.3" } } Enter fullscreen mode Exit fullscreen mode CODE_BLOCK: { "name": "microservices", "version": "1.0.0", "description": "", "main": "microservice.js", "scripts": { "test": "echo \"Error: no test specified\" && exit 1" }, "keywords": [], "author": "", "license": "ISC", "dependencies": { "winston": "^3.18.3" } } CODE_BLOCK: { "name": "microservices", "version": "1.0.0", "description": "", "main": "microservice.js", "scripts": { "test": "echo \"Error: no test specified\" && exit 1" }, "keywords": [], "author": "", "license": "ISC", "dependencies": { "winston": "^3.18.3" } } CODE_BLOCK: FROM node:22-alpine WORKDIR /app COPY package*.json . RUN npm install COPY microservice.js . CMD [ "node", "microservice.js"] Enter fullscreen mode Exit fullscreen mode CODE_BLOCK: FROM node:22-alpine WORKDIR /app COPY package*.json . RUN npm install COPY microservice.js . CMD [ "node", "microservice.js"] CODE_BLOCK: FROM node:22-alpine WORKDIR /app COPY package*.json . RUN npm install COPY microservice.js . CMD [ "node", "microservice.js"] CODE_BLOCK: services: ms-generic-service: image: base-microservice build: ./microservices container_name: ms-generic-service environment: - SERVICE_NAMES=generic-service volumes: - /workspaces/codespaces-blank/logs:/app/logs ms-auth-service: image: base-microservice container_name: ms-auth-service environment: - SERVICE_NAMES=auth-service volumes: - /workspaces/codespaces-blank/logs:/app/logs ms-payments-service: image: base-microservice container_name: ms-payments-service environment: - SERVICE_NAMES=payments-service volumes: - /workspaces/codespaces-blank/logs:/app/logs ms-orders-service: image: base-microservice container_name: ms-orders-service environment: - SERVICE_NAMES=orders-service volumes: - /workspaces/codespaces-blank/logs:/app/logs ms-catalog-service: image: base-microservice container_name: ms-catalog-service environment: - SERVICE_NAMES=catalog-service volumes: - /workspaces/codespaces-blank/logs:/app/logs Enter fullscreen mode Exit fullscreen mode CODE_BLOCK: services: ms-generic-service: image: base-microservice build: ./microservices container_name: ms-generic-service environment: - SERVICE_NAMES=generic-service volumes: - /workspaces/codespaces-blank/logs:/app/logs ms-auth-service: image: base-microservice container_name: ms-auth-service environment: - SERVICE_NAMES=auth-service volumes: - /workspaces/codespaces-blank/logs:/app/logs ms-payments-service: image: base-microservice container_name: ms-payments-service environment: - SERVICE_NAMES=payments-service volumes: - /workspaces/codespaces-blank/logs:/app/logs ms-orders-service: image: base-microservice container_name: ms-orders-service environment: - SERVICE_NAMES=orders-service volumes: - /workspaces/codespaces-blank/logs:/app/logs ms-catalog-service: image: base-microservice container_name: ms-catalog-service environment: - SERVICE_NAMES=catalog-service volumes: - /workspaces/codespaces-blank/logs:/app/logs CODE_BLOCK: services: ms-generic-service: image: base-microservice build: ./microservices container_name: ms-generic-service environment: - SERVICE_NAMES=generic-service volumes: - /workspaces/codespaces-blank/logs:/app/logs ms-auth-service: image: base-microservice container_name: ms-auth-service environment: - SERVICE_NAMES=auth-service volumes: - /workspaces/codespaces-blank/logs:/app/logs ms-payments-service: image: base-microservice container_name: ms-payments-service environment: - SERVICE_NAMES=payments-service volumes: - /workspaces/codespaces-blank/logs:/app/logs ms-orders-service: image: base-microservice container_name: ms-orders-service environment: - SERVICE_NAMES=orders-service volumes: - /workspaces/codespaces-blank/logs:/app/logs ms-catalog-service: image: base-microservice container_name: ms-catalog-service environment: - SERVICE_NAMES=catalog-service volumes: - /workspaces/codespaces-blank/logs:/app/logs COMMAND_BLOCK: docker compose up Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: docker compose up COMMAND_BLOCK: docker compose up CODE_BLOCK: filebeat.inputs: - type: filestream id: my-filestream-id paths: - /workspaces/codespaces-blank/logs/*.log output.kafka: hosts: ["localhost:9092"] topic: 'logs' partition.round_robin: reachable_only: false required_acks: 1 Enter fullscreen mode Exit fullscreen mode CODE_BLOCK: filebeat.inputs: - type: filestream id: my-filestream-id paths: - /workspaces/codespaces-blank/logs/*.log output.kafka: hosts: ["localhost:9092"] topic: 'logs' partition.round_robin: reachable_only: false required_acks: 1 CODE_BLOCK: filebeat.inputs: - type: filestream id: my-filestream-id paths: - /workspaces/codespaces-blank/logs/*.log output.kafka: hosts: ["localhost:9092"] topic: 'logs' partition.round_robin: reachable_only: false required_acks: 1 COMMAND_BLOCK: sudo ./filebeat -e Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: sudo ./filebeat -e COMMAND_BLOCK: sudo ./filebeat -e COMMAND_BLOCK: # Sample Logstash configuration for creating a simple # Beats -> Logstash -> Elasticsearch pipeline. input { kafka { bootstrap_servers => "localhost:9092" topics => ["logs"] codec => "json" consumer_threads => 10 group_id => "logstashmy" # new group id to avoid old offsets auto_offset_reset => "earliest" # read from beginning if no offsets } } filter { json { source => "message" } mutate { remove_field =>["@metadata","input","ecs","host","agent","log","stream"] } date { match => ["@timestamp","ISO8601"] } if [level] == "error" { mutate { add_tag => ["error"] } } } output { elasticsearch { hosts => ["http://localhost:9200"] index => "logs-%{+YYYY.MM.dd}" user => "elastic" password => "5LaAci5P" } } Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # Sample Logstash configuration for creating a simple # Beats -> Logstash -> Elasticsearch pipeline. input { kafka { bootstrap_servers => "localhost:9092" topics => ["logs"] codec => "json" consumer_threads => 10 group_id => "logstashmy" # new group id to avoid old offsets auto_offset_reset => "earliest" # read from beginning if no offsets } } filter { json { source => "message" } mutate { remove_field =>["@metadata","input","ecs","host","agent","log","stream"] } date { match => ["@timestamp","ISO8601"] } if [level] == "error" { mutate { add_tag => ["error"] } } } output { elasticsearch { hosts => ["http://localhost:9200"] index => "logs-%{+YYYY.MM.dd}" user => "elastic" password => "5LaAci5P" } } COMMAND_BLOCK: # Sample Logstash configuration for creating a simple # Beats -> Logstash -> Elasticsearch pipeline. input { kafka { bootstrap_servers => "localhost:9092" topics => ["logs"] codec => "json" consumer_threads => 10 group_id => "logstashmy" # new group id to avoid old offsets auto_offset_reset => "earliest" # read from beginning if no offsets } } filter { json { source => "message" } mutate { remove_field =>["@metadata","input","ecs","host","agent","log","stream"] } date { match => ["@timestamp","ISO8601"] } if [level] == "error" { mutate { add_tag => ["error"] } } } output { elasticsearch { hosts => ["http://localhost:9200"] index => "logs-%{+YYYY.MM.dd}" user => "elastic" password => "5LaAci5P" } } CODE_BLOCK: bin/logstash -f config/logstash-sample.conf Enter fullscreen mode Exit fullscreen mode CODE_BLOCK: bin/logstash -f config/logstash-sample.conf CODE_BLOCK: bin/logstash -f config/logstash-sample.conf - Source: Node.js Microservices (simulating traffic). - Shipper: Filebeat tails the log files and pushes them to Kafka. - Buffer: Kafka Cluster (3 Brokers + 1 Controller) queues the logs. - Processor: Logstash consumes from Kafka, transforms JSON, and pushes to Elasticsearch. - Storage & View: Elasticsearch indexes the data; Kibana visualizes it. - Index Management: You will now see the logs-2026.01.16 index created and populated. - Discover: The raw data is now searchable. You can filter by service, level, or latency. - Dashboards: Finally, we can build a dashboard to monitor our ecosystem. - Pie Chart: Distribution of logs by Service. - Bar Chart: HTTP Methods over time. - Heatmap: Error rates per service.