βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SECURITY AGENT PIPELINE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β 1. THREAT MODELER β Maps codebase, identifies β
β (LLM + static analysis) β attack surfaces, prioritizes β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β 2. SCANNER ORCHESTRATOR β Spins up parallel sub-agents β
β (Agent coordinator) β per module/subsystem β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β 3. VULN DETECTOR β Per-file/function analysis β
β (LLM sub-agent) β with semantic reasoning β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β 4. EXPLOIT SYNTHESIZER β Generates PoC code, β
β (LLM + code executor) β compiles, and runs in sandboxβ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β 5. TRIAGE ENGINE β Multi-model consensus, β
β (Ensemble of models) β severity rating, dedup β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β 6. REPORT GENERATOR β CVE-formatted output, β
β (LLM) β fix suggestions, CVSS scoringβ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SECURITY AGENT PIPELINE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β 1. THREAT MODELER β Maps codebase, identifies β
β (LLM + static analysis) β attack surfaces, prioritizes β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β 2. SCANNER ORCHESTRATOR β Spins up parallel sub-agents β
β (Agent coordinator) β per module/subsystem β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β 3. VULN DETECTOR β Per-file/function analysis β
β (LLM sub-agent) β with semantic reasoning β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β 4. EXPLOIT SYNTHESIZER β Generates PoC code, β
β (LLM + code executor) β compiles, and runs in sandboxβ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β 5. TRIAGE ENGINE β Multi-model consensus, β
β (Ensemble of models) β severity rating, dedup β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β 6. REPORT GENERATOR β CVE-formatted output, β
β (LLM) β fix suggestions, CVSS scoringβ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SECURITY AGENT PIPELINE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β 1. THREAT MODELER β Maps codebase, identifies β
β (LLM + static analysis) β attack surfaces, prioritizes β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β 2. SCANNER ORCHESTRATOR β Spins up parallel sub-agents β
β (Agent coordinator) β per module/subsystem β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β 3. VULN DETECTOR β Per-file/function analysis β
β (LLM sub-agent) β with semantic reasoning β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β 4. EXPLOIT SYNTHESIZER β Generates PoC code, β
β (LLM + code executor) β compiles, and runs in sandboxβ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β 5. TRIAGE ENGINE β Multi-model consensus, β
β (Ensemble of models) β severity rating, dedup β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β 6. REPORT GENERATOR β CVE-formatted output, β
β (LLM) β fix suggestions, CVSS scoringβ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Simplified sub-agent invocation pattern
async def scan_module(module_path: str, context: SecurityContext) -> list[Finding]: """ Launch a sandboxed LLM sub-agent to analyze a single module. Returns structured findings with severity, description, and PoC. """ system_prompt = build_security_analyst_prompt( language=context.language, vulnerability_classes=context.priority_vuln_classes, trust_model=context.trust_model, output_schema=FindingSchema ) file_content = load_with_dependencies(module_path, context.repo_root) findings = await llm_client.chat( model="claude-opus-4-7", # or gpt-5.5 for high-value targets system=system_prompt, messages=[{ "role": "user", "content": f"Analyze this module for security vulnerabilities:\n\n{file_content}" }], response_schema=list[Finding], # structured output enforces quality max_tokens=8192, timeout=120 ) return findings
# Simplified sub-agent invocation pattern
async def scan_module(module_path: str, context: SecurityContext) -> list[Finding]: """ Launch a sandboxed LLM sub-agent to analyze a single module. Returns structured findings with severity, description, and PoC. """ system_prompt = build_security_analyst_prompt( language=context.language, vulnerability_classes=context.priority_vuln_classes, trust_model=context.trust_model, output_schema=FindingSchema ) file_content = load_with_dependencies(module_path, context.repo_root) findings = await llm_client.chat( model="claude-opus-4-7", # or gpt-5.5 for high-value targets system=system_prompt, messages=[{ "role": "user", "content": f"Analyze this module for security vulnerabilities:\n\n{file_content}" }], response_schema=list[Finding], # structured output enforces quality max_tokens=8192, timeout=120 ) return findings
# Simplified sub-agent invocation pattern
async def scan_module(module_path: str, context: SecurityContext) -> list[Finding]: """ Launch a sandboxed LLM sub-agent to analyze a single module. Returns structured findings with severity, description, and PoC. """ system_prompt = build_security_analyst_prompt( language=context.language, vulnerability_classes=context.priority_vuln_classes, trust_model=context.trust_model, output_schema=FindingSchema ) file_content = load_with_dependencies(module_path, context.repo_root) findings = await llm_client.chat( model="claude-opus-4-7", # or gpt-5.5 for high-value targets system=system_prompt, messages=[{ "role": "user", "content": f"Analyze this module for security vulnerabilities:\n\n{file_content}" }], response_schema=list[Finding], # structured output enforces quality max_tokens=8192, timeout=120 ) return findings
async def validate_finding(finding: Finding, sandbox: SandboxEnv) -> ValidatedFinding: """ Attempt to generate and run a PoC for a suspected vulnerability. A finding backed by a working PoC has effectively 0% false positive rate. """ max_iterations = 5 for attempt in range(max_iterations): # Step 1: Synthesize PoC code poc_code = await llm_client.chat( model="claude-opus-4-7", messages=[{ "role": "user", "content": f""" Write a minimal proof-of-concept that triggers this vulnerability: Finding: {finding.description} Affected code: {finding.code_snippet} Expected behavior: {finding.expected_trigger} Write executable {finding.language} code only. No explanations. """ }] ) # Step 2: Execute in isolated sandbox result = await sandbox.execute( code=poc_code, language=finding.language, timeout=30, memory_limit="512mb" ) # Step 3: Did it trigger the expected vulnerability? if result.crashed and matches_expected_behavior(result, finding): return ValidatedFinding( finding=finding, poc_code=poc_code, execution_result=result, confidence="HIGH", false_positive=False ) # Step 4: Iterate β feed failure back to model finding = await refine_hypothesis(finding, result, llm_client) # Couldn't reproduce after max_iterations β flag as unconfirmed return ValidatedFinding(finding=finding, confidence="LOW", false_positive=True)
async def validate_finding(finding: Finding, sandbox: SandboxEnv) -> ValidatedFinding: """ Attempt to generate and run a PoC for a suspected vulnerability. A finding backed by a working PoC has effectively 0% false positive rate. """ max_iterations = 5 for attempt in range(max_iterations): # Step 1: Synthesize PoC code poc_code = await llm_client.chat( model="claude-opus-4-7", messages=[{ "role": "user", "content": f""" Write a minimal proof-of-concept that triggers this vulnerability: Finding: {finding.description} Affected code: {finding.code_snippet} Expected behavior: {finding.expected_trigger} Write executable {finding.language} code only. No explanations. """ }] ) # Step 2: Execute in isolated sandbox result = await sandbox.execute( code=poc_code, language=finding.language, timeout=30, memory_limit="512mb" ) # Step 3: Did it trigger the expected vulnerability? if result.crashed and matches_expected_behavior(result, finding): return ValidatedFinding( finding=finding, poc_code=poc_code, execution_result=result, confidence="HIGH", false_positive=False ) # Step 4: Iterate β feed failure back to model finding = await refine_hypothesis(finding, result, llm_client) # Couldn't reproduce after max_iterations β flag as unconfirmed return ValidatedFinding(finding=finding, confidence="LOW", false_positive=True)
async def validate_finding(finding: Finding, sandbox: SandboxEnv) -> ValidatedFinding: """ Attempt to generate and run a PoC for a suspected vulnerability. A finding backed by a working PoC has effectively 0% false positive rate. """ max_iterations = 5 for attempt in range(max_iterations): # Step 1: Synthesize PoC code poc_code = await llm_client.chat( model="claude-opus-4-7", messages=[{ "role": "user", "content": f""" Write a minimal proof-of-concept that triggers this vulnerability: Finding: {finding.description} Affected code: {finding.code_snippet} Expected behavior: {finding.expected_trigger} Write executable {finding.language} code only. No explanations. """ }] ) # Step 2: Execute in isolated sandbox result = await sandbox.execute( code=poc_code, language=finding.language, timeout=30, memory_limit="512mb" ) # Step 3: Did it trigger the expected vulnerability? if result.crashed and matches_expected_behavior(result, finding): return ValidatedFinding( finding=finding, poc_code=poc_code, execution_result=result, confidence="HIGH", false_positive=False ) # Step 4: Iterate β feed failure back to model finding = await refine_hypothesis(finding, result, llm_client) # Couldn't reproduce after max_iterations β flag as unconfirmed return ValidatedFinding(finding=finding, confidence="LOW", false_positive=True)
async def triage_with_consensus( finding: Finding, models: list[str] = ["claude-opus-4-7", "gpt-5.5", "gemini-2.5-pro"]
) -> ConsensusResult: """ Submit a finding to multiple models for independent verification. Require 2/3 agreement to advance to human review queue. """ verdicts = await asyncio.gather(*[ verify_finding_with_model(finding, model) for model in models ]) confirmed_count = sum(1 for v in verdicts if v.is_valid) return ConsensusResult( finding=finding, verdicts=verdicts, consensus_reached=confirmed_count >= 2, confidence_score=confirmed_count / len(models), advance_to_human_review=confirmed_count >= 2 )
async def triage_with_consensus( finding: Finding, models: list[str] = ["claude-opus-4-7", "gpt-5.5", "gemini-2.5-pro"]
) -> ConsensusResult: """ Submit a finding to multiple models for independent verification. Require 2/3 agreement to advance to human review queue. """ verdicts = await asyncio.gather(*[ verify_finding_with_model(finding, model) for model in models ]) confirmed_count = sum(1 for v in verdicts if v.is_valid) return ConsensusResult( finding=finding, verdicts=verdicts, consensus_reached=confirmed_count >= 2, confidence_score=confirmed_count / len(models), advance_to_human_review=confirmed_count >= 2 )
async def triage_with_consensus( finding: Finding, models: list[str] = ["claude-opus-4-7", "gpt-5.5", "gemini-2.5-pro"]
) -> ConsensusResult: """ Submit a finding to multiple models for independent verification. Require 2/3 agreement to advance to human review queue. """ verdicts = await asyncio.gather(*[ verify_finding_with_model(finding, model) for model in models ]) confirmed_count = sum(1 for v in verdicts if v.is_valid) return ConsensusResult( finding=finding, verdicts=verdicts, consensus_reached=confirmed_count >= 2, confidence_score=confirmed_count / len(models), advance_to_human_review=confirmed_count >= 2 )
#!/usr/bin/env python3
"""
minimal_vuln_scanner.py
A basic LLM-powered vulnerability scanner for CI/CD integration.
Requires: anthropic>=0.30.0, pip install anthropic
""" import asyncio
import json
from pathlib import Path
from anthropic import AsyncAnthropic client = AsyncAnthropic() SECURITY_SYSTEM_PROMPT = """You are an expert security researcher performing a white-box vulnerability audit. Analyze the provided code for: 1. Memory safety issues (buffer overflows, UAF, null deref β especially in C/C++)
2. Injection vulnerabilities (SQL, command, LDAP, path traversal) 3. Authentication/authorization bypasses
4. Race conditions and TOCTOU bugs
5. Cryptographic weaknesses
6. Unsafe deserialization
7. Integer overflow/underflow conditions
8. Logic bugs affecting security-critical code paths For each finding, provide:
- Vulnerability class (CWE ID if applicable)
- Severity (Critical/High/Medium/Low)
- Affected code location (file:line)
- Root cause explanation (2-3 sentences)
- Proof-of-concept trigger (how would an attacker trigger this?)
- Recommended fix Return your response as a JSON array of findings. If no vulnerabilities are found,
return an empty array []. Do NOT speculate β only report findings you are confident about.""" async def scan_file(file_path: Path) -> list[dict]: """Scan a single file for vulnerabilities using Claude.""" content = file_path.read_text(errors='replace') # Skip files that are too short to be meaningful if len(content.strip()) < 50: return [] message = await client.messages.create( model="claude-opus-4-5", # Use claude-opus-4-7 for higher accuracy max_tokens=4096, system=SECURITY_SYSTEM_PROMPT, messages=[{ "role": "user", "content": f"File: {file_path}\n\n```
{% endraw %}
\n{content[:50000]}\n
{% raw %}
```" # Truncate to 50k chars; for large files, chunk by function }] ) response_text = message.content[0].text.strip() try: # Extract JSON array from response start = response_text.find('[') end = response_text.rfind(']') + 1 if start != -1 and end > start: findings = json.loads(response_text[start:end]) # Annotate each finding with source file for f in findings: f['source_file'] = str(file_path) return findings except json.JSONDecodeError: pass return [] async def scan_repository(repo_path: str, extensions: list[str] = None) -> dict: """ Scan an entire repository for vulnerabilities. Args: repo_path: Path to the repository root extensions: File extensions to scan (default: common security-relevant types) Returns: Dict with findings grouped by severity """ if extensions is None: extensions = ['.c', '.cpp', '.h', '.py', '.js', '.ts', '.go', '.rs', '.java'] repo = Path(repo_path) files_to_scan = [ f for f in repo.rglob('*') if f.suffix in extensions and '.git' not in f.parts and 'node_modules' not in f.parts and 'vendor' not in f.parts ] print(f"[*] Scanning {len(files_to_scan)} files in {repo_path}") # Scan files concurrently (respect API rate limits) semaphore = asyncio.Semaphore(5) # Max 5 concurrent API calls async def scan_with_limit(f): async with semaphore: print(f" Scanning: {f.relative_to(repo)}") return await scan_file(f) all_results = await asyncio.gather(*[scan_with_limit(f) for f in files_to_scan]) # Flatten and group by severity all_findings = [f for sublist in all_results for f in sublist] grouped = { 'critical': [f for f in all_findings if f.get('severity', '').lower() == 'critical'], 'high': [f for f in all_findings if f.get('severity', '').lower() == 'high'], 'medium': [f for f in all_findings if f.get('severity', '').lower() == 'medium'], 'low': [f for f in all_findings if f.get('severity', '').lower() == 'low'], } return grouped async def main(): import sys repo_path = sys.argv[1] if len(sys.argv) > 1 else '.' results = await scan_repository(repo_path) total = sum(len(v) for v in results.values()) print(f"\n{'='*60}") print(f"SCAN COMPLETE β {total} findings") print(f"{'='*60}") print(f" π΄ Critical: {len(results['critical'])}") print(f" π High: {len(results['high'])}") print(f" π‘ Medium: {len(results['medium'])}") print(f" π’ Low: {len(results['low'])}") print(f"{'='*60}\n") # Print critical and high findings in detail for severity in ['critical', 'high']: for finding in results[severity]: print(f"[{finding['severity'].upper()}] {finding.get('vulnerability_class', 'Unknown')}") print(f" File: {finding.get('source_file')}") print(f" {finding.get('root_cause', 'No description')}") print(f" Fix: {finding.get('recommended_fix', 'See full report')}\n") # Save full report with open('security_report.json', 'w') as f: json.dump(results, f, indent=2) print("[*] Full report saved to security_report.json") if __name__ == '__main__': asyncio.run(main())
#!/usr/bin/env python3
"""
minimal_vuln_scanner.py
A basic LLM-powered vulnerability scanner for CI/CD integration.
Requires: anthropic>=0.30.0, pip install anthropic
""" import asyncio
import json
from pathlib import Path
from anthropic import AsyncAnthropic client = AsyncAnthropic() SECURITY_SYSTEM_PROMPT = """You are an expert security researcher performing a white-box vulnerability audit. Analyze the provided code for: 1. Memory safety issues (buffer overflows, UAF, null deref β especially in C/C++)
2. Injection vulnerabilities (SQL, command, LDAP, path traversal) 3. Authentication/authorization bypasses
4. Race conditions and TOCTOU bugs
5. Cryptographic weaknesses
6. Unsafe deserialization
7. Integer overflow/underflow conditions
8. Logic bugs affecting security-critical code paths For each finding, provide:
- Vulnerability class (CWE ID if applicable)
- Severity (Critical/High/Medium/Low)
- Affected code location (file:line)
- Root cause explanation (2-3 sentences)
- Proof-of-concept trigger (how would an attacker trigger this?)
- Recommended fix Return your response as a JSON array of findings. If no vulnerabilities are found,
return an empty array []. Do NOT speculate β only report findings you are confident about.""" async def scan_file(file_path: Path) -> list[dict]: """Scan a single file for vulnerabilities using Claude.""" content = file_path.read_text(errors='replace') # Skip files that are too short to be meaningful if len(content.strip()) < 50: return [] message = await client.messages.create( model="claude-opus-4-5", # Use claude-opus-4-7 for higher accuracy max_tokens=4096, system=SECURITY_SYSTEM_PROMPT, messages=[{ "role": "user", "content": f"File: {file_path}\n\n```
{% endraw %}
\n{content[:50000]}\n
{% raw %}
```" # Truncate to 50k chars; for large files, chunk by function }] ) response_text = message.content[0].text.strip() try: # Extract JSON array from response start = response_text.find('[') end = response_text.rfind(']') + 1 if start != -1 and end > start: findings = json.loads(response_text[start:end]) # Annotate each finding with source file for f in findings: f['source_file'] = str(file_path) return findings except json.JSONDecodeError: pass return [] async def scan_repository(repo_path: str, extensions: list[str] = None) -> dict: """ Scan an entire repository for vulnerabilities. Args: repo_path: Path to the repository root extensions: File extensions to scan (default: common security-relevant types) Returns: Dict with findings grouped by severity """ if extensions is None: extensions = ['.c', '.cpp', '.h', '.py', '.js', '.ts', '.go', '.rs', '.java'] repo = Path(repo_path) files_to_scan = [ f for f in repo.rglob('*') if f.suffix in extensions and '.git' not in f.parts and 'node_modules' not in f.parts and 'vendor' not in f.parts ] print(f"[*] Scanning {len(files_to_scan)} files in {repo_path}") # Scan files concurrently (respect API rate limits) semaphore = asyncio.Semaphore(5) # Max 5 concurrent API calls async def scan_with_limit(f): async with semaphore: print(f" Scanning: {f.relative_to(repo)}") return await scan_file(f) all_results = await asyncio.gather(*[scan_with_limit(f) for f in files_to_scan]) # Flatten and group by severity all_findings = [f for sublist in all_results for f in sublist] grouped = { 'critical': [f for f in all_findings if f.get('severity', '').lower() == 'critical'], 'high': [f for f in all_findings if f.get('severity', '').lower() == 'high'], 'medium': [f for f in all_findings if f.get('severity', '').lower() == 'medium'], 'low': [f for f in all_findings if f.get('severity', '').lower() == 'low'], } return grouped async def main(): import sys repo_path = sys.argv[1] if len(sys.argv) > 1 else '.' results = await scan_repository(repo_path) total = sum(len(v) for v in results.values()) print(f"\n{'='*60}") print(f"SCAN COMPLETE β {total} findings") print(f"{'='*60}") print(f" π΄ Critical: {len(results['critical'])}") print(f" π High: {len(results['high'])}") print(f" π‘ Medium: {len(results['medium'])}") print(f" π’ Low: {len(results['low'])}") print(f"{'='*60}\n") # Print critical and high findings in detail for severity in ['critical', 'high']: for finding in results[severity]: print(f"[{finding['severity'].upper()}] {finding.get('vulnerability_class', 'Unknown')}") print(f" File: {finding.get('source_file')}") print(f" {finding.get('root_cause', 'No description')}") print(f" Fix: {finding.get('recommended_fix', 'See full report')}\n") # Save full report with open('security_report.json', 'w') as f: json.dump(results, f, indent=2) print("[*] Full report saved to security_report.json") if __name__ == '__main__': asyncio.run(main())
#!/usr/bin/env python3
"""
minimal_vuln_scanner.py
A basic LLM-powered vulnerability scanner for CI/CD integration.
Requires: anthropic>=0.30.0, pip install anthropic
""" import asyncio
import json
from pathlib import Path
from anthropic import AsyncAnthropic client = AsyncAnthropic() SECURITY_SYSTEM_PROMPT = """You are an expert security researcher performing a white-box vulnerability audit. Analyze the provided code for: 1. Memory safety issues (buffer overflows, UAF, null deref β especially in C/C++)
2. Injection vulnerabilities (SQL, command, LDAP, path traversal) 3. Authentication/authorization bypasses
4. Race conditions and TOCTOU bugs
5. Cryptographic weaknesses
6. Unsafe deserialization
7. Integer overflow/underflow conditions
8. Logic bugs affecting security-critical code paths For each finding, provide:
- Vulnerability class (CWE ID if applicable)
- Severity (Critical/High/Medium/Low)
- Affected code location (file:line)
- Root cause explanation (2-3 sentences)
- Proof-of-concept trigger (how would an attacker trigger this?)
- Recommended fix Return your response as a JSON array of findings. If no vulnerabilities are found,
return an empty array []. Do NOT speculate β only report findings you are confident about.""" async def scan_file(file_path: Path) -> list[dict]: """Scan a single file for vulnerabilities using Claude.""" content = file_path.read_text(errors='replace') # Skip files that are too short to be meaningful if len(content.strip()) < 50: return [] message = await client.messages.create( model="claude-opus-4-5", # Use claude-opus-4-7 for higher accuracy max_tokens=4096, system=SECURITY_SYSTEM_PROMPT, messages=[{ "role": "user", "content": f"File: {file_path}\n\n```
{% endraw %}
\n{content[:50000]}\n
{% raw %}
```" # Truncate to 50k chars; for large files, chunk by function }] ) response_text = message.content[0].text.strip() try: # Extract JSON array from response start = response_text.find('[') end = response_text.rfind(']') + 1 if start != -1 and end > start: findings = json.loads(response_text[start:end]) # Annotate each finding with source file for f in findings: f['source_file'] = str(file_path) return findings except json.JSONDecodeError: pass return [] async def scan_repository(repo_path: str, extensions: list[str] = None) -> dict: """ Scan an entire repository for vulnerabilities. Args: repo_path: Path to the repository root extensions: File extensions to scan (default: common security-relevant types) Returns: Dict with findings grouped by severity """ if extensions is None: extensions = ['.c', '.cpp', '.h', '.py', '.js', '.ts', '.go', '.rs', '.java'] repo = Path(repo_path) files_to_scan = [ f for f in repo.rglob('*') if f.suffix in extensions and '.git' not in f.parts and 'node_modules' not in f.parts and 'vendor' not in f.parts ] print(f"[*] Scanning {len(files_to_scan)} files in {repo_path}") # Scan files concurrently (respect API rate limits) semaphore = asyncio.Semaphore(5) # Max 5 concurrent API calls async def scan_with_limit(f): async with semaphore: print(f" Scanning: {f.relative_to(repo)}") return await scan_file(f) all_results = await asyncio.gather(*[scan_with_limit(f) for f in files_to_scan]) # Flatten and group by severity all_findings = [f for sublist in all_results for f in sublist] grouped = { 'critical': [f for f in all_findings if f.get('severity', '').lower() == 'critical'], 'high': [f for f in all_findings if f.get('severity', '').lower() == 'high'], 'medium': [f for f in all_findings if f.get('severity', '').lower() == 'medium'], 'low': [f for f in all_findings if f.get('severity', '').lower() == 'low'], } return grouped async def main(): import sys repo_path = sys.argv[1] if len(sys.argv) > 1 else '.' results = await scan_repository(repo_path) total = sum(len(v) for v in results.values()) print(f"\n{'='*60}") print(f"SCAN COMPLETE β {total} findings") print(f"{'='*60}") print(f" π΄ Critical: {len(results['critical'])}") print(f" π High: {len(results['high'])}") print(f" π‘ Medium: {len(results['medium'])}") print(f" π’ Low: {len(results['low'])}") print(f"{'='*60}\n") # Print critical and high findings in detail for severity in ['critical', 'high']: for finding in results[severity]: print(f"[{finding['severity'].upper()}] {finding.get('vulnerability_class', 'Unknown')}") print(f" File: {finding.get('source_file')}") print(f" {finding.get('root_cause', 'No description')}") print(f" Fix: {finding.get('recommended_fix', 'See full report')}\n") # Save full report with open('security_report.json', 'w') as f: json.dump(results, f, indent=2) print("[*] Full report saved to security_report.json") if __name__ == '__main__': asyncio.run(main())
# .github/workflows/ai-security-scan.yml
name: AI Security Scan on: pull_request: types: [opened, synchronize] schedule: - cron: '0 2 * * 1' # Weekly full scan every Monday at 2am jobs: llm-vuln-scan: runs-on: ubuntu-latest permissions: pull-requests: write security-events: write steps: - uses: actions/checkout@v4 with: fetch-depth: 0 # Full history for diff-based scanning on PRs - name: Set up Python uses: actions/setup-python@v5 with: python-version: '3.12' - name: Install dependencies run: pip install anthropic>=0.30.0 - name: Run AI Security Scanner env: ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }} run: | # On PRs: scan only changed files for speed if [ "${{ github.event_name }}" = "pull_request" ]; then git diff --name-only origin/${{ github.base_ref }}...HEAD > changed_files.txt python minimal_vuln_scanner.py --files-list changed_files.txt else # On scheduled run: full repository scan python minimal_vuln_scanner.py . fi - name: Check for Critical Findings run: | CRITICAL_COUNT=$(python -c " import json with open('security_report.json') as f: report = json.load(f) print(len(report.get('critical', []))) ") echo "Critical findings: $CRITICAL_COUNT" # Fail the build on critical findings if [ "$CRITICAL_COUNT" -gt "0" ]; then echo "::error::$CRITICAL_COUNT critical security vulnerabilities found!" exit 1 fi - name: Post PR Comment with Findings if: github.event_name == 'pull_request' uses: actions/github-script@v7 with: script: | const fs = require('fs'); const report = JSON.parse(fs.readFileSync('security_report.json')); const total = Object.values(report).flat().length; const body = `## π AI Security Scan Results | Severity | Count | |---|---| | π΄ Critical | ${report.critical?.length || 0} | | π High | ${report.high?.length || 0} | | π‘ Medium | ${report.medium?.length || 0} | | π’ Low | ${report.low?.length || 0} | ${total === 0 ? 'β
No vulnerabilities found!' : 'β οΈ Review findings in the security_report.json artifact.'}`; github.rest.issues.createComment({ issue_number: context.issue.number, owner: context.repo.owner, repo: context.repo.repo, body: body });
# .github/workflows/ai-security-scan.yml
name: AI Security Scan on: pull_request: types: [opened, synchronize] schedule: - cron: '0 2 * * 1' # Weekly full scan every Monday at 2am jobs: llm-vuln-scan: runs-on: ubuntu-latest permissions: pull-requests: write security-events: write steps: - uses: actions/checkout@v4 with: fetch-depth: 0 # Full history for diff-based scanning on PRs - name: Set up Python uses: actions/setup-python@v5 with: python-version: '3.12' - name: Install dependencies run: pip install anthropic>=0.30.0 - name: Run AI Security Scanner env: ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }} run: | # On PRs: scan only changed files for speed if [ "${{ github.event_name }}" = "pull_request" ]; then git diff --name-only origin/${{ github.base_ref }}...HEAD > changed_files.txt python minimal_vuln_scanner.py --files-list changed_files.txt else # On scheduled run: full repository scan python minimal_vuln_scanner.py . fi - name: Check for Critical Findings run: | CRITICAL_COUNT=$(python -c " import json with open('security_report.json') as f: report = json.load(f) print(len(report.get('critical', []))) ") echo "Critical findings: $CRITICAL_COUNT" # Fail the build on critical findings if [ "$CRITICAL_COUNT" -gt "0" ]; then echo "::error::$CRITICAL_COUNT critical security vulnerabilities found!" exit 1 fi - name: Post PR Comment with Findings if: github.event_name == 'pull_request' uses: actions/github-script@v7 with: script: | const fs = require('fs'); const report = JSON.parse(fs.readFileSync('security_report.json')); const total = Object.values(report).flat().length; const body = `## π AI Security Scan Results | Severity | Count | |---|---| | π΄ Critical | ${report.critical?.length || 0} | | π High | ${report.high?.length || 0} | | π‘ Medium | ${report.medium?.length || 0} | | π’ Low | ${report.low?.length || 0} | ${total === 0 ? 'β
No vulnerabilities found!' : 'β οΈ Review findings in the security_report.json artifact.'}`; github.rest.issues.createComment({ issue_number: context.issue.number, owner: context.repo.owner, repo: context.repo.repo, body: body });
# .github/workflows/ai-security-scan.yml
name: AI Security Scan on: pull_request: types: [opened, synchronize] schedule: - cron: '0 2 * * 1' # Weekly full scan every Monday at 2am jobs: llm-vuln-scan: runs-on: ubuntu-latest permissions: pull-requests: write security-events: write steps: - uses: actions/checkout@v4 with: fetch-depth: 0 # Full history for diff-based scanning on PRs - name: Set up Python uses: actions/setup-python@v5 with: python-version: '3.12' - name: Install dependencies run: pip install anthropic>=0.30.0 - name: Run AI Security Scanner env: ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }} run: | # On PRs: scan only changed files for speed if [ "${{ github.event_name }}" = "pull_request" ]; then git diff --name-only origin/${{ github.base_ref }}...HEAD > changed_files.txt python minimal_vuln_scanner.py --files-list changed_files.txt else # On scheduled run: full repository scan python minimal_vuln_scanner.py . fi - name: Check for Critical Findings run: | CRITICAL_COUNT=$(python -c " import json with open('security_report.json') as f: report = json.load(f) print(len(report.get('critical', []))) ") echo "Critical findings: $CRITICAL_COUNT" # Fail the build on critical findings if [ "$CRITICAL_COUNT" -gt "0" ]; then echo "::error::$CRITICAL_COUNT critical security vulnerabilities found!" exit 1 fi - name: Post PR Comment with Findings if: github.event_name == 'pull_request' uses: actions/github-script@v7 with: script: | const fs = require('fs'); const report = JSON.parse(fs.readFileSync('security_report.json')); const total = Object.values(report).flat().length; const body = `## π AI Security Scan Results | Severity | Count | |---|---| | π΄ Critical | ${report.critical?.length || 0} | | π High | ${report.high?.length || 0} | | π‘ Medium | ${report.medium?.length || 0} | | π’ Low | ${report.low?.length || 0} | ${total === 0 ? 'β
No vulnerabilities found!' : 'β οΈ Review findings in the security_report.json artifact.'}`; github.rest.issues.createComment({ issue_number: context.issue.number, owner: context.repo.owner, repo: context.repo.repo, body: body });
# multi_model_consensus.py
# Run findings through multiple models; only surface results where β₯2 agree.
# Requires ANTHROPIC_API_KEY and OPENAI_API_KEY env vars. import asyncio
import json
from anthropic import AsyncAnthropic
from openai import AsyncOpenAI anthropic_client = AsyncAnthropic()
openai_client = AsyncOpenAI() VERIFICATION_PROMPT = """You are an expert security researcher verifying whether
a reported vulnerability is real or a false positive. Given the following finding and source code, answer:
1. Is this vulnerability real? (yes/no/uncertain)
2. If real: can an attacker trigger it from an untrusted context? (yes/no/uncertain)
3. Confidence: (high/medium/low) Respond in JSON: {"is_real": bool, "triggerable": bool, "confidence": "high"|"medium"|"low", "reasoning": "one sentence"}""" async def verify_with_claude(finding: dict, source_code: str) -> dict: msg = await anthropic_client.messages.create( model="claude-opus-4-5", max_tokens=512, system=VERIFICATION_PROMPT, messages=[{"role": "user", "content": f"Finding:\n{json.dumps(finding)}\n\nCode:\n{source_code}"}] ) return json.loads(msg.content[0].text) async def verify_with_gpt(finding: dict, source_code: str) -> dict: resp = await openai_client.chat.completions.create( model="gpt-4.1", messages=[ {"role": "system", "content": VERIFICATION_PROMPT}, {"role": "user", "content": f"Finding:\n{json.dumps(finding)}\n\nCode:\n{source_code}"} ], max_tokens=512, response_format={"type": "json_object"} ) return json.loads(resp.choices[0].message.content) async def consensus_verify(finding: dict, source_code: str) -> dict: """Verify a finding with multiple models; return consensus result.""" claude_result, gpt_result = await asyncio.gather( verify_with_claude(finding, source_code), verify_with_gpt(finding, source_code) ) # Require both to agree the finding is real both_confirm = claude_result.get('is_real') and gpt_result.get('is_real') return { "finding": finding, "consensus": both_confirm, "claude_verdict": claude_result, "gpt_verdict": gpt_result, "advance_to_human_review": both_confirm, "false_positive_probability": "low" if both_confirm else "high" }
# multi_model_consensus.py
# Run findings through multiple models; only surface results where β₯2 agree.
# Requires ANTHROPIC_API_KEY and OPENAI_API_KEY env vars. import asyncio
import json
from anthropic import AsyncAnthropic
from openai import AsyncOpenAI anthropic_client = AsyncAnthropic()
openai_client = AsyncOpenAI() VERIFICATION_PROMPT = """You are an expert security researcher verifying whether
a reported vulnerability is real or a false positive. Given the following finding and source code, answer:
1. Is this vulnerability real? (yes/no/uncertain)
2. If real: can an attacker trigger it from an untrusted context? (yes/no/uncertain)
3. Confidence: (high/medium/low) Respond in JSON: {"is_real": bool, "triggerable": bool, "confidence": "high"|"medium"|"low", "reasoning": "one sentence"}""" async def verify_with_claude(finding: dict, source_code: str) -> dict: msg = await anthropic_client.messages.create( model="claude-opus-4-5", max_tokens=512, system=VERIFICATION_PROMPT, messages=[{"role": "user", "content": f"Finding:\n{json.dumps(finding)}\n\nCode:\n{source_code}"}] ) return json.loads(msg.content[0].text) async def verify_with_gpt(finding: dict, source_code: str) -> dict: resp = await openai_client.chat.completions.create( model="gpt-4.1", messages=[ {"role": "system", "content": VERIFICATION_PROMPT}, {"role": "user", "content": f"Finding:\n{json.dumps(finding)}\n\nCode:\n{source_code}"} ], max_tokens=512, response_format={"type": "json_object"} ) return json.loads(resp.choices[0].message.content) async def consensus_verify(finding: dict, source_code: str) -> dict: """Verify a finding with multiple models; return consensus result.""" claude_result, gpt_result = await asyncio.gather( verify_with_claude(finding, source_code), verify_with_gpt(finding, source_code) ) # Require both to agree the finding is real both_confirm = claude_result.get('is_real') and gpt_result.get('is_real') return { "finding": finding, "consensus": both_confirm, "claude_verdict": claude_result, "gpt_verdict": gpt_result, "advance_to_human_review": both_confirm, "false_positive_probability": "low" if both_confirm else "high" }
# multi_model_consensus.py
# Run findings through multiple models; only surface results where β₯2 agree.
# Requires ANTHROPIC_API_KEY and OPENAI_API_KEY env vars. import asyncio
import json
from anthropic import AsyncAnthropic
from openai import AsyncOpenAI anthropic_client = AsyncAnthropic()
openai_client = AsyncOpenAI() VERIFICATION_PROMPT = """You are an expert security researcher verifying whether
a reported vulnerability is real or a false positive. Given the following finding and source code, answer:
1. Is this vulnerability real? (yes/no/uncertain)
2. If real: can an attacker trigger it from an untrusted context? (yes/no/uncertain)
3. Confidence: (high/medium/low) Respond in JSON: {"is_real": bool, "triggerable": bool, "confidence": "high"|"medium"|"low", "reasoning": "one sentence"}""" async def verify_with_claude(finding: dict, source_code: str) -> dict: msg = await anthropic_client.messages.create( model="claude-opus-4-5", max_tokens=512, system=VERIFICATION_PROMPT, messages=[{"role": "user", "content": f"Finding:\n{json.dumps(finding)}\n\nCode:\n{source_code}"}] ) return json.loads(msg.content[0].text) async def verify_with_gpt(finding: dict, source_code: str) -> dict: resp = await openai_client.chat.completions.create( model="gpt-4.1", messages=[ {"role": "system", "content": VERIFICATION_PROMPT}, {"role": "user", "content": f"Finding:\n{json.dumps(finding)}\n\nCode:\n{source_code}"} ], max_tokens=512, response_format={"type": "json_object"} ) return json.loads(resp.choices[0].message.content) async def consensus_verify(finding: dict, source_code: str) -> dict: """Verify a finding with multiple models; return consensus result.""" claude_result, gpt_result = await asyncio.gather( verify_with_claude(finding, source_code), verify_with_gpt(finding, source_code) ) # Require both to agree the finding is real both_confirm = claude_result.get('is_real') and gpt_result.get('is_real') return { "finding": finding, "consensus": both_confirm, "claude_verdict": claude_result, "gpt_verdict": gpt_result, "advance_to_human_review": both_confirm, "false_positive_probability": "low" if both_confirm else "high" } - The Day an AI Found a macOS Kernel CVE
- What Is LLM Vulnerability Research? (Beyond Static Analysis)
- How Mythos Preview Works: Exploit Chain Construction & Proof Generation
- Real-World Results: Mozilla, Cloudflare, and Numbers That Stunned the Industry
- The Agentic Harness Architecture: Deep Technical Breakdown
- GPT-5.5 vs Claude Mythos: A Comparative Look at Frontier Security Models
- The New Bottleneck: Finding > Fixing
- Building Your Own AI Security Pipeline
- Safety, Ethics, and Dual-Use Concerns
- What's Next: The Near-Future of AI-Powered Cyber Defense
- Conclusion & Call to Action - SAST (Static Application Security Testing): Pattern-matching against known vulnerability signatures in source code. Fast, high false-positive rate, misses logic bugs entirely.
- DAST (Dynamic Application Security Testing): Black-box fuzzing, sending malformed inputs and watching for crashes. Good for input validation bugs, blind to architectural flaws.
- Symbolic Execution: Exhaustively explores code paths using constraint solvers (e.g., KLEE, angr). Powerful but doesn't scale to real-world codebases.
- Manual Penetration Testing: Human researchers manually auditing code. High quality, brutally expensive, doesn't scale. - Which primitives can be combined?
- What preconditions does each step require?
- Can an attacker reliably satisfy those preconditions from an unprivileged context?
- What does the final exploit look like end-to-end? - Hypothesize: Identify a suspected vulnerability and formulate a triggering condition.
- Synthesize: Write code that would trigger the bug β a test harness, a malformed input, a specific sequence of API calls.
- Compile & Execute: Build the PoC in an isolated sandbox environment and run it.
- Observe & Iterate: If the expected behavior (crash, memory corruption, privilege escalation) isn't observed, read the output, revise the hypothesis, and try again. - Bug 2024918: An incorrect equality check allowed the JIT compiler to optimize away initialization of a live WebAssembly GC struct, creating a fake-object primitive with arbitrary read/write. This code had undergone extensive fuzzing by both internal and external researchers and was never found.
- Bug 2024437: A 15-year-old bug in the <legend> HTML element triggered by an intricate orchestration of recursion stack depth limits, expando properties, and cycle collection across distant parts of the browser.
- Bug 2022733: A parent-process UAF triggered by flooding WebTransport with thousands of certificate hashes to stretch a race condition in a refcount-heavy copy loop β then exploiting that race over IPC from a compromised content process. - Build a dependency graph of the codebase
- Identify all external trust boundaries (network input, file parsing, IPC, user input)
- Enumerate attack-relevant subsystems (crypto, auth, memory management, privilege operations)
- Produce a prioritized list of modules to scan, ordered by attack surface and severity potential - Full file context for the module under analysis
- Relevant cross-file dependencies loaded dynamically
- Language-specific vulnerability playbook (C/C++ memory bugs vs. Python deserialization vs. Rust unsafe blocks)
- A structured output format enforcing finding quality - Shorten patch cycles aggressively. The 90-day standard disclosure window was designed for the old world. As AI-found bugs become public CVEs faster, the exploitation window is compressing.
- Invest in automated patch generation pipelines. Claude Security (now in public beta for Enterprise) can generate proposed fixes, not just identify bugs. This is the next frontier for reducing the triage burden.
- Memory-safe languages matter more than ever. Both Cloudflare and Mozilla's data confirm significantly higher false-positive rates and more severe findings in C/C++ codebases vs. Rust or Go. The ROI on memory-safe rewrites just got a lot more concrete.
- Staged rollout policies are critical. With AI accelerating both attack and defense, end users need to be able to receive patches faster. Frictionless update mechanisms aren't just a UX concern β they're a security posture. - Responsible disclosure, always. AI is going to accelerate vulnerability discovery dramatically. The 90-day disclosure standard exists for good reason β it gives end users time to patch. Don't let the excitement of AI-found bugs shortcut this process.
- Scope your harness carefully. Ensure your scanning pipeline only touches infrastructure you own or have explicit written authorization to test. The fact that a tool is effective doesn't change the legal and ethical requirements for authorization.
- Verify before you disclose. Submit only confirmed, PoC-backed findings to maintainers. The open-source community is already overwhelmed by low-quality AI-generated bug reports. Be part of the solution, not the problem.
- Watch for model inconsistency. Cloudflare's team documented that Mythos Preview's organic guardrails are inconsistent β the same task framed differently could produce completely different refusal behavior. Don't treat model-level safeguards as a substitute for process-level controls. - Mythos-class capabilities will become available in more generally accessible models as Anthropic and OpenAI iterate
- Automated patch generation will mature β tools like Claude Security will move from "propose fixes" to "generate, test, and submit PRs" with minimal human involvement
- CI/CD-integrated AI security scanning will become a default expectation, not a differentiator - The concept of a "security debt surface" will become quantifiable in real-time β every codebase will have a live severity score updated continuously by AI agents
- Memory-safe language adoption will accelerate dramatically as C/C++ vulnerability rates become impossible to ignore empirically
- The security research job market will bifurcate: routine scanning automation, but a premium on researchers who can architect harnesses, interpret AI findings, and build novel exploitation techniques that AI hasn't yet learned - The bottleneck will shift from patching to architectural hardening β teams will move from "fix this bug" to "eliminate this entire bug class" through language choices, sandboxing, capability restriction, and formal verification
- AI models may begin writing security specifications and verifying code against them, moving toward a world where newly written code is provably free of common vulnerability classes - Run the minimal scanner above against your most critical service. Set your ANTHROPIC_API_KEY, point it at a repo, and see what it finds. The marginal cost of a scan is a few API dollars. The marginal cost of an unpatched critical is not.
- Set up the GitHub Actions workflow for your team's most security-sensitive repositories. Automated scanning on every PR is now table stakes.
- Apply to Anthropic's Cyber Verification Program if your organization does legitimate security research, red-teaming, or penetration testing. Access to higher-capability models in this domain is now a significant professional advantage.