Tools: 4 Patterns From Space Life Support That Will Save Your Backend

Tools: 4 Patterns From Space Life Support That Will Save Your Backend

Pattern 1: Watchdog Supervisors ## Pattern 2: Triple Modular Redundancy (Majority Voting) ## Pattern 3: Graceful Degradation With Priority Tiers ## Pattern 4: Closed-Loop Health Monitoring ## What Space Engineers Know That Backend Engineers Don't Mars Pathfinder landed on July 4, 1997. Within days, it started rebooting itself — over and over. The watchdog timer kept firing because a priority inversion bug blocked the high-priority bus management task from completing. Engineers diagnosed it from 191 million kilometers away and fixed it by flipping a single boolean: enabling priority inheritance on a mutex. That bug was found during pre-flight testing. It was deprioritized because landing software was more urgent. Your production backend has the same class of problem. Something works in dev, breaks under load, and the team says "we'll fix it later." Space systems can't afford that. Neither can you. Here are 4 patterns from space life support software — the Environmental Control and Life Support System (ECLSS) on the ISS and deep-space mission architectures — that translate directly to Python backend code. The Mars Pathfinder watchdog was a hardware timer. If the flight software didn't reset it within a fixed window, the timer assumed the system was stuck and forced a reboot. The problem wasn't the watchdog — it worked exactly as designed. The problem was the software couldn't "feed" the watchdog in time because a medium-priority task was blocking a high-priority one. The lesson: watchdogs catch real failures, but only if the tasks they monitor can actually complete. In Python, the supervisor pattern does the same thing. Each subsystem must check in by a deadline. If any subsystem misses its window, the supervisor takes corrective action — restart the task, switch to a backup, or log an alert. The Space Shuttle used five flight computers — four running identical software in sync, with hardware voting on every output. If one computer produced a different result, the other three outvoted it. A fifth computer ran independently written backup software in case all four primary machines failed together. The ISS uses a triple-redundant network backbone for its control systems. The principle: don't trust any single result. Run the computation multiple times and let the majority decide. For backend systems, this pattern applies anywhere you call an external service that might return wrong data — not just timeouts, but incorrect responses. LLM hallucinations, flaky API responses, sensor readings that drift. The cost is 3x the compute. Space systems accept this tradeoff because the alternative — trusting a single source — is more expensive. For critical backend operations (payment calculations, access control decisions), the same tradeoff applies. The ISS ECLSS doesn't shut down everything when one component fails. It sheds non-essential functions to preserve life-critical ones. Oxygen generation continues even if the water recycling efficiency drops from 90% to 70%. Lighting dims before life support power gets reduced. This is graceful degradation with priority tiers. Your backend should do the same thing under load or partial failure. The pattern is explicit: every task declares its priority upfront. When the system degrades, it doesn't guess what to drop — the priority tiers make the decision deterministic. ISS flight controllers don't improvise load shedding. Neither should your code. Practical applications: The ISS ECLSS runs continuous atmospheric monitoring — O2 partial pressure, CO2 levels, trace contaminants, humidity. Not as dashboards for humans to watch. As closed-loop control inputs that automatically adjust system behavior. The closed loop means: measure → compare to setpoint → act → measure again. No human in the middle. The closed-loop principle: your monitoring should trigger actions, not just emit logs. If p99 latency crosses 500ms, the system scales up — it doesn't wait for a human to notice a dashboard and click a button. On the ISS, the Carbon Dioxide Removal Assembly keeps CO2 below 3 mmHg automatically — it doesn't wait for a crew member to read a gauge and flip a switch. These 4 patterns share a common philosophy: assume failure, design for it, and automate the response. Space systems can't afford a human in the loop for every failure. The ISS is 408 km above Earth. Mars is 3 to 22 light-minutes away. By the time a human notices a problem and types a command, the system needs to have already handled it. Your production backend operates under the same constraint — not because of distance, but because of scale. You can't manually respond to every timeout, every flaky API response, every memory spike across a fleet of services. The Ariane 5 explosion in 1996 — 370 million USD in damage — happened because software inherited from Ariane 4 couldn't handle the new rocket's higher horizontal velocity. An integer overflow in the inertial reference system caused both redundant computers to shut down simultaneously. The engineers reused code without re-validating its assumptions. Every time you copy a function from one service to another without checking its boundary conditions, you're doing the same thing. Build watchdogs that catch real failures. Use voting when you can't trust a single source. Shed non-critical work before critical work drowns. Close the loop between measurement and action. Space systems get one chance to work. Your backend gets retries — but your users don't have infinite patience. Follow @klement_gunndu for more architecture and Python content. We're building in public. Templates let you quickly answer FAQs or store snippets for re-use. Are you sure you want to ? It will become hidden in your post, but will still be visible via the comment's permalink. Hide child comments as well For further actions, you may consider blocking this person and/or reporting abuse COMMAND_BLOCK:

