$ -weight: 500;">pip -weight: 500;">install requests tenacity
-weight: 500;">pip -weight: 500;">install requests tenacity
-weight: 500;">pip -weight: 500;">install requests tenacity
import time
from datetime import datetime, timezone
from email.utils import parsedate_to_datetime def parse_retry_after(headers): """ Parse the Retry-After header from a rate-limited response. Returns seconds to wait, or None if the header is absent. """ retry_after = headers.get("Retry-After") if not retry_after: return None # Some APIs return seconds as a plain integer string try: return max(0.0, float(retry_after)) except ValueError: pass # Others return an HTTP date string: "Wed, 21 Oct 2015 07:28:00 GMT" try: reset_dt = parsedate_to_datetime(retry_after) now = datetime.now(timezone.utc) return max(0.0, (reset_dt - now).total_seconds()) except Exception: return None
import time
from datetime import datetime, timezone
from email.utils import parsedate_to_datetime def parse_retry_after(headers): """ Parse the Retry-After header from a rate-limited response. Returns seconds to wait, or None if the header is absent. """ retry_after = headers.get("Retry-After") if not retry_after: return None # Some APIs return seconds as a plain integer string try: return max(0.0, float(retry_after)) except ValueError: pass # Others return an HTTP date string: "Wed, 21 Oct 2015 07:28:00 GMT" try: reset_dt = parsedate_to_datetime(retry_after) now = datetime.now(timezone.utc) return max(0.0, (reset_dt - now).total_seconds()) except Exception: return None
import time
from datetime import datetime, timezone
from email.utils import parsedate_to_datetime def parse_retry_after(headers): """ Parse the Retry-After header from a rate-limited response. Returns seconds to wait, or None if the header is absent. """ retry_after = headers.get("Retry-After") if not retry_after: return None # Some APIs return seconds as a plain integer string try: return max(0.0, float(retry_after)) except ValueError: pass # Others return an HTTP date string: "Wed, 21 Oct 2015 07:28:00 GMT" try: reset_dt = parsedate_to_datetime(retry_after) now = datetime.now(timezone.utc) return max(0.0, (reset_dt - now).total_seconds()) except Exception: return None
import random def calculate_wait(response, attempt, base=1.0, max_delay=60.0): """ Calculate how long to wait before retrying a failed request. Uses Retry-After header when available, exponential backoff otherwise. """ api_specified = parse_retry_after(response.headers) if api_specified is not None: return api_specified # Exponential backoff with jitter delay = min(base * (2 ** attempt), max_delay) jitter = random.uniform(0, delay * 0.1) return delay + jitter
import random def calculate_wait(response, attempt, base=1.0, max_delay=60.0): """ Calculate how long to wait before retrying a failed request. Uses Retry-After header when available, exponential backoff otherwise. """ api_specified = parse_retry_after(response.headers) if api_specified is not None: return api_specified # Exponential backoff with jitter delay = min(base * (2 ** attempt), max_delay) jitter = random.uniform(0, delay * 0.1) return delay + jitter
import random def calculate_wait(response, attempt, base=1.0, max_delay=60.0): """ Calculate how long to wait before retrying a failed request. Uses Retry-After header when available, exponential backoff otherwise. """ api_specified = parse_retry_after(response.headers) if api_specified is not None: return api_specified # Exponential backoff with jitter delay = min(base * (2 ** attempt), max_delay) jitter = random.uniform(0, delay * 0.1) return delay + jitter
import requests def fetch_with_backoff(url, headers=None, max_retries=6, base_delay=1.0): """ Make a GET request with retry logic for 429 and 5xx responses. Does not retry on 4xx client errors (except 429). """ for attempt in range(max_retries): response = requests.get(url, headers=headers or {}, timeout=30) if response.status_code == 200: return response if response.status_code == 429: if attempt == max_retries - 1: raise RuntimeError(f"Rate limit persists after {max_retries} retries: {url}") wait = calculate_wait(response, attempt, base=base_delay) print(f"Rate limited (429). Waiting {wait:.1f}s (attempt {attempt + 1}/{max_retries})") time.sleep(wait) continue if response.status_code >= 500: if attempt == max_retries - 1: response.raise_for_status() wait = min(base_delay * (2 ** attempt), 60.0) print(f"Server error ({response.status_code}). Waiting {wait:.1f}s") time.sleep(wait) continue # 4xx client errors: do not retry response.raise_for_status() raise RuntimeError(f"Exhausted retries for {url}")
import requests def fetch_with_backoff(url, headers=None, max_retries=6, base_delay=1.0): """ Make a GET request with retry logic for 429 and 5xx responses. Does not retry on 4xx client errors (except 429). """ for attempt in range(max_retries): response = requests.get(url, headers=headers or {}, timeout=30) if response.status_code == 200: return response if response.status_code == 429: if attempt == max_retries - 1: raise RuntimeError(f"Rate limit persists after {max_retries} retries: {url}") wait = calculate_wait(response, attempt, base=base_delay) print(f"Rate limited (429). Waiting {wait:.1f}s (attempt {attempt + 1}/{max_retries})") time.sleep(wait) continue if response.status_code >= 500: if attempt == max_retries - 1: response.raise_for_status() wait = min(base_delay * (2 ** attempt), 60.0) print(f"Server error ({response.status_code}). Waiting {wait:.1f}s") time.sleep(wait) continue # 4xx client errors: do not retry response.raise_for_status() raise RuntimeError(f"Exhausted retries for {url}")
import requests def fetch_with_backoff(url, headers=None, max_retries=6, base_delay=1.0): """ Make a GET request with retry logic for 429 and 5xx responses. Does not retry on 4xx client errors (except 429). """ for attempt in range(max_retries): response = requests.get(url, headers=headers or {}, timeout=30) if response.status_code == 200: return response if response.status_code == 429: if attempt == max_retries - 1: raise RuntimeError(f"Rate limit persists after {max_retries} retries: {url}") wait = calculate_wait(response, attempt, base=base_delay) print(f"Rate limited (429). Waiting {wait:.1f}s (attempt {attempt + 1}/{max_retries})") time.sleep(wait) continue if response.status_code >= 500: if attempt == max_retries - 1: response.raise_for_status() wait = min(base_delay * (2 ** attempt), 60.0) print(f"Server error ({response.status_code}). Waiting {wait:.1f}s") time.sleep(wait) continue # 4xx client errors: do not retry response.raise_for_status() raise RuntimeError(f"Exhausted retries for {url}")
from tenacity import ( retry, stop_after_attempt, wait_exponential_jitter, retry_if_exception_type, before_sleep_log,
)
import logging
import requests logger = logging.getLogger(__name__) class RateLimitError(Exception): pass class ServerError(Exception): pass def raise_for_status_with_retry(response): """Convert HTTP errors to typed exceptions for tenacity.""" if response.status_code == 429: raise RateLimitError( f"Rate limited. Retry-After: {response.headers.get('Retry-After', 'not specified')}" ) if response.status_code >= 500: raise ServerError(f"Server error {response.status_code}") response.raise_for_status() return response @retry( retry=retry_if_exception_type((RateLimitError, ServerError)), wait=wait_exponential_jitter(initial=1, max=60), -weight: 500;">stop=stop_after_attempt(6), before_sleep=before_sleep_log(logger, logging.WARNING), reraise=True,
)
def fetch_api_resource(url, session): response = session.get(url, timeout=30) return raise_for_status_with_retry(response)
from tenacity import ( retry, stop_after_attempt, wait_exponential_jitter, retry_if_exception_type, before_sleep_log,
)
import logging
import requests logger = logging.getLogger(__name__) class RateLimitError(Exception): pass class ServerError(Exception): pass def raise_for_status_with_retry(response): """Convert HTTP errors to typed exceptions for tenacity.""" if response.status_code == 429: raise RateLimitError( f"Rate limited. Retry-After: {response.headers.get('Retry-After', 'not specified')}" ) if response.status_code >= 500: raise ServerError(f"Server error {response.status_code}") response.raise_for_status() return response @retry( retry=retry_if_exception_type((RateLimitError, ServerError)), wait=wait_exponential_jitter(initial=1, max=60), -weight: 500;">stop=stop_after_attempt(6), before_sleep=before_sleep_log(logger, logging.WARNING), reraise=True,
)
def fetch_api_resource(url, session): response = session.get(url, timeout=30) return raise_for_status_with_retry(response)
from tenacity import ( retry, stop_after_attempt, wait_exponential_jitter, retry_if_exception_type, before_sleep_log,
)
import logging
import requests logger = logging.getLogger(__name__) class RateLimitError(Exception): pass class ServerError(Exception): pass def raise_for_status_with_retry(response): """Convert HTTP errors to typed exceptions for tenacity.""" if response.status_code == 429: raise RateLimitError( f"Rate limited. Retry-After: {response.headers.get('Retry-After', 'not specified')}" ) if response.status_code >= 500: raise ServerError(f"Server error {response.status_code}") response.raise_for_status() return response @retry( retry=retry_if_exception_type((RateLimitError, ServerError)), wait=wait_exponential_jitter(initial=1, max=60), -weight: 500;">stop=stop_after_attempt(6), before_sleep=before_sleep_log(logger, logging.WARNING), reraise=True,
)
def fetch_api_resource(url, session): response = session.get(url, timeout=30) return raise_for_status_with_retry(response)
import threading class TokenBucket: """Thread-safe token bucket for rate limiting API requests.""" def __init__(self, rate, capacity): self.rate = rate # tokens added per second self.capacity = capacity # maximum tokens self.tokens = float(capacity) self.last_refill = time.monotonic() self._lock = threading.Lock() def acquire(self, tokens=1): """Wait until tokens are available. Returns the actual wait time.""" with self._lock: now = time.monotonic() elapsed = now - self.last_refill self.tokens = min(self.capacity, self.tokens + elapsed * self.rate) self.last_refill = now if self.tokens >= tokens: self.tokens -= tokens return 0.0 wait = (tokens - self.tokens) / self.rate self.tokens = 0 return wait # For an API allowing 10 requests per second:
bucket = TokenBucket(rate=10, capacity=10) def throttled_fetch(url, session): wait = bucket.acquire() if wait > 0: time.sleep(wait) return fetch_api_resource(url, session)
import threading class TokenBucket: """Thread-safe token bucket for rate limiting API requests.""" def __init__(self, rate, capacity): self.rate = rate # tokens added per second self.capacity = capacity # maximum tokens self.tokens = float(capacity) self.last_refill = time.monotonic() self._lock = threading.Lock() def acquire(self, tokens=1): """Wait until tokens are available. Returns the actual wait time.""" with self._lock: now = time.monotonic() elapsed = now - self.last_refill self.tokens = min(self.capacity, self.tokens + elapsed * self.rate) self.last_refill = now if self.tokens >= tokens: self.tokens -= tokens return 0.0 wait = (tokens - self.tokens) / self.rate self.tokens = 0 return wait # For an API allowing 10 requests per second:
bucket = TokenBucket(rate=10, capacity=10) def throttled_fetch(url, session): wait = bucket.acquire() if wait > 0: time.sleep(wait) return fetch_api_resource(url, session)
import threading class TokenBucket: """Thread-safe token bucket for rate limiting API requests.""" def __init__(self, rate, capacity): self.rate = rate # tokens added per second self.capacity = capacity # maximum tokens self.tokens = float(capacity) self.last_refill = time.monotonic() self._lock = threading.Lock() def acquire(self, tokens=1): """Wait until tokens are available. Returns the actual wait time.""" with self._lock: now = time.monotonic() elapsed = now - self.last_refill self.tokens = min(self.capacity, self.tokens + elapsed * self.rate) self.last_refill = now if self.tokens >= tokens: self.tokens -= tokens return 0.0 wait = (tokens - self.tokens) / self.rate self.tokens = 0 return wait # For an API allowing 10 requests per second:
bucket = TokenBucket(rate=10, capacity=10) def throttled_fetch(url, session): wait = bucket.acquire() if wait > 0: time.sleep(wait) return fetch_api_resource(url, session)
import requests bucket = TokenBucket(rate=5, capacity=10) # Stay under the API limit def paginate_api(base_url, auth_headers, params=None): results = [] page = 1 with requests.Session() as session: session.headers.-weight: 500;">update(auth_headers) while True: url = f"{base_url}?page={page}&per_page=100" response = throttled_fetch(url, session) data = response.json() items = data.get("items") or data.get("results") or [] if not items: break results.extend(items) page += 1 return results
import requests bucket = TokenBucket(rate=5, capacity=10) # Stay under the API limit def paginate_api(base_url, auth_headers, params=None): results = [] page = 1 with requests.Session() as session: session.headers.-weight: 500;">update(auth_headers) while True: url = f"{base_url}?page={page}&per_page=100" response = throttled_fetch(url, session) data = response.json() items = data.get("items") or data.get("results") or [] if not items: break results.extend(items) page += 1 return results
import requests bucket = TokenBucket(rate=5, capacity=10) # Stay under the API limit def paginate_api(base_url, auth_headers, params=None): results = [] page = 1 with requests.Session() as session: session.headers.-weight: 500;">update(auth_headers) while True: url = f"{base_url}?page={page}&per_page=100" response = throttled_fetch(url, session) data = response.json() items = data.get("items") or data.get("results") or [] if not items: break results.extend(items) page += 1 return results - A calculate_wait() function that reads Retry-After headers when present and falls back to exponential backoff with jitter when not
- A fetch_with_backoff() wrapper function for single requests
- A tenacity-based decorator for production use with logging
- A proactive TokenBucket class to prevent most 429 responses before they occur