Tools: Logging Strategies for Real-Time Applications: Session Tracking at Scale

Tools: Logging Strategies for Real-Time Applications: Session Tracking at Scale

The Real-Time Logging Challenge ## Strategy #1: Session-Based Logging ## Strategy #2: Structured Logging ## Strategy #3: Correlation IDs ## Strategy #4: Performance Logging ## Strategy #5: Log Levels by Component ## Strategy #6: Sampling for High-Volume Events ## Strategy #7: Error Context Preservation ## Strategy #8: Log Aggregation & Search ## Strategy #9: Monitoring Integration ## Best Practices ## The Results ## Final Thoughts Hey builders! 👋 Let's talk about something that sounds boring but becomes absolutely critical in production: logging. When you're running hundreds of concurrent sessions, bad logging is the difference between finding bugs in minutes vs. spending days debugging. Let me share how we built a logging system that actually helps instead of drowns you in noise. Traditional logging advice doesn't work for real-time apps. Here's why: Traditional app logging: Real-time app with 100 concurrent sessions: Which session failed? Good luck finding out. Every log entry MUST include session context: Now every log line is traceable to a specific session! Stop logging strings. Log structured data: Now you can search logs by specific fields! Track requests across multiple services: Now you can trace a request from client → your service → Riva → back! Log performance metrics for every operation: Different components need different log levels: Don't log EVERY audio chunk - sample intelligently: When errors happen, log EVERYTHING relevant: Use ELK Stack or Loki for log aggregation: Now you can query logs with LogQL: Connect logs to metrics: After implementing these logging strategies: Good logging is invisible when everything works, but invaluable when things break. The goal isn't to log everything - it's to log the right things at the right level with the right context. Think of logs as breadcrumbs for future you. When you're debugging at 3 AM, you'll thank past you for logging that session ID. What's your logging setup? Any horror stories about debugging without proper logs? Share below! 🚀 Templates let you quickly answer FAQs or store snippets for re-use. Are you sure you want to ? It will become hidden in your post, but will still be visible via the comment's permalink. as well , this person and/or CODE_BLOCK: 2024-01-15 10:30:45 INFO Processing request 2024-01-15 10:30:46 ERROR Failed to connect to database 2024-01-15 10:30:47 INFO Processing request CODE_BLOCK: 2024-01-15 10:30:45 INFO Processing request 2024-01-15 10:30:46 ERROR Failed to connect to database 2024-01-15 10:30:47 INFO Processing request CODE_BLOCK: 2024-01-15 10:30:45 INFO Processing request 2024-01-15 10:30:46 ERROR Failed to connect to database 2024-01-15 10:30:47 INFO Processing request CODE_BLOCK: 2024-01-15 10:30:45.123 INFO Processing audio 2024-01-15 10:30:45.124 INFO Processing audio 2024-01-15 10:30:45.125 ERROR Connection failed 2024-01-15 10:30:45.126 INFO Processing audio 2024-01-15 10:30:45.127 INFO Processing audio 2024-01-15 10:30:45.128 INFO Processing audio CODE_BLOCK: 2024-01-15 10:30:45.123 INFO Processing audio 2024-01-15 10:30:45.124 INFO Processing audio 2024-01-15 10:30:45.125 ERROR Connection failed 2024-01-15 10:30:45.126 INFO Processing audio 2024-01-15 10:30:45.127 INFO Processing audio 2024-01-15 10:30:45.128 INFO Processing audio CODE_BLOCK: 2024-01-15 10:30:45.123 INFO Processing audio 2024-01-15 10:30:45.124 INFO Processing audio 2024-01-15 10:30:45.125 ERROR Connection failed 2024-01-15 10:30:45.126 INFO Processing audio 2024-01-15 10:30:45.127 INFO Processing audio 2024-01-15 10:30:45.128 INFO Processing audio COMMAND_BLOCK: import logging import uuid from contextvars import ContextVar from typing import Optional # Context variable for session tracking session_context: ContextVar[Optional[str]] = ContextVar('session_context', default=None) class SessionLoggerAdapter(logging.LoggerAdapter): """Logger that automatically includes session context""" def process(self, msg, kwargs): session_id = session_context.get() if session_id: return f'[{session_id}] {msg}', kwargs return msg, kwargs def get_logger(name: str) -> SessionLoggerAdapter: """Get a session-aware logger""" base_logger = logging.getLogger(name) return SessionLoggerAdapter(base_logger, {}) # Usage in your endpoint logger = get_logger(__name__) @app.websocket("/transcribe/{session_id}") async def transcribe_endpoint(websocket: WebSocket, session_id: str): # Set session context for this async task session_context.set(session_id) logger.info("Session started") # Logs: [abc-123] Session started try: await process_transcription(websocket) except Exception as e: logger.error(f"Transcription failed: {e}") # Logs: [abc-123] Transcription failed: ... finally: logger.info("Session ended") # Logs: [abc-123] Session ended COMMAND_BLOCK: import logging import uuid from contextvars import ContextVar from typing import Optional # Context variable for session tracking session_context: ContextVar[Optional[str]] = ContextVar('session_context', default=None) class SessionLoggerAdapter(logging.LoggerAdapter): """Logger that automatically includes session context""" def process(self, msg, kwargs): session_id = session_context.get() if session_id: return f'[{session_id}] {msg}', kwargs return msg, kwargs def get_logger(name: str) -> SessionLoggerAdapter: """Get a session-aware logger""" base_logger = logging.getLogger(name) return SessionLoggerAdapter(base_logger, {}) # Usage in your endpoint logger = get_logger(__name__) @app.websocket("/transcribe/{session_id}") async def transcribe_endpoint(websocket: WebSocket, session_id: str): # Set session context for this async task session_context.set(session_id) logger.info("Session started") # Logs: [abc-123] Session started try: await process_transcription(websocket) except Exception as e: logger.error(f"Transcription failed: {e}") # Logs: [abc-123] Transcription failed: ... finally: logger.info("Session ended") # Logs: [abc-123] Session ended COMMAND_BLOCK: import logging import uuid from contextvars import ContextVar from typing import Optional # Context variable for session tracking session_context: ContextVar[Optional[str]] = ContextVar('session_context', default=None) class SessionLoggerAdapter(logging.LoggerAdapter): """Logger that automatically includes session context""" def process(self, msg, kwargs): session_id = session_context.get() if session_id: return f'[{session_id}] {msg}', kwargs return msg, kwargs def get_logger(name: str) -> SessionLoggerAdapter: """Get a session-aware logger""" base_logger = logging.getLogger(name) return SessionLoggerAdapter(base_logger, {}) # Usage in your endpoint logger = get_logger(__name__) @app.websocket("/transcribe/{session_id}") async def transcribe_endpoint(websocket: WebSocket, session_id: str): # Set session context for this async task session_context.set(session_id) logger.info("Session started") # Logs: [abc-123] Session started try: await process_transcription(websocket) except Exception as e: logger.error(f"Transcription failed: {e}") # Logs: [abc-123] Transcription failed: ... finally: logger.info("Session ended") # Logs: [abc-123] Session ended COMMAND_BLOCK: import logging import json from datetime import datetime from typing import Any, Dict class StructuredLogger: """Logger that outputs structured JSON""" def __init__(self, name: str): self.logger = logging.getLogger(name) def _log(self, level: int, event: str, **kwargs): """Log structured data as JSON""" log_data = { "timestamp": datetime.utcnow().isoformat(), "event": event, "session_id": session_context.get(), **kwargs } self.logger.log(level, json.dumps(log_data)) def info(self, event: str, **kwargs): self._log(logging.INFO, event, **kwargs) def error(self, event: str, error: Exception = None, **kwargs): error_data = kwargs if error: error_data.update({ "error_type": type(error).__name__, "error_message": str(error) }) self._log(logging.ERROR, event, **error_data) # Usage logger = StructuredLogger(__name__) logger.info( "audio_received", audio_size=len(audio_data), sample_rate=16000, channels=1 ) # Outputs: # {"timestamp": "2024-01-15T10:30:45.123Z", "event": "audio_received", # "session_id": "abc-123", "audio_size": 16000, "sample_rate": 16000, "channels": 1} COMMAND_BLOCK: import logging import json from datetime import datetime from typing import Any, Dict class StructuredLogger: """Logger that outputs structured JSON""" def __init__(self, name: str): self.logger = logging.getLogger(name) def _log(self, level: int, event: str, **kwargs): """Log structured data as JSON""" log_data = { "timestamp": datetime.utcnow().isoformat(), "event": event, "session_id": session_context.get(), **kwargs } self.logger.log(level, json.dumps(log_data)) def info(self, event: str, **kwargs): self._log(logging.INFO, event, **kwargs) def error(self, event: str, error: Exception = None, **kwargs): error_data = kwargs if error: error_data.update({ "error_type": type(error).__name__, "error_message": str(error) }) self._log(logging.ERROR, event, **error_data) # Usage logger = StructuredLogger(__name__) logger.info( "audio_received", audio_size=len(audio_data), sample_rate=16000, channels=1 ) # Outputs: # {"timestamp": "2024-01-15T10:30:45.123Z", "event": "audio_received", # "session_id": "abc-123", "audio_size": 16000, "sample_rate": 16000, "channels": 1} COMMAND_BLOCK: import logging import json from datetime import datetime from typing import Any, Dict class StructuredLogger: """Logger that outputs structured JSON""" def __init__(self, name: str): self.logger = logging.getLogger(name) def _log(self, level: int, event: str, **kwargs): """Log structured data as JSON""" log_data = { "timestamp": datetime.utcnow().isoformat(), "event": event, "session_id": session_context.get(), **kwargs } self.logger.log(level, json.dumps(log_data)) def info(self, event: str, **kwargs): self._log(logging.INFO, event, **kwargs) def error(self, event: str, error: Exception = None, **kwargs): error_data = kwargs if error: error_data.update({ "error_type": type(error).__name__, "error_message": str(error) }) self._log(logging.ERROR, event, **error_data) # Usage logger = StructuredLogger(__name__) logger.info( "audio_received", audio_size=len(audio_data), sample_rate=16000, channels=1 ) # Outputs: # {"timestamp": "2024-01-15T10:30:45.123Z", "event": "audio_received", # "session_id": "abc-123", "audio_size": 16000, "sample_rate": 16000, "channels": 1} COMMAND_BLOCK: from contextvars import ContextVar import uuid # Correlation ID for tracking across services correlation_id: ContextVar[Optional[str]] = ContextVar('correlation_id', default=None) class CorrelatedLogger(StructuredLogger): """Logger with correlation ID support""" def _log(self, level: int, event: str, **kwargs): log_data = { "timestamp": datetime.utcnow().isoformat(), "event": event, "session_id": session_context.get(), "correlation_id": correlation_id.get(), **kwargs } self.logger.log(level, json.dumps(log_data)) @app.websocket("/transcribe/{session_id}") async def transcribe_endpoint(websocket: WebSocket, session_id: str): # Generate correlation ID for this request corr_id = str(uuid.uuid4()) correlation_id.set(corr_id) session_context.set(session_id) logger = CorrelatedLogger(__name__) logger.info("session_started") # When calling Riva service, pass correlation ID await riva_client.transcribe( audio_data, metadata={"correlation_id": corr_id} ) COMMAND_BLOCK: from contextvars import ContextVar import uuid # Correlation ID for tracking across services correlation_id: ContextVar[Optional[str]] = ContextVar('correlation_id', default=None) class CorrelatedLogger(StructuredLogger): """Logger with correlation ID support""" def _log(self, level: int, event: str, **kwargs): log_data = { "timestamp": datetime.utcnow().isoformat(), "event": event, "session_id": session_context.get(), "correlation_id": correlation_id.get(), **kwargs } self.logger.log(level, json.dumps(log_data)) @app.websocket("/transcribe/{session_id}") async def transcribe_endpoint(websocket: WebSocket, session_id: str): # Generate correlation ID for this request corr_id = str(uuid.uuid4()) correlation_id.set(corr_id) session_context.set(session_id) logger = CorrelatedLogger(__name__) logger.info("session_started") # When calling Riva service, pass correlation ID await riva_client.transcribe( audio_data, metadata={"correlation_id": corr_id} ) COMMAND_BLOCK: from contextvars import ContextVar import uuid # Correlation ID for tracking across services correlation_id: ContextVar[Optional[str]] = ContextVar('correlation_id', default=None) class CorrelatedLogger(StructuredLogger): """Logger with correlation ID support""" def _log(self, level: int, event: str, **kwargs): log_data = { "timestamp": datetime.utcnow().isoformat(), "event": event, "session_id": session_context.get(), "correlation_id": correlation_id.get(), **kwargs } self.logger.log(level, json.dumps(log_data)) @app.websocket("/transcribe/{session_id}") async def transcribe_endpoint(websocket: WebSocket, session_id: str): # Generate correlation ID for this request corr_id = str(uuid.uuid4()) correlation_id.set(corr_id) session_context.set(session_id) logger = CorrelatedLogger(__name__) logger.info("session_started") # When calling Riva service, pass correlation ID await riva_client.transcribe( audio_data, metadata={"correlation_id": corr_id} ) COMMAND_BLOCK: import time from functools import wraps from typing import Callable def log_performance(operation: str): """Decorator to log operation performance""" def decorator(func: Callable): @wraps(func) async def wrapper(*args, **kwargs): logger = CorrelatedLogger(func.__module__) start_time = time.time() try: result = await func(*args, **kwargs) duration = time.time() - start_time logger.info( f"{operation}_completed", duration_ms=round(duration * 1000, 2), success=True ) return result except Exception as e: duration = time.time() - start_time logger.error( f"{operation}_failed", duration_ms=round(duration * 1000, 2), success=False, error=e ) raise return wrapper return decorator # Usage @log_performance("audio_transcription") async def transcribe_audio(audio: bytes, session_id: str) -> str: # Transcription logic return await riva_client.transcribe(audio) # Logs: # {"event": "audio_transcription_completed", "duration_ms": 245.67, "success": true} COMMAND_BLOCK: import time from functools import wraps from typing import Callable def log_performance(operation: str): """Decorator to log operation performance""" def decorator(func: Callable): @wraps(func) async def wrapper(*args, **kwargs): logger = CorrelatedLogger(func.__module__) start_time = time.time() try: result = await func(*args, **kwargs) duration = time.time() - start_time logger.info( f"{operation}_completed", duration_ms=round(duration * 1000, 2), success=True ) return result except Exception as e: duration = time.time() - start_time logger.error( f"{operation}_failed", duration_ms=round(duration * 1000, 2), success=False, error=e ) raise return wrapper return decorator # Usage @log_performance("audio_transcription") async def transcribe_audio(audio: bytes, session_id: str) -> str: # Transcription logic return await riva_client.transcribe(audio) # Logs: # {"event": "audio_transcription_completed", "duration_ms": 245.67, "success": true} COMMAND_BLOCK: import time from functools import wraps from typing import Callable def log_performance(operation: str): """Decorator to log operation performance""" def decorator(func: Callable): @wraps(func) async def wrapper(*args, **kwargs): logger = CorrelatedLogger(func.__module__) start_time = time.time() try: result = await func(*args, **kwargs) duration = time.time() - start_time logger.info( f"{operation}_completed", duration_ms=round(duration * 1000, 2), success=True ) return result except Exception as e: duration = time.time() - start_time logger.error( f"{operation}_failed", duration_ms=round(duration * 1000, 2), success=False, error=e ) raise return wrapper return decorator # Usage @log_performance("audio_transcription") async def transcribe_audio(audio: bytes, session_id: str) -> str: # Transcription logic return await riva_client.transcribe(audio) # Logs: # {"event": "audio_transcription_completed", "duration_ms": 245.67, "success": true} COMMAND_BLOCK: import logging.config LOGGING_CONFIG = { "version": 1, "disable_existing_loggers": False, "formatters": { "json": { "()": "pythonjsonlogger.jsonlogger.JsonFormatter", "format": "%(timestamp)s %(level)s %(name)s %(message)s" } }, "handlers": { "console": { "class": "logging.StreamHandler", "formatter": "json", "stream": "ext://sys.stdout" }, "file": { "class": "logging.handlers.RotatingFileHandler", "formatter": "json", "filename": "logs/app.log", "maxBytes": 10485760, # 10MB "backupCount": 5 } }, "loggers": { # Your app - verbose logging "app": { "level": "DEBUG", "handlers": ["console", "file"], "propagate": False }, # Riva client - only warnings and errors "riva_client": { "level": "WARNING", "handlers": ["console", "file"], "propagate": False }, # Third-party libraries - minimal logging "uvicorn": { "level": "INFO", "handlers": ["console"], "propagate": False }, "grpc": { "level": "ERROR", "handlers": ["console"], "propagate": False } }, "root": { "level": "INFO", "handlers": ["console", "file"] } } # Apply configuration logging.config.dictConfig(LOGGING_CONFIG) COMMAND_BLOCK: import logging.config LOGGING_CONFIG = { "version": 1, "disable_existing_loggers": False, "formatters": { "json": { "()": "pythonjsonlogger.jsonlogger.JsonFormatter", "format": "%(timestamp)s %(level)s %(name)s %(message)s" } }, "handlers": { "console": { "class": "logging.StreamHandler", "formatter": "json", "stream": "ext://sys.stdout" }, "file": { "class": "logging.handlers.RotatingFileHandler", "formatter": "json", "filename": "logs/app.log", "maxBytes": 10485760, # 10MB "backupCount": 5 } }, "loggers": { # Your app - verbose logging "app": { "level": "DEBUG", "handlers": ["console", "file"], "propagate": False }, # Riva client - only warnings and errors "riva_client": { "level": "WARNING", "handlers": ["console", "file"], "propagate": False }, # Third-party libraries - minimal logging "uvicorn": { "level": "INFO", "handlers": ["console"], "propagate": False }, "grpc": { "level": "ERROR", "handlers": ["console"], "propagate": False } }, "root": { "level": "INFO", "handlers": ["console", "file"] } } # Apply configuration logging.config.dictConfig(LOGGING_CONFIG) COMMAND_BLOCK: import logging.config LOGGING_CONFIG = { "version": 1, "disable_existing_loggers": False, "formatters": { "json": { "()": "pythonjsonlogger.jsonlogger.JsonFormatter", "format": "%(timestamp)s %(level)s %(name)s %(message)s" } }, "handlers": { "console": { "class": "logging.StreamHandler", "formatter": "json", "stream": "ext://sys.stdout" }, "file": { "class": "logging.handlers.RotatingFileHandler", "formatter": "json", "filename": "logs/app.log", "maxBytes": 10485760, # 10MB "backupCount": 5 } }, "loggers": { # Your app - verbose logging "app": { "level": "DEBUG", "handlers": ["console", "file"], "propagate": False }, # Riva client - only warnings and errors "riva_client": { "level": "WARNING", "handlers": ["console", "file"], "propagate": False }, # Third-party libraries - minimal logging "uvicorn": { "level": "INFO", "handlers": ["console"], "propagate": False }, "grpc": { "level": "ERROR", "handlers": ["console"], "propagate": False } }, "root": { "level": "INFO", "handlers": ["console", "file"] } } # Apply configuration logging.config.dictConfig(LOGGING_CONFIG) COMMAND_BLOCK: import random class SampledLogger(CorrelatedLogger): """Logger with sampling support for high-frequency events""" def __init__(self, name: str, sample_rate: float = 0.01): super().__init__(name) self.sample_rate = sample_rate def sample(self, event: str, **kwargs): """Log with sampling""" if random.random() < self.sample_rate: self.info(event, sampled=True, **kwargs) logger = SampledLogger(__name__, sample_rate=0.01) # Log 1% of events # Log every 100th audio chunk logger.sample( "audio_chunk_processed", chunk_size=len(chunk), total_chunks=chunk_count ) COMMAND_BLOCK: import random class SampledLogger(CorrelatedLogger): """Logger with sampling support for high-frequency events""" def __init__(self, name: str, sample_rate: float = 0.01): super().__init__(name) self.sample_rate = sample_rate def sample(self, event: str, **kwargs): """Log with sampling""" if random.random() < self.sample_rate: self.info(event, sampled=True, **kwargs) logger = SampledLogger(__name__, sample_rate=0.01) # Log 1% of events # Log every 100th audio chunk logger.sample( "audio_chunk_processed", chunk_size=len(chunk), total_chunks=chunk_count ) COMMAND_BLOCK: import random class SampledLogger(CorrelatedLogger): """Logger with sampling support for high-frequency events""" def __init__(self, name: str, sample_rate: float = 0.01): super().__init__(name) self.sample_rate = sample_rate def sample(self, event: str, **kwargs): """Log with sampling""" if random.random() < self.sample_rate: self.info(event, sampled=True, **kwargs) logger = SampledLogger(__name__, sample_rate=0.01) # Log 1% of events # Log every 100th audio chunk logger.sample( "audio_chunk_processed", chunk_size=len(chunk), total_chunks=chunk_count ) COMMAND_BLOCK: import traceback import sys class ErrorContextLogger(CorrelatedLogger): """Logger with rich error context""" def error_with_context( self, event: str, error: Exception, **kwargs ): """Log error with full context""" # Get exception info exc_type, exc_value, exc_traceback = sys.exc_info() # Build error context error_context = { "error_type": type(error).__name__, "error_message": str(error), "error_code": getattr(error, 'code', None), "traceback": traceback.format_exc(), **kwargs } self.error(event, **error_context) # Usage logger = ErrorContextLogger(__name__) try: await riva_client.transcribe(audio) except Exception as e: logger.error_with_context( "transcription_failed", error=e, audio_size=len(audio), sample_rate=sample_rate, language=language, riva_endpoint=riva_client.endpoint ) raise COMMAND_BLOCK: import traceback import sys class ErrorContextLogger(CorrelatedLogger): """Logger with rich error context""" def error_with_context( self, event: str, error: Exception, **kwargs ): """Log error with full context""" # Get exception info exc_type, exc_value, exc_traceback = sys.exc_info() # Build error context error_context = { "error_type": type(error).__name__, "error_message": str(error), "error_code": getattr(error, 'code', None), "traceback": traceback.format_exc(), **kwargs } self.error(event, **error_context) # Usage logger = ErrorContextLogger(__name__) try: await riva_client.transcribe(audio) except Exception as e: logger.error_with_context( "transcription_failed", error=e, audio_size=len(audio), sample_rate=sample_rate, language=language, riva_endpoint=riva_client.endpoint ) raise COMMAND_BLOCK: import traceback import sys class ErrorContextLogger(CorrelatedLogger): """Logger with rich error context""" def error_with_context( self, event: str, error: Exception, **kwargs ): """Log error with full context""" # Get exception info exc_type, exc_value, exc_traceback = sys.exc_info() # Build error context error_context = { "error_type": type(error).__name__, "error_message": str(error), "error_code": getattr(error, 'code', None), "traceback": traceback.format_exc(), **kwargs } self.error(event, **error_context) # Usage logger = ErrorContextLogger(__name__) try: await riva_client.transcribe(audio) except Exception as e: logger.error_with_context( "transcription_failed", error=e, audio_size=len(audio), sample_rate=sample_rate, language=language, riva_endpoint=riva_client.endpoint ) raise COMMAND_BLOCK: # Docker Compose for Loki + Grafana version: '3' services: loki: image: grafana/loki:latest ports: - "3100:3100" command: -config.file=/etc/loki/local-config.yaml promtail: image: grafana/promtail:latest volumes: - ./logs:/var/log - ./promtail-config.yaml:/etc/promtail/config.yaml command: -config.file=/etc/promtail/config.yaml grafana: image: grafana/grafana:latest ports: - "3000:3000" environment: - GF_SECURITY_ADMIN_PASSWORD=admin COMMAND_BLOCK: # Docker Compose for Loki + Grafana version: '3' services: loki: image: grafana/loki:latest ports: - "3100:3100" command: -config.file=/etc/loki/local-config.yaml promtail: image: grafana/promtail:latest volumes: - ./logs:/var/log - ./promtail-config.yaml:/etc/promtail/config.yaml command: -config.file=/etc/promtail/config.yaml grafana: image: grafana/grafana:latest ports: - "3000:3000" environment: - GF_SECURITY_ADMIN_PASSWORD=admin COMMAND_BLOCK: # Docker Compose for Loki + Grafana version: '3' services: loki: image: grafana/loki:latest ports: - "3100:3100" command: -config.file=/etc/loki/local-config.yaml promtail: image: grafana/promtail:latest volumes: - ./logs:/var/log - ./promtail-config.yaml:/etc/promtail/config.yaml command: -config.file=/etc/promtail/config.yaml grafana: image: grafana/grafana:latest ports: - "3000:3000" environment: - GF_SECURITY_ADMIN_PASSWORD=admin COMMAND_BLOCK: # Find all errors for a specific session {job="transcription-service"} |= "session_id=abc-123" | json | level="ERROR" # Find slow transcriptions {job="transcription-service"} | json | duration_ms > 1000 # Count errors by type sum by (error_type) (count_over_time({job="transcription-service"} | json | level="ERROR" [1h])) COMMAND_BLOCK: # Find all errors for a specific session {job="transcription-service"} |= "session_id=abc-123" | json | level="ERROR" # Find slow transcriptions {job="transcription-service"} | json | duration_ms > 1000 # Count errors by type sum by (error_type) (count_over_time({job="transcription-service"} | json | level="ERROR" [1h])) COMMAND_BLOCK: # Find all errors for a specific session {job="transcription-service"} |= "session_id=abc-123" | json | level="ERROR" # Find slow transcriptions {job="transcription-service"} | json | duration_ms > 1000 # Count errors by type sum by (error_type) (count_over_time({job="transcription-service"} | json | level="ERROR" [1h])) COMMAND_BLOCK: from prometheus_client import Counter, Histogram # Metrics transcription_requests = Counter( 'transcription_requests_total', 'Total transcription requests', ['session_id', 'language', 'status'] ) transcription_duration = Histogram( 'transcription_duration_seconds', 'Transcription duration', ['language'] ) class MonitoredLogger(ErrorContextLogger): """Logger integrated with metrics""" @log_performance("transcription") async def log_transcription( self, session_id: str, language: str, audio_size: int ): start_time = time.time() try: result = await transcribe(audio_data, language) # Log success self.info( "transcription_completed", audio_size=audio_size, language=language, result_length=len(result) ) # Update metrics transcription_requests.labels( session_id=session_id, language=language, status="success" ).inc() transcription_duration.labels( language=language ).observe(time.time() - start_time) return result except Exception as e: # Log failure self.error_with_context( "transcription_failed", error=e, audio_size=audio_size, language=language ) # Update metrics transcription_requests.labels( session_id=session_id, language=language, status="error" ).inc() raise COMMAND_BLOCK: from prometheus_client import Counter, Histogram # Metrics transcription_requests = Counter( 'transcription_requests_total', 'Total transcription requests', ['session_id', 'language', 'status'] ) transcription_duration = Histogram( 'transcription_duration_seconds', 'Transcription duration', ['language'] ) class MonitoredLogger(ErrorContextLogger): """Logger integrated with metrics""" @log_performance("transcription") async def log_transcription( self, session_id: str, language: str, audio_size: int ): start_time = time.time() try: result = await transcribe(audio_data, language) # Log success self.info( "transcription_completed", audio_size=audio_size, language=language, result_length=len(result) ) # Update metrics transcription_requests.labels( session_id=session_id, language=language, status="success" ).inc() transcription_duration.labels( language=language ).observe(time.time() - start_time) return result except Exception as e: # Log failure self.error_with_context( "transcription_failed", error=e, audio_size=audio_size, language=language ) # Update metrics transcription_requests.labels( session_id=session_id, language=language, status="error" ).inc() raise COMMAND_BLOCK: from prometheus_client import Counter, Histogram # Metrics transcription_requests = Counter( 'transcription_requests_total', 'Total transcription requests', ['session_id', 'language', 'status'] ) transcription_duration = Histogram( 'transcription_duration_seconds', 'Transcription duration', ['language'] ) class MonitoredLogger(ErrorContextLogger): """Logger integrated with metrics""" @log_performance("transcription") async def log_transcription( self, session_id: str, language: str, audio_size: int ): start_time = time.time() try: result = await transcribe(audio_data, language) # Log success self.info( "transcription_completed", audio_size=audio_size, language=language, result_length=len(result) ) # Update metrics transcription_requests.labels( session_id=session_id, language=language, status="success" ).inc() transcription_duration.labels( language=language ).observe(time.time() - start_time) return result except Exception as e: # Log failure self.error_with_context( "transcription_failed", error=e, audio_size=audio_size, language=language ) # Update metrics transcription_requests.labels( session_id=session_id, language=language, status="error" ).inc() raise - Always include session/correlation IDs - Makes debugging possible - Use structured logging - JSON is searchable and parseable - Sample high-frequency events - Don't fill disk with audio chunk logs - Log performance metrics - Know what's slow before users complain - Preserve error context - Log everything needed to debug - Set appropriate log levels - Debug in dev, Info in production - Rotate log files - Don't fill up disk - Centralize logs - Use log aggregation for multiple instances - Alert on log patterns - Error rate spikes should trigger alerts - Test your logging - Verify logs are useful during incidents - Mean Time to Resolution (MTTR) dropped from hours to minutes - Debug sessions became productive instead of frustrating - Production incidents were traceable across services - Performance bottlenecks became immediately visible - Customer support could look up exact session issues