import asyncio
from dataclasses import dataclass, field
from datetime import datetime, timezone @dataclass
class SubsystemHealth: name: str last_heartbeat: float = 0.0 timeout_seconds: float = 5.0 failures: int = 0 max_failures: int = 3 class WatchdogSupervisor: """Monitors subsystem heartbeats. Restarts tasks that miss deadlines.""" def __init__(self) -> None: self.subsystems: dict[str, SubsystemHealth] = {} self._tasks: dict[str, asyncio.Task] = {} def register( self, name: str, coro_factory, timeout: float = 5.0, ) -> None: self.subsystems[name] = SubsystemHealth( name=name, timeout_seconds=timeout ) self._tasks[name] = asyncio.create_task( coro_factory(), name=f"subsystem-{name}" ) def heartbeat(self, name: str) -> None: """Called by each subsystem to signal it's alive.""" loop = asyncio.get_event_loop() self.subsystems[name].last_heartbeat = loop.time() self.subsystems[name].failures = 0 async def monitor(self, check_interval: float = 2.0) -> None: loop = asyncio.get_event_loop() while True: await asyncio.sleep(check_interval) now = loop.time() for name, health in self.subsystems.items(): elapsed = now - health.last_heartbeat if elapsed > health.timeout_seconds: health.failures += 1 print( f"[WATCHDOG] {name}: missed heartbeat " f"({health.failures}/{health.max_failures})" ) if health.failures >= health.max_failures: task = self._tasks.get(name) if task and not task.done(): task.cancel() print(f"[WATCHDOG] {name}: max failures — task canceled") COMMAND_BLOCK:
import asyncio
from dataclasses import dataclass, field
from datetime import datetime, timezone @dataclass
class SubsystemHealth: name: str last_heartbeat: float = 0.0 timeout_seconds: float = 5.0 failures: int = 0 max_failures: int = 3 class WatchdogSupervisor: """Monitors subsystem heartbeats. Restarts tasks that miss deadlines.""" def __init__(self) -> None: self.subsystems: dict[str, SubsystemHealth] = {} self._tasks: dict[str, asyncio.Task] = {} def register( self, name: str, coro_factory, timeout: float = 5.0, ) -> None: self.subsystems[name] = SubsystemHealth( name=name, timeout_seconds=timeout ) self._tasks[name] = asyncio.create_task( coro_factory(), name=f"subsystem-{name}" ) def heartbeat(self, name: str) -> None: """Called by each subsystem to signal it's alive.""" loop = asyncio.get_event_loop() self.subsystems[name].last_heartbeat = loop.time() self.subsystems[name].failures = 0 async def monitor(self, check_interval: float = 2.0) -> None: loop = asyncio.get_event_loop() while True: await asyncio.sleep(check_interval) now = loop.time() for name, health in self.subsystems.items(): elapsed = now - health.last_heartbeat if elapsed > health.timeout_seconds: health.failures += 1 print( f"[WATCHDOG] {name}: missed heartbeat " f"({health.failures}/{health.max_failures})" ) if health.failures >= health.max_failures: task = self._tasks.get(name) if task and not task.done(): task.cancel() print(f"[WATCHDOG] {name}: max failures — task canceled") COMMAND_BLOCK:
import asyncio
from dataclasses import dataclass, field
from datetime import datetime, timezone @dataclass
class SubsystemHealth: name: str last_heartbeat: float = 0.0 timeout_seconds: float = 5.0 failures: int = 0 max_failures: int = 3 class WatchdogSupervisor: """Monitors subsystem heartbeats. Restarts tasks that miss deadlines.""" def __init__(self) -> None: self.subsystems: dict[str, SubsystemHealth] = {} self._tasks: dict[str, asyncio.Task] = {} def register( self, name: str, coro_factory, timeout: float = 5.0, ) -> None: self.subsystems[name] = SubsystemHealth( name=name, timeout_seconds=timeout ) self._tasks[name] = asyncio.create_task( coro_factory(), name=f"subsystem-{name}" ) def heartbeat(self, name: str) -> None: """Called by each subsystem to signal it's alive.""" loop = asyncio.get_event_loop() self.subsystems[name].last_heartbeat = loop.time() self.subsystems[name].failures = 0 async def monitor(self, check_interval: float = 2.0) -> None: loop = asyncio.get_event_loop() while True: await asyncio.sleep(check_interval) now = loop.time() for name, health in self.subsystems.items(): elapsed = now - health.last_heartbeat if elapsed > health.timeout_seconds: health.failures += 1 print( f"[WATCHDOG] {name}: missed heartbeat " f"({health.failures}/{health.max_failures})" ) if health.failures >= health.max_failures: task = self._tasks.get(name) if task and not task.done(): task.cancel() print(f"[WATCHDOG] {name}: max failures — task canceled") COMMAND_BLOCK:
import asyncio
from collections import Counter
from typing import Any, Callable, Awaitable async def triple_modular_redundancy( fn: Callable[..., Awaitable[Any]], *args: Any, timeout_per_call: float = 10.0, **kwargs: Any,
) -> Any: """Run a function 3 times concurrently. Return the majority result. If all 3 results differ, raise ValueError — no consensus. If any call times out or fails, use remaining results. """ results: list[Any] = [] errors: list[Exception] = [] async def _guarded_call() -> Any: async with asyncio.timeout(timeout_per_call): return await fn(*args, **kwargs) tasks = [asyncio.create_task(_guarded_call()) for _ in range(3)] done, pending = await asyncio.wait( tasks, return_when=asyncio.ALL_COMPLETED ) for task in done: if task.exception(): errors.append(task.exception()) else: results.append(task.result()) if len(results) < 2: raise RuntimeError( f"TMR failed: only {len(results)} result(s), " f"{len(errors)} error(s)" ) # Majority vote: most common result wins counter = Counter(str(r) for r in results) majority_key, count = counter.most_common(1)[0] if count < 2: raise ValueError( f"TMR: no consensus — all 3 results differ: {results}" ) # Return the actual result object (not the string key) for r in results: if str(r) == majority_key: return r raise RuntimeError("TMR: unreachable state") COMMAND_BLOCK:
import asyncio
from collections import Counter
from typing import Any, Callable, Awaitable async def triple_modular_redundancy( fn: Callable[..., Awaitable[Any]], *args: Any, timeout_per_call: float = 10.0, **kwargs: Any,
) -> Any: """Run a function 3 times concurrently. Return the majority result. If all 3 results differ, raise ValueError — no consensus. If any call times out or fails, use remaining results. """ results: list[Any] = [] errors: list[Exception] = [] async def _guarded_call() -> Any: async with asyncio.timeout(timeout_per_call): return await fn(*args, **kwargs) tasks = [asyncio.create_task(_guarded_call()) for _ in range(3)] done, pending = await asyncio.wait( tasks, return_when=asyncio.ALL_COMPLETED ) for task in done: if task.exception(): errors.append(task.exception()) else: results.append(task.result()) if len(results) < 2: raise RuntimeError( f"TMR failed: only {len(results)} result(s), " f"{len(errors)} error(s)" ) # Majority vote: most common result wins counter = Counter(str(r) for r in results) majority_key, count = counter.most_common(1)[0] if count < 2: raise ValueError( f"TMR: no consensus — all 3 results differ: {results}" ) # Return the actual result object (not the string key) for r in results: if str(r) == majority_key: return r raise RuntimeError("TMR: unreachable state") COMMAND_BLOCK:
import asyncio
from collections import Counter
from typing import Any, Callable, Awaitable async def triple_modular_redundancy( fn: Callable[..., Awaitable[Any]], *args: Any, timeout_per_call: float = 10.0, **kwargs: Any,
) -> Any: """Run a function 3 times concurrently. Return the majority result. If all 3 results differ, raise ValueError — no consensus. If any call times out or fails, use remaining results. """ results: list[Any] = [] errors: list[Exception] = [] async def _guarded_call() -> Any: async with asyncio.timeout(timeout_per_call): return await fn(*args, **kwargs) tasks = [asyncio.create_task(_guarded_call()) for _ in range(3)] done, pending = await asyncio.wait( tasks, return_when=asyncio.ALL_COMPLETED ) for task in done: if task.exception(): errors.append(task.exception()) else: results.append(task.result()) if len(results) < 2: raise RuntimeError( f"TMR failed: only {len(results)} result(s), " f"{len(errors)} error(s)" ) # Majority vote: most common result wins counter = Counter(str(r) for r in results) majority_key, count = counter.most_common(1)[0] if count < 2: raise ValueError( f"TMR: no consensus — all 3 results differ: {results}" ) # Return the actual result object (not the string key) for r in results: if str(r) == majority_key: return r raise RuntimeError("TMR: unreachable state") COMMAND_BLOCK:
import asyncio
from enum import IntEnum class Priority(IntEnum): CRITICAL = 0 # Must run. Equivalent to O2 generation. HIGH = 1 # Should run. Equivalent to CO2 scrubbing. MEDIUM = 2 # Nice to have. Equivalent to humidity control. LOW = 3 # Deferrable. Equivalent to science experiments. class DegradationManager: """Sheds low-priority work when system resources are constrained.""" def __init__(self, max_concurrent: int = 10) -> None: self._semaphore = asyncio.Semaphore(max_concurrent) self._current_level = Priority.LOW # All systems go self._active: dict[Priority, int] = {p: 0 for p in Priority} @property def degradation_level(self) -> Priority: return self._current_level def degrade(self, level: Priority) -> None: """Shed all tasks at or below this priority level.""" self._current_level = level print( f"[DEGRADE] Level set to {level.name} — " f"shedding priority > {level.value}" ) def restore(self) -> None: self._current_level = Priority.LOW print("[RESTORE] All priority tiers active") async def run_if_allowed( self, priority: Priority, coro, fallback=None, ): """Run the coroutine only if its priority tier is active.""" if priority > self._current_level: print( f"[SHED] Skipping {priority.name} task " f"(current level: {self._current_level.name})" ) return fallback async with self._semaphore: self._active[priority] += 1 try: return await coro finally: self._active[priority] -= 1 # Usage: shed low-priority work during high load
async def main(): dm = DegradationManager(max_concurrent=5) # Normal operation — everything runs result = await dm.run_if_allowed( Priority.LOW, some_analytics_task() ) # System under pressure — shed LOW and MEDIUM tasks dm.degrade(Priority.HIGH) # This returns the fallback immediately — not queued result = await dm.run_if_allowed( Priority.LOW, some_analytics_task(), fallback={"status": "deferred"}, ) # CRITICAL tasks always run result = await dm.run_if_allowed( Priority.CRITICAL, process_payment(), ) COMMAND_BLOCK:
import asyncio
from enum import IntEnum class Priority(IntEnum): CRITICAL = 0 # Must run. Equivalent to O2 generation. HIGH = 1 # Should run. Equivalent to CO2 scrubbing. MEDIUM = 2 # Nice to have. Equivalent to humidity control. LOW = 3 # Deferrable. Equivalent to science experiments. class DegradationManager: """Sheds low-priority work when system resources are constrained.""" def __init__(self, max_concurrent: int = 10) -> None: self._semaphore = asyncio.Semaphore(max_concurrent) self._current_level = Priority.LOW # All systems go self._active: dict[Priority, int] = {p: 0 for p in Priority} @property def degradation_level(self) -> Priority: return self._current_level def degrade(self, level: Priority) -> None: """Shed all tasks at or below this priority level.""" self._current_level = level print( f"[DEGRADE] Level set to {level.name} — " f"shedding priority > {level.value}" ) def restore(self) -> None: self._current_level = Priority.LOW print("[RESTORE] All priority tiers active") async def run_if_allowed( self, priority: Priority, coro, fallback=None, ): """Run the coroutine only if its priority tier is active.""" if priority > self._current_level: print( f"[SHED] Skipping {priority.name} task " f"(current level: {self._current_level.name})" ) return fallback async with self._semaphore: self._active[priority] += 1 try: return await coro finally: self._active[priority] -= 1 # Usage: shed low-priority work during high load
async def main(): dm = DegradationManager(max_concurrent=5) # Normal operation — everything runs result = await dm.run_if_allowed( Priority.LOW, some_analytics_task() ) # System under pressure — shed LOW and MEDIUM tasks dm.degrade(Priority.HIGH) # This returns the fallback immediately — not queued result = await dm.run_if_allowed( Priority.LOW, some_analytics_task(), fallback={"status": "deferred"}, ) # CRITICAL tasks always run result = await dm.run_if_allowed( Priority.CRITICAL, process_payment(), ) COMMAND_BLOCK:
import asyncio
from enum import IntEnum class Priority(IntEnum): CRITICAL = 0 # Must run. Equivalent to O2 generation. HIGH = 1 # Should run. Equivalent to CO2 scrubbing. MEDIUM = 2 # Nice to have. Equivalent to humidity control. LOW = 3 # Deferrable. Equivalent to science experiments. class DegradationManager: """Sheds low-priority work when system resources are constrained.""" def __init__(self, max_concurrent: int = 10) -> None: self._semaphore = asyncio.Semaphore(max_concurrent) self._current_level = Priority.LOW # All systems go self._active: dict[Priority, int] = {p: 0 for p in Priority} @property def degradation_level(self) -> Priority: return self._current_level def degrade(self, level: Priority) -> None: """Shed all tasks at or below this priority level.""" self._current_level = level print( f"[DEGRADE] Level set to {level.name} — " f"shedding priority > {level.value}" ) def restore(self) -> None: self._current_level = Priority.LOW print("[RESTORE] All priority tiers active") async def run_if_allowed( self, priority: Priority, coro, fallback=None, ): """Run the coroutine only if its priority tier is active.""" if priority > self._current_level: print( f"[SHED] Skipping {priority.name} task " f"(current level: {self._current_level.name})" ) return fallback async with self._semaphore: self._active[priority] += 1 try: return await coro finally: self._active[priority] -= 1 # Usage: shed low-priority work during high load
async def main(): dm = DegradationManager(max_concurrent=5) # Normal operation — everything runs result = await dm.run_if_allowed( Priority.LOW, some_analytics_task() ) # System under pressure — shed LOW and MEDIUM tasks dm.degrade(Priority.HIGH) # This returns the fallback immediately — not queued result = await dm.run_if_allowed( Priority.LOW, some_analytics_task(), fallback={"status": "deferred"}, ) # CRITICAL tasks always run result = await dm.run_if_allowed( Priority.CRITICAL, process_payment(), ) COMMAND_BLOCK:
import asyncio
from dataclasses import dataclass @dataclass
class HealthMetric: name: str value: float = 0.0 min_threshold: float = 0.0 max_threshold: float = 100.0 unit: str = "" class ClosedLoopMonitor: """Continuously measures system health and takes corrective action.""" def __init__(self) -> None: self.metrics: dict[str, HealthMetric] = {} self._actions: dict[str, list] = {} def register_metric( self, name: str, min_val: float, max_val: float, unit: str = "", ) -> None: self.metrics[name] = HealthMetric( name=name, min_threshold=min_val, max_threshold=max_val, unit=unit, ) self._actions[name] = [] def on_breach(self, metric_name: str, action) -> None: """Register a corrective action when a metric leaves bounds.""" self._actions[metric_name].append(action) def update(self, name: str, value: float) -> None: self.metrics[name].value = value async def run(self, interval: float = 1.0) -> None: while True: for name, metric in self.metrics.items(): if ( metric.value < metric.min_threshold or metric.value > metric.max_threshold ): print( f"[BREACH] {name}: {metric.value}{metric.unit} " f"(bounds: {metric.min_threshold}–" f"{metric.max_threshold})" ) for action in self._actions.get(name, []): await action(metric) await asyncio.sleep(interval) # Usage: monitor response latency and error rate
async def scale_up(metric: HealthMetric): print(f"[ACTION] Scaling up — {metric.name} at {metric.value}{metric.unit}") async def alert_oncall(metric: HealthMetric): print(f"[ALERT] Paging on-call — {metric.name} breach: {metric.value}") async def main(): monitor = ClosedLoopMonitor() monitor.register_metric("response_latency_p99", min_val=0, max_val=500, unit="ms") monitor.register_metric("error_rate_5xx", min_val=0, max_val=5, unit="%") monitor.on_breach("response_latency_p99", scale_up) monitor.on_breach("error_rate_5xx", alert_oncall) # Start monitoring loop monitor_task = asyncio.create_task(monitor.run(interval=2.0)) # Simulate metrics arriving from your observability stack await asyncio.sleep(1) monitor.update("response_latency_p99", 620) # Triggers scale_up monitor.update("error_rate_5xx", 7.2) # Triggers alert await asyncio.sleep(3) monitor_task.cancel() COMMAND_BLOCK:
import asyncio
from dataclasses import dataclass @dataclass
class HealthMetric: name: str value: float = 0.0 min_threshold: float = 0.0 max_threshold: float = 100.0 unit: str = "" class ClosedLoopMonitor: """Continuously measures system health and takes corrective action.""" def __init__(self) -> None: self.metrics: dict[str, HealthMetric] = {} self._actions: dict[str, list] = {} def register_metric( self, name: str, min_val: float, max_val: float, unit: str = "", ) -> None: self.metrics[name] = HealthMetric( name=name, min_threshold=min_val, max_threshold=max_val, unit=unit, ) self._actions[name] = [] def on_breach(self, metric_name: str, action) -> None: """Register a corrective action when a metric leaves bounds.""" self._actions[metric_name].append(action) def update(self, name: str, value: float) -> None: self.metrics[name].value = value async def run(self, interval: float = 1.0) -> None: while True: for name, metric in self.metrics.items(): if ( metric.value < metric.min_threshold or metric.value > metric.max_threshold ): print( f"[BREACH] {name}: {metric.value}{metric.unit} " f"(bounds: {metric.min_threshold}–" f"{metric.max_threshold})" ) for action in self._actions.get(name, []): await action(metric) await asyncio.sleep(interval) # Usage: monitor response latency and error rate
async def scale_up(metric: HealthMetric): print(f"[ACTION] Scaling up — {metric.name} at {metric.value}{metric.unit}") async def alert_oncall(metric: HealthMetric): print(f"[ALERT] Paging on-call — {metric.name} breach: {metric.value}") async def main(): monitor = ClosedLoopMonitor() monitor.register_metric("response_latency_p99", min_val=0, max_val=500, unit="ms") monitor.register_metric("error_rate_5xx", min_val=0, max_val=5, unit="%") monitor.on_breach("response_latency_p99", scale_up) monitor.on_breach("error_rate_5xx", alert_oncall) # Start monitoring loop monitor_task = asyncio.create_task(monitor.run(interval=2.0)) # Simulate metrics arriving from your observability stack await asyncio.sleep(1) monitor.update("response_latency_p99", 620) # Triggers scale_up monitor.update("error_rate_5xx", 7.2) # Triggers alert await asyncio.sleep(3) monitor_task.cancel() COMMAND_BLOCK:
import asyncio
from dataclasses import dataclass @dataclass
class HealthMetric: name: str value: float = 0.0 min_threshold: float = 0.0 max_threshold: float = 100.0 unit: str = "" class ClosedLoopMonitor: """Continuously measures system health and takes corrective action.""" def __init__(self) -> None: self.metrics: dict[str, HealthMetric] = {} self._actions: dict[str, list] = {} def register_metric( self, name: str, min_val: float, max_val: float, unit: str = "", ) -> None: self.metrics[name] = HealthMetric( name=name, min_threshold=min_val, max_threshold=max_val, unit=unit, ) self._actions[name] = [] def on_breach(self, metric_name: str, action) -> None: """Register a corrective action when a metric leaves bounds.""" self._actions[metric_name].append(action) def update(self, name: str, value: float) -> None: self.metrics[name].value = value async def run(self, interval: float = 1.0) -> None: while True: for name, metric in self.metrics.items(): if ( metric.value < metric.min_threshold or metric.value > metric.max_threshold ): print( f"[BREACH] {name}: {metric.value}{metric.unit} " f"(bounds: {metric.min_threshold}–" f"{metric.max_threshold})" ) for action in self._actions.get(name, []): await action(metric) await asyncio.sleep(interval) # Usage: monitor response latency and error rate
async def scale_up(metric: HealthMetric): print(f"[ACTION] Scaling up — {metric.name} at {metric.value}{metric.unit}") async def alert_oncall(metric: HealthMetric): print(f"[ALERT] Paging on-call — {metric.name} breach: {metric.value}") async def main(): monitor = ClosedLoopMonitor() monitor.register_metric("response_latency_p99", min_val=0, max_val=500, unit="ms") monitor.register_metric("error_rate_5xx", min_val=0, max_val=5, unit="%") monitor.on_breach("response_latency_p99", scale_up) monitor.on_breach("error_rate_5xx", alert_oncall) # Start monitoring loop monitor_task = asyncio.create_task(monitor.run(interval=2.0)) # Simulate metrics arriving from your observability stack await asyncio.sleep(1) monitor.update("response_latency_p99", 620) # Triggers scale_up monitor.update("error_rate_5xx", 7.2) # Triggers alert await asyncio.sleep(3) monitor_task.cancel() - Heartbeat, not polling. Each subsystem pushes a signal. The supervisor doesn't ask "are you alive?" — it checks "did you signal within your window?" This is exactly how ISS ECLSS telemetry works: sensors push data, the control system flags when data stops arriving.
- Failure counting, not instant kill. One missed heartbeat could be a transient delay. Three consecutive misses indicate a real problem. The Mars Pathfinder watchdog didn't have this nuance — it rebooted on the first miss. Modern systems are more forgiving.
- Bounded restarts. After max_failures, the supervisor cancels the task. It doesn't retry forever. Unbounded retries are how a single failing subsystem takes down an entire spacecraft — or an entire microservice cluster. - LLM output validation. Call the model 3 times with the same prompt. If 2 of 3 agree, the answer is likely correct. If all 3 differ, the prompt is ambiguous.
- External API responses. If a pricing API returns $0 once, that's probably a bug. If it returns $0 three times, that's the actual price.
- Data pipeline transforms. Run the same ETL step with 3 different implementations. Majority vote catches implementation-specific bugs. - API rate limits hit. Degrade to CRITICAL — only payment processing and auth continue. Analytics, recommendations, and logging queue up for later.
- Database connection pool exhausted. Shed MEDIUM and LOW queries. Reads and writes for the core transaction path keep running.
- Third-party service outage. If your recommendation engine is down, return cached results (fallback) instead of blocking the page load.