$ """
complexity_tracker.py — Track cyclomatic complexity drift per function.
Requires: -weight: 500;">pip -weight: 500;">install radon
Radon docs: https://radon.readthedocs.io/
"""
import json
import subprocess
import sys
from datetime import date
from pathlib import Path def get_complexity(source_dir: str) -> list[dict]: """Run radon cc and return per-function complexity scores.""" result = subprocess.run( ["radon", "cc", source_dir, "-j", "-n", "C"], capture_output=True, text=True, check=True, ) raw = json.loads(result.stdout) functions = [] for filepath, blocks in raw.items(): for block in blocks: functions.append({ "file": filepath, "name": block["name"], "complexity": block["complexity"], "lineno": block["lineno"], }) return functions def load_baseline(path: Path) -> dict: """Load previous complexity snapshot.""" if path.exists(): return json.loads(path.read_text()) return {} def detect_drift(baseline: dict, current: list[dict], threshold: int = 3) -> list[dict]: """Flag functions whose complexity increased beyond threshold.""" alerts = [] for func in current: key = f"{func['file']}::{func['name']}" prev = baseline.get(key, {}).get("complexity", func["complexity"]) delta = func["complexity"] - prev if delta >= threshold: alerts.append({ "function": key, "was": prev, "now": func["complexity"], "delta": delta, "line": func["lineno"], }) return alerts def save_snapshot(functions: list[dict], path: Path) -> None: """Save current complexity as the new baseline.""" snapshot = {} for f in functions: key = f"{f['file']}::{f['name']}" snapshot[key] = { "complexity": f["complexity"], "date": str(date.today()), } path.write_text(json.dumps(snapshot, indent=2)) if __name__ == "__main__": source = sys.argv[1] if len(sys.argv) > 1 else "src" baseline_path = Path(".complexity-baseline.json") current = get_complexity(source) baseline = load_baseline(baseline_path) alerts = detect_drift(baseline, current) if alerts: print(f"Found {len(alerts)} complexity drift alerts:") for a in alerts: print(f" {a['function']} line {a['line']}: " f"{a['was']} -> {a['now']} (+{a['delta']})") sys.exit(1) else: print(f"No drift detected across {len(current)} functions.") save_snapshot(current, baseline_path)
"""
complexity_tracker.py — Track cyclomatic complexity drift per function.
Requires: -weight: 500;">pip -weight: 500;">install radon
Radon docs: https://radon.readthedocs.io/
"""
import json
import subprocess
import sys
from datetime import date
from pathlib import Path def get_complexity(source_dir: str) -> list[dict]: """Run radon cc and return per-function complexity scores.""" result = subprocess.run( ["radon", "cc", source_dir, "-j", "-n", "C"], capture_output=True, text=True, check=True, ) raw = json.loads(result.stdout) functions = [] for filepath, blocks in raw.items(): for block in blocks: functions.append({ "file": filepath, "name": block["name"], "complexity": block["complexity"], "lineno": block["lineno"], }) return functions def load_baseline(path: Path) -> dict: """Load previous complexity snapshot.""" if path.exists(): return json.loads(path.read_text()) return {} def detect_drift(baseline: dict, current: list[dict], threshold: int = 3) -> list[dict]: """Flag functions whose complexity increased beyond threshold.""" alerts = [] for func in current: key = f"{func['file']}::{func['name']}" prev = baseline.get(key, {}).get("complexity", func["complexity"]) delta = func["complexity"] - prev if delta >= threshold: alerts.append({ "function": key, "was": prev, "now": func["complexity"], "delta": delta, "line": func["lineno"], }) return alerts def save_snapshot(functions: list[dict], path: Path) -> None: """Save current complexity as the new baseline.""" snapshot = {} for f in functions: key = f"{f['file']}::{f['name']}" snapshot[key] = { "complexity": f["complexity"], "date": str(date.today()), } path.write_text(json.dumps(snapshot, indent=2)) if __name__ == "__main__": source = sys.argv[1] if len(sys.argv) > 1 else "src" baseline_path = Path(".complexity-baseline.json") current = get_complexity(source) baseline = load_baseline(baseline_path) alerts = detect_drift(baseline, current) if alerts: print(f"Found {len(alerts)} complexity drift alerts:") for a in alerts: print(f" {a['function']} line {a['line']}: " f"{a['was']} -> {a['now']} (+{a['delta']})") sys.exit(1) else: print(f"No drift detected across {len(current)} functions.") save_snapshot(current, baseline_path)
"""
complexity_tracker.py — Track cyclomatic complexity drift per function.
Requires: -weight: 500;">pip -weight: 500;">install radon
Radon docs: https://radon.readthedocs.io/
"""
import json
import subprocess
import sys
from datetime import date
from pathlib import Path def get_complexity(source_dir: str) -> list[dict]: """Run radon cc and return per-function complexity scores.""" result = subprocess.run( ["radon", "cc", source_dir, "-j", "-n", "C"], capture_output=True, text=True, check=True, ) raw = json.loads(result.stdout) functions = [] for filepath, blocks in raw.items(): for block in blocks: functions.append({ "file": filepath, "name": block["name"], "complexity": block["complexity"], "lineno": block["lineno"], }) return functions def load_baseline(path: Path) -> dict: """Load previous complexity snapshot.""" if path.exists(): return json.loads(path.read_text()) return {} def detect_drift(baseline: dict, current: list[dict], threshold: int = 3) -> list[dict]: """Flag functions whose complexity increased beyond threshold.""" alerts = [] for func in current: key = f"{func['file']}::{func['name']}" prev = baseline.get(key, {}).get("complexity", func["complexity"]) delta = func["complexity"] - prev if delta >= threshold: alerts.append({ "function": key, "was": prev, "now": func["complexity"], "delta": delta, "line": func["lineno"], }) return alerts def save_snapshot(functions: list[dict], path: Path) -> None: """Save current complexity as the new baseline.""" snapshot = {} for f in functions: key = f"{f['file']}::{f['name']}" snapshot[key] = { "complexity": f["complexity"], "date": str(date.today()), } path.write_text(json.dumps(snapshot, indent=2)) if __name__ == "__main__": source = sys.argv[1] if len(sys.argv) > 1 else "src" baseline_path = Path(".complexity-baseline.json") current = get_complexity(source) baseline = load_baseline(baseline_path) alerts = detect_drift(baseline, current) if alerts: print(f"Found {len(alerts)} complexity drift alerts:") for a in alerts: print(f" {a['function']} line {a['line']}: " f"{a['was']} -> {a['now']} (+{a['delta']})") sys.exit(1) else: print(f"No drift detected across {len(current)} functions.") save_snapshot(current, baseline_path)
# Install: -weight: 500;">npm -weight: 500;">install -g jscpd
# Docs: https://github.com/kucherenko/jscpd # Scan your source directory for duplicates
jscpd ./src --min-lines 5 --min-tokens 50 --reporters consoleFull # Output shows duplicate blocks with file locations:
# Clone found (Python):
# src/auth/login.py [10:25]
# src/auth/register.py [15:30]
# Lines: 15, Tokens: 89 # Set a duplication threshold for CI
# Configure in .jscpd.json: {"threshold": 5}
jscpd ./src --threshold 5 --reporters consoleFull
# Install: -weight: 500;">npm -weight: 500;">install -g jscpd
# Docs: https://github.com/kucherenko/jscpd # Scan your source directory for duplicates
jscpd ./src --min-lines 5 --min-tokens 50 --reporters consoleFull # Output shows duplicate blocks with file locations:
# Clone found (Python):
# src/auth/login.py [10:25]
# src/auth/register.py [15:30]
# Lines: 15, Tokens: 89 # Set a duplication threshold for CI
# Configure in .jscpd.json: {"threshold": 5}
jscpd ./src --threshold 5 --reporters consoleFull
# Install: -weight: 500;">npm -weight: 500;">install -g jscpd
# Docs: https://github.com/kucherenko/jscpd # Scan your source directory for duplicates
jscpd ./src --min-lines 5 --min-tokens 50 --reporters consoleFull # Output shows duplicate blocks with file locations:
# Clone found (Python):
# src/auth/login.py [10:25]
# src/auth/register.py [15:30]
# Lines: 15, Tokens: 89 # Set a duplication threshold for CI
# Configure in .jscpd.json: {"threshold": 5}
jscpd ./src --threshold 5 --reporters consoleFull
# Uses Pylint's similarity checker across your codebase
# Docs: https://pylint.readthedocs.io/
pylint ---weight: 500;">disable=all ---weight: 500;">enable=duplicate-code src/
# Uses Pylint's similarity checker across your codebase
# Docs: https://pylint.readthedocs.io/
pylint ---weight: 500;">disable=all ---weight: 500;">enable=duplicate-code src/
# Uses Pylint's similarity checker across your codebase
# Docs: https://pylint.readthedocs.io/
pylint ---weight: 500;">disable=all ---weight: 500;">enable=duplicate-code src/
# Install: -weight: 500;">pip -weight: 500;">install vulture
# Docs: https://github.com/jendrikseipp/vulture # Scan for dead code with 80% confidence threshold
vulture src/ --min-confidence 80 # Output:
# src/utils/helpers.py:45: unused function 'format_response' (90% confidence)
# src/models/user.py:12: unused import 'Optional' (100% confidence)
# src/api/routes.py:89: unused variable 'temp_cache' (80% confidence)
# Install: -weight: 500;">pip -weight: 500;">install vulture
# Docs: https://github.com/jendrikseipp/vulture # Scan for dead code with 80% confidence threshold
vulture src/ --min-confidence 80 # Output:
# src/utils/helpers.py:45: unused function 'format_response' (90% confidence)
# src/models/user.py:12: unused import 'Optional' (100% confidence)
# src/api/routes.py:89: unused variable 'temp_cache' (80% confidence)
# Install: -weight: 500;">pip -weight: 500;">install vulture
# Docs: https://github.com/jendrikseipp/vulture # Scan for dead code with 80% confidence threshold
vulture src/ --min-confidence 80 # Output:
# src/utils/helpers.py:45: unused function 'format_response' (90% confidence)
# src/models/user.py:12: unused import 'Optional' (100% confidence)
# src/api/routes.py:89: unused variable 'temp_cache' (80% confidence)
"""
dead_code_tracker.py — Track dead code accumulation over time.
Requires: -weight: 500;">pip -weight: 500;">install vulture
"""
import subprocess
import json
from datetime import date
from pathlib import Path def count_dead_code(source_dir: str, min_confidence: int = 80) -> dict: """Run vulture and count findings by type.""" result = subprocess.run( ["vulture", source_dir, f"--min-confidence={min_confidence}"], capture_output=True, text=True, ) lines = result.stdout.strip().split("\n") if result.stdout.strip() else [] counts = {"unused_function": 0, "unused_import": 0, "unused_variable": 0, "other": 0} for line in lines: if "unused function" in line: counts["unused_function"] += 1 elif "unused import" in line: counts["unused_import"] += 1 elif "unused variable" in line: counts["unused_variable"] += 1 else: counts["other"] += 1 counts["total"] = len(lines) counts["date"] = str(date.today()) return counts def append_history(counts: dict, history_path: Path) -> None: """Append today's count to the tracking history.""" history = [] if history_path.exists(): history = json.loads(history_path.read_text()) history.append(counts) history_path.write_text(json.dumps(history, indent=2)) if __name__ == "__main__": counts = count_dead_code("src") append_history(counts, Path(".dead-code-history.json")) print(f"Dead code: {counts['total']} findings " f"({counts['unused_function']} functions, " f"{counts['unused_import']} imports, " f"{counts['unused_variable']} variables)")
"""
dead_code_tracker.py — Track dead code accumulation over time.
Requires: -weight: 500;">pip -weight: 500;">install vulture
"""
import subprocess
import json
from datetime import date
from pathlib import Path def count_dead_code(source_dir: str, min_confidence: int = 80) -> dict: """Run vulture and count findings by type.""" result = subprocess.run( ["vulture", source_dir, f"--min-confidence={min_confidence}"], capture_output=True, text=True, ) lines = result.stdout.strip().split("\n") if result.stdout.strip() else [] counts = {"unused_function": 0, "unused_import": 0, "unused_variable": 0, "other": 0} for line in lines: if "unused function" in line: counts["unused_function"] += 1 elif "unused import" in line: counts["unused_import"] += 1 elif "unused variable" in line: counts["unused_variable"] += 1 else: counts["other"] += 1 counts["total"] = len(lines) counts["date"] = str(date.today()) return counts def append_history(counts: dict, history_path: Path) -> None: """Append today's count to the tracking history.""" history = [] if history_path.exists(): history = json.loads(history_path.read_text()) history.append(counts) history_path.write_text(json.dumps(history, indent=2)) if __name__ == "__main__": counts = count_dead_code("src") append_history(counts, Path(".dead-code-history.json")) print(f"Dead code: {counts['total']} findings " f"({counts['unused_function']} functions, " f"{counts['unused_import']} imports, " f"{counts['unused_variable']} variables)")
"""
dead_code_tracker.py — Track dead code accumulation over time.
Requires: -weight: 500;">pip -weight: 500;">install vulture
"""
import subprocess
import json
from datetime import date
from pathlib import Path def count_dead_code(source_dir: str, min_confidence: int = 80) -> dict: """Run vulture and count findings by type.""" result = subprocess.run( ["vulture", source_dir, f"--min-confidence={min_confidence}"], capture_output=True, text=True, ) lines = result.stdout.strip().split("\n") if result.stdout.strip() else [] counts = {"unused_function": 0, "unused_import": 0, "unused_variable": 0, "other": 0} for line in lines: if "unused function" in line: counts["unused_function"] += 1 elif "unused import" in line: counts["unused_import"] += 1 elif "unused variable" in line: counts["unused_variable"] += 1 else: counts["other"] += 1 counts["total"] = len(lines) counts["date"] = str(date.today()) return counts def append_history(counts: dict, history_path: Path) -> None: """Append today's count to the tracking history.""" history = [] if history_path.exists(): history = json.loads(history_path.read_text()) history.append(counts) history_path.write_text(json.dumps(history, indent=2)) if __name__ == "__main__": counts = count_dead_code("src") append_history(counts, Path(".dead-code-history.json")) print(f"Dead code: {counts['total']} findings " f"({counts['unused_function']} functions, " f"{counts['unused_import']} imports, " f"{counts['unused_variable']} variables)")
"""
refactor_ratio.py — Measure refactoring vs new code ratio from -weight: 500;">git history.
Uses -weight: 500;">git log to classify commits as refactoring or feature work.
"""
import subprocess
import re
import sys def get_recent_commits(days: int = 14) -> list[str]: """Get commit messages from the last N days.""" result = subprocess.run( ["-weight: 500;">git", "log", f"--since={days} days ago", "--pretty=format:%s", "--no-merges"], capture_output=True, text=True, check=True, ) return [line.strip() for line in result.stdout.split("\n") if line.strip()] def classify_commits(messages: list[str]) -> dict: """Classify commits as refactor, feature, fix, or other.""" refactor_patterns = re.compile( r"refactor|extract|consolidate|simplify|rename|restructure|deduplicate|cleanup|clean up", re.IGNORECASE, ) feature_patterns = re.compile( r"add|implement|create|build|introduce|new|feature", re.IGNORECASE, ) fix_patterns = re.compile(r"fix|bug|patch|resolve|hotfix", re.IGNORECASE) counts = {"refactor": 0, "feature": 0, "fix": 0, "other": 0} for msg in messages: if refactor_patterns.search(msg): counts["refactor"] += 1 elif feature_patterns.search(msg): counts["feature"] += 1 elif fix_patterns.search(msg): counts["fix"] += 1 else: counts["other"] += 1 return counts def compute_ratio(counts: dict) -> float: """Compute refactoring ratio as percentage of total commits.""" total = sum(counts.values()) if total == 0: return 0.0 return (counts["refactor"] / total) * 100 if __name__ == "__main__": days = int(sys.argv[1]) if len(sys.argv) > 1 else 14 commits = get_recent_commits(days) counts = classify_commits(commits) ratio = compute_ratio(counts) print(f"Last {days} days: {len(commits)} commits") print(f" Refactoring: {counts['refactor']} ({ratio:.1f}%)") print(f" Features: {counts['feature']}") print(f" Fixes: {counts['fix']}") print(f" Other: {counts['other']}") if ratio < 15: print(f"\nRefactoring ratio ({ratio:.1f}%) is below 15% threshold.") print("Consider scheduling dedicated refactoring time.")
"""
refactor_ratio.py — Measure refactoring vs new code ratio from -weight: 500;">git history.
Uses -weight: 500;">git log to classify commits as refactoring or feature work.
"""
import subprocess
import re
import sys def get_recent_commits(days: int = 14) -> list[str]: """Get commit messages from the last N days.""" result = subprocess.run( ["-weight: 500;">git", "log", f"--since={days} days ago", "--pretty=format:%s", "--no-merges"], capture_output=True, text=True, check=True, ) return [line.strip() for line in result.stdout.split("\n") if line.strip()] def classify_commits(messages: list[str]) -> dict: """Classify commits as refactor, feature, fix, or other.""" refactor_patterns = re.compile( r"refactor|extract|consolidate|simplify|rename|restructure|deduplicate|cleanup|clean up", re.IGNORECASE, ) feature_patterns = re.compile( r"add|implement|create|build|introduce|new|feature", re.IGNORECASE, ) fix_patterns = re.compile(r"fix|bug|patch|resolve|hotfix", re.IGNORECASE) counts = {"refactor": 0, "feature": 0, "fix": 0, "other": 0} for msg in messages: if refactor_patterns.search(msg): counts["refactor"] += 1 elif feature_patterns.search(msg): counts["feature"] += 1 elif fix_patterns.search(msg): counts["fix"] += 1 else: counts["other"] += 1 return counts def compute_ratio(counts: dict) -> float: """Compute refactoring ratio as percentage of total commits.""" total = sum(counts.values()) if total == 0: return 0.0 return (counts["refactor"] / total) * 100 if __name__ == "__main__": days = int(sys.argv[1]) if len(sys.argv) > 1 else 14 commits = get_recent_commits(days) counts = classify_commits(commits) ratio = compute_ratio(counts) print(f"Last {days} days: {len(commits)} commits") print(f" Refactoring: {counts['refactor']} ({ratio:.1f}%)") print(f" Features: {counts['feature']}") print(f" Fixes: {counts['fix']}") print(f" Other: {counts['other']}") if ratio < 15: print(f"\nRefactoring ratio ({ratio:.1f}%) is below 15% threshold.") print("Consider scheduling dedicated refactoring time.")
"""
refactor_ratio.py — Measure refactoring vs new code ratio from -weight: 500;">git history.
Uses -weight: 500;">git log to classify commits as refactoring or feature work.
"""
import subprocess
import re
import sys def get_recent_commits(days: int = 14) -> list[str]: """Get commit messages from the last N days.""" result = subprocess.run( ["-weight: 500;">git", "log", f"--since={days} days ago", "--pretty=format:%s", "--no-merges"], capture_output=True, text=True, check=True, ) return [line.strip() for line in result.stdout.split("\n") if line.strip()] def classify_commits(messages: list[str]) -> dict: """Classify commits as refactor, feature, fix, or other.""" refactor_patterns = re.compile( r"refactor|extract|consolidate|simplify|rename|restructure|deduplicate|cleanup|clean up", re.IGNORECASE, ) feature_patterns = re.compile( r"add|implement|create|build|introduce|new|feature", re.IGNORECASE, ) fix_patterns = re.compile(r"fix|bug|patch|resolve|hotfix", re.IGNORECASE) counts = {"refactor": 0, "feature": 0, "fix": 0, "other": 0} for msg in messages: if refactor_patterns.search(msg): counts["refactor"] += 1 elif feature_patterns.search(msg): counts["feature"] += 1 elif fix_patterns.search(msg): counts["fix"] += 1 else: counts["other"] += 1 return counts def compute_ratio(counts: dict) -> float: """Compute refactoring ratio as percentage of total commits.""" total = sum(counts.values()) if total == 0: return 0.0 return (counts["refactor"] / total) * 100 if __name__ == "__main__": days = int(sys.argv[1]) if len(sys.argv) > 1 else 14 commits = get_recent_commits(days) counts = classify_commits(commits) ratio = compute_ratio(counts) print(f"Last {days} days: {len(commits)} commits") print(f" Refactoring: {counts['refactor']} ({ratio:.1f}%)") print(f" Features: {counts['feature']}") print(f" Fixes: {counts['fix']}") print(f" Other: {counts['other']}") if ratio < 15: print(f"\nRefactoring ratio ({ratio:.1f}%) is below 15% threshold.") print("Consider scheduling dedicated refactoring time.")
"""
boundary_check.py — Enforce architectural boundaries via import analysis.
Uses Python's ast module (standard library) to parse imports.
"""
import ast
import sys
from pathlib import Path # Define allowed imports between modules.
# Each key is a module, values are modules it MAY import from.
ALLOWED_IMPORTS = { "auth": {"models", "utils", "config"}, "billing": {"models", "utils", "config"}, "api": {"auth", "billing", "models", "utils", "config"}, "models": {"utils", "config"}, "utils": {"config"}, "config": set(),
} def get_module_name(filepath: Path, src_root: Path) -> str: """Extract the top-level module name from a file path.""" relative = filepath.relative_to(src_root) return relative.parts[0] if len(relative.parts) > 1 else "" def check_imports(filepath: Path, src_root: Path) -> list[dict]: """Parse a Python file and check imports against boundary rules.""" module = get_module_name(filepath, src_root) if module not in ALLOWED_IMPORTS: return [] violations = [] source = filepath.read_text() tree = ast.parse(source, filename=str(filepath)) for node in ast.walk(tree): target = None if isinstance(node, ast.Import): for alias in node.names: parts = alias.name.split(".") if parts[0] in ALLOWED_IMPORTS and parts[0] != module: target = parts[0] elif isinstance(node, ast.ImportFrom): if node.module: parts = node.module.split(".") if parts[0] in ALLOWED_IMPORTS and parts[0] != module: target = parts[0] if target and target not in ALLOWED_IMPORTS.get(module, set()): violations.append({ "file": str(filepath), "line": node.lineno, "module": module, "imports": target, "allowed": sorted(ALLOWED_IMPORTS[module]), }) return violations def scan_directory(src_root: Path) -> list[dict]: """Scan all Python files for boundary violations.""" all_violations = [] for pyfile in src_root.rglob("*.py"): all_violations.extend(check_imports(pyfile, src_root)) return all_violations if __name__ == "__main__": src = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("src") violations = scan_directory(src) if violations: print(f"Found {len(violations)} boundary violations:") for v in violations: print(f" {v['file']}:{v['line']} — " f"'{v['module']}' imports '{v['imports']}' " f"(allowed: {v['allowed']})") sys.exit(1) else: print("No boundary violations found.")
"""
boundary_check.py — Enforce architectural boundaries via import analysis.
Uses Python's ast module (standard library) to parse imports.
"""
import ast
import sys
from pathlib import Path # Define allowed imports between modules.
# Each key is a module, values are modules it MAY import from.
ALLOWED_IMPORTS = { "auth": {"models", "utils", "config"}, "billing": {"models", "utils", "config"}, "api": {"auth", "billing", "models", "utils", "config"}, "models": {"utils", "config"}, "utils": {"config"}, "config": set(),
} def get_module_name(filepath: Path, src_root: Path) -> str: """Extract the top-level module name from a file path.""" relative = filepath.relative_to(src_root) return relative.parts[0] if len(relative.parts) > 1 else "" def check_imports(filepath: Path, src_root: Path) -> list[dict]: """Parse a Python file and check imports against boundary rules.""" module = get_module_name(filepath, src_root) if module not in ALLOWED_IMPORTS: return [] violations = [] source = filepath.read_text() tree = ast.parse(source, filename=str(filepath)) for node in ast.walk(tree): target = None if isinstance(node, ast.Import): for alias in node.names: parts = alias.name.split(".") if parts[0] in ALLOWED_IMPORTS and parts[0] != module: target = parts[0] elif isinstance(node, ast.ImportFrom): if node.module: parts = node.module.split(".") if parts[0] in ALLOWED_IMPORTS and parts[0] != module: target = parts[0] if target and target not in ALLOWED_IMPORTS.get(module, set()): violations.append({ "file": str(filepath), "line": node.lineno, "module": module, "imports": target, "allowed": sorted(ALLOWED_IMPORTS[module]), }) return violations def scan_directory(src_root: Path) -> list[dict]: """Scan all Python files for boundary violations.""" all_violations = [] for pyfile in src_root.rglob("*.py"): all_violations.extend(check_imports(pyfile, src_root)) return all_violations if __name__ == "__main__": src = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("src") violations = scan_directory(src) if violations: print(f"Found {len(violations)} boundary violations:") for v in violations: print(f" {v['file']}:{v['line']} — " f"'{v['module']}' imports '{v['imports']}' " f"(allowed: {v['allowed']})") sys.exit(1) else: print("No boundary violations found.")
"""
boundary_check.py — Enforce architectural boundaries via import analysis.
Uses Python's ast module (standard library) to parse imports.
"""
import ast
import sys
from pathlib import Path # Define allowed imports between modules.
# Each key is a module, values are modules it MAY import from.
ALLOWED_IMPORTS = { "auth": {"models", "utils", "config"}, "billing": {"models", "utils", "config"}, "api": {"auth", "billing", "models", "utils", "config"}, "models": {"utils", "config"}, "utils": {"config"}, "config": set(),
} def get_module_name(filepath: Path, src_root: Path) -> str: """Extract the top-level module name from a file path.""" relative = filepath.relative_to(src_root) return relative.parts[0] if len(relative.parts) > 1 else "" def check_imports(filepath: Path, src_root: Path) -> list[dict]: """Parse a Python file and check imports against boundary rules.""" module = get_module_name(filepath, src_root) if module not in ALLOWED_IMPORTS: return [] violations = [] source = filepath.read_text() tree = ast.parse(source, filename=str(filepath)) for node in ast.walk(tree): target = None if isinstance(node, ast.Import): for alias in node.names: parts = alias.name.split(".") if parts[0] in ALLOWED_IMPORTS and parts[0] != module: target = parts[0] elif isinstance(node, ast.ImportFrom): if node.module: parts = node.module.split(".") if parts[0] in ALLOWED_IMPORTS and parts[0] != module: target = parts[0] if target and target not in ALLOWED_IMPORTS.get(module, set()): violations.append({ "file": str(filepath), "line": node.lineno, "module": module, "imports": target, "allowed": sorted(ALLOWED_IMPORTS[module]), }) return violations def scan_directory(src_root: Path) -> list[dict]: """Scan all Python files for boundary violations.""" all_violations = [] for pyfile in src_root.rglob("*.py"): all_violations.extend(check_imports(pyfile, src_root)) return all_violations if __name__ == "__main__": src = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("src") violations = scan_directory(src) if violations: print(f"Found {len(violations)} boundary violations:") for v in violations: print(f" {v['file']}:{v['line']} — " f"'{v['module']}' imports '{v['imports']}' " f"(allowed: {v['allowed']})") sys.exit(1) else: print("No boundary violations found.")
# .github/workflows/debt-detection.yml
name: Tech Debt Detection
on: [pull_request] jobs: complexity-drift: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - run: -weight: 500;">pip -weight: 500;">install radon - run: python complexity_tracker.py src clone-detection: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - run: -weight: 500;">npm -weight: 500;">install -g jscpd - run: jscpd ./src --threshold 5 dead-code: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - run: -weight: 500;">pip -weight: 500;">install vulture - run: vulture src/ --min-confidence 80 refactor-ratio: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 with: fetch-depth: 0 - run: python refactor_ratio.py 14 boundary-check: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - run: python boundary_check.py src
# .github/workflows/debt-detection.yml
name: Tech Debt Detection
on: [pull_request] jobs: complexity-drift: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - run: -weight: 500;">pip -weight: 500;">install radon - run: python complexity_tracker.py src clone-detection: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - run: -weight: 500;">npm -weight: 500;">install -g jscpd - run: jscpd ./src --threshold 5 dead-code: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - run: -weight: 500;">pip -weight: 500;">install vulture - run: vulture src/ --min-confidence 80 refactor-ratio: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 with: fetch-depth: 0 - run: python refactor_ratio.py 14 boundary-check: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - run: python boundary_check.py src
# .github/workflows/debt-detection.yml
name: Tech Debt Detection
on: [pull_request] jobs: complexity-drift: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - run: -weight: 500;">pip -weight: 500;">install radon - run: python complexity_tracker.py src clone-detection: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - run: -weight: 500;">npm -weight: 500;">install -g jscpd - run: jscpd ./src --threshold 5 dead-code: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - run: -weight: 500;">pip -weight: 500;">install vulture - run: vulture src/ --min-confidence 80 refactor-ratio: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 with: fetch-depth: 0 - run: python refactor_ratio.py 14 boundary-check: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - run: python boundary_check.py src