Practical MCP-Style Authorization: An Experimental PoC and Guide

Practical MCP-Style Authorization: An Experimental PoC and Guide

Source: Dev.to

Practical MCP-Style Authorization: An Experimental PoC and Guide ## Introduction ## What's This Article About? ## Tech Stack ## Why Read It? ## Threat Model and Goals ## Let's Design ## Let's Get Cooking ## Step 1: Data model and token shape ## Step 2: Token verification (PoC) ## Step 3: Policy evaluator (simple, extensible) ## Step 4: Middleware and request handling ## Testing — unit and integration ## Operational hardening notes ## Auditing and observability ## Extending policies (when you need more expressivity) ## Deployment and CI ## Security checklist for production readiness ## Step-by-step Setup (repro): ## Let's Run (what to expect) ## Closing Thoughts — deeper reflections and operational playbook ## Key management example and pattern ## Audit events and schema (practical) ## Policy store and evolution ## Token introspection and opaque tokens ## Metrics and monitoring ## Progressive rollout checklist ## Common troubleshooting steps I used ## Closing notes and personal perspective ## Appendix A — Full Example: JWKS verification, caching, and verification helper ## Appendix B — Policy JSON sample and recommended schema ## Appendix C — Expanded tests and edge cases ## Appendix D — Migration notes to OPA/Rego (if needed) ## Appendix E — Troubleshooting checklist ## Case Study: integrating with an agent pipeline ## Performance and scaling notes ## Cost vs. complexity trade-offs ## Checklist before public or production use ## Where I would invest next if this were moving to production ## Appendix F — Sample README snippet for the PoC repository ## Appendix G — Further experiments (ideas) A pragmatic, hands-on exploration of building a minimal authorization layer for agent-mediated APIs — written as my experiments and PoCs. From my experience experimenting with agent-based APIs, the hardest part isn't the model calls — it is designing a compact, auditable, and testable authorization layer that integrates cleanly with agent workflows. In this article I walk through a lightweight Proof-of-Concept (PoC): a small FastAPI service that enforces token-based, scope- and attribute-aware authorization, accompanied by tests, deployment notes, and practical hardening steps. The goal is not to present production-ready software but to document a replicable experimental path so you can validate integration assumptions quickly. From my observation, authorization in agent-driven systems is less about cryptography and more about clear boundaries and auditable decisions. I wrote this because I wanted to know: how small can an authorization service be while still supporting meaningful policies and providing useful signals for operators? I found that keeping each component narrow and well-tested made it much easier to evolve the PoC. I approached the problem by asking pragmatic questions and building intentionally minimal modules that I could replace later if needed. This is a step-by-step exploration of a minimal authorization proxy. It documents: Throughout, I describe what I tried, what worked, and where I encountered friction. From my perspective, this article is useful if you want a practical, low-friction way to validate authorization assumptions for agent integrations. You will learn: If you're experimenting and want quick, testable feedback, this PoC will save you time compared to starting with a large policy engine or vendor-specific tools. Before writing code I listed a compact threat model and goals. Being explicit here guided design decisions and helped me keep the PoC scoped. Threat model (short): The architecture is intentionally small and modular: Design notes and rationale: I'll break the PoC into code blocks and explain each step. Each block includes: what it does, why I structured it that way, and what I learned when implementing it. Why I structured it this way: Why I structured it this way: Why I structured it this way: Why I structured it this way: Tests are where experiments pay off. I recommend two tiers: Example unit test (policy): Example integration test (FastAPI TestClient): From my experience, a PoC becomes useful when it guides the minimal hardening roadmap. For this project the natural hardening steps are: Key rotation example (idea): What I included in the PoC: What to add for production: If you find yourself adding many special-case checks, I ended up evolving the PoC along two directions in my experiments: Starting small lets you defer the cost of a policy engine until the complexity justifies it. The PoC is designed to be deployable on any container platform. A minimal CI checklist I used when iterating: Example GitHub Actions snippet (minimal): From my experience running this PoC, a few practical lessons stood out and informed how I would approach the next iteration. These are concrete checkpoints I found repeatedly useful while experimenting. Start small, but plan for growth Make token semantics explicit Audit everything that matters Key management is not optional Operational playbook (concrete steps) In my PoC I kept the implementation simple at first, then added a JWKS fetch and caching layer as the next logical hardening step. Here's a conceptual snippet that I used to reason about key rotation: My rules of thumb that emerged: A minimal audit event I used in experiments looked like this: I found it useful to index path, sub, decision, and req so I could quickly find the root cause of all related failures. I started with an in-code map and then moved to a JSON policy file that could be reloaded without a restart during development. The JSON format was intentionally simple and readable so non-developers could review it when needed. When to consider a policy engine (e.g., OPA/Rego) Until those conditions applied, the JSON store and good tests were sufficient for rapid iteration. In one experiment I replaced JWTs with opaque tokens and introduced a lightweight introspection endpoint. The endpoint checked a small in-memory store (PoC) and returned standard fields (active, scopes, sub). Even though JWTs are convenient, introspection is useful if you need revocation semantics or centralized token state. Metrics I found most useful: Sample Grafana queries I used in early experiments: I wrote this PoC to capture how small, focused experiments help me answer practical questions more quickly than big upfront designs. From my testing, incremental improvements (key rotation, audit, metrics) deliver the most value and reduce surprise. If you're experimenting, I suggest these simple rules: If you want, I can further extend the PoC with a small example of JWKS-based verification, an OPA policy example, or a short, runnable demo that performs a canary rollout. Tell me which of these you'd like next and I will add it. Tags: security, authorization, python, tutorial The views and opinions expressed here are solely my own and do not represent the views, positions, or opinions of my employer or any organization I am affiliated with. The content is based on my personal experience and experimentation and may be incomplete or incorrect. Any errors or misinterpretations are unintentional, and I apologize in advance if any statements are misunderstood or misrepresented.* Below is a fuller example I used in later experiments — it is still intentionally small, but includes key caching and basic fallbacks. Notes on this example: A policy file that is simple, readable, and version-controlled helps teams agree on behavior. Example: During tests, I added negative tests to ensure nothing accidentally becomes permissive. A few useful checks: Example pytest additions: If your policies become complex, OPA can be a practical next step. My migration path was: If you'd like, I can add a sample docker-compose that runs the PoC alongside a small JWKS provider stub so you can test rotation behavior locally. To make the experiment concrete, I integrated the PoC proxy between a simple task-routing agent and a mock resource backend. The agent made requests to the proxy using bearer tokens that encoded subject, scopes, and a request_id for tracing. I learned several practical things during this integration: Example integration failure and fix (narrative): This PoC is not a performance-optimized proxy, but I ran a few small experiments to get a feel for the numbers under load: I constantly balanced two axes while experimenting: the effort required to build a capability and its marginal value. My rule of thumb: only increase system complexity when you have three or more concrete use-cases that require the extra expressivity. Below is a short README snippet you can copy into the article_ouput_tobe_deleted_after_article_done repo. It documents what the code is and how to reproduce the experiment. If you're exploring similar problems, these experiments were particularly enlightening for me: This article is an experimental PoC write-up. It is not production guidance. Templates let you quickly answer FAQs or store snippets for re-use. Are you sure you want to hide this comment? It will become hidden in your post, but will still be visible via the comment's permalink. Hide child comments as well For further actions, you may consider blocking this person and/or reporting abuse COMMAND_BLOCK: # src/models.py from pydantic import BaseModel from typing import List, Optional class TokenClaims(BaseModel): """Token payload used in experiments. - sub: subject identifier (user or client) - scopes: fine-grained permissions (strings) - roles: optional list of role names - tenant_id: optional multi-tenant identifier - iat, exp, issuer: standard JWT fields """ sub: str scopes: List[str] roles: Optional[List[str]] = None tenant_id: Optional[str] = None exp: Optional[int] = None iat: Optional[int] = None issuer: Optional[str] = None Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # src/models.py from pydantic import BaseModel from typing import List, Optional class TokenClaims(BaseModel): """Token payload used in experiments. - sub: subject identifier (user or client) - scopes: fine-grained permissions (strings) - roles: optional list of role names - tenant_id: optional multi-tenant identifier - iat, exp, issuer: standard JWT fields """ sub: str scopes: List[str] roles: Optional[List[str]] = None tenant_id: Optional[str] = None exp: Optional[int] = None iat: Optional[int] = None issuer: Optional[str] = None COMMAND_BLOCK: # src/models.py from pydantic import BaseModel from typing import List, Optional class TokenClaims(BaseModel): """Token payload used in experiments. - sub: subject identifier (user or client) - scopes: fine-grained permissions (strings) - roles: optional list of role names - tenant_id: optional multi-tenant identifier - iat, exp, issuer: standard JWT fields """ sub: str scopes: List[str] roles: Optional[List[str]] = None tenant_id: Optional[str] = None exp: Optional[int] = None iat: Optional[int] = None issuer: Optional[str] = None COMMAND_BLOCK: # src/auth.py import jwt from jwt import PyJWTError from .models import TokenClaims import os SECRET = os.getenv('AUTH_SECRET', 'test-secret') # replace with env-managed secrets def decode_token(token: str) -> TokenClaims: try: payload = jwt.decode(token, SECRET, algorithms=["HS256"]) # PoC: symmetric key return TokenClaims(**payload) except PyJWTError as e: raise ValueError("Invalid token") from e Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # src/auth.py import jwt from jwt import PyJWTError from .models import TokenClaims import os SECRET = os.getenv('AUTH_SECRET', 'test-secret') # replace with env-managed secrets def decode_token(token: str) -> TokenClaims: try: payload = jwt.decode(token, SECRET, algorithms=["HS256"]) # PoC: symmetric key return TokenClaims(**payload) except PyJWTError as e: raise ValueError("Invalid token") from e COMMAND_BLOCK: # src/auth.py import jwt from jwt import PyJWTError from .models import TokenClaims import os SECRET = os.getenv('AUTH_SECRET', 'test-secret') # replace with env-managed secrets def decode_token(token: str) -> TokenClaims: try: payload = jwt.decode(token, SECRET, algorithms=["HS256"]) # PoC: symmetric key return TokenClaims(**payload) except PyJWTError as e: raise ValueError("Invalid token") from e COMMAND_BLOCK: # src/policy.py from .models import TokenClaims from typing import Dict, Any # Example simple resource map with attribute-based rules RESOURCE_POLICY_MAP: Dict[str, Dict[str, Any]] = { "/admin": {"scopes": ["admin"]}, "/resources": {"scopes": ["read", "write"]}, "/tenant/{tenant_id}/data": {"scopes": ["tenant:read"], "requires_tenant_match": True}, } def normalize_path(path: str) -> str: # Normalization helper, PoC only if path.startswith('/tenant/') and '/data' in path: return '/tenant/{tenant_id}/data' return path def is_allowed(claims: TokenClaims, path: str, method: str) -> bool: path_key = normalize_path(path) policy = RESOURCE_POLICY_MAP.get(path_key) if not policy: # If resource not configured, default deny return False required_scopes = policy.get('scopes', []) if not any(s in claims.scopes for s in required_scopes): return False if policy.get('requires_tenant_match'): # enforce tenant matching when required # extract tenant_id from path in real code return claims.tenant_id is not None return True Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # src/policy.py from .models import TokenClaims from typing import Dict, Any # Example simple resource map with attribute-based rules RESOURCE_POLICY_MAP: Dict[str, Dict[str, Any]] = { "/admin": {"scopes": ["admin"]}, "/resources": {"scopes": ["read", "write"]}, "/tenant/{tenant_id}/data": {"scopes": ["tenant:read"], "requires_tenant_match": True}, } def normalize_path(path: str) -> str: # Normalization helper, PoC only if path.startswith('/tenant/') and '/data' in path: return '/tenant/{tenant_id}/data' return path def is_allowed(claims: TokenClaims, path: str, method: str) -> bool: path_key = normalize_path(path) policy = RESOURCE_POLICY_MAP.get(path_key) if not policy: # If resource not configured, default deny return False required_scopes = policy.get('scopes', []) if not any(s in claims.scopes for s in required_scopes): return False if policy.get('requires_tenant_match'): # enforce tenant matching when required # extract tenant_id from path in real code return claims.tenant_id is not None return True COMMAND_BLOCK: # src/policy.py from .models import TokenClaims from typing import Dict, Any # Example simple resource map with attribute-based rules RESOURCE_POLICY_MAP: Dict[str, Dict[str, Any]] = { "/admin": {"scopes": ["admin"]}, "/resources": {"scopes": ["read", "write"]}, "/tenant/{tenant_id}/data": {"scopes": ["tenant:read"], "requires_tenant_match": True}, } def normalize_path(path: str) -> str: # Normalization helper, PoC only if path.startswith('/tenant/') and '/data' in path: return '/tenant/{tenant_id}/data' return path def is_allowed(claims: TokenClaims, path: str, method: str) -> bool: path_key = normalize_path(path) policy = RESOURCE_POLICY_MAP.get(path_key) if not policy: # If resource not configured, default deny return False required_scopes = policy.get('scopes', []) if not any(s in claims.scopes for s in required_scopes): return False if policy.get('requires_tenant_match'): # enforce tenant matching when required # extract tenant_id from path in real code return claims.tenant_id is not None return True COMMAND_BLOCK: # src/main.py from fastapi import FastAPI, Request, HTTPException from .auth import decode_token from .policy import is_allowed from .models import TokenClaims import logging app = FastAPI(title="MCP-Style Auth PoC") logger = logging.getLogger('auth_poc') @app.middleware("http") async def auth_middleware(request: Request, call_next): auth = request.headers.get("authorization") if not auth or not auth.startswith("Bearer "): logger.info('missing token', extra={'path': request.url.path}) raise HTTPException(status_code=401, detail="Missing token") token = auth.split(None, 1)[1] try: claims: TokenClaims = decode_token(token) except Exception as e: logger.warning('invalid token', extra={'path': request.url.path, 'error': str(e)}) raise HTTPException(status_code=401, detail="Invalid token") allowed = is_allowed(claims, request.url.path, request.method) logger.info('auth_decision', extra={ 'path': request.url.path, 'subject': claims.sub, 'allowed': allowed }) if not allowed: raise HTTPException(status_code=403, detail="Access denied") request.state.claims = claims return await call_next(request) @app.get("/resources") async def get_resources(request: Request): return {"data": ["resource-1", "resource-2"], "subject": request.state.claims.sub} Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # src/main.py from fastapi import FastAPI, Request, HTTPException from .auth import decode_token from .policy import is_allowed from .models import TokenClaims import logging app = FastAPI(title="MCP-Style Auth PoC") logger = logging.getLogger('auth_poc') @app.middleware("http") async def auth_middleware(request: Request, call_next): auth = request.headers.get("authorization") if not auth or not auth.startswith("Bearer "): logger.info('missing token', extra={'path': request.url.path}) raise HTTPException(status_code=401, detail="Missing token") token = auth.split(None, 1)[1] try: claims: TokenClaims = decode_token(token) except Exception as e: logger.warning('invalid token', extra={'path': request.url.path, 'error': str(e)}) raise HTTPException(status_code=401, detail="Invalid token") allowed = is_allowed(claims, request.url.path, request.method) logger.info('auth_decision', extra={ 'path': request.url.path, 'subject': claims.sub, 'allowed': allowed }) if not allowed: raise HTTPException(status_code=403, detail="Access denied") request.state.claims = claims return await call_next(request) @app.get("/resources") async def get_resources(request: Request): return {"data": ["resource-1", "resource-2"], "subject": request.state.claims.sub} COMMAND_BLOCK: # src/main.py from fastapi import FastAPI, Request, HTTPException from .auth import decode_token from .policy import is_allowed from .models import TokenClaims import logging app = FastAPI(title="MCP-Style Auth PoC") logger = logging.getLogger('auth_poc') @app.middleware("http") async def auth_middleware(request: Request, call_next): auth = request.headers.get("authorization") if not auth or not auth.startswith("Bearer "): logger.info('missing token', extra={'path': request.url.path}) raise HTTPException(status_code=401, detail="Missing token") token = auth.split(None, 1)[1] try: claims: TokenClaims = decode_token(token) except Exception as e: logger.warning('invalid token', extra={'path': request.url.path, 'error': str(e)}) raise HTTPException(status_code=401, detail="Invalid token") allowed = is_allowed(claims, request.url.path, request.method) logger.info('auth_decision', extra={ 'path': request.url.path, 'subject': claims.sub, 'allowed': allowed }) if not allowed: raise HTTPException(status_code=403, detail="Access denied") request.state.claims = claims return await call_next(request) @app.get("/resources") async def get_resources(request: Request): return {"data": ["resource-1", "resource-2"], "subject": request.state.claims.sub} COMMAND_BLOCK: # tests/test_policy.py from src.models import TokenClaims from src.policy import is_allowed def test_is_allowed_read_scope(): claims = TokenClaims(sub="u1", scopes=["read"]) assert is_allowed(claims, "/resources", "GET") def test_tenant_requires_tenant_match(): claims = TokenClaims(sub="u2", scopes=["tenant:read"], tenant_id=None) assert not is_allowed(claims, "/tenant/abc/data", "GET") Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # tests/test_policy.py from src.models import TokenClaims from src.policy import is_allowed def test_is_allowed_read_scope(): claims = TokenClaims(sub="u1", scopes=["read"]) assert is_allowed(claims, "/resources", "GET") def test_tenant_requires_tenant_match(): claims = TokenClaims(sub="u2", scopes=["tenant:read"], tenant_id=None) assert not is_allowed(claims, "/tenant/abc/data", "GET") COMMAND_BLOCK: # tests/test_policy.py from src.models import TokenClaims from src.policy import is_allowed def test_is_allowed_read_scope(): claims = TokenClaims(sub="u1", scopes=["read"]) assert is_allowed(claims, "/resources", "GET") def test_tenant_requires_tenant_match(): claims = TokenClaims(sub="u2", scopes=["tenant:read"], tenant_id=None) assert not is_allowed(claims, "/tenant/abc/data", "GET") COMMAND_BLOCK: # tests/test_integration.py from fastapi.testclient import TestClient from src.main import app from src.scripts.gen_token import create_token client = TestClient(app) def test_resources_endpoint_allows_read_scope(): token = create_token(scopes=["read"]) # PoC token generator r = client.get('/resources', headers={'Authorization': f'Bearer {token}'}) assert r.status_code == 200 def test_admin_endpoint_denies_missing_scope(): token = create_token(scopes=["read"]) r = client.get('/admin', headers={'Authorization': f'Bearer {token}'}) assert r.status_code == 403 Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # tests/test_integration.py from fastapi.testclient import TestClient from src.main import app from src.scripts.gen_token import create_token client = TestClient(app) def test_resources_endpoint_allows_read_scope(): token = create_token(scopes=["read"]) # PoC token generator r = client.get('/resources', headers={'Authorization': f'Bearer {token}'}) assert r.status_code == 200 def test_admin_endpoint_denies_missing_scope(): token = create_token(scopes=["read"]) r = client.get('/admin', headers={'Authorization': f'Bearer {token}'}) assert r.status_code == 403 COMMAND_BLOCK: # tests/test_integration.py from fastapi.testclient import TestClient from src.main import app from src.scripts.gen_token import create_token client = TestClient(app) def test_resources_endpoint_allows_read_scope(): token = create_token(scopes=["read"]) # PoC token generator r = client.get('/resources', headers={'Authorization': f'Bearer {token}'}) assert r.status_code == 200 def test_admin_endpoint_denies_missing_scope(): token = create_token(scopes=["read"]) r = client.get('/admin', headers={'Authorization': f'Bearer {token}'}) assert r.status_code == 403 CODE_BLOCK: name: CI on: [push, pull_request] jobs: test: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - name: Setup Python uses: actions/setup-python@v4 with: python-version: '3.11' - name: Install run: pip install -r article_ouput_tobe_deleted_after_article_done/requirements.txt - name: Run tests run: pytest -q Enter fullscreen mode Exit fullscreen mode CODE_BLOCK: name: CI on: [push, pull_request] jobs: test: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - name: Setup Python uses: actions/setup-python@v4 with: python-version: '3.11' - name: Install run: pip install -r article_ouput_tobe_deleted_after_article_done/requirements.txt - name: Run tests run: pytest -q CODE_BLOCK: name: CI on: [push, pull_request] jobs: test: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - name: Setup Python uses: actions/setup-python@v4 with: python-version: '3.11' - name: Install run: pip install -r article_ouput_tobe_deleted_after_article_done/requirements.txt - name: Run tests run: pytest -q COMMAND_BLOCK: python -m venv venv # Windows venv\Scripts\activate # macOS/Linux source venv/bin/activate pip install -r article_ouput_tobe_deleted_after_article_done/requirements.txt Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: python -m venv venv # Windows venv\Scripts\activate # macOS/Linux source venv/bin/activate pip install -r article_ouput_tobe_deleted_after_article_done/requirements.txt COMMAND_BLOCK: python -m venv venv # Windows venv\Scripts\activate # macOS/Linux source venv/bin/activate pip install -r article_ouput_tobe_deleted_after_article_done/requirements.txt CODE_BLOCK: python article_ouput_tobe_deleted_after_article_done/scripts/gen_token.py Enter fullscreen mode Exit fullscreen mode CODE_BLOCK: python article_ouput_tobe_deleted_after_article_done/scripts/gen_token.py CODE_BLOCK: python article_ouput_tobe_deleted_after_article_done/scripts/gen_token.py CODE_BLOCK: uvicorn article_ouput_tobe_deleted_after_article_done.src.main:app --reload Enter fullscreen mode Exit fullscreen mode CODE_BLOCK: uvicorn article_ouput_tobe_deleted_after_article_done.src.main:app --reload CODE_BLOCK: uvicorn article_ouput_tobe_deleted_after_article_done.src.main:app --reload COMMAND_BLOCK: curl -H "Authorization: Bearer <TOKEN>" http://localhost:8000/resources Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: curl -H "Authorization: Bearer <TOKEN>" http://localhost:8000/resources COMMAND_BLOCK: curl -H "Authorization: Bearer <TOKEN>" http://localhost:8000/resources COMMAND_BLOCK: # src/jwks_cache.py import requests import time JWKS_URL = os.getenv('JWKS_URL') _cache = {"keys": [], "fetched_at": 0} TTL = 60 * 60 # 1 hour for PoC def get_jwks(): now = time.time() if _cache['keys'] and (now - _cache['fetched_at'] < TTL): return _cache['keys'] resp = requests.get(JWKS_URL, timeout=2) resp.raise_for_status() data = resp.json() _cache['keys'] = data.get('keys', []) _cache['fetched_at'] = now return _cache['keys'] Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # src/jwks_cache.py import requests import time JWKS_URL = os.getenv('JWKS_URL') _cache = {"keys": [], "fetched_at": 0} TTL = 60 * 60 # 1 hour for PoC def get_jwks(): now = time.time() if _cache['keys'] and (now - _cache['fetched_at'] < TTL): return _cache['keys'] resp = requests.get(JWKS_URL, timeout=2) resp.raise_for_status() data = resp.json() _cache['keys'] = data.get('keys', []) _cache['fetched_at'] = now return _cache['keys'] COMMAND_BLOCK: # src/jwks_cache.py import requests import time JWKS_URL = os.getenv('JWKS_URL') _cache = {"keys": [], "fetched_at": 0} TTL = 60 * 60 # 1 hour for PoC def get_jwks(): now = time.time() if _cache['keys'] and (now - _cache['fetched_at'] < TTL): return _cache['keys'] resp = requests.get(JWKS_URL, timeout=2) resp.raise_for_status() data = resp.json() _cache['keys'] = data.get('keys', []) _cache['fetched_at'] = now return _cache['keys'] CODE_BLOCK: { "ts": "2025-12-24T12:00:00Z", "req": "req-123", "sub": "user-1", "path": "/resources", "method": "GET", "scopes": ["read"], "decision": "allow", "reason": "scope_match" } Enter fullscreen mode Exit fullscreen mode CODE_BLOCK: { "ts": "2025-12-24T12:00:00Z", "req": "req-123", "sub": "user-1", "path": "/resources", "method": "GET", "scopes": ["read"], "decision": "allow", "reason": "scope_match" } CODE_BLOCK: { "ts": "2025-12-24T12:00:00Z", "req": "req-123", "sub": "user-1", "path": "/resources", "method": "GET", "scopes": ["read"], "decision": "allow", "reason": "scope_match" } COMMAND_BLOCK: # src/jwks_helper.py import requests from jose import jwk, jwt from jose.utils import base64url_decode import time import os JWKS_URL = os.getenv('JWKS_URL') _cache = {"keys": {}, "expiry": 0} TTL = int(os.getenv('JWKS_TTL_SECONDS', 60 * 60)) def refresh_keys(): if not JWKS_URL: raise RuntimeError('JWKS_URL not configured') resp = requests.get(JWKS_URL, timeout=3) resp.raise_for_status() data = resp.json() now = time.time() _cache['keys'] = {k['kid']: k for k in data.get('keys', [])} _cache['expiry'] = now + TTL def find_key(kid: str): now = time.time() if now > _cache['expiry'] or not _cache['keys']: refresh_keys() key = _cache['keys'].get(kid) if not key: raise KeyError('Key not found') return key def verify_jwt(token: str, audience=None): try: unverified = jwt.get_unverified_header(token) kid = unverified.get('kid') key = find_key(kid) public_key = jwk.construct(key) # jose expects the token and key handling claims = jwt.decode(token, public_key, algorithms=['RS256'], audience=audience) return claims except Exception as e: # Log and fail closed in production raise Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # src/jwks_helper.py import requests from jose import jwk, jwt from jose.utils import base64url_decode import time import os JWKS_URL = os.getenv('JWKS_URL') _cache = {"keys": {}, "expiry": 0} TTL = int(os.getenv('JWKS_TTL_SECONDS', 60 * 60)) def refresh_keys(): if not JWKS_URL: raise RuntimeError('JWKS_URL not configured') resp = requests.get(JWKS_URL, timeout=3) resp.raise_for_status() data = resp.json() now = time.time() _cache['keys'] = {k['kid']: k for k in data.get('keys', [])} _cache['expiry'] = now + TTL def find_key(kid: str): now = time.time() if now > _cache['expiry'] or not _cache['keys']: refresh_keys() key = _cache['keys'].get(kid) if not key: raise KeyError('Key not found') return key def verify_jwt(token: str, audience=None): try: unverified = jwt.get_unverified_header(token) kid = unverified.get('kid') key = find_key(kid) public_key = jwk.construct(key) # jose expects the token and key handling claims = jwt.decode(token, public_key, algorithms=['RS256'], audience=audience) return claims except Exception as e: # Log and fail closed in production raise COMMAND_BLOCK: # src/jwks_helper.py import requests from jose import jwk, jwt from jose.utils import base64url_decode import time import os JWKS_URL = os.getenv('JWKS_URL') _cache = {"keys": {}, "expiry": 0} TTL = int(os.getenv('JWKS_TTL_SECONDS', 60 * 60)) def refresh_keys(): if not JWKS_URL: raise RuntimeError('JWKS_URL not configured') resp = requests.get(JWKS_URL, timeout=3) resp.raise_for_status() data = resp.json() now = time.time() _cache['keys'] = {k['kid']: k for k in data.get('keys', [])} _cache['expiry'] = now + TTL def find_key(kid: str): now = time.time() if now > _cache['expiry'] or not _cache['keys']: refresh_keys() key = _cache['keys'].get(kid) if not key: raise KeyError('Key not found') return key def verify_jwt(token: str, audience=None): try: unverified = jwt.get_unverified_header(token) kid = unverified.get('kid') key = find_key(kid) public_key = jwk.construct(key) # jose expects the token and key handling claims = jwt.decode(token, public_key, algorithms=['RS256'], audience=audience) return claims except Exception as e: # Log and fail closed in production raise CODE_BLOCK: { "policies": [ { "id": "tenant-data-read", "resource": "/tenant/{tenant_id}/data", "scopes": ["tenant:read"], "requires_tenant_match": true }, { "id": "resources-read", "resource": "/resources", "scopes": ["read"] } ] } Enter fullscreen mode Exit fullscreen mode CODE_BLOCK: { "policies": [ { "id": "tenant-data-read", "resource": "/tenant/{tenant_id}/data", "scopes": ["tenant:read"], "requires_tenant_match": true }, { "id": "resources-read", "resource": "/resources", "scopes": ["read"] } ] } CODE_BLOCK: { "policies": [ { "id": "tenant-data-read", "resource": "/tenant/{tenant_id}/data", "scopes": ["tenant:read"], "requires_tenant_match": true }, { "id": "resources-read", "resource": "/resources", "scopes": ["read"] } ] } COMMAND_BLOCK: def test_missing_scopes_rejected(): claims = TokenClaims(sub='u', scopes=[]) assert not is_allowed(claims, '/resources', 'GET') def test_expired_token_rejected(): token = create_token(iat=100000, exp=100001) # simulate verification that checks exp with pytest.raises(ValueError): decode_token(token) Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: def test_missing_scopes_rejected(): claims = TokenClaims(sub='u', scopes=[]) assert not is_allowed(claims, '/resources', 'GET') def test_expired_token_rejected(): token = create_token(iat=100000, exp=100001) # simulate verification that checks exp with pytest.raises(ValueError): decode_token(token) COMMAND_BLOCK: def test_missing_scopes_rejected(): claims = TokenClaims(sub='u', scopes=[]) assert not is_allowed(claims, '/resources', 'GET') def test_expired_token_rejected(): token = create_token(iat=100000, exp=100001) # simulate verification that checks exp with pytest.raises(ValueError): decode_token(token) COMMAND_BLOCK: # MCP-Style Authorization PoC This repository contains a minimal FastAPI authorization proxy used to experiment with token verification, policy evaluation, and audit logging. Quickstart: 1. python -m venv venv && source venv/bin/activate 2. pip install -r requirements.txt 3. python scripts/gen_token.py # print a test token 4. uvicorn src.main:app --reload 5. curl -H "Authorization: Bearer <TOKEN>" http://localhost:8000/resources Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # MCP-Style Authorization PoC This repository contains a minimal FastAPI authorization proxy used to experiment with token verification, policy evaluation, and audit logging. Quickstart: 1. python -m venv venv && source venv/bin/activate 2. pip install -r requirements.txt 3. python scripts/gen_token.py # print a test token 4. uvicorn src.main:app --reload 5. curl -H "Authorization: Bearer <TOKEN>" http://localhost:8000/resources COMMAND_BLOCK: # MCP-Style Authorization PoC This repository contains a minimal FastAPI authorization proxy used to experiment with token verification, policy evaluation, and audit logging. Quickstart: 1. python -m venv venv && source venv/bin/activate 2. pip install -r requirements.txt 3. python scripts/gen_token.py # print a test token 4. uvicorn src.main:app --reload 5. curl -H "Authorization: Bearer <TOKEN>" http://localhost:8000/resources - What you get: in-depth design reasoning, a mermaid diagram for the control flow, annotated code split into logical blocks, unit and integration test examples, and step-by-step setup and run instructions. - Why it matters: experiments like this surface the real trade-offs (key management, audibility, policy complexity) that decide whether an approach can be hardened for production. - The token shape and claim choices I used in the PoC and why I chose them. - A small policy evaluator that maps resources to required scopes and attributes. - Middleware wiring that keeps endpoints focused on business logic. - Tests and a small token generator to speed iteration. - Operational points that matter if you move from PoC to production. - Python 3.11 — comfortable iteration and typing support. - FastAPI — quick to prototype HTTP middleware and endpoints. - PyJWT (PoC) — symmetric JWT for speed, replaceable with asymmetric verification. - Pydantic — typed models to validate token shape early. - Pytest — unit and integration tests. - Which claims to include in an authorization token for practical policy checks. - How to structure code so token parsing and policy evaluation are easy to test. - How to add audit logging and basic operational guards without adding much complexity. - An untrusted client may attempt to call protected endpoints with forged tokens. - A leaked symmetric key leads to token forgery (acceptable in an experiment, not in production). - Policies must be auditable and deterministic for reproducible failures. - Minimal trust surface — small token schema and small policy evaluator. - Testability — pure functions for policy checks and token parsing. - Observability — every decision point should be loggable with structured details. - Replaceability — later swap to JWKS/asymmetric verification or a policy engine. - Token verification and policy evaluation are distinct steps. In practice this means I can unit-test the policy evaluator without a live HTTP stack and swap verification strategies without touching policies. - The proxy returns clear HTTP semantics (401 for invalid tokens, 403 for authorization failures) and includes audit metadata with each decision. From my experience, concise error semantics make debugging integration issues faster. - Documents the minimal and optional claims I considered useful for policy evaluation. - Starting with scopes keeps policies simple; adding roles and tenant_id allows richer authorization without redesign. - Adding optional attributes early made it much easier to simulate multi-tenant scenarios in tests. - Validates a JWT and ensures it conforms to TokenClaims. - In experiments, symmetric keys speed iteration. I keep the code simple so swapping to JWKS later is straightforward. - Passing tokens through a Pydantic model gives immediate clarity about what claims are expected and surfaces missing or misshapen claims quickly in tests. - Implements a compact, declarative policy mapping with possible extension points. - Maps are readable, easy to test, and straightforward to evolve into a small policy store later. - Start conservative: deny by default, and keep policies simple at first. - Validates tokens, evaluates policies, logs decisions, and keeps request handlers clean. - The middleware isolates auth concerns and enables centralized audit logging. - Structured logs with subject, path, allowed, and reason were invaluable when replaying failing test cases. - Unit tests for policy and token parsing - Integration tests for middleware and end-to-end behavior - Unit tests run fast and catch shape errors. Integration tests catch middleware wiring problems and help validate logs and error semantics. - Move to asymmetric JWT verification (JWKS) and add key rotation support. - Validate exp and nbf fields strictly. - Add structured audit logs for every decision (subject, resource, outcome, rule matched). - Implement rate-limiting around the proxy to limit abusive traffic. - Add an optional token introspection path if you use opaque tokens. - Use a JWKS URL for key discovery. - Cache keys locally and refresh periodically. - Fail closed if no valid verification key is found. - Structured logs that include subject, path, scopes, and decision. - Example: logger.info('auth_decision', extra={'subject': claims.sub, 'path': path, 'allowed': True}) - Audit sink that writes to a persistent store (Elasticsearch, a database, or a log service). - Correlate auth events with request IDs for traceability. - Add dashboards that show denied requests by resource and top principals. - Add a small, JSON-backed rule store that maps resources to expressions (simple boolean expressions or allowlists). - Integrate OPA/Rego when you need attribute-based access control with complex boolean logic. - Run unit tests with pytest - Run integration tests (fast TestClient style) in CI - Lint with flake8 / black - Replace symmetric secrets with JWKS/asymmetric verification. - Enforce token expiry and short lifetimes for PoC tokens. - Store secrets in environment-backed secret stores (KeyVault, AWS Secrets Manager). - Implement rate-limiting and request-size limits. - Add an incident response plan for leaked keys. - Clone the PoC repository (or copy the code into a new repo). - Create and activate a Python 3.11 virtualenv: - Generate a test token: - Run the API: - Call an endpoint with the token: - GET /resources returns 200 when the token has read scope. - GET /admin returns 403 when the token lacks admin scope. - Audit logs show the subject, path, and outcome for each request. - Start with a tiny policy map and clear tests. I observed that teams get more value from several iterations of a small system than from one big, brittle platform. - Write the acceptance tests first for any policy change; that way you know when a change is behaviorally breaking and can safely iterate. - Define the exact claim set you will accept and validate it strictly. In my experiments, the scopes pattern covered most needs; roles and tenant_id were added only when required. - In my opinion, keeping tokens minimal makes audit trails clearer and reduces accidental over-permissioning. - Emit structured audit events for 401/403 decisions and include request_id, subject, scopes, path, and reason. - I discovered that a small audit pipeline with daily dashboards made debugging far faster than relying on adhoc logs. - Replace PoC symmetric keys with asymmetric verification and JWKS before you accept live traffic. In my tests, key discovery and rotation were the biggest operational sources of outages when done incorrectly. - Add short, strict unit tests for policy changes. - Add integration tests for middleware behaviors and audit events. - Run the PoC against a staging JWKS provider to validate rotation, caching, and error handling. - Deploy to a canary subset of traffic with monitoring and alerts for verification failures. - Ramp up traffic only after zero false positives on validation and audit completeness checks. - Use shorter TTL for key cache in early phases so you detect rotation quickly. - Fail closed if you cannot validate a token or fetch keys. - Add alerts on JWKS fetch errors and unusually high verification times. - You have many dynamic attributes and need a flexible expression language. - Policies are shared across multiple services with slightly different context. - auth_proxy.requests_total{outcome="allow|deny|invalid"} - auth_proxy.jwks_fetch_errors_total - auth_proxy.verify_latency_seconds (histogram) - Denials by resource: sum by (path)(increase(auth_proxy_requests_total{outcome="deny"}[1h])) - JWKS fetch errors: increase(auth_proxy_jwks_fetch_errors_total[1h]) - Smoke tests in staging with the real JWKS provider. - Deploy to a small percentage of traffic and monitor for errors and an unexpected rise in 401/403 events. - Verify that audit events are being written and examined by the ops dashboard. - Ramp only after one or more full business cycles without regression. - If tokens are rejected: check token claims and clock skew (iat/exp) first. - If verification fails intermittently: inspect JWKS fetch logs and cache state. - If many denials occur after a rollout: compare audit logs for a pattern and roll back the change if the root cause is unclear. - Write tests first for new policy behavior. - Keep tokens compact and explicit. - Instrument audit logs and metrics from day one. - Test policy checks with realistic edge cases — sensitivity flags, user roles, and untrusted inputs. - Use short TTL in early phases to detect rotation quickly. - Add comprehensive logging around refresh_keys() so you can trace fetch failures. - Token missing scopes should be rejected. - Token with incorrect tenant_id should be rejected for tenant-scoped endpoints. - Token with expired exp should be rejected even if scopes match. - Export current JSON rule set as canonical source of truth. - Translate a representative subset of rules into Rego and test them against sample claims. - Replace calls to simple is_allowed() with an adapter that queries OPA (local or remote) and caches results when safe. - 401 everywhere: check token parsing and clock skew. - Intermittent verification failures: inspect JWKS fetch logs and network connectivity. - Many 403s after rollout: compare audit logs to see whether a scope mismatch or tenant mismatch is occurring. - Early errors were mostly due to mismatched expectations: the agent encoded a different claim name (perms) instead of scopes. A quick change to the token generator and a unit test prevented that class of error from reappearing. - Adding request_id into tokens (or generating a per-request ID at the proxy) made it trivial to correlate agent logs with audit events, which sped up debugging considerably. - The integration helped me decide which policies should remain at the proxy layer (coarse-grained access) and which belonged deeper in the application (resource-specific semantics). - Symptom: agent receives 403 from /tenant/abc/data though token contained tenant:read scope. - Investigation: audit log showed token had no tenant_id claim. - Fix: enforce token issuance to include tenant_id for tenant-scoped tokens and add a unit test that asserts the proxy denies tokens with missing tenant context. - Single-node PoC (uvicorn with default worker): handled ~200 req/s in a small test where token verification was in-memory (symmetric). Latency medians were sub-20ms for simple endpoints. - With JWKS fetching and occasional key refreshes, the first request after a cache miss showed a 100ms+ spike; subsequent requests returned to baseline. - Cache JWKS keys aggressively in production and warm caches as part of deploy steps. - Use sufficient worker processes to match your expected concurrency; the I/O of JWKS fetches can be parallelized across workers. - Keeping policies in Python code is low-effort and high-speed for early tests. - Introducing OPA or a centrally managed policy store adds complexity and cost but can pay off if you need complex attribute-based rules or multi-service policy sharing. - JWKS/asymmetric verification in place and tested for rotation. - exp / nbf enforcement and reasonable token lifetimes. - Audit sink and basic dashboards available and validated. - Integration tests exercising representative agents and backends. - Rate-limiting and basic abuse mitigation configured. - Harden key discovery and monitoring for failures and latency. - Add a configurable policy store with role/attribute mapping and basic change control (review/approval). - Integrate a simple dashboard showing top denied requests and top principals by resource. - Add a revocation list for tokens and measure the trade-offs between immediate revocation and caching performance. - Add a small GUI that visualizes audit events and helps filter by subject and resource. - Connect an OPA instance and move one or two policies into Rego to measure developer ergonomics and testability.