Tools: Update: How to Implement Exponential Backoff for Rate-Limited APIs in Python

Tools: Update: How to Implement Exponential Backoff for Rate-Limited APIs in Python

What We Are Building

Step 1: Parse the Retry-After Header

Step 2: Build the Wait Calculation Function

Step 3: Write the Retry Wrapper

Step 4: Production-Grade Retry with Tenacity

Step 5: Add Proactive Rate Limiting with Token Bucket

Putting It Together

Error Categories to Handle Differently

Testing Your Retry Logic Hitting an API rate limit and not knowing what to do with the HTTP 429 response is one of the most common causes of brittle data automation scripts. This is a step-by-step implementation guide: from a minimal correct backoff function to a production-grade tenacity decorator that logs retries, handles Retry-After headers, and distinguishes between retriable and non-retriable errors. By the end of this guide, you will have: Prerequisites: Python 3.8+, requests and tenacity installed: The most important piece of rate limit handling is honoring the API's own signal about when to retry. Many 429 responses include a Retry-After header that tells you exactly how long to wait. Combine Retry-After parsing with exponential backoff as the fallback: The jitter term prevents the thundering herd problem: if multiple workers all hit the limit at the same moment, jitter ensures they do not all retry at exactly the same moment. A minimal correct implementation with explicit handling for 429 vs. server errors vs. client errors: This handles the three distinct cases: rate limits (retry with API-specified or backoff delay), server errors (retry with backoff), and client errors (fail immediately without retrying). Photo by QuinceCreative on Pixabay The tenacity library provides a decorator-based retry system that is cleaner to configure and includes built-in logging: The before_sleep_log parameter writes a WARNING entry to your log system before each sleep interval, which makes retry behavior visible in logs without requiring custom logging code in the retry loop. Exponential backoff is reactive: it handles failures after they occur. A token bucket implementation is proactive: it throttles your own request rate to stay below the API limit, reducing how often 429 responses occur. A complete paginated API consumer that combines proactive throttling and reactive retry: The token bucket prevents most 429 responses. The tenacity decorator handles the ones that slip through. The session reuses the TCP connection across requests. Not all errors should be retried. A well-structured retry strategy distinguishes: Retrying on 401 or 400 is always wrong and wastes time. A correct implementation routes each error type to the appropriate response. Photo by blickpixel on Pixabay Retry logic is difficult to test without a server that deliberately returns 429 responses. Options: Mock the response object: Use unittest.mock.patch to replace requests.get with a function that returns a mock response with status_code=429 for the first N calls. Use a local proxy: Run a local reverse proxy (nginx, mitmproxy) in front of the API that injects 429 responses at a configured rate. Use httpbin: httpbin.org/status/429 returns a 429 response that you can use to test parsing and backoff behavior without hitting a real API. For the complete guide including token bucket implementation details, tenacity configuration for production pipelines, and monitoring recommendations, read the full article here. Templates let you quickly answer FAQs or store snippets for re-use. Hide child comments as well For further actions, you may consider blocking this person and/or reporting abuse

Command

Copy

