# Semgrep rule for SQL injection in Python
rules: - id: sql-injection-format-string patterns: - pattern: | cursor.execute(f"...{$VAR}...") - pattern-not: | cursor.execute(f"...{CONST}...") message: "Potential SQL injection via f-string formatting" severity: ERROR languages: [python]
# Semgrep rule for SQL injection in Python
rules: - id: sql-injection-format-string patterns: - pattern: | cursor.execute(f"...{$VAR}...") - pattern-not: | cursor.execute(f"...{CONST}...") message: "Potential SQL injection via f-string formatting" severity: ERROR languages: [python]
# Semgrep rule for SQL injection in Python
rules: - id: sql-injection-format-string patterns: - pattern: | cursor.execute(f"...{$VAR}...") - pattern-not: | cursor.execute(f"...{CONST}...") message: "Potential SQL injection via f-string formatting" severity: ERROR languages: [python]
# Vulnerable: SQL injection via string formatting
from flask import Flask, request app = Flask(__name__) @app.route('/users/search')
def search_users(): name = request.args.get('name') conn = psycopg2.connect(database="myapp") cursor = conn.cursor() # VULNERABLE: User input directly in query string query = f"SELECT * FROM users WHERE name LIKE '%{name}%'" cursor.execute(query) results = cursor.fetchall() return {"users": results}
# Vulnerable: SQL injection via string formatting
from flask import Flask, request app = Flask(__name__) @app.route('/users/search')
def search_users(): name = request.args.get('name') conn = psycopg2.connect(database="myapp") cursor = conn.cursor() # VULNERABLE: User input directly in query string query = f"SELECT * FROM users WHERE name LIKE '%{name}%'" cursor.execute(query) results = cursor.fetchall() return {"users": results}
# Vulnerable: SQL injection via string formatting
from flask import Flask, request app = Flask(__name__) @app.route('/users/search')
def search_users(): name = request.args.get('name') conn = psycopg2.connect(database="myapp") cursor = conn.cursor() # VULNERABLE: User input directly in query string query = f"SELECT * FROM users WHERE name LIKE '%{name}%'" cursor.execute(query) results = cursor.fetchall() return {"users": results}
# Fixed: Parameterized query
query = "SELECT * FROM users WHERE name LIKE %s"
cursor.execute(query, (f"%{name}%",))
# Fixed: Parameterized query
query = "SELECT * FROM users WHERE name LIKE %s"
cursor.execute(query, (f"%{name}%",))
# Fixed: Parameterized query
query = "SELECT * FROM users WHERE name LIKE %s"
cursor.execute(query, (f"%{name}%",))
// Vulnerable: Reflected XSS via dangerouslySetInnerHTML function SearchResults() { const [searchParams] = useSearchParams(); const query = searchParams.get('q'); return ( <div> <h2>Results for:</h2> {/* VULNERABLE: Rendering user input as raw HTML */} <div dangerouslySetInnerHTML={{ __html: query }} /> </div> );
}
// Vulnerable: Reflected XSS via dangerouslySetInnerHTML function SearchResults() { const [searchParams] = useSearchParams(); const query = searchParams.get('q'); return ( <div> <h2>Results for:</h2> {/* VULNERABLE: Rendering user input as raw HTML */} <div dangerouslySetInnerHTML={{ __html: query }} /> </div> );
}
// Vulnerable: Reflected XSS via dangerouslySetInnerHTML function SearchResults() { const [searchParams] = useSearchParams(); const query = searchParams.get('q'); return ( <div> <h2>Results for:</h2> {/* VULNERABLE: Rendering user input as raw HTML */} <div dangerouslySetInnerHTML={{ __html: query }} /> </div> );
}
// Fixed: Render as text content, not HTML
<div>{query}</div> // Or if HTML is needed, sanitize first <div dangerouslySetInnerHTML={{ __html: DOMPurify.sanitize(query) }} />
// Fixed: Render as text content, not HTML
<div>{query}</div> // Or if HTML is needed, sanitize first <div dangerouslySetInnerHTML={{ __html: DOMPurify.sanitize(query) }} />
// Fixed: Render as text content, not HTML
<div>{query}</div> // Or if HTML is needed, sanitize first <div dangerouslySetInnerHTML={{ __html: DOMPurify.sanitize(query) }} />
// Vulnerable: Missing authorization check
const express = require('express');
const router = express.Router(); // Middleware checks authentication (is the user logged in?)
router.use(authMiddleware); router.get('/api/documents/:id', async (req, res) => { const document = await Document.findById(req.params.id); if (!document) { return res.-weight: 500;">status(404).json({ error: 'Not found' }); } // VULNERABLE: No check that req.user owns this document // Any authenticated user can access any document by ID return res.json(document);
}); router.delete('/api/documents/:id', async (req, res) => { // VULNERABLE: Same issue - no ownership check before deletion await Document.findByIdAndDelete(req.params.id); return res.-weight: 500;">status(204).send();
});
// Vulnerable: Missing authorization check
const express = require('express');
const router = express.Router(); // Middleware checks authentication (is the user logged in?)
router.use(authMiddleware); router.get('/api/documents/:id', async (req, res) => { const document = await Document.findById(req.params.id); if (!document) { return res.-weight: 500;">status(404).json({ error: 'Not found' }); } // VULNERABLE: No check that req.user owns this document // Any authenticated user can access any document by ID return res.json(document);
}); router.delete('/api/documents/:id', async (req, res) => { // VULNERABLE: Same issue - no ownership check before deletion await Document.findByIdAndDelete(req.params.id); return res.-weight: 500;">status(204).send();
});
// Vulnerable: Missing authorization check
const express = require('express');
const router = express.Router(); // Middleware checks authentication (is the user logged in?)
router.use(authMiddleware); router.get('/api/documents/:id', async (req, res) => { const document = await Document.findById(req.params.id); if (!document) { return res.-weight: 500;">status(404).json({ error: 'Not found' }); } // VULNERABLE: No check that req.user owns this document // Any authenticated user can access any document by ID return res.json(document);
}); router.delete('/api/documents/:id', async (req, res) => { // VULNERABLE: Same issue - no ownership check before deletion await Document.findByIdAndDelete(req.params.id); return res.-weight: 500;">status(204).send();
});
// Fixed: Add ownership verification
router.get('/api/documents/:id', async (req, res) => { const document = await Document.findById(req.params.id); if (!document) { return res.-weight: 500;">status(404).json({ error: 'Not found' }); } // Check that the requesting user owns this document if (document.ownerId.toString() !== req.user.id) { return res.-weight: 500;">status(403).json({ error: 'Forbidden' }); } return res.json(document);
});
// Fixed: Add ownership verification
router.get('/api/documents/:id', async (req, res) => { const document = await Document.findById(req.params.id); if (!document) { return res.-weight: 500;">status(404).json({ error: 'Not found' }); } // Check that the requesting user owns this document if (document.ownerId.toString() !== req.user.id) { return res.-weight: 500;">status(403).json({ error: 'Forbidden' }); } return res.json(document);
});
// Fixed: Add ownership verification
router.get('/api/documents/:id', async (req, res) => { const document = await Document.findById(req.params.id); if (!document) { return res.-weight: 500;">status(404).json({ error: 'Not found' }); } // Check that the requesting user owns this document if (document.ownerId.toString() !== req.user.id) { return res.-weight: 500;">status(403).json({ error: 'Forbidden' }); } return res.json(document);
});
# Vulnerable: Insecure deserialization with pickle from flask import Flask, request app = Flask(__name__) @app.route('/api/import-config', methods=['POST'])
def import_config(): encoded_data = request.form.get('config') # VULNERABLE: Deserializing untrusted data with pickle # An attacker can craft a pickle payload that executes arbitrary code config = pickle.loads(base64.b64decode(encoded_data)) apply_config(config) return {"-weight: 500;">status": "Config imported successfully"}
# Vulnerable: Insecure deserialization with pickle from flask import Flask, request app = Flask(__name__) @app.route('/api/import-config', methods=['POST'])
def import_config(): encoded_data = request.form.get('config') # VULNERABLE: Deserializing untrusted data with pickle # An attacker can craft a pickle payload that executes arbitrary code config = pickle.loads(base64.b64decode(encoded_data)) apply_config(config) return {"-weight: 500;">status": "Config imported successfully"}
# Vulnerable: Insecure deserialization with pickle from flask import Flask, request app = Flask(__name__) @app.route('/api/import-config', methods=['POST'])
def import_config(): encoded_data = request.form.get('config') # VULNERABLE: Deserializing untrusted data with pickle # An attacker can craft a pickle payload that executes arbitrary code config = pickle.loads(base64.b64decode(encoded_data)) apply_config(config) return {"-weight: 500;">status": "Config imported successfully"}
# Fixed: Use JSON instead of pickle for untrusted data @app.route('/api/import-config', methods=['POST'])
def import_config(): encoded_data = request.form.get('config') config = json.loads(base64.b64decode(encoded_data)) # Validate the config structure validate_config_schema(config) apply_config(config) return {"-weight: 500;">status": "Config imported successfully"}
# Fixed: Use JSON instead of pickle for untrusted data @app.route('/api/import-config', methods=['POST'])
def import_config(): encoded_data = request.form.get('config') config = json.loads(base64.b64decode(encoded_data)) # Validate the config structure validate_config_schema(config) apply_config(config) return {"-weight: 500;">status": "Config imported successfully"}
# Fixed: Use JSON instead of pickle for untrusted data @app.route('/api/import-config', methods=['POST'])
def import_config(): encoded_data = request.form.get('config') config = json.loads(base64.b64decode(encoded_data)) # Validate the config structure validate_config_schema(config) apply_config(config) return {"-weight: 500;">status": "Config imported successfully"}
# Vulnerable: Hardcoded credentials # VULNERABLE: AWS credentials hardcoded in source
AWS_ACCESS_KEY = "AKIAIOSFODNN7EXAMPLE"
AWS_SECRET_KEY = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" # VULNERABLE: JWT secret hardcoded
JWT_SECRET = "super-secret-jwt-key-2026" def get_s3_client(): return boto3.client( 's3', aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key=AWS_SECRET_KEY ) def generate_token(user_id): return jwt.encode( {"user_id": user_id}, JWT_SECRET, algorithm="HS256" )
# Vulnerable: Hardcoded credentials # VULNERABLE: AWS credentials hardcoded in source
AWS_ACCESS_KEY = "AKIAIOSFODNN7EXAMPLE"
AWS_SECRET_KEY = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" # VULNERABLE: JWT secret hardcoded
JWT_SECRET = "super-secret-jwt-key-2026" def get_s3_client(): return boto3.client( 's3', aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key=AWS_SECRET_KEY ) def generate_token(user_id): return jwt.encode( {"user_id": user_id}, JWT_SECRET, algorithm="HS256" )
# Vulnerable: Hardcoded credentials # VULNERABLE: AWS credentials hardcoded in source
AWS_ACCESS_KEY = "AKIAIOSFODNN7EXAMPLE"
AWS_SECRET_KEY = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" # VULNERABLE: JWT secret hardcoded
JWT_SECRET = "super-secret-jwt-key-2026" def get_s3_client(): return boto3.client( 's3', aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key=AWS_SECRET_KEY ) def generate_token(user_id): return jwt.encode( {"user_id": user_id}, JWT_SECRET, algorithm="HS256" )
# Fixed: Use environment variables def get_s3_client(): return boto3.client( 's3', aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'], aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'] ) def generate_token(user_id): return jwt.encode( {"user_id": user_id}, os.environ['JWT_SECRET'], algorithm="HS256" )
# Fixed: Use environment variables def get_s3_client(): return boto3.client( 's3', aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'], aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'] ) def generate_token(user_id): return jwt.encode( {"user_id": user_id}, os.environ['JWT_SECRET'], algorithm="HS256" )
# Fixed: Use environment variables def get_s3_client(): return boto3.client( 's3', aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'], aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'] ) def generate_token(user_id): return jwt.encode( {"user_id": user_id}, os.environ['JWT_SECRET'], algorithm="HS256" )
# .pre-commit-config.yaml
repos: - repo: https://github.com/semgrep/semgrep hooks: - id: semgrep args: ['--config', 'p/security-audit', '--severity', 'ERROR'] - repo: https://github.com/gitleaks/gitleaks hooks: - id: gitleaks
# .pre-commit-config.yaml
repos: - repo: https://github.com/semgrep/semgrep hooks: - id: semgrep args: ['--config', 'p/security-audit', '--severity', 'ERROR'] - repo: https://github.com/gitleaks/gitleaks hooks: - id: gitleaks
# .pre-commit-config.yaml
repos: - repo: https://github.com/semgrep/semgrep hooks: - id: semgrep args: ['--config', 'p/security-audit', '--severity', 'ERROR'] - repo: https://github.com/gitleaks/gitleaks hooks: - id: gitleaks
# Example GitHub Actions workflow for security PR checks
name: Security Review
on: [pull_request] jobs: semgrep: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - uses: semgrep/semgrep-action@v1 with: config: >- p/security-audit p/secrets p/owasp-top-ten snyk-code: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - uses: snyk/actions/code@master env: SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }} # CodeRabbit runs automatically via GitHub App # No workflow configuration needed
# Example GitHub Actions workflow for security PR checks
name: Security Review
on: [pull_request] jobs: semgrep: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - uses: semgrep/semgrep-action@v1 with: config: >- p/security-audit p/secrets p/owasp-top-ten snyk-code: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - uses: snyk/actions/code@master env: SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }} # CodeRabbit runs automatically via GitHub App # No workflow configuration needed
# Example GitHub Actions workflow for security PR checks
name: Security Review
on: [pull_request] jobs: semgrep: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - uses: semgrep/semgrep-action@v1 with: config: >- p/security-audit p/secrets p/owasp-top-ten snyk-code: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - uses: snyk/actions/code@master env: SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }} # CodeRabbit runs automatically via GitHub App # No workflow configuration needed - Business logic flaws - An endpoint that checks if a user is authenticated but does not check if they are authorized to access the specific resource
- Incomplete validation - Input validation that checks the format but not the range, or validates some fields but not others
- Race conditions with security implications - A time-of-check-to-time-of-use (TOCTOU) gap between verifying permissions and executing the action
- Contextual misconfigurations - Security settings that are technically valid but inappropriate for the application's threat model - SonarLint (the IDE plugin from SonarQube) provides real-time security feedback in VS Code, JetBrains, and Eclipse as developers type. It catches common vulnerability patterns immediately, with explanations and fix suggestions.
- Semgrep CLI can run as a pre-commit hook, scanning changed files in under a second and blocking commits that contain high-severity vulnerabilities. Configure it with --severity ERROR to flag only critical issues and avoid slowing down the development loop.
- Secrets scanners like -weight: 500;">git-secrets, gitleaks, or Semgrep Secrets should run pre-commit to prevent credentials from ever entering version control. Once a secret is committed, it exists in -weight: 500;">git history forever (unless the repo is rewritten), making prevention far more valuable than detection. - AI code review (CodeRabbit or similar) reviews every PR for logic-level security issues, missing authorization checks, insufficient input validation, and security anti-patterns. The AI provides contextual feedback that rule-based tools cannot.
- SAST scanning (Semgrep, Snyk Code, or both) runs on the PR diff and reports findings as inline comments or -weight: 500;">status checks. Configure quality gates to block merges when high-severity vulnerabilities are found.
- SCA scanning (Snyk Open Source, Semgrep Supply Chain, or Dependabot) checks whether the PR introduces or updates dependencies with known vulnerabilities. This addresses OWASP A06 (Vulnerable and Outdated Components). - Full SAST scans run weekly or on each release branch. Use Checkmarx, Veracode, or Semgrep with Pro rules for maximum coverage.
- Dependency audits check the full dependency tree for known vulnerabilities, including transitive dependencies that PR-level SCA might miss.
- Infrastructure-as-code scanning checks Terraform, CloudFormation, Kubernetes manifests, and Docker configurations for security misconfigurations. - Security champion review for any PR that touches authentication, authorization, cryptography, input validation, or data handling logic. AI review can flag these PRs for attention, but a human should verify the findings.
- Threat modeling for new features, especially those that introduce new data flows, external integrations, or user-facing attack surfaces.
- Periodic manual code audits where a security engineer reviews critical modules without relying on automated tool output. This catches issues that all automated tools miss and prevents the deskilling problem discussed earlier. - AI security scanning catches known vulnerability patterns early in development, at low cost, across the entire codebase. This is the foundation.
- Penetration testing validates the security posture of the deployed application on a regular cadence (quarterly or before major releases) and identifies runtime and infrastructure vulnerabilities that code scanning cannot find.
- Bug bounty programs provide continuous external testing and catch edge cases that internal processes and automated tools miss.