Tools: Logging Strategies for Real-Time Applications: Session Tracking at Scale

Tools: Logging Strategies for Real-Time Applications: Session Tracking at Scale

Source: Dev.to

The Real-Time Logging Challenge ## Strategy #1: Session-Based Logging ## Strategy #2: Structured Logging ## Strategy #3: Correlation IDs ## Strategy #4: Performance Logging ## Strategy #5: Log Levels by Component ## Strategy #6: Sampling for High-Volume Events ## Strategy #7: Error Context Preservation ## Strategy #8: Log Aggregation & Search ## Strategy #9: Monitoring Integration ## Best Practices ## The Results ## Final Thoughts Hey builders! 👋 Let's talk about something that sounds boring but becomes absolutely critical in production: logging. When you're running hundreds of concurrent sessions, bad logging is the difference between finding bugs in minutes vs. spending days debugging. Let me share how we built a logging system that actually helps instead of drowns you in noise. Traditional logging advice doesn't work for real-time apps. Here's why: Traditional app logging: Real-time app with 100 concurrent sessions: Which session failed? Good luck finding out. Every log entry MUST include session context: Now every log line is traceable to a specific session! Stop logging strings. Log structured data: Now you can search logs by specific fields! Track requests across multiple services: Now you can trace a request from client → your service → Riva → back! Log performance metrics for every operation: Different components need different log levels: Don't log EVERY audio chunk - sample intelligently: When errors happen, log EVERYTHING relevant: Use ELK Stack or Loki for log aggregation: Now you can query logs with LogQL: Connect logs to metrics: After implementing these logging strategies: Good logging is invisible when everything works, but invaluable when things break. The goal isn't to log everything - it's to log the right things at the right level with the right context. Think of logs as breadcrumbs for future you. When you're debugging at 3 AM, you'll thank past you for logging that session ID. What's your logging setup? Any horror stories about debugging without proper logs? Share below! 🚀 Templates let you quickly answer FAQs or store snippets for re-use. Are you sure you want to hide this comment? It will become hidden in your post, but will still be visible via the comment's permalink. Hide child comments as well For further actions, you may consider blocking this person and/or reporting abuse CODE_BLOCK: 2024-01-15 10:30:45 INFO Processing request 2024-01-15 10:30:46 ERROR Failed to connect to database 2024-01-15 10:30:47 INFO Processing request Enter fullscreen mode Exit fullscreen mode CODE_BLOCK: 2024-01-15 10:30:45 INFO Processing request 2024-01-15 10:30:46 ERROR Failed to connect to database 2024-01-15 10:30:47 INFO Processing request CODE_BLOCK: 2024-01-15 10:30:45 INFO Processing request 2024-01-15 10:30:46 ERROR Failed to connect to database 2024-01-15 10:30:47 INFO Processing request CODE_BLOCK: 2024-01-15 10:30:45.123 INFO Processing audio 2024-01-15 10:30:45.124 INFO Processing audio 2024-01-15 10:30:45.125 ERROR Connection failed 2024-01-15 10:30:45.126 INFO Processing audio 2024-01-15 10:30:45.127 INFO Processing audio 2024-01-15 10:30:45.128 INFO Processing audio Enter fullscreen mode Exit fullscreen mode CODE_BLOCK: 2024-01-15 10:30:45.123 INFO Processing audio 2024-01-15 10:30:45.124 INFO Processing audio 2024-01-15 10:30:45.125 ERROR Connection failed 2024-01-15 10:30:45.126 INFO Processing audio 2024-01-15 10:30:45.127 INFO Processing audio 2024-01-15 10:30:45.128 INFO Processing audio CODE_BLOCK: 2024-01-15 10:30:45.123 INFO Processing audio 2024-01-15 10:30:45.124 INFO Processing audio 2024-01-15 10:30:45.125 ERROR Connection failed 2024-01-15 10:30:45.126 INFO Processing audio 2024-01-15 10:30:45.127 INFO Processing audio 2024-01-15 10:30:45.128 INFO Processing audio COMMAND_BLOCK: import logging import uuid from contextvars import ContextVar from typing import Optional # Context variable for session tracking session_context: ContextVar[Optional[str]] = ContextVar('session_context', default=None) class SessionLoggerAdapter(logging.LoggerAdapter): """Logger that automatically includes session context""" def process(self, msg, kwargs): session_id = session_context.get() if session_id: return f'[{session_id}] {msg}', kwargs return msg, kwargs def get_logger(name: str) -> SessionLoggerAdapter: """Get a session-aware logger""" base_logger = logging.getLogger(name) return SessionLoggerAdapter(base_logger, {}) # Usage in your endpoint logger = get_logger(__name__) @app.websocket("/transcribe/{session_id}") async def transcribe_endpoint(websocket: WebSocket, session_id: str): # Set session context for this async task session_context.set(session_id) logger.info("Session started") # Logs: [abc-123] Session started try: await process_transcription(websocket) except Exception as e: logger.error(f"Transcription failed: {e}") # Logs: [abc-123] Transcription failed: ... finally: logger.info("Session ended") # Logs: [abc-123] Session ended Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: import logging import uuid from contextvars import ContextVar from typing import Optional # Context variable for session tracking session_context: ContextVar[Optional[str]] = ContextVar('session_context', default=None) class SessionLoggerAdapter(logging.LoggerAdapter): """Logger that automatically includes session context""" def process(self, msg, kwargs): session_id = session_context.get() if session_id: return f'[{session_id}] {msg}', kwargs return msg, kwargs def get_logger(name: str) -> SessionLoggerAdapter: """Get a session-aware logger""" base_logger = logging.getLogger(name) return SessionLoggerAdapter(base_logger, {}) # Usage in your endpoint logger = get_logger(__name__) @app.websocket("/transcribe/{session_id}") async def transcribe_endpoint(websocket: WebSocket, session_id: str): # Set session context for this async task session_context.set(session_id) logger.info("Session started") # Logs: [abc-123] Session started try: await process_transcription(websocket) except Exception as e: logger.error(f"Transcription failed: {e}") # Logs: [abc-123] Transcription failed: ... finally: logger.info("Session ended") # Logs: [abc-123] Session ended COMMAND_BLOCK: import logging import uuid from contextvars import ContextVar from typing import Optional # Context variable for session tracking session_context: ContextVar[Optional[str]] = ContextVar('session_context', default=None) class SessionLoggerAdapter(logging.LoggerAdapter): """Logger that automatically includes session context""" def process(self, msg, kwargs): session_id = session_context.get() if session_id: return f'[{session_id}] {msg}', kwargs return msg, kwargs def get_logger(name: str) -> SessionLoggerAdapter: """Get a session-aware logger""" base_logger = logging.getLogger(name) return SessionLoggerAdapter(base_logger, {}) # Usage in your endpoint logger = get_logger(__name__) @app.websocket("/transcribe/{session_id}") async def transcribe_endpoint(websocket: WebSocket, session_id: str): # Set session context for this async task session_context.set(session_id) logger.info("Session started") # Logs: [abc-123] Session started try: await process_transcription(websocket) except Exception as e: logger.error(f"Transcription failed: {e}") # Logs: [abc-123] Transcription failed: ... finally: logger.info("Session ended") # Logs: [abc-123] Session ended COMMAND_BLOCK: import logging import json from datetime import datetime from typing import Any, Dict class StructuredLogger: """Logger that outputs structured JSON""" def __init__(self, name: str): self.logger = logging.getLogger(name) def _log(self, level: int, event: str, **kwargs): """Log structured data as JSON""" log_data = { "timestamp": datetime.utcnow().isoformat(), "event": event, "session_id": session_context.get(), **kwargs } self.logger.log(level, json.dumps(log_data)) def info(self, event: str, **kwargs): self._log(logging.INFO, event, **kwargs) def error(self, event: str, error: Exception = None, **kwargs): error_data = kwargs if error: error_data.update({ "error_type": type(error).__name__, "error_message": str(error) }) self._log(logging.ERROR, event, **error_data) # Usage logger = StructuredLogger(__name__) logger.info( "audio_received", audio_size=len(audio_data), sample_rate=16000, channels=1 ) # Outputs: # {"timestamp": "2024-01-15T10:30:45.123Z", "event": "audio_received", # "session_id": "abc-123", "audio_size": 16000, "sample_rate": 16000, "channels": 1} Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: import logging import json from datetime import datetime from typing import Any, Dict class StructuredLogger: """Logger that outputs structured JSON""" def __init__(self, name: str): self.logger = logging.getLogger(name) def _log(self, level: int, event: str, **kwargs): """Log structured data as JSON""" log_data = { "timestamp": datetime.utcnow().isoformat(), "event": event, "session_id": session_context.get(), **kwargs } self.logger.log(level, json.dumps(log_data)) def info(self, event: str, **kwargs): self._log(logging.INFO, event, **kwargs) def error(self, event: str, error: Exception = None, **kwargs): error_data = kwargs if error: error_data.update({ "error_type": type(error).__name__, "error_message": str(error) }) self._log(logging.ERROR, event, **error_data) # Usage logger = StructuredLogger(__name__) logger.info( "audio_received", audio_size=len(audio_data), sample_rate=16000, channels=1 ) # Outputs: # {"timestamp": "2024-01-15T10:30:45.123Z", "event": "audio_received", # "session_id": "abc-123", "audio_size": 16000, "sample_rate": 16000, "channels": 1} COMMAND_BLOCK: import logging import json from datetime import datetime from typing import Any, Dict class StructuredLogger: """Logger that outputs structured JSON""" def __init__(self, name: str): self.logger = logging.getLogger(name) def _log(self, level: int, event: str, **kwargs): """Log structured data as JSON""" log_data = { "timestamp": datetime.utcnow().isoformat(), "event": event, "session_id": session_context.get(), **kwargs } self.logger.log(level, json.dumps(log_data)) def info(self, event: str, **kwargs): self._log(logging.INFO, event, **kwargs) def error(self, event: str, error: Exception = None, **kwargs): error_data = kwargs if error: error_data.update({ "error_type": type(error).__name__, "error_message": str(error) }) self._log(logging.ERROR, event, **error_data) # Usage logger = StructuredLogger(__name__) logger.info( "audio_received", audio_size=len(audio_data), sample_rate=16000, channels=1 ) # Outputs: # {"timestamp": "2024-01-15T10:30:45.123Z", "event": "audio_received", # "session_id": "abc-123", "audio_size": 16000, "sample_rate": 16000, "channels": 1} COMMAND_BLOCK: from contextvars import ContextVar import uuid # Correlation ID for tracking across services correlation_id: ContextVar[Optional[str]] = ContextVar('correlation_id', default=None) class CorrelatedLogger(StructuredLogger): """Logger with correlation ID support""" def _log(self, level: int, event: str, **kwargs): log_data = { "timestamp": datetime.utcnow().isoformat(), "event": event, "session_id": session_context.get(), "correlation_id": correlation_id.get(), **kwargs } self.logger.log(level, json.dumps(log_data)) @app.websocket("/transcribe/{session_id}") async def transcribe_endpoint(websocket: WebSocket, session_id: str): # Generate correlation ID for this request corr_id = str(uuid.uuid4()) correlation_id.set(corr_id) session_context.set(session_id) logger = CorrelatedLogger(__name__) logger.info("session_started") # When calling Riva service, pass correlation ID await riva_client.transcribe( audio_data, metadata={"correlation_id": corr_id} ) Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: from contextvars import ContextVar import uuid # Correlation ID for tracking across services correlation_id: ContextVar[Optional[str]] = ContextVar('correlation_id', default=None) class CorrelatedLogger(StructuredLogger): """Logger with correlation ID support""" def _log(self, level: int, event: str, **kwargs): log_data = { "timestamp": datetime.utcnow().isoformat(), "event": event, "session_id": session_context.get(), "correlation_id": correlation_id.get(), **kwargs } self.logger.log(level, json.dumps(log_data)) @app.websocket("/transcribe/{session_id}") async def transcribe_endpoint(websocket: WebSocket, session_id: str): # Generate correlation ID for this request corr_id = str(uuid.uuid4()) correlation_id.set(corr_id) session_context.set(session_id) logger = CorrelatedLogger(__name__) logger.info("session_started") # When calling Riva service, pass correlation ID await riva_client.transcribe( audio_data, metadata={"correlation_id": corr_id} ) COMMAND_BLOCK: from contextvars import ContextVar import uuid # Correlation ID for tracking across services correlation_id: ContextVar[Optional[str]] = ContextVar('correlation_id', default=None) class CorrelatedLogger(StructuredLogger): """Logger with correlation ID support""" def _log(self, level: int, event: str, **kwargs): log_data = { "timestamp": datetime.utcnow().isoformat(), "event": event, "session_id": session_context.get(), "correlation_id": correlation_id.get(), **kwargs } self.logger.log(level, json.dumps(log_data)) @app.websocket("/transcribe/{session_id}") async def transcribe_endpoint(websocket: WebSocket, session_id: str): # Generate correlation ID for this request corr_id = str(uuid.uuid4()) correlation_id.set(corr_id) session_context.set(session_id) logger = CorrelatedLogger(__name__) logger.info("session_started") # When calling Riva service, pass correlation ID await riva_client.transcribe( audio_data, metadata={"correlation_id": corr_id} ) COMMAND_BLOCK: import time from functools import wraps from typing import Callable def log_performance(operation: str): """Decorator to log operation performance""" def decorator(func: Callable): @wraps(func) async def wrapper(*args, **kwargs): logger = CorrelatedLogger(func.__module__) start_time = time.time() try: result = await func(*args, **kwargs) duration = time.time() - start_time logger.info( f"{operation}_completed", duration_ms=round(duration * 1000, 2), success=True ) return result except Exception as e: duration = time.time() - start_time logger.error( f"{operation}_failed", duration_ms=round(duration * 1000, 2), success=False, error=e ) raise return wrapper return decorator # Usage @log_performance("audio_transcription") async def transcribe_audio(audio: bytes, session_id: str) -> str: # Transcription logic return await riva_client.transcribe(audio) # Logs: # {"event": "audio_transcription_completed", "duration_ms": 245.67, "success": true} Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: import time from functools import wraps from typing import Callable def log_performance(operation: str): """Decorator to log operation performance""" def decorator(func: Callable): @wraps(func) async def wrapper(*args, **kwargs): logger = CorrelatedLogger(func.__module__) start_time = time.time() try: result = await func(*args, **kwargs) duration = time.time() - start_time logger.info( f"{operation}_completed", duration_ms=round(duration * 1000, 2), success=True ) return result except Exception as e: duration = time.time() - start_time logger.error( f"{operation}_failed", duration_ms=round(duration * 1000, 2), success=False, error=e ) raise return wrapper return decorator # Usage @log_performance("audio_transcription") async def transcribe_audio(audio: bytes, session_id: str) -> str: # Transcription logic return await riva_client.transcribe(audio) # Logs: # {"event": "audio_transcription_completed", "duration_ms": 245.67, "success": true} COMMAND_BLOCK: import time from functools import wraps from typing import Callable def log_performance(operation: str): """Decorator to log operation performance""" def decorator(func: Callable): @wraps(func) async def wrapper(*args, **kwargs): logger = CorrelatedLogger(func.__module__) start_time = time.time() try: result = await func(*args, **kwargs) duration = time.time() - start_time logger.info( f"{operation}_completed", duration_ms=round(duration * 1000, 2), success=True ) return result except Exception as e: duration = time.time() - start_time logger.error( f"{operation}_failed", duration_ms=round(duration * 1000, 2), success=False, error=e ) raise return wrapper return decorator # Usage @log_performance("audio_transcription") async def transcribe_audio(audio: bytes, session_id: str) -> str: # Transcription logic return await riva_client.transcribe(audio) # Logs: # {"event": "audio_transcription_completed", "duration_ms": 245.67, "success": true} COMMAND_BLOCK: import logging.config LOGGING_CONFIG = { "version": 1, "disable_existing_loggers": False, "formatters": { "json": { "()": "pythonjsonlogger.jsonlogger.JsonFormatter", "format": "%(timestamp)s %(level)s %(name)s %(message)s" } }, "handlers": { "console": { "class": "logging.StreamHandler", "formatter": "json", "stream": "ext://sys.stdout" }, "file": { "class": "logging.handlers.RotatingFileHandler", "formatter": "json", "filename": "logs/app.log", "maxBytes": 10485760, # 10MB "backupCount": 5 } }, "loggers": { # Your app - verbose logging "app": { "level": "DEBUG", "handlers": ["console", "file"], "propagate": False }, # Riva client - only warnings and errors "riva_client": { "level": "WARNING", "handlers": ["console", "file"], "propagate": False }, # Third-party libraries - minimal logging "uvicorn": { "level": "INFO", "handlers": ["console"], "propagate": False }, "grpc": { "level": "ERROR", "handlers": ["console"], "propagate": False } }, "root": { "level": "INFO", "handlers": ["console", "file"] } } # Apply configuration logging.config.dictConfig(LOGGING_CONFIG) Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: import logging.config LOGGING_CONFIG = { "version": 1, "disable_existing_loggers": False, "formatters": { "json": { "()": "pythonjsonlogger.jsonlogger.JsonFormatter", "format": "%(timestamp)s %(level)s %(name)s %(message)s" } }, "handlers": { "console": { "class": "logging.StreamHandler", "formatter": "json", "stream": "ext://sys.stdout" }, "file": { "class": "logging.handlers.RotatingFileHandler", "formatter": "json", "filename": "logs/app.log", "maxBytes": 10485760, # 10MB "backupCount": 5 } }, "loggers": { # Your app - verbose logging "app": { "level": "DEBUG", "handlers": ["console", "file"], "propagate": False }, # Riva client - only warnings and errors "riva_client": { "level": "WARNING", "handlers": ["console", "file"], "propagate": False }, # Third-party libraries - minimal logging "uvicorn": { "level": "INFO", "handlers": ["console"], "propagate": False }, "grpc": { "level": "ERROR", "handlers": ["console"], "propagate": False } }, "root": { "level": "INFO", "handlers": ["console", "file"] } } # Apply configuration logging.config.dictConfig(LOGGING_CONFIG) COMMAND_BLOCK: import logging.config LOGGING_CONFIG = { "version": 1, "disable_existing_loggers": False, "formatters": { "json": { "()": "pythonjsonlogger.jsonlogger.JsonFormatter", "format": "%(timestamp)s %(level)s %(name)s %(message)s" } }, "handlers": { "console": { "class": "logging.StreamHandler", "formatter": "json", "stream": "ext://sys.stdout" }, "file": { "class": "logging.handlers.RotatingFileHandler", "formatter": "json", "filename": "logs/app.log", "maxBytes": 10485760, # 10MB "backupCount": 5 } }, "loggers": { # Your app - verbose logging "app": { "level": "DEBUG", "handlers": ["console", "file"], "propagate": False }, # Riva client - only warnings and errors "riva_client": { "level": "WARNING", "handlers": ["console", "file"], "propagate": False }, # Third-party libraries - minimal logging "uvicorn": { "level": "INFO", "handlers": ["console"], "propagate": False }, "grpc": { "level": "ERROR", "handlers": ["console"], "propagate": False } }, "root": { "level": "INFO", "handlers": ["console", "file"] } } # Apply configuration logging.config.dictConfig(LOGGING_CONFIG) COMMAND_BLOCK: import random class SampledLogger(CorrelatedLogger): """Logger with sampling support for high-frequency events""" def __init__(self, name: str, sample_rate: float = 0.01): super().__init__(name) self.sample_rate = sample_rate def sample(self, event: str, **kwargs): """Log with sampling""" if random.random() < self.sample_rate: self.info(event, sampled=True, **kwargs) logger = SampledLogger(__name__, sample_rate=0.01) # Log 1% of events # Log every 100th audio chunk logger.sample( "audio_chunk_processed", chunk_size=len(chunk), total_chunks=chunk_count ) Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: import random class SampledLogger(CorrelatedLogger): """Logger with sampling support for high-frequency events""" def __init__(self, name: str, sample_rate: float = 0.01): super().__init__(name) self.sample_rate = sample_rate def sample(self, event: str, **kwargs): """Log with sampling""" if random.random() < self.sample_rate: self.info(event, sampled=True, **kwargs) logger = SampledLogger(__name__, sample_rate=0.01) # Log 1% of events # Log every 100th audio chunk logger.sample( "audio_chunk_processed", chunk_size=len(chunk), total_chunks=chunk_count ) COMMAND_BLOCK: import random class SampledLogger(CorrelatedLogger): """Logger with sampling support for high-frequency events""" def __init__(self, name: str, sample_rate: float = 0.01): super().__init__(name) self.sample_rate = sample_rate def sample(self, event: str, **kwargs): """Log with sampling""" if random.random() < self.sample_rate: self.info(event, sampled=True, **kwargs) logger = SampledLogger(__name__, sample_rate=0.01) # Log 1% of events # Log every 100th audio chunk logger.sample( "audio_chunk_processed", chunk_size=len(chunk), total_chunks=chunk_count ) COMMAND_BLOCK: import traceback import sys class ErrorContextLogger(CorrelatedLogger): """Logger with rich error context""" def error_with_context( self, event: str, error: Exception, **kwargs ): """Log error with full context""" # Get exception info exc_type, exc_value, exc_traceback = sys.exc_info() # Build error context error_context = { "error_type": type(error).__name__, "error_message": str(error), "error_code": getattr(error, 'code', None), "traceback": traceback.format_exc(), **kwargs } self.error(event, **error_context) # Usage logger = ErrorContextLogger(__name__) try: await riva_client.transcribe(audio) except Exception as e: logger.error_with_context( "transcription_failed", error=e, audio_size=len(audio), sample_rate=sample_rate, language=language, riva_endpoint=riva_client.endpoint ) raise Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: import traceback import sys class ErrorContextLogger(CorrelatedLogger): """Logger with rich error context""" def error_with_context( self, event: str, error: Exception, **kwargs ): """Log error with full context""" # Get exception info exc_type, exc_value, exc_traceback = sys.exc_info() # Build error context error_context = { "error_type": type(error).__name__, "error_message": str(error), "error_code": getattr(error, 'code', None), "traceback": traceback.format_exc(), **kwargs } self.error(event, **error_context) # Usage logger = ErrorContextLogger(__name__) try: await riva_client.transcribe(audio) except Exception as e: logger.error_with_context( "transcription_failed", error=e, audio_size=len(audio), sample_rate=sample_rate, language=language, riva_endpoint=riva_client.endpoint ) raise COMMAND_BLOCK: import traceback import sys class ErrorContextLogger(CorrelatedLogger): """Logger with rich error context""" def error_with_context( self, event: str, error: Exception, **kwargs ): """Log error with full context""" # Get exception info exc_type, exc_value, exc_traceback = sys.exc_info() # Build error context error_context = { "error_type": type(error).__name__, "error_message": str(error), "error_code": getattr(error, 'code', None), "traceback": traceback.format_exc(), **kwargs } self.error(event, **error_context) # Usage logger = ErrorContextLogger(__name__) try: await riva_client.transcribe(audio) except Exception as e: logger.error_with_context( "transcription_failed", error=e, audio_size=len(audio), sample_rate=sample_rate, language=language, riva_endpoint=riva_client.endpoint ) raise COMMAND_BLOCK: # Docker Compose for Loki + Grafana version: '3' services: loki: image: grafana/loki:latest ports: - "3100:3100" command: -config.file=/etc/loki/local-config.yaml promtail: image: grafana/promtail:latest volumes: - ./logs:/var/log - ./promtail-config.yaml:/etc/promtail/config.yaml command: -config.file=/etc/promtail/config.yaml grafana: image: grafana/grafana:latest ports: - "3000:3000" environment: - GF_SECURITY_ADMIN_PASSWORD=admin Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # Docker Compose for Loki + Grafana version: '3' services: loki: image: grafana/loki:latest ports: - "3100:3100" command: -config.file=/etc/loki/local-config.yaml promtail: image: grafana/promtail:latest volumes: - ./logs:/var/log - ./promtail-config.yaml:/etc/promtail/config.yaml command: -config.file=/etc/promtail/config.yaml grafana: image: grafana/grafana:latest ports: - "3000:3000" environment: - GF_SECURITY_ADMIN_PASSWORD=admin COMMAND_BLOCK: # Docker Compose for Loki + Grafana version: '3' services: loki: image: grafana/loki:latest ports: - "3100:3100" command: -config.file=/etc/loki/local-config.yaml promtail: image: grafana/promtail:latest volumes: - ./logs:/var/log - ./promtail-config.yaml:/etc/promtail/config.yaml command: -config.file=/etc/promtail/config.yaml grafana: image: grafana/grafana:latest ports: - "3000:3000" environment: - GF_SECURITY_ADMIN_PASSWORD=admin COMMAND_BLOCK: # Find all errors for a specific session {job="transcription-service"} |= "session_id=abc-123" | json | level="ERROR" # Find slow transcriptions {job="transcription-service"} | json | duration_ms > 1000 # Count errors by type sum by (error_type) (count_over_time({job="transcription-service"} | json | level="ERROR" [1h])) Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # Find all errors for a specific session {job="transcription-service"} |= "session_id=abc-123" | json | level="ERROR" # Find slow transcriptions {job="transcription-service"} | json | duration_ms > 1000 # Count errors by type sum by (error_type) (count_over_time({job="transcription-service"} | json | level="ERROR" [1h])) COMMAND_BLOCK: # Find all errors for a specific session {job="transcription-service"} |= "session_id=abc-123" | json | level="ERROR" # Find slow transcriptions {job="transcription-service"} | json | duration_ms > 1000 # Count errors by type sum by (error_type) (count_over_time({job="transcription-service"} | json | level="ERROR" [1h])) COMMAND_BLOCK: from prometheus_client import Counter, Histogram # Metrics transcription_requests = Counter( 'transcription_requests_total', 'Total transcription requests', ['session_id', 'language', 'status'] ) transcription_duration = Histogram( 'transcription_duration_seconds', 'Transcription duration', ['language'] ) class MonitoredLogger(ErrorContextLogger): """Logger integrated with metrics""" @log_performance("transcription") async def log_transcription( self, session_id: str, language: str, audio_size: int ): start_time = time.time() try: result = await transcribe(audio_data, language) # Log success self.info( "transcription_completed", audio_size=audio_size, language=language, result_length=len(result) ) # Update metrics transcription_requests.labels( session_id=session_id, language=language, status="success" ).inc() transcription_duration.labels( language=language ).observe(time.time() - start_time) return result except Exception as e: # Log failure self.error_with_context( "transcription_failed", error=e, audio_size=audio_size, language=language ) # Update metrics transcription_requests.labels( session_id=session_id, language=language, status="error" ).inc() raise Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: from prometheus_client import Counter, Histogram # Metrics transcription_requests = Counter( 'transcription_requests_total', 'Total transcription requests', ['session_id', 'language', 'status'] ) transcription_duration = Histogram( 'transcription_duration_seconds', 'Transcription duration', ['language'] ) class MonitoredLogger(ErrorContextLogger): """Logger integrated with metrics""" @log_performance("transcription") async def log_transcription( self, session_id: str, language: str, audio_size: int ): start_time = time.time() try: result = await transcribe(audio_data, language) # Log success self.info( "transcription_completed", audio_size=audio_size, language=language, result_length=len(result) ) # Update metrics transcription_requests.labels( session_id=session_id, language=language, status="success" ).inc() transcription_duration.labels( language=language ).observe(time.time() - start_time) return result except Exception as e: # Log failure self.error_with_context( "transcription_failed", error=e, audio_size=audio_size, language=language ) # Update metrics transcription_requests.labels( session_id=session_id, language=language, status="error" ).inc() raise COMMAND_BLOCK: from prometheus_client import Counter, Histogram # Metrics transcription_requests = Counter( 'transcription_requests_total', 'Total transcription requests', ['session_id', 'language', 'status'] ) transcription_duration = Histogram( 'transcription_duration_seconds', 'Transcription duration', ['language'] ) class MonitoredLogger(ErrorContextLogger): """Logger integrated with metrics""" @log_performance("transcription") async def log_transcription( self, session_id: str, language: str, audio_size: int ): start_time = time.time() try: result = await transcribe(audio_data, language) # Log success self.info( "transcription_completed", audio_size=audio_size, language=language, result_length=len(result) ) # Update metrics transcription_requests.labels( session_id=session_id, language=language, status="success" ).inc() transcription_duration.labels( language=language ).observe(time.time() - start_time) return result except Exception as e: # Log failure self.error_with_context( "transcription_failed", error=e, audio_size=audio_size, language=language ) # Update metrics transcription_requests.labels( session_id=session_id, language=language, status="error" ).inc() raise - Always include session/correlation IDs - Makes debugging possible - Use structured logging - JSON is searchable and parseable - Sample high-frequency events - Don't fill disk with audio chunk logs - Log performance metrics - Know what's slow before users complain - Preserve error context - Log everything needed to debug - Set appropriate log levels - Debug in dev, Info in production - Rotate log files - Don't fill up disk - Centralize logs - Use log aggregation for multiple instances - Alert on log patterns - Error rate spikes should trigger alerts - Test your logging - Verify logs are useful during incidents - Mean Time to Resolution (MTTR) dropped from hours to minutes - Debug sessions became productive instead of frustrating - Production incidents were traceable across services - Performance bottlenecks became immediately visible - Customer support could look up exact session issues