$ -weight: 500;">pip -weight: 500;">install requests tenacity -weight: 500;">pip -weight: 500;">install requests tenacity -weight: 500;">pip -weight: 500;">install requests tenacity import time from datetime import datetime, timezone from email.utils import parsedate_to_datetime def parse_retry_after(headers): """ Parse the Retry-After header from a rate-limited response. Returns seconds to wait, or None if the header is absent. """ retry_after = headers.get("Retry-After") if not retry_after: return None # Some APIs return seconds as a plain integer string try: return max(0.0, float(retry_after)) except ValueError: pass # Others return an HTTP date string: "Wed, 21 Oct 2015 07:28:00 GMT" try: reset_dt = parsedate_to_datetime(retry_after) now = datetime.now(timezone.utc) return max(0.0, (reset_dt - now).total_seconds()) except Exception: return None import time from datetime import datetime, timezone from email.utils import parsedate_to_datetime def parse_retry_after(headers): """ Parse the Retry-After header from a rate-limited response. Returns seconds to wait, or None if the header is absent. """ retry_after = headers.get("Retry-After") if not retry_after: return None # Some APIs return seconds as a plain integer string try: return max(0.0, float(retry_after)) except ValueError: pass # Others return an HTTP date string: "Wed, 21 Oct 2015 07:28:00 GMT" try: reset_dt = parsedate_to_datetime(retry_after) now = datetime.now(timezone.utc) return max(0.0, (reset_dt - now).total_seconds()) except Exception: return None import time from datetime import datetime, timezone from email.utils import parsedate_to_datetime def parse_retry_after(headers): """ Parse the Retry-After header from a rate-limited response. Returns seconds to wait, or None if the header is absent. """ retry_after = headers.get("Retry-After") if not retry_after: return None # Some APIs return seconds as a plain integer string try: return max(0.0, float(retry_after)) except ValueError: pass # Others return an HTTP date string: "Wed, 21 Oct 2015 07:28:00 GMT" try: reset_dt = parsedate_to_datetime(retry_after) now = datetime.now(timezone.utc) return max(0.0, (reset_dt - now).total_seconds()) except Exception: return None import random def calculate_wait(response, attempt, base=1.0, max_delay=60.0): """ Calculate how long to wait before retrying a failed request. Uses Retry-After header when available, exponential backoff otherwise. """ api_specified = parse_retry_after(response.headers) if api_specified is not None: return api_specified # Exponential backoff with jitter delay = min(base * (2 ** attempt), max_delay) jitter = random.uniform(0, delay * 0.1) return delay + jitter import random def calculate_wait(response, attempt, base=1.0, max_delay=60.0): """ Calculate how long to wait before retrying a failed request. Uses Retry-After header when available, exponential backoff otherwise. """ api_specified = parse_retry_after(response.headers) if api_specified is not None: return api_specified # Exponential backoff with jitter delay = min(base * (2 ** attempt), max_delay) jitter = random.uniform(0, delay * 0.1) return delay + jitter import random def calculate_wait(response, attempt, base=1.0, max_delay=60.0): """ Calculate how long to wait before retrying a failed request. Uses Retry-After header when available, exponential backoff otherwise. """ api_specified = parse_retry_after(response.headers) if api_specified is not None: return api_specified # Exponential backoff with jitter delay = min(base * (2 ** attempt), max_delay) jitter = random.uniform(0, delay * 0.1) return delay + jitter import requests def fetch_with_backoff(url, headers=None, max_retries=6, base_delay=1.0): """ Make a GET request with retry logic for 429 and 5xx responses. Does not retry on 4xx client errors (except 429). """ for attempt in range(max_retries): response = requests.get(url, headers=headers or {}, timeout=30) if response.status_code == 200: return response if response.status_code == 429: if attempt == max_retries - 1: raise RuntimeError(f"Rate limit persists after {max_retries} retries: {url}") wait = calculate_wait(response, attempt, base=base_delay) print(f"Rate limited (429). Waiting {wait:.1f}s (attempt {attempt + 1}/{max_retries})") time.sleep(wait) continue if response.status_code >= 500: if attempt == max_retries - 1: response.raise_for_status() wait = min(base_delay * (2 ** attempt), 60.0) print(f"Server error ({response.status_code}). Waiting {wait:.1f}s") time.sleep(wait) continue # 4xx client errors: do not retry response.raise_for_status() raise RuntimeError(f"Exhausted retries for {url}") import requests def fetch_with_backoff(url, headers=None, max_retries=6, base_delay=1.0): """ Make a GET request with retry logic for 429 and 5xx responses. Does not retry on 4xx client errors (except 429). """ for attempt in range(max_retries): response = requests.get(url, headers=headers or {}, timeout=30) if response.status_code == 200: return response if response.status_code == 429: if attempt == max_retries - 1: raise RuntimeError(f"Rate limit persists after {max_retries} retries: {url}") wait = calculate_wait(response, attempt, base=base_delay) print(f"Rate limited (429). Waiting {wait:.1f}s (attempt {attempt + 1}/{max_retries})") time.sleep(wait) continue if response.status_code >= 500: if attempt == max_retries - 1: response.raise_for_status() wait = min(base_delay * (2 ** attempt), 60.0) print(f"Server error ({response.status_code}). Waiting {wait:.1f}s") time.sleep(wait) continue # 4xx client errors: do not retry response.raise_for_status() raise RuntimeError(f"Exhausted retries for {url}") import requests def fetch_with_backoff(url, headers=None, max_retries=6, base_delay=1.0): """ Make a GET request with retry logic for 429 and 5xx responses. Does not retry on 4xx client errors (except 429). """ for attempt in range(max_retries): response = requests.get(url, headers=headers or {}, timeout=30) if response.status_code == 200: return response if response.status_code == 429: if attempt == max_retries - 1: raise RuntimeError(f"Rate limit persists after {max_retries} retries: {url}") wait = calculate_wait(response, attempt, base=base_delay) print(f"Rate limited (429). Waiting {wait:.1f}s (attempt {attempt + 1}/{max_retries})") time.sleep(wait) continue if response.status_code >= 500: if attempt == max_retries - 1: response.raise_for_status() wait = min(base_delay * (2 ** attempt), 60.0) print(f"Server error ({response.status_code}). Waiting {wait:.1f}s") time.sleep(wait) continue # 4xx client errors: do not retry response.raise_for_status() raise RuntimeError(f"Exhausted retries for {url}") from tenacity import ( retry, stop_after_attempt, wait_exponential_jitter, retry_if_exception_type, before_sleep_log, ) import logging import requests logger = logging.getLogger(__name__) class RateLimitError(Exception): pass class ServerError(Exception): pass def raise_for_status_with_retry(response): """Convert HTTP errors to typed exceptions for tenacity.""" if response.status_code == 429: raise RateLimitError( f"Rate limited. Retry-After: {response.headers.get('Retry-After', 'not specified')}" ) if response.status_code >= 500: raise ServerError(f"Server error {response.status_code}") response.raise_for_status() return response @retry( retry=retry_if_exception_type((RateLimitError, ServerError)), wait=wait_exponential_jitter(initial=1, max=60), -weight: 500;">stop=stop_after_attempt(6), before_sleep=before_sleep_log(logger, logging.WARNING), reraise=True, ) def fetch_api_resource(url, session): response = session.get(url, timeout=30) return raise_for_status_with_retry(response) from tenacity import ( retry, stop_after_attempt, wait_exponential_jitter, retry_if_exception_type, before_sleep_log, ) import logging import requests logger = logging.getLogger(__name__) class RateLimitError(Exception): pass class ServerError(Exception): pass def raise_for_status_with_retry(response): """Convert HTTP errors to typed exceptions for tenacity.""" if response.status_code == 429: raise RateLimitError( f"Rate limited. Retry-After: {response.headers.get('Retry-After', 'not specified')}" ) if response.status_code >= 500: raise ServerError(f"Server error {response.status_code}") response.raise_for_status() return response @retry( retry=retry_if_exception_type((RateLimitError, ServerError)), wait=wait_exponential_jitter(initial=1, max=60), -weight: 500;">stop=stop_after_attempt(6), before_sleep=before_sleep_log(logger, logging.WARNING), reraise=True, ) def fetch_api_resource(url, session): response = session.get(url, timeout=30) return raise_for_status_with_retry(response) from tenacity import ( retry, stop_after_attempt, wait_exponential_jitter, retry_if_exception_type, before_sleep_log, ) import logging import requests logger = logging.getLogger(__name__) class RateLimitError(Exception): pass class ServerError(Exception): pass def raise_for_status_with_retry(response): """Convert HTTP errors to typed exceptions for tenacity.""" if response.status_code == 429: raise RateLimitError( f"Rate limited. Retry-After: {response.headers.get('Retry-After', 'not specified')}" ) if response.status_code >= 500: raise ServerError(f"Server error {response.status_code}") response.raise_for_status() return response @retry( retry=retry_if_exception_type((RateLimitError, ServerError)), wait=wait_exponential_jitter(initial=1, max=60), -weight: 500;">stop=stop_after_attempt(6), before_sleep=before_sleep_log(logger, logging.WARNING), reraise=True, ) def fetch_api_resource(url, session): response = session.get(url, timeout=30) return raise_for_status_with_retry(response) import threading class TokenBucket: """Thread-safe token bucket for rate limiting API requests.""" def __init__(self, rate, capacity): self.rate = rate # tokens added per second self.capacity = capacity # maximum tokens self.tokens = float(capacity) self.last_refill = time.monotonic() self._lock = threading.Lock() def acquire(self, tokens=1): """Wait until tokens are available. Returns the actual wait time.""" with self._lock: now = time.monotonic() elapsed = now - self.last_refill self.tokens = min(self.capacity, self.tokens + elapsed * self.rate) self.last_refill = now if self.tokens >= tokens: self.tokens -= tokens return 0.0 wait = (tokens - self.tokens) / self.rate self.tokens = 0 return wait # For an API allowing 10 requests per second: bucket = TokenBucket(rate=10, capacity=10) def throttled_fetch(url, session): wait = bucket.acquire() if wait > 0: time.sleep(wait) return fetch_api_resource(url, session) import threading class TokenBucket: """Thread-safe token bucket for rate limiting API requests.""" def __init__(self, rate, capacity): self.rate = rate # tokens added per second self.capacity = capacity # maximum tokens self.tokens = float(capacity) self.last_refill = time.monotonic() self._lock = threading.Lock() def acquire(self, tokens=1): """Wait until tokens are available. Returns the actual wait time.""" with self._lock: now = time.monotonic() elapsed = now - self.last_refill self.tokens = min(self.capacity, self.tokens + elapsed * self.rate) self.last_refill = now if self.tokens >= tokens: self.tokens -= tokens return 0.0 wait = (tokens - self.tokens) / self.rate self.tokens = 0 return wait # For an API allowing 10 requests per second: bucket = TokenBucket(rate=10, capacity=10) def throttled_fetch(url, session): wait = bucket.acquire() if wait > 0: time.sleep(wait) return fetch_api_resource(url, session) import threading class TokenBucket: """Thread-safe token bucket for rate limiting API requests.""" def __init__(self, rate, capacity): self.rate = rate # tokens added per second self.capacity = capacity # maximum tokens self.tokens = float(capacity) self.last_refill = time.monotonic() self._lock = threading.Lock() def acquire(self, tokens=1): """Wait until tokens are available. Returns the actual wait time.""" with self._lock: now = time.monotonic() elapsed = now - self.last_refill self.tokens = min(self.capacity, self.tokens + elapsed * self.rate) self.last_refill = now if self.tokens >= tokens: self.tokens -= tokens return 0.0 wait = (tokens - self.tokens) / self.rate self.tokens = 0 return wait # For an API allowing 10 requests per second: bucket = TokenBucket(rate=10, capacity=10) def throttled_fetch(url, session): wait = bucket.acquire() if wait > 0: time.sleep(wait) return fetch_api_resource(url, session) import requests bucket = TokenBucket(rate=5, capacity=10) # Stay under the API limit def paginate_api(base_url, auth_headers, params=None): results = [] page = 1 with requests.Session() as session: session.headers.-weight: 500;">update(auth_headers) while True: url = f"{base_url}?page={page}&per_page=100" response = throttled_fetch(url, session) data = response.json() items = data.get("items") or data.get("results") or [] if not items: break results.extend(items) page += 1 return results import requests bucket = TokenBucket(rate=5, capacity=10) # Stay under the API limit def paginate_api(base_url, auth_headers, params=None): results = [] page = 1 with requests.Session() as session: session.headers.-weight: 500;">update(auth_headers) while True: url = f"{base_url}?page={page}&per_page=100" response = throttled_fetch(url, session) data = response.json() items = data.get("items") or data.get("results") or [] if not items: break results.extend(items) page += 1 return results import requests bucket = TokenBucket(rate=5, capacity=10) # Stay under the API limit def paginate_api(base_url, auth_headers, params=None): results = [] page = 1 with requests.Session() as session: session.headers.-weight: 500;">update(auth_headers) while True: url = f"{base_url}?page={page}&per_page=100" response = throttled_fetch(url, session) data = response.json() items = data.get("items") or data.get("results") or [] if not items: break results.extend(items) page += 1 return results - A calculate_wait() function that reads Retry-After headers when present and falls back to exponential backoff with jitter when not - A fetch_with_backoff() wrapper function for single requests - A tenacity-based decorator for production use with logging - A proactive TokenBucket class to prevent most 429 responses before they occur