Tools
Tools: Logging Strategies for Real-Time Applications: Session Tracking at Scale
2026-01-31
0 views
admin
The Real-Time Logging Challenge ## Strategy #1: Session-Based Logging ## Strategy #2: Structured Logging ## Strategy #3: Correlation IDs ## Strategy #4: Performance Logging ## Strategy #5: Log Levels by Component ## Strategy #6: Sampling for High-Volume Events ## Strategy #7: Error Context Preservation ## Strategy #8: Log Aggregation & Search ## Strategy #9: Monitoring Integration ## Best Practices ## The Results ## Final Thoughts Hey builders! 👋 Let's talk about something that sounds boring but becomes absolutely critical in production: logging. When you're running hundreds of concurrent sessions, bad logging is the difference between finding bugs in minutes vs. spending days debugging. Let me share how we built a logging system that actually helps instead of drowns you in noise. Traditional logging advice doesn't work for real-time apps. Here's why: Traditional app logging: Real-time app with 100 concurrent sessions: Which session failed? Good luck finding out. Every log entry MUST include session context: Now every log line is traceable to a specific session! Stop logging strings. Log structured data: Now you can search logs by specific fields! Track requests across multiple services: Now you can trace a request from client → your service → Riva → back! Log performance metrics for every operation: Different components need different log levels: Don't log EVERY audio chunk - sample intelligently: When errors happen, log EVERYTHING relevant: Use ELK Stack or Loki for log aggregation: Now you can query logs with LogQL: Connect logs to metrics: After implementing these logging strategies: Good logging is invisible when everything works, but invaluable when things break. The goal isn't to log everything - it's to log the right things at the right level with the right context. Think of logs as breadcrumbs for future you. When you're debugging at 3 AM, you'll thank past you for logging that session ID. What's your logging setup? Any horror stories about debugging without proper logs? Share below! 🚀 Templates let you quickly answer FAQs or store snippets for re-use. Are you sure you want to hide this comment? It will become hidden in your post, but will still be visible via the comment's permalink. Hide child comments as well For further actions, you may consider blocking this person and/or reporting abuse CODE_BLOCK:
2024-01-15 10:30:45 INFO Processing request
2024-01-15 10:30:46 ERROR Failed to connect to database
2024-01-15 10:30:47 INFO Processing request Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
2024-01-15 10:30:45 INFO Processing request
2024-01-15 10:30:46 ERROR Failed to connect to database
2024-01-15 10:30:47 INFO Processing request CODE_BLOCK:
2024-01-15 10:30:45 INFO Processing request
2024-01-15 10:30:46 ERROR Failed to connect to database
2024-01-15 10:30:47 INFO Processing request CODE_BLOCK:
2024-01-15 10:30:45.123 INFO Processing audio
2024-01-15 10:30:45.124 INFO Processing audio
2024-01-15 10:30:45.125 ERROR Connection failed
2024-01-15 10:30:45.126 INFO Processing audio
2024-01-15 10:30:45.127 INFO Processing audio
2024-01-15 10:30:45.128 INFO Processing audio Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
2024-01-15 10:30:45.123 INFO Processing audio
2024-01-15 10:30:45.124 INFO Processing audio
2024-01-15 10:30:45.125 ERROR Connection failed
2024-01-15 10:30:45.126 INFO Processing audio
2024-01-15 10:30:45.127 INFO Processing audio
2024-01-15 10:30:45.128 INFO Processing audio CODE_BLOCK:
2024-01-15 10:30:45.123 INFO Processing audio
2024-01-15 10:30:45.124 INFO Processing audio
2024-01-15 10:30:45.125 ERROR Connection failed
2024-01-15 10:30:45.126 INFO Processing audio
2024-01-15 10:30:45.127 INFO Processing audio
2024-01-15 10:30:45.128 INFO Processing audio COMMAND_BLOCK:
import logging
import uuid
from contextvars import ContextVar
from typing import Optional # Context variable for session tracking
session_context: ContextVar[Optional[str]] = ContextVar('session_context', default=None) class SessionLoggerAdapter(logging.LoggerAdapter): """Logger that automatically includes session context""" def process(self, msg, kwargs): session_id = session_context.get() if session_id: return f'[{session_id}] {msg}', kwargs return msg, kwargs def get_logger(name: str) -> SessionLoggerAdapter: """Get a session-aware logger""" base_logger = logging.getLogger(name) return SessionLoggerAdapter(base_logger, {}) # Usage in your endpoint
logger = get_logger(__name__) @app.websocket("/transcribe/{session_id}")
async def transcribe_endpoint(websocket: WebSocket, session_id: str): # Set session context for this async task session_context.set(session_id) logger.info("Session started") # Logs: [abc-123] Session started try: await process_transcription(websocket) except Exception as e: logger.error(f"Transcription failed: {e}") # Logs: [abc-123] Transcription failed: ... finally: logger.info("Session ended") # Logs: [abc-123] Session ended Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
import logging
import uuid
from contextvars import ContextVar
from typing import Optional # Context variable for session tracking
session_context: ContextVar[Optional[str]] = ContextVar('session_context', default=None) class SessionLoggerAdapter(logging.LoggerAdapter): """Logger that automatically includes session context""" def process(self, msg, kwargs): session_id = session_context.get() if session_id: return f'[{session_id}] {msg}', kwargs return msg, kwargs def get_logger(name: str) -> SessionLoggerAdapter: """Get a session-aware logger""" base_logger = logging.getLogger(name) return SessionLoggerAdapter(base_logger, {}) # Usage in your endpoint
logger = get_logger(__name__) @app.websocket("/transcribe/{session_id}")
async def transcribe_endpoint(websocket: WebSocket, session_id: str): # Set session context for this async task session_context.set(session_id) logger.info("Session started") # Logs: [abc-123] Session started try: await process_transcription(websocket) except Exception as e: logger.error(f"Transcription failed: {e}") # Logs: [abc-123] Transcription failed: ... finally: logger.info("Session ended") # Logs: [abc-123] Session ended COMMAND_BLOCK:
import logging
import uuid
from contextvars import ContextVar
from typing import Optional # Context variable for session tracking
session_context: ContextVar[Optional[str]] = ContextVar('session_context', default=None) class SessionLoggerAdapter(logging.LoggerAdapter): """Logger that automatically includes session context""" def process(self, msg, kwargs): session_id = session_context.get() if session_id: return f'[{session_id}] {msg}', kwargs return msg, kwargs def get_logger(name: str) -> SessionLoggerAdapter: """Get a session-aware logger""" base_logger = logging.getLogger(name) return SessionLoggerAdapter(base_logger, {}) # Usage in your endpoint
logger = get_logger(__name__) @app.websocket("/transcribe/{session_id}")
async def transcribe_endpoint(websocket: WebSocket, session_id: str): # Set session context for this async task session_context.set(session_id) logger.info("Session started") # Logs: [abc-123] Session started try: await process_transcription(websocket) except Exception as e: logger.error(f"Transcription failed: {e}") # Logs: [abc-123] Transcription failed: ... finally: logger.info("Session ended") # Logs: [abc-123] Session ended COMMAND_BLOCK:
import logging
import json
from datetime import datetime
from typing import Any, Dict class StructuredLogger: """Logger that outputs structured JSON""" def __init__(self, name: str): self.logger = logging.getLogger(name) def _log(self, level: int, event: str, **kwargs): """Log structured data as JSON""" log_data = { "timestamp": datetime.utcnow().isoformat(), "event": event, "session_id": session_context.get(), **kwargs } self.logger.log(level, json.dumps(log_data)) def info(self, event: str, **kwargs): self._log(logging.INFO, event, **kwargs) def error(self, event: str, error: Exception = None, **kwargs): error_data = kwargs if error: error_data.update({ "error_type": type(error).__name__, "error_message": str(error) }) self._log(logging.ERROR, event, **error_data) # Usage
logger = StructuredLogger(__name__) logger.info( "audio_received", audio_size=len(audio_data), sample_rate=16000, channels=1
) # Outputs:
# {"timestamp": "2024-01-15T10:30:45.123Z", "event": "audio_received", # "session_id": "abc-123", "audio_size": 16000, "sample_rate": 16000, "channels": 1} Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
import logging
import json
from datetime import datetime
from typing import Any, Dict class StructuredLogger: """Logger that outputs structured JSON""" def __init__(self, name: str): self.logger = logging.getLogger(name) def _log(self, level: int, event: str, **kwargs): """Log structured data as JSON""" log_data = { "timestamp": datetime.utcnow().isoformat(), "event": event, "session_id": session_context.get(), **kwargs } self.logger.log(level, json.dumps(log_data)) def info(self, event: str, **kwargs): self._log(logging.INFO, event, **kwargs) def error(self, event: str, error: Exception = None, **kwargs): error_data = kwargs if error: error_data.update({ "error_type": type(error).__name__, "error_message": str(error) }) self._log(logging.ERROR, event, **error_data) # Usage
logger = StructuredLogger(__name__) logger.info( "audio_received", audio_size=len(audio_data), sample_rate=16000, channels=1
) # Outputs:
# {"timestamp": "2024-01-15T10:30:45.123Z", "event": "audio_received", # "session_id": "abc-123", "audio_size": 16000, "sample_rate": 16000, "channels": 1} COMMAND_BLOCK:
import logging
import json
from datetime import datetime
from typing import Any, Dict class StructuredLogger: """Logger that outputs structured JSON""" def __init__(self, name: str): self.logger = logging.getLogger(name) def _log(self, level: int, event: str, **kwargs): """Log structured data as JSON""" log_data = { "timestamp": datetime.utcnow().isoformat(), "event": event, "session_id": session_context.get(), **kwargs } self.logger.log(level, json.dumps(log_data)) def info(self, event: str, **kwargs): self._log(logging.INFO, event, **kwargs) def error(self, event: str, error: Exception = None, **kwargs): error_data = kwargs if error: error_data.update({ "error_type": type(error).__name__, "error_message": str(error) }) self._log(logging.ERROR, event, **error_data) # Usage
logger = StructuredLogger(__name__) logger.info( "audio_received", audio_size=len(audio_data), sample_rate=16000, channels=1
) # Outputs:
# {"timestamp": "2024-01-15T10:30:45.123Z", "event": "audio_received", # "session_id": "abc-123", "audio_size": 16000, "sample_rate": 16000, "channels": 1} COMMAND_BLOCK:
from contextvars import ContextVar
import uuid # Correlation ID for tracking across services
correlation_id: ContextVar[Optional[str]] = ContextVar('correlation_id', default=None) class CorrelatedLogger(StructuredLogger): """Logger with correlation ID support""" def _log(self, level: int, event: str, **kwargs): log_data = { "timestamp": datetime.utcnow().isoformat(), "event": event, "session_id": session_context.get(), "correlation_id": correlation_id.get(), **kwargs } self.logger.log(level, json.dumps(log_data)) @app.websocket("/transcribe/{session_id}")
async def transcribe_endpoint(websocket: WebSocket, session_id: str): # Generate correlation ID for this request corr_id = str(uuid.uuid4()) correlation_id.set(corr_id) session_context.set(session_id) logger = CorrelatedLogger(__name__) logger.info("session_started") # When calling Riva service, pass correlation ID await riva_client.transcribe( audio_data, metadata={"correlation_id": corr_id} ) Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
from contextvars import ContextVar
import uuid # Correlation ID for tracking across services
correlation_id: ContextVar[Optional[str]] = ContextVar('correlation_id', default=None) class CorrelatedLogger(StructuredLogger): """Logger with correlation ID support""" def _log(self, level: int, event: str, **kwargs): log_data = { "timestamp": datetime.utcnow().isoformat(), "event": event, "session_id": session_context.get(), "correlation_id": correlation_id.get(), **kwargs } self.logger.log(level, json.dumps(log_data)) @app.websocket("/transcribe/{session_id}")
async def transcribe_endpoint(websocket: WebSocket, session_id: str): # Generate correlation ID for this request corr_id = str(uuid.uuid4()) correlation_id.set(corr_id) session_context.set(session_id) logger = CorrelatedLogger(__name__) logger.info("session_started") # When calling Riva service, pass correlation ID await riva_client.transcribe( audio_data, metadata={"correlation_id": corr_id} ) COMMAND_BLOCK:
from contextvars import ContextVar
import uuid # Correlation ID for tracking across services
correlation_id: ContextVar[Optional[str]] = ContextVar('correlation_id', default=None) class CorrelatedLogger(StructuredLogger): """Logger with correlation ID support""" def _log(self, level: int, event: str, **kwargs): log_data = { "timestamp": datetime.utcnow().isoformat(), "event": event, "session_id": session_context.get(), "correlation_id": correlation_id.get(), **kwargs } self.logger.log(level, json.dumps(log_data)) @app.websocket("/transcribe/{session_id}")
async def transcribe_endpoint(websocket: WebSocket, session_id: str): # Generate correlation ID for this request corr_id = str(uuid.uuid4()) correlation_id.set(corr_id) session_context.set(session_id) logger = CorrelatedLogger(__name__) logger.info("session_started") # When calling Riva service, pass correlation ID await riva_client.transcribe( audio_data, metadata={"correlation_id": corr_id} ) COMMAND_BLOCK:
import time
from functools import wraps
from typing import Callable def log_performance(operation: str): """Decorator to log operation performance""" def decorator(func: Callable): @wraps(func) async def wrapper(*args, **kwargs): logger = CorrelatedLogger(func.__module__) start_time = time.time() try: result = await func(*args, **kwargs) duration = time.time() - start_time logger.info( f"{operation}_completed", duration_ms=round(duration * 1000, 2), success=True ) return result except Exception as e: duration = time.time() - start_time logger.error( f"{operation}_failed", duration_ms=round(duration * 1000, 2), success=False, error=e ) raise return wrapper return decorator # Usage
@log_performance("audio_transcription")
async def transcribe_audio(audio: bytes, session_id: str) -> str: # Transcription logic return await riva_client.transcribe(audio) # Logs:
# {"event": "audio_transcription_completed", "duration_ms": 245.67, "success": true} Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
import time
from functools import wraps
from typing import Callable def log_performance(operation: str): """Decorator to log operation performance""" def decorator(func: Callable): @wraps(func) async def wrapper(*args, **kwargs): logger = CorrelatedLogger(func.__module__) start_time = time.time() try: result = await func(*args, **kwargs) duration = time.time() - start_time logger.info( f"{operation}_completed", duration_ms=round(duration * 1000, 2), success=True ) return result except Exception as e: duration = time.time() - start_time logger.error( f"{operation}_failed", duration_ms=round(duration * 1000, 2), success=False, error=e ) raise return wrapper return decorator # Usage
@log_performance("audio_transcription")
async def transcribe_audio(audio: bytes, session_id: str) -> str: # Transcription logic return await riva_client.transcribe(audio) # Logs:
# {"event": "audio_transcription_completed", "duration_ms": 245.67, "success": true} COMMAND_BLOCK:
import time
from functools import wraps
from typing import Callable def log_performance(operation: str): """Decorator to log operation performance""" def decorator(func: Callable): @wraps(func) async def wrapper(*args, **kwargs): logger = CorrelatedLogger(func.__module__) start_time = time.time() try: result = await func(*args, **kwargs) duration = time.time() - start_time logger.info( f"{operation}_completed", duration_ms=round(duration * 1000, 2), success=True ) return result except Exception as e: duration = time.time() - start_time logger.error( f"{operation}_failed", duration_ms=round(duration * 1000, 2), success=False, error=e ) raise return wrapper return decorator # Usage
@log_performance("audio_transcription")
async def transcribe_audio(audio: bytes, session_id: str) -> str: # Transcription logic return await riva_client.transcribe(audio) # Logs:
# {"event": "audio_transcription_completed", "duration_ms": 245.67, "success": true} COMMAND_BLOCK:
import logging.config LOGGING_CONFIG = { "version": 1, "disable_existing_loggers": False, "formatters": { "json": { "()": "pythonjsonlogger.jsonlogger.JsonFormatter", "format": "%(timestamp)s %(level)s %(name)s %(message)s" } }, "handlers": { "console": { "class": "logging.StreamHandler", "formatter": "json", "stream": "ext://sys.stdout" }, "file": { "class": "logging.handlers.RotatingFileHandler", "formatter": "json", "filename": "logs/app.log", "maxBytes": 10485760, # 10MB "backupCount": 5 } }, "loggers": { # Your app - verbose logging "app": { "level": "DEBUG", "handlers": ["console", "file"], "propagate": False }, # Riva client - only warnings and errors "riva_client": { "level": "WARNING", "handlers": ["console", "file"], "propagate": False }, # Third-party libraries - minimal logging "uvicorn": { "level": "INFO", "handlers": ["console"], "propagate": False }, "grpc": { "level": "ERROR", "handlers": ["console"], "propagate": False } }, "root": { "level": "INFO", "handlers": ["console", "file"] }
} # Apply configuration
logging.config.dictConfig(LOGGING_CONFIG) Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
import logging.config LOGGING_CONFIG = { "version": 1, "disable_existing_loggers": False, "formatters": { "json": { "()": "pythonjsonlogger.jsonlogger.JsonFormatter", "format": "%(timestamp)s %(level)s %(name)s %(message)s" } }, "handlers": { "console": { "class": "logging.StreamHandler", "formatter": "json", "stream": "ext://sys.stdout" }, "file": { "class": "logging.handlers.RotatingFileHandler", "formatter": "json", "filename": "logs/app.log", "maxBytes": 10485760, # 10MB "backupCount": 5 } }, "loggers": { # Your app - verbose logging "app": { "level": "DEBUG", "handlers": ["console", "file"], "propagate": False }, # Riva client - only warnings and errors "riva_client": { "level": "WARNING", "handlers": ["console", "file"], "propagate": False }, # Third-party libraries - minimal logging "uvicorn": { "level": "INFO", "handlers": ["console"], "propagate": False }, "grpc": { "level": "ERROR", "handlers": ["console"], "propagate": False } }, "root": { "level": "INFO", "handlers": ["console", "file"] }
} # Apply configuration
logging.config.dictConfig(LOGGING_CONFIG) COMMAND_BLOCK:
import logging.config LOGGING_CONFIG = { "version": 1, "disable_existing_loggers": False, "formatters": { "json": { "()": "pythonjsonlogger.jsonlogger.JsonFormatter", "format": "%(timestamp)s %(level)s %(name)s %(message)s" } }, "handlers": { "console": { "class": "logging.StreamHandler", "formatter": "json", "stream": "ext://sys.stdout" }, "file": { "class": "logging.handlers.RotatingFileHandler", "formatter": "json", "filename": "logs/app.log", "maxBytes": 10485760, # 10MB "backupCount": 5 } }, "loggers": { # Your app - verbose logging "app": { "level": "DEBUG", "handlers": ["console", "file"], "propagate": False }, # Riva client - only warnings and errors "riva_client": { "level": "WARNING", "handlers": ["console", "file"], "propagate": False }, # Third-party libraries - minimal logging "uvicorn": { "level": "INFO", "handlers": ["console"], "propagate": False }, "grpc": { "level": "ERROR", "handlers": ["console"], "propagate": False } }, "root": { "level": "INFO", "handlers": ["console", "file"] }
} # Apply configuration
logging.config.dictConfig(LOGGING_CONFIG) COMMAND_BLOCK:
import random class SampledLogger(CorrelatedLogger): """Logger with sampling support for high-frequency events""" def __init__(self, name: str, sample_rate: float = 0.01): super().__init__(name) self.sample_rate = sample_rate def sample(self, event: str, **kwargs): """Log with sampling""" if random.random() < self.sample_rate: self.info(event, sampled=True, **kwargs) logger = SampledLogger(__name__, sample_rate=0.01) # Log 1% of events # Log every 100th audio chunk
logger.sample( "audio_chunk_processed", chunk_size=len(chunk), total_chunks=chunk_count
) Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
import random class SampledLogger(CorrelatedLogger): """Logger with sampling support for high-frequency events""" def __init__(self, name: str, sample_rate: float = 0.01): super().__init__(name) self.sample_rate = sample_rate def sample(self, event: str, **kwargs): """Log with sampling""" if random.random() < self.sample_rate: self.info(event, sampled=True, **kwargs) logger = SampledLogger(__name__, sample_rate=0.01) # Log 1% of events # Log every 100th audio chunk
logger.sample( "audio_chunk_processed", chunk_size=len(chunk), total_chunks=chunk_count
) COMMAND_BLOCK:
import random class SampledLogger(CorrelatedLogger): """Logger with sampling support for high-frequency events""" def __init__(self, name: str, sample_rate: float = 0.01): super().__init__(name) self.sample_rate = sample_rate def sample(self, event: str, **kwargs): """Log with sampling""" if random.random() < self.sample_rate: self.info(event, sampled=True, **kwargs) logger = SampledLogger(__name__, sample_rate=0.01) # Log 1% of events # Log every 100th audio chunk
logger.sample( "audio_chunk_processed", chunk_size=len(chunk), total_chunks=chunk_count
) COMMAND_BLOCK:
import traceback
import sys class ErrorContextLogger(CorrelatedLogger): """Logger with rich error context""" def error_with_context( self, event: str, error: Exception, **kwargs ): """Log error with full context""" # Get exception info exc_type, exc_value, exc_traceback = sys.exc_info() # Build error context error_context = { "error_type": type(error).__name__, "error_message": str(error), "error_code": getattr(error, 'code', None), "traceback": traceback.format_exc(), **kwargs } self.error(event, **error_context) # Usage
logger = ErrorContextLogger(__name__) try: await riva_client.transcribe(audio)
except Exception as e: logger.error_with_context( "transcription_failed", error=e, audio_size=len(audio), sample_rate=sample_rate, language=language, riva_endpoint=riva_client.endpoint ) raise Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
import traceback
import sys class ErrorContextLogger(CorrelatedLogger): """Logger with rich error context""" def error_with_context( self, event: str, error: Exception, **kwargs ): """Log error with full context""" # Get exception info exc_type, exc_value, exc_traceback = sys.exc_info() # Build error context error_context = { "error_type": type(error).__name__, "error_message": str(error), "error_code": getattr(error, 'code', None), "traceback": traceback.format_exc(), **kwargs } self.error(event, **error_context) # Usage
logger = ErrorContextLogger(__name__) try: await riva_client.transcribe(audio)
except Exception as e: logger.error_with_context( "transcription_failed", error=e, audio_size=len(audio), sample_rate=sample_rate, language=language, riva_endpoint=riva_client.endpoint ) raise COMMAND_BLOCK:
import traceback
import sys class ErrorContextLogger(CorrelatedLogger): """Logger with rich error context""" def error_with_context( self, event: str, error: Exception, **kwargs ): """Log error with full context""" # Get exception info exc_type, exc_value, exc_traceback = sys.exc_info() # Build error context error_context = { "error_type": type(error).__name__, "error_message": str(error), "error_code": getattr(error, 'code', None), "traceback": traceback.format_exc(), **kwargs } self.error(event, **error_context) # Usage
logger = ErrorContextLogger(__name__) try: await riva_client.transcribe(audio)
except Exception as e: logger.error_with_context( "transcription_failed", error=e, audio_size=len(audio), sample_rate=sample_rate, language=language, riva_endpoint=riva_client.endpoint ) raise COMMAND_BLOCK:
# Docker Compose for Loki + Grafana
version: '3'
services: loki: image: grafana/loki:latest ports: - "3100:3100" command: -config.file=/etc/loki/local-config.yaml promtail: image: grafana/promtail:latest volumes: - ./logs:/var/log - ./promtail-config.yaml:/etc/promtail/config.yaml command: -config.file=/etc/promtail/config.yaml grafana: image: grafana/grafana:latest ports: - "3000:3000" environment: - GF_SECURITY_ADMIN_PASSWORD=admin Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
# Docker Compose for Loki + Grafana
version: '3'
services: loki: image: grafana/loki:latest ports: - "3100:3100" command: -config.file=/etc/loki/local-config.yaml promtail: image: grafana/promtail:latest volumes: - ./logs:/var/log - ./promtail-config.yaml:/etc/promtail/config.yaml command: -config.file=/etc/promtail/config.yaml grafana: image: grafana/grafana:latest ports: - "3000:3000" environment: - GF_SECURITY_ADMIN_PASSWORD=admin COMMAND_BLOCK:
# Docker Compose for Loki + Grafana
version: '3'
services: loki: image: grafana/loki:latest ports: - "3100:3100" command: -config.file=/etc/loki/local-config.yaml promtail: image: grafana/promtail:latest volumes: - ./logs:/var/log - ./promtail-config.yaml:/etc/promtail/config.yaml command: -config.file=/etc/promtail/config.yaml grafana: image: grafana/grafana:latest ports: - "3000:3000" environment: - GF_SECURITY_ADMIN_PASSWORD=admin COMMAND_BLOCK:
# Find all errors for a specific session
{job="transcription-service"} |= "session_id=abc-123" | json | level="ERROR" # Find slow transcriptions
{job="transcription-service"} | json | duration_ms > 1000 # Count errors by type
sum by (error_type) (count_over_time({job="transcription-service"} | json | level="ERROR" [1h])) Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
# Find all errors for a specific session
{job="transcription-service"} |= "session_id=abc-123" | json | level="ERROR" # Find slow transcriptions
{job="transcription-service"} | json | duration_ms > 1000 # Count errors by type
sum by (error_type) (count_over_time({job="transcription-service"} | json | level="ERROR" [1h])) COMMAND_BLOCK:
# Find all errors for a specific session
{job="transcription-service"} |= "session_id=abc-123" | json | level="ERROR" # Find slow transcriptions
{job="transcription-service"} | json | duration_ms > 1000 # Count errors by type
sum by (error_type) (count_over_time({job="transcription-service"} | json | level="ERROR" [1h])) COMMAND_BLOCK:
from prometheus_client import Counter, Histogram # Metrics
transcription_requests = Counter( 'transcription_requests_total', 'Total transcription requests', ['session_id', 'language', 'status']
) transcription_duration = Histogram( 'transcription_duration_seconds', 'Transcription duration', ['language']
) class MonitoredLogger(ErrorContextLogger): """Logger integrated with metrics""" @log_performance("transcription") async def log_transcription( self, session_id: str, language: str, audio_size: int ): start_time = time.time() try: result = await transcribe(audio_data, language) # Log success self.info( "transcription_completed", audio_size=audio_size, language=language, result_length=len(result) ) # Update metrics transcription_requests.labels( session_id=session_id, language=language, status="success" ).inc() transcription_duration.labels( language=language ).observe(time.time() - start_time) return result except Exception as e: # Log failure self.error_with_context( "transcription_failed", error=e, audio_size=audio_size, language=language ) # Update metrics transcription_requests.labels( session_id=session_id, language=language, status="error" ).inc() raise Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
from prometheus_client import Counter, Histogram # Metrics
transcription_requests = Counter( 'transcription_requests_total', 'Total transcription requests', ['session_id', 'language', 'status']
) transcription_duration = Histogram( 'transcription_duration_seconds', 'Transcription duration', ['language']
) class MonitoredLogger(ErrorContextLogger): """Logger integrated with metrics""" @log_performance("transcription") async def log_transcription( self, session_id: str, language: str, audio_size: int ): start_time = time.time() try: result = await transcribe(audio_data, language) # Log success self.info( "transcription_completed", audio_size=audio_size, language=language, result_length=len(result) ) # Update metrics transcription_requests.labels( session_id=session_id, language=language, status="success" ).inc() transcription_duration.labels( language=language ).observe(time.time() - start_time) return result except Exception as e: # Log failure self.error_with_context( "transcription_failed", error=e, audio_size=audio_size, language=language ) # Update metrics transcription_requests.labels( session_id=session_id, language=language, status="error" ).inc() raise COMMAND_BLOCK:
from prometheus_client import Counter, Histogram # Metrics
transcription_requests = Counter( 'transcription_requests_total', 'Total transcription requests', ['session_id', 'language', 'status']
) transcription_duration = Histogram( 'transcription_duration_seconds', 'Transcription duration', ['language']
) class MonitoredLogger(ErrorContextLogger): """Logger integrated with metrics""" @log_performance("transcription") async def log_transcription( self, session_id: str, language: str, audio_size: int ): start_time = time.time() try: result = await transcribe(audio_data, language) # Log success self.info( "transcription_completed", audio_size=audio_size, language=language, result_length=len(result) ) # Update metrics transcription_requests.labels( session_id=session_id, language=language, status="success" ).inc() transcription_duration.labels( language=language ).observe(time.time() - start_time) return result except Exception as e: # Log failure self.error_with_context( "transcription_failed", error=e, audio_size=audio_size, language=language ) # Update metrics transcription_requests.labels( session_id=session_id, language=language, status="error" ).inc() raise - Always include session/correlation IDs - Makes debugging possible
- Use structured logging - JSON is searchable and parseable
- Sample high-frequency events - Don't fill disk with audio chunk logs
- Log performance metrics - Know what's slow before users complain
- Preserve error context - Log everything needed to debug
- Set appropriate log levels - Debug in dev, Info in production
- Rotate log files - Don't fill up disk
- Centralize logs - Use log aggregation for multiple instances
- Alert on log patterns - Error rate spikes should trigger alerts
- Test your logging - Verify logs are useful during incidents - Mean Time to Resolution (MTTR) dropped from hours to minutes
- Debug sessions became productive instead of frustrating
- Production incidents were traceable across services
- Performance bottlenecks became immediately visible
- Customer support could look up exact session issues
how-totutorialguidedev.toaimldockerpythondatabase