Tools
πAutomated Security Incident Response System β
2025-12-26
0 views
admin
Implementation Guide - 2 Days (No Docker) ## π Prerequisites ## Total: ~$90/month ## ποΈ Architecture Overview ## π
DAY 1: Foundation & Detection (8 Hours) ## π PHASE 1: Setup GuardDuty (1 Hour) ## π PHASE 2: Setup Security Hub (1 Hour) ## π PHASE 3: Setup SNS for Alerts (30 Minutes) ## π PHASE 4: Create IAM Role for Lambda (30 Minutes) ## π PHASE 5: Create Auto-Response Lambda Functions (3 Hours) ## π PHASE 6: Connect EventBridge to Lambda (1 Hour) ## π PHASE 7: Testing with Sample Findings (1 Hour) ## π
DAY 2: Advanced Workflows & Compliance (8 Hours) ## π PHASE 8: Setup CloudTrail (1 Hour) ## π PHASE 9: Setup AWS Config (1.5 Hours) ## π PHASE 10: Build Step Functions Workflow (2 Hours) ## π PHASE 11: Advanced Response Functions (2 Hours) ## π PHASE 12: Create CloudWatch Dashboard (1.5 Hours) ## π Phase 13: Create Runbook via AWS Console ## π PHASE 14: Final Testing & Validation (1 Hour) Based on: CrowdStrike Incident July 2024 - $5.4B Losses Timeline: 2 Hari | Budget: ~$90/bulan | Complexity: Advanced step 1.1 : Enable GuardDuty β
Verification: GuardDuty console will show "No findings" - this is normal, it takes time to detect threats Step 1.2: Generate Sample Findings (for Testing) Generate sample findings β
Verification: GuardDuty β Findings β You'll see 50+ sample findings Security Hub adalah centralized security dashboard. Step 2.1: Enable Security Hub Wait 5-10 minutes untuk Security Hub populate findings. Step 2.2: Enable GuardDuty Integration β
Verification: Security Hub β Findings β akan muncul findings dari GuardDuty Step 3.1: Create SNS Topic Save the Topic ARN - it will look like: arn:aws:sns:YOUR_REGION:YOUR_ACCOUNT_ID:security-incident-alerts Step 3.2: Create Email Subscription Step 3.3: Create Slack Subscription (Optional) We'll integrate this in Lambda later β
Verification: Send test message:
Example: Step 4.1: Create Execution Role Step 4.2: Add Inline Policy for Additional Permissions Step 5.1: Create Main Response Function Step 5.2: Write Lambda Code (No Docker Needed!) Click Code tab, replace with this code: Step 5.3: Configure Lambda β
Verification: Lambda function created successfully Step 6.1: Create EventBridge Rule for GuardDuty Step 6.2: Create Rule for Security Hub β
Verification: EventBridge β Rules β 2 rules active Step 7.1: Trigger Test Finding Step 7.2: Monitor Lambda Execution You should see logs like: Processing finding: UnauthorizedAccess:EC2/MaliciousIPCaller.Custom (Severity: HIGH)
Instance i-xxxxx successfully isolated
Security alert sent successfully Step 7.3: Check Email Check your email - you should receive alert: π¨ SECURITY INCIDENT DETECTED π¨
Severity: HIGH
Type: UnauthorizedAccess:EC2/MaliciousIPCaller.Custom β
Day 1 Complete! Basic automated response working. CloudTrail logs all API calls for audit trail. Step 8.1: Enable CloudTrail β
Verification: CloudTrail β Trails β Status "Logging" AWS Config monitors resource configurations for compliance. Step 9.1: Enable Config AWS Config Console βGet started Resource types to record: Step 9.2: Add Compliance Rules Add these managed rules: To add each rule: 1.Config β Rules β Add rule 2.Search for rule name 3.Click rule β Next 4.Leave defaults β Save β
Verification: Config β Rules β 6 rules active, evaluating compliance Step Functions untuk complex multi-step response workflows. Step 10.1: Create Step Functions State Machine Step Functions Console β Create state machine Choose template: Blank Name: security-incident-workflow Definition (paste this JSON): Replace REGION and ACCOUNT with your Value Step 10.2: Update EventBridge to Trigger Step Functions Create additional Lambda functions untuk specific scenarios. Step 11.1: Create Forensics Collection Function Step 12.1: Create Custom Dashboard Step 12.2: Add Widgets Widget 1: GuardDuty Findings Count Widget 2: Lambda Invocation Count Widget 3: Lambda Errors Widget 4: Security Hub Findings by Severity Widget 5: Recent CloudTrail Events Widget 6: Step Functions Executions (optional) Option 1: Save in S3 (Recommended) Step 1: Create S3 Bucket Step 2: Create Runbook File in your computer Test 1: EC2 Compromise Simulation Step 1: Generate Sample Finding Step 2: Check Lambda Triggered Step 3: Check CloudWatch Logs Step 4: Verify Email Alert Step 5: Check Dashboard β
Test 1 PASSED if all 5 checks OK! Test 2: IAM Credential Compromise Step 1: Generate IAM Finding Step 2: Verify Lambda Processing β
Test 2 PASSED if Lambda executed + email received! Step 14.2: Performance Validation via Console Check Dashboard Metrics: Widget: Lambda Invocations β
Shows 3+ invocations (from 3 tests) β
Recent activity visible Widget: Lambda Errors β
Should show 0 errors β
(Failures like "Instance not found" are logged but not Lambda errors) Widget: Lambda Duration β
Average < 5 seconds β
No timeouts Widget: SNS Messages Published β
Shows 3+ messages β
Matches number of tests Widget: CloudWatch Logs β
Shows recent log entries β
"Security alert sent successfully" appears Check Lambda Performance Details: Step 14.3: Compliance Validation Option 1: Check AWS Config Compliance (Simple) Overall compliance score: Should be > 70% π Resume Bullet Points
After completing this project: "Built automated security incident response system processing 1000+ findings/month with < 2 minute response time" "Implemented cloud-native SOAR (Security Orchestration, Automation & Response) using AWS Lambda, Step Functions, reducing manual intervention by 85%" "Designed multi-layered threat detection using GuardDuty, Security Hub, and Config, achieving 95% threat detection accuracy" "Architected forensics collection pipeline with automated evidence preservation compliant with SOC2 and ISO 27001" "Reduced security incident response time from 2 hours (manual) to 2 minutes (automated), saving $150K annually in labor costs" Congratulations! You've built a production-grade automated security incident response system! π Templates let you quickly answer FAQs or store snippets for re-use. Are you sure you want to hide this comment? It will become hidden in your post, but will still be visible via the comment's permalink. Hide child comments as well For further actions, you may consider blocking this person and/or reporting abuse CODE_BLOCK:
aws guardduty create-detector \ --enable \ --finding-publishing-frequency FIFTEEN_MINUTES Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
aws guardduty create-detector \ --enable \ --finding-publishing-frequency FIFTEEN_MINUTES CODE_BLOCK:
aws guardduty create-detector \ --enable \ --finding-publishing-frequency FIFTEEN_MINUTES CODE_BLOCK:
DETECTOR_ID=$(aws guardduty list-detectors --query 'DetectorIds[0]' --output text) Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
DETECTOR_ID=$(aws guardduty list-detectors --query 'DetectorIds[0]' --output text) CODE_BLOCK:
DETECTOR_ID=$(aws guardduty list-detectors --query 'DetectorIds[0]' --output text) CODE_BLOCK:
aws guardduty create-sample-findings \ --detector-id $DETECTOR_ID \ --finding-types "Recon:EC2/PortProbeUnprotectedPort" \ "UnauthorizedAccess:IAMUser/InstanceCredentialExfiltration"\ "CryptoCurrency:EC2/BitcoinTool.B!DNS" Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
aws guardduty create-sample-findings \ --detector-id $DETECTOR_ID \ --finding-types "Recon:EC2/PortProbeUnprotectedPort" \ "UnauthorizedAccess:IAMUser/InstanceCredentialExfiltration"\ "CryptoCurrency:EC2/BitcoinTool.B!DNS" CODE_BLOCK:
aws guardduty create-sample-findings \ --detector-id $DETECTOR_ID \ --finding-types "Recon:EC2/PortProbeUnprotectedPort" \ "UnauthorizedAccess:IAMUser/InstanceCredentialExfiltration"\ "CryptoCurrency:EC2/BitcoinTool.B!DNS" CODE_BLOCK:
aws securityhub enable-security-hub \ --enable-default-standards Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
aws securityhub enable-security-hub \ --enable-default-standards CODE_BLOCK:
aws securityhub enable-security-hub \ --enable-default-standards CODE_BLOCK:
aws sns publish \ --topic-arn arn:aws:sns:YOUR_REGION:YOUR_ACCOUNT_ID:security-incident-alerts \ --message "Test alert - Security system operational" Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
aws sns publish \ --topic-arn arn:aws:sns:YOUR_REGION:YOUR_ACCOUNT_ID:security-incident-alerts \ --message "Test alert - Security system operational" CODE_BLOCK:
aws sns publish \ --topic-arn arn:aws:sns:YOUR_REGION:YOUR_ACCOUNT_ID:security-incident-alerts \ --message "Test alert - Security system operational" CODE_BLOCK:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "ec2:DescribeInstances", "ec2:DescribeSecurityGroups", "ec2:CreateSecurityGroup", "ec2:AuthorizeSecurityGroupIngress", "ec2:AuthorizeSecurityGroupEgress", "ec2:RevokeSecurityGroupIngress", "ec2:RevokeSecurityGroupEgress", "ec2:ModifyInstanceAttribute", "ec2:CreateSnapshot", "ec2:CreateTags", "iam:ListAccessKeys", "iam:DeleteAccessKey", "iam:UpdateLoginProfile", "iam:CreateAccessKey", "sts:GetCallerIdentity", "guardduty:GetFindings", "guardduty:ListFindings", "securityhub:GetFindings", "cloudtrail:LookupEvents" ], "Resource": "*" } ]
} Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "ec2:DescribeInstances", "ec2:DescribeSecurityGroups", "ec2:CreateSecurityGroup", "ec2:AuthorizeSecurityGroupIngress", "ec2:AuthorizeSecurityGroupEgress", "ec2:RevokeSecurityGroupIngress", "ec2:RevokeSecurityGroupEgress", "ec2:ModifyInstanceAttribute", "ec2:CreateSnapshot", "ec2:CreateTags", "iam:ListAccessKeys", "iam:DeleteAccessKey", "iam:UpdateLoginProfile", "iam:CreateAccessKey", "sts:GetCallerIdentity", "guardduty:GetFindings", "guardduty:ListFindings", "securityhub:GetFindings", "cloudtrail:LookupEvents" ], "Resource": "*" } ]
} CODE_BLOCK:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "ec2:DescribeInstances", "ec2:DescribeSecurityGroups", "ec2:CreateSecurityGroup", "ec2:AuthorizeSecurityGroupIngress", "ec2:AuthorizeSecurityGroupEgress", "ec2:RevokeSecurityGroupIngress", "ec2:RevokeSecurityGroupEgress", "ec2:ModifyInstanceAttribute", "ec2:CreateSnapshot", "ec2:CreateTags", "iam:ListAccessKeys", "iam:DeleteAccessKey", "iam:UpdateLoginProfile", "iam:CreateAccessKey", "sts:GetCallerIdentity", "guardduty:GetFindings", "guardduty:ListFindings", "securityhub:GetFindings", "cloudtrail:LookupEvents" ], "Resource": "*" } ]
} COMMAND_BLOCK:
import json
import boto3
import os
from datetime import datetime ec2 = boto3.client('ec2')
iam = boto3.client('iam')
sns = boto3.client('sns')
cloudtrail = boto3.client('cloudtrail') SNS_TOPIC_ARN = os.environ.get('SNS_TOPIC_ARN', 'arn:aws:sns:{Your_Region}:{Your_ACCOUNT_ID}:security-incident-alerts') def lambda_handler(event, context): """Main handler for security incidents""" print(f"Received event: {json.dumps(event, indent=2)}") try: # Parse the finding - handle both GuardDuty and Security Hub formats if 'findings' in event.get('detail', {}): # Security Hub format finding = event['detail']['findings'][0] else: # GuardDuty native format finding = event['detail'] # Get severity - handle different formats if isinstance(finding.get('Severity'), dict): severity = finding['Severity'].get('Label', 'UNKNOWN') else: # Map numeric severity to label severity_num = finding.get('severity', 0) if severity_num >= 7: severity = 'HIGH' elif severity_num >= 8.5: severity = 'CRITICAL' elif severity_num >= 4: severity = 'MEDIUM' else: severity = 'LOW' finding_type = finding.get('type', 'Unknown') finding_id = finding.get('id', 'Unknown') print(f"Processing finding: {finding_type} (Severity: {severity})") # Only respond to HIGH and CRITICAL if severity not in ['HIGH', 'CRITICAL']: print(f"Severity {severity} below threshold, skipping automated response") return { 'statusCode': 200, 'body': json.dumps('Severity below threshold') } # Route to appropriate handler response_actions = [] if 'UnauthorizedAccess:EC2' in finding_type or 'Backdoor:EC2' in finding_type or 'Trojan:Runtime' in finding_type: instance_id = extract_instance_id(finding) if instance_id: result = isolate_ec2_instance(instance_id, finding_type) response_actions.append(result) elif 'UnauthorizedAccess:IAMUser' in finding_type or 'CredentialAccess' in finding_type: user_name = extract_user_name(finding) if user_name: result = rotate_iam_credentials(user_name, finding_type) response_actions.append(result) elif 'Impact:EC2/PortSweep' in finding_type or 'Recon:EC2' in finding_type: source_ip = extract_source_ip(finding) if source_ip: result = block_suspicious_ip(source_ip, finding_type) response_actions.append(result) elif 'CryptoCurrency' in finding_type: instance_id = extract_instance_id(finding) if instance_id: result = handle_crypto_mining(instance_id, finding_type) response_actions.append(result) # Send comprehensive alert send_security_alert(finding, response_actions) return { 'statusCode': 200, 'body': json.dumps({ 'message': 'Security incident processed', 'finding_id': finding_id, 'actions_taken': response_actions }) } except Exception as e: print(f"Error processing security incident: {str(e)}") import traceback traceback.print_exc() send_error_alert(str(e), event) return { 'statusCode': 500, 'body': json.dumps(f'Error: {str(e)}') } def extract_instance_id(finding): """Extract EC2 instance ID from finding""" try: for resource in finding.get('Resources', []): if resource.get('Type') == 'Instance': instance_id = resource.get('Id', '').split('/')[-1] return instance_id if instance_id.startswith('i-') else None except Exception as e: print(f"Error extracting instance ID: {e}") return None def extract_user_name(finding): """Extract IAM user name from finding""" try: for resource in finding.get('Resources', []): if resource.get('Type') == 'AccessKey': return resource.get('AccessKeyDetails', {}).get('UserName') except Exception as e: print(f"Error extracting user name: {e}") return None def extract_source_ip(finding): """Extract source IP from finding""" try: service = finding.get('Service', {}) action = service.get('Action', {}) network_connection = action.get('NetworkConnectionAction', {}) return network_connection.get('RemoteIpDetails', {}).get('IpAddressV4') except Exception as e: print(f"Error extracting source IP: {e}") return None def isolate_ec2_instance(instance_id, finding_type): """Isolate compromised EC2 instance""" try: print(f"Isolating instance: {instance_id}") # Get instance details response = ec2.describe_instances(InstanceIds=[instance_id]) instance = response['Reservations'][0]['Instances'][0] vpc_id = instance['VpcId'] # Create isolation security group sg_name = f'isolation-{instance_id}-{datetime.now().strftime("%Y%m%d%H%M%S")}' sg_response = ec2.create_security_group( GroupName=sg_name, Description=f'Quarantine SG for {instance_id} - {finding_type}', VpcId=vpc_id ) isolation_sg_id = sg_response['GroupId'] # Remove all default egress rules (blocks all outbound) ec2.revoke_security_group_egress( GroupId=isolation_sg_id, IpPermissions=[{ 'IpProtocol': '-1', 'FromPort': -1, 'ToPort': -1, 'IpRanges': [{'CidrIp': '0.0.0.0/0'}] }] ) # Add only SSH access from admin IP (optional) # ec2.authorize_security_group_ingress( # GroupId=isolation_sg_id, # IpPermissions=[{ # 'IpProtocol': 'tcp', # 'FromPort': 22, # 'ToPort': 22, # 'IpRanges': [{'CidrIp': 'YOUR_ADMIN_IP/32', 'Description': 'Admin access'}] # }] # ) # Apply isolation security group to instance ec2.modify_instance_attribute( InstanceId=instance_id, Groups=[isolation_sg_id] ) # Create snapshot for forensics volumes = [vol['Ebs']['VolumeId'] for vol in instance.get('BlockDeviceMappings', [])] snapshots = [] for volume_id in volumes: snapshot = ec2.create_snapshot( VolumeId=volume_id, Description=f'Forensic snapshot - {finding_type}', TagSpecifications=[{ 'ResourceType': 'snapshot', 'Tags': [ {'Key': 'Purpose', 'Value': 'Forensics'}, {'Key': 'InstanceId', 'Value': instance_id}, {'Key': 'Incident', 'Value': finding_type} ] }] ) snapshots.append(snapshot['SnapshotId']) action_summary = { 'action': 'isolate_instance', 'instance_id': instance_id, 'isolation_sg': isolation_sg_id, 'forensic_snapshots': snapshots, 'status': 'success' } print(f"Instance {instance_id} successfully isolated") return action_summary except Exception as e: print(f"Error isolating instance {instance_id}: {e}") return { 'action': 'isolate_instance', 'instance_id': instance_id, 'status': 'failed', 'error': str(e) } def rotate_iam_credentials(user_name, finding_type): """Rotate compromised IAM credentials""" try: print(f"Rotating credentials for user: {user_name}") actions_taken = [] # List and delete all access keys keys_response = iam.list_access_keys(UserName=user_name) deleted_keys = [] for key in keys_response['AccessKeyMetadata']: access_key_id = key['AccessKeyId'] # Deactivate first (safer than immediate delete) iam.update_access_key( UserName=user_name, AccessKeyId=access_key_id, Status='Inactive' ) deleted_keys.append(access_key_id) actions_taken.append(f'Deactivated access key: {access_key_id}') # Force password reset (if console access exists) try: iam.update_login_profile( UserName=user_name, PasswordResetRequired=True ) actions_taken.append('Forced password reset') except iam.exceptions.NoSuchEntityException: actions_taken.append('No console access to reset') # Attach deny-all policy (additional safety) policy_document = { "Version": "2012-10-17", "Statement": [{ "Effect": "Deny", "Action": "*", "Resource": "*" }] } try: iam.put_user_policy( UserName=user_name, PolicyName='EmergencyDenyAll', PolicyDocument=json.dumps(policy_document) ) actions_taken.append('Applied emergency deny policy') except Exception as e: print(f"Could not apply deny policy: {e}") action_summary = { 'action': 'rotate_credentials', 'user_name': user_name, 'deactivated_keys': deleted_keys, 'actions': actions_taken, 'status': 'success' } print(f"Credentials rotated for {user_name}") return action_summary except Exception as e: print(f"Error rotating credentials for {user_name}: {e}") return { 'action': 'rotate_credentials', 'user_name': user_name, 'status': 'failed', 'error': str(e) }
def block_suspicious_ip(source_ip, finding_type): """Block suspicious IP using NACL""" try: print(f"Blocking suspicious IP: {source_ip}") # Get default VPC vpcs = ec2.describe_vpcs(Filters=[{'Name': 'isDefault', 'Values': ['true']}]) if not vpcs['Vpcs']: return { 'action': 'block_ip', 'source_ip': source_ip, 'status': 'failed', 'error': 'No default VPC found' } vpc_id = vpcs['Vpcs'][0]['VpcId'] # Get network ACLs for VPC nacls = ec2.describe_network_acls( Filters=[{'Name': 'vpc-id', 'Values': [vpc_id]}] ) if not nacls['NetworkAcls']: return { 'action': 'block_ip', 'source_ip': source_ip, 'status': 'failed', 'error': 'No NACLs found' } nacl_id = nacls['NetworkAcls'][0]['NetworkAclId'] # Add deny rule (rule number 1 for highest priority) ec2.create_network_acl_entry( NetworkAclId=nacl_id, RuleNumber=1, Protocol='-1', # All protocols RuleAction='deny', CidrBlock=f'{source_ip}/32' ) action_summary = { 'action': 'block_ip', 'source_ip': source_ip, 'nacl_id': nacl_id, 'status': 'success' } print(f"IP {source_ip} successfully blocked") return action_summary except Exception as e: print(f"Error blocking IP {source_ip}: {e}") return { 'action': 'block_ip', 'source_ip': source_ip, 'status': 'failed', 'error': str(e) } def handle_crypto_mining(instance_id, finding_type): """Handle cryptocurrency mining detection""" try: print(f"Handling crypto mining on instance: {instance_id}") # Stop the instance immediately ec2.stop_instances(InstanceIds=[instance_id]) # Create forensic snapshot response = ec2.describe_instances(InstanceIds=[instance_id]) instance = response['Reservations'][0]['Instances'][0] volumes = [vol['Ebs']['VolumeId'] for vol in instance.get('BlockDeviceMappings', [])] snapshots = [] for volume_id in volumes: snapshot = ec2.create_snapshot( VolumeId=volume_id, Description=f'Crypto mining forensics - {instance_id}' ) snapshots.append(snapshot['SnapshotId']) action_summary = { 'action': 'stop_crypto_mining', 'instance_id': instance_id, 'instance_stopped': True, 'forensic_snapshots': snapshots, 'status': 'success' } print(f"Crypto mining instance {instance_id} stopped") return action_summary except Exception as e: print(f"Error handling crypto mining on {instance_id}: {e}") return { 'action': 'stop_crypto_mining', 'instance_id': instance_id, 'status': 'failed', 'error': str(e) } def send_security_alert(finding, response_actions): """Send comprehensive security alert""" try: # Get severity - handle different formats if isinstance(finding.get('Severity'), dict): severity = finding['Severity'].get('Label', 'UNKNOWN') else: severity_num = finding.get('severity', 0) if severity_num >= 8.5: severity = 'CRITICAL' elif severity_num >= 7: severity = 'HIGH' elif severity_num >= 4: severity = 'MEDIUM' else: severity = 'LOW' # Get finding type (lowercase 'type' for GuardDuty, uppercase 'Type' for Security Hub) finding_type = finding.get('type') or finding.get('Type', 'Unknown') # Get description description = finding.get('description') or finding.get('Description', 'No description') # Get finding ID finding_id = finding.get('id') or finding.get('Id', 'Unknown') # Build alert message message = f"""
π¨ SECURITY INCIDENT DETECTED π¨ Severity: {severity}
Type: {finding_type}
Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S UTC')} Description:
{description} AUTOMATED ACTIONS TAKEN:
""" if response_actions: for action in response_actions: message += f"\nβ {action.get('action', 'unknown')}: {action.get('status', 'unknown')}" if action.get('status') == 'failed': message += f" - Error: {action.get('error', 'unknown')}" else: message += "\nNo automated actions taken (instance not found or not applicable)" message += "\n\nSOC Team: Please review and investigate further." message += f"\n\nFinding ID: {finding_id}" # Send to SNS sns.publish( TopicArn=SNS_TOPIC_ARN, Subject=f'π¨ SECURITY ALERT: {finding_type}', Message=message ) print("Security alert sent successfully") except Exception as e: print(f"Error sending security alert: {e}") import traceback traceback.print_exc() def send_error_alert(error_message, event): """Send alert when response function fails""" try: message = f"""
β SECURITY RESPONSE ERROR An error occurred while processing a security incident. Error: {error_message} Event: {json.dumps(event, indent=2)} Action Required: Manual investigation needed.
""" sns.publish( TopicArn=SNS_TOPIC_ARN, Subject='β Security Response System Error', Message=message ) except Exception as e: print(f"Failed to send error alert: {e}") Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
import json
import boto3
import os
from datetime import datetime ec2 = boto3.client('ec2')
iam = boto3.client('iam')
sns = boto3.client('sns')
cloudtrail = boto3.client('cloudtrail') SNS_TOPIC_ARN = os.environ.get('SNS_TOPIC_ARN', 'arn:aws:sns:{Your_Region}:{Your_ACCOUNT_ID}:security-incident-alerts') def lambda_handler(event, context): """Main handler for security incidents""" print(f"Received event: {json.dumps(event, indent=2)}") try: # Parse the finding - handle both GuardDuty and Security Hub formats if 'findings' in event.get('detail', {}): # Security Hub format finding = event['detail']['findings'][0] else: # GuardDuty native format finding = event['detail'] # Get severity - handle different formats if isinstance(finding.get('Severity'), dict): severity = finding['Severity'].get('Label', 'UNKNOWN') else: # Map numeric severity to label severity_num = finding.get('severity', 0) if severity_num >= 7: severity = 'HIGH' elif severity_num >= 8.5: severity = 'CRITICAL' elif severity_num >= 4: severity = 'MEDIUM' else: severity = 'LOW' finding_type = finding.get('type', 'Unknown') finding_id = finding.get('id', 'Unknown') print(f"Processing finding: {finding_type} (Severity: {severity})") # Only respond to HIGH and CRITICAL if severity not in ['HIGH', 'CRITICAL']: print(f"Severity {severity} below threshold, skipping automated response") return { 'statusCode': 200, 'body': json.dumps('Severity below threshold') } # Route to appropriate handler response_actions = [] if 'UnauthorizedAccess:EC2' in finding_type or 'Backdoor:EC2' in finding_type or 'Trojan:Runtime' in finding_type: instance_id = extract_instance_id(finding) if instance_id: result = isolate_ec2_instance(instance_id, finding_type) response_actions.append(result) elif 'UnauthorizedAccess:IAMUser' in finding_type or 'CredentialAccess' in finding_type: user_name = extract_user_name(finding) if user_name: result = rotate_iam_credentials(user_name, finding_type) response_actions.append(result) elif 'Impact:EC2/PortSweep' in finding_type or 'Recon:EC2' in finding_type: source_ip = extract_source_ip(finding) if source_ip: result = block_suspicious_ip(source_ip, finding_type) response_actions.append(result) elif 'CryptoCurrency' in finding_type: instance_id = extract_instance_id(finding) if instance_id: result = handle_crypto_mining(instance_id, finding_type) response_actions.append(result) # Send comprehensive alert send_security_alert(finding, response_actions) return { 'statusCode': 200, 'body': json.dumps({ 'message': 'Security incident processed', 'finding_id': finding_id, 'actions_taken': response_actions }) } except Exception as e: print(f"Error processing security incident: {str(e)}") import traceback traceback.print_exc() send_error_alert(str(e), event) return { 'statusCode': 500, 'body': json.dumps(f'Error: {str(e)}') } def extract_instance_id(finding): """Extract EC2 instance ID from finding""" try: for resource in finding.get('Resources', []): if resource.get('Type') == 'Instance': instance_id = resource.get('Id', '').split('/')[-1] return instance_id if instance_id.startswith('i-') else None except Exception as e: print(f"Error extracting instance ID: {e}") return None def extract_user_name(finding): """Extract IAM user name from finding""" try: for resource in finding.get('Resources', []): if resource.get('Type') == 'AccessKey': return resource.get('AccessKeyDetails', {}).get('UserName') except Exception as e: print(f"Error extracting user name: {e}") return None def extract_source_ip(finding): """Extract source IP from finding""" try: service = finding.get('Service', {}) action = service.get('Action', {}) network_connection = action.get('NetworkConnectionAction', {}) return network_connection.get('RemoteIpDetails', {}).get('IpAddressV4') except Exception as e: print(f"Error extracting source IP: {e}") return None def isolate_ec2_instance(instance_id, finding_type): """Isolate compromised EC2 instance""" try: print(f"Isolating instance: {instance_id}") # Get instance details response = ec2.describe_instances(InstanceIds=[instance_id]) instance = response['Reservations'][0]['Instances'][0] vpc_id = instance['VpcId'] # Create isolation security group sg_name = f'isolation-{instance_id}-{datetime.now().strftime("%Y%m%d%H%M%S")}' sg_response = ec2.create_security_group( GroupName=sg_name, Description=f'Quarantine SG for {instance_id} - {finding_type}', VpcId=vpc_id ) isolation_sg_id = sg_response['GroupId'] # Remove all default egress rules (blocks all outbound) ec2.revoke_security_group_egress( GroupId=isolation_sg_id, IpPermissions=[{ 'IpProtocol': '-1', 'FromPort': -1, 'ToPort': -1, 'IpRanges': [{'CidrIp': '0.0.0.0/0'}] }] ) # Add only SSH access from admin IP (optional) # ec2.authorize_security_group_ingress( # GroupId=isolation_sg_id, # IpPermissions=[{ # 'IpProtocol': 'tcp', # 'FromPort': 22, # 'ToPort': 22, # 'IpRanges': [{'CidrIp': 'YOUR_ADMIN_IP/32', 'Description': 'Admin access'}] # }] # ) # Apply isolation security group to instance ec2.modify_instance_attribute( InstanceId=instance_id, Groups=[isolation_sg_id] ) # Create snapshot for forensics volumes = [vol['Ebs']['VolumeId'] for vol in instance.get('BlockDeviceMappings', [])] snapshots = [] for volume_id in volumes: snapshot = ec2.create_snapshot( VolumeId=volume_id, Description=f'Forensic snapshot - {finding_type}', TagSpecifications=[{ 'ResourceType': 'snapshot', 'Tags': [ {'Key': 'Purpose', 'Value': 'Forensics'}, {'Key': 'InstanceId', 'Value': instance_id}, {'Key': 'Incident', 'Value': finding_type} ] }] ) snapshots.append(snapshot['SnapshotId']) action_summary = { 'action': 'isolate_instance', 'instance_id': instance_id, 'isolation_sg': isolation_sg_id, 'forensic_snapshots': snapshots, 'status': 'success' } print(f"Instance {instance_id} successfully isolated") return action_summary except Exception as e: print(f"Error isolating instance {instance_id}: {e}") return { 'action': 'isolate_instance', 'instance_id': instance_id, 'status': 'failed', 'error': str(e) } def rotate_iam_credentials(user_name, finding_type): """Rotate compromised IAM credentials""" try: print(f"Rotating credentials for user: {user_name}") actions_taken = [] # List and delete all access keys keys_response = iam.list_access_keys(UserName=user_name) deleted_keys = [] for key in keys_response['AccessKeyMetadata']: access_key_id = key['AccessKeyId'] # Deactivate first (safer than immediate delete) iam.update_access_key( UserName=user_name, AccessKeyId=access_key_id, Status='Inactive' ) deleted_keys.append(access_key_id) actions_taken.append(f'Deactivated access key: {access_key_id}') # Force password reset (if console access exists) try: iam.update_login_profile( UserName=user_name, PasswordResetRequired=True ) actions_taken.append('Forced password reset') except iam.exceptions.NoSuchEntityException: actions_taken.append('No console access to reset') # Attach deny-all policy (additional safety) policy_document = { "Version": "2012-10-17", "Statement": [{ "Effect": "Deny", "Action": "*", "Resource": "*" }] } try: iam.put_user_policy( UserName=user_name, PolicyName='EmergencyDenyAll', PolicyDocument=json.dumps(policy_document) ) actions_taken.append('Applied emergency deny policy') except Exception as e: print(f"Could not apply deny policy: {e}") action_summary = { 'action': 'rotate_credentials', 'user_name': user_name, 'deactivated_keys': deleted_keys, 'actions': actions_taken, 'status': 'success' } print(f"Credentials rotated for {user_name}") return action_summary except Exception as e: print(f"Error rotating credentials for {user_name}: {e}") return { 'action': 'rotate_credentials', 'user_name': user_name, 'status': 'failed', 'error': str(e) }
def block_suspicious_ip(source_ip, finding_type): """Block suspicious IP using NACL""" try: print(f"Blocking suspicious IP: {source_ip}") # Get default VPC vpcs = ec2.describe_vpcs(Filters=[{'Name': 'isDefault', 'Values': ['true']}]) if not vpcs['Vpcs']: return { 'action': 'block_ip', 'source_ip': source_ip, 'status': 'failed', 'error': 'No default VPC found' } vpc_id = vpcs['Vpcs'][0]['VpcId'] # Get network ACLs for VPC nacls = ec2.describe_network_acls( Filters=[{'Name': 'vpc-id', 'Values': [vpc_id]}] ) if not nacls['NetworkAcls']: return { 'action': 'block_ip', 'source_ip': source_ip, 'status': 'failed', 'error': 'No NACLs found' } nacl_id = nacls['NetworkAcls'][0]['NetworkAclId'] # Add deny rule (rule number 1 for highest priority) ec2.create_network_acl_entry( NetworkAclId=nacl_id, RuleNumber=1, Protocol='-1', # All protocols RuleAction='deny', CidrBlock=f'{source_ip}/32' ) action_summary = { 'action': 'block_ip', 'source_ip': source_ip, 'nacl_id': nacl_id, 'status': 'success' } print(f"IP {source_ip} successfully blocked") return action_summary except Exception as e: print(f"Error blocking IP {source_ip}: {e}") return { 'action': 'block_ip', 'source_ip': source_ip, 'status': 'failed', 'error': str(e) } def handle_crypto_mining(instance_id, finding_type): """Handle cryptocurrency mining detection""" try: print(f"Handling crypto mining on instance: {instance_id}") # Stop the instance immediately ec2.stop_instances(InstanceIds=[instance_id]) # Create forensic snapshot response = ec2.describe_instances(InstanceIds=[instance_id]) instance = response['Reservations'][0]['Instances'][0] volumes = [vol['Ebs']['VolumeId'] for vol in instance.get('BlockDeviceMappings', [])] snapshots = [] for volume_id in volumes: snapshot = ec2.create_snapshot( VolumeId=volume_id, Description=f'Crypto mining forensics - {instance_id}' ) snapshots.append(snapshot['SnapshotId']) action_summary = { 'action': 'stop_crypto_mining', 'instance_id': instance_id, 'instance_stopped': True, 'forensic_snapshots': snapshots, 'status': 'success' } print(f"Crypto mining instance {instance_id} stopped") return action_summary except Exception as e: print(f"Error handling crypto mining on {instance_id}: {e}") return { 'action': 'stop_crypto_mining', 'instance_id': instance_id, 'status': 'failed', 'error': str(e) } def send_security_alert(finding, response_actions): """Send comprehensive security alert""" try: # Get severity - handle different formats if isinstance(finding.get('Severity'), dict): severity = finding['Severity'].get('Label', 'UNKNOWN') else: severity_num = finding.get('severity', 0) if severity_num >= 8.5: severity = 'CRITICAL' elif severity_num >= 7: severity = 'HIGH' elif severity_num >= 4: severity = 'MEDIUM' else: severity = 'LOW' # Get finding type (lowercase 'type' for GuardDuty, uppercase 'Type' for Security Hub) finding_type = finding.get('type') or finding.get('Type', 'Unknown') # Get description description = finding.get('description') or finding.get('Description', 'No description') # Get finding ID finding_id = finding.get('id') or finding.get('Id', 'Unknown') # Build alert message message = f"""
π¨ SECURITY INCIDENT DETECTED π¨ Severity: {severity}
Type: {finding_type}
Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S UTC')} Description:
{description} AUTOMATED ACTIONS TAKEN:
""" if response_actions: for action in response_actions: message += f"\nβ {action.get('action', 'unknown')}: {action.get('status', 'unknown')}" if action.get('status') == 'failed': message += f" - Error: {action.get('error', 'unknown')}" else: message += "\nNo automated actions taken (instance not found or not applicable)" message += "\n\nSOC Team: Please review and investigate further." message += f"\n\nFinding ID: {finding_id}" # Send to SNS sns.publish( TopicArn=SNS_TOPIC_ARN, Subject=f'π¨ SECURITY ALERT: {finding_type}', Message=message ) print("Security alert sent successfully") except Exception as e: print(f"Error sending security alert: {e}") import traceback traceback.print_exc() def send_error_alert(error_message, event): """Send alert when response function fails""" try: message = f"""
β SECURITY RESPONSE ERROR An error occurred while processing a security incident. Error: {error_message} Event: {json.dumps(event, indent=2)} Action Required: Manual investigation needed.
""" sns.publish( TopicArn=SNS_TOPIC_ARN, Subject='β Security Response System Error', Message=message ) except Exception as e: print(f"Failed to send error alert: {e}") COMMAND_BLOCK:
import json
import boto3
import os
from datetime import datetime ec2 = boto3.client('ec2')
iam = boto3.client('iam')
sns = boto3.client('sns')
cloudtrail = boto3.client('cloudtrail') SNS_TOPIC_ARN = os.environ.get('SNS_TOPIC_ARN', 'arn:aws:sns:{Your_Region}:{Your_ACCOUNT_ID}:security-incident-alerts') def lambda_handler(event, context): """Main handler for security incidents""" print(f"Received event: {json.dumps(event, indent=2)}") try: # Parse the finding - handle both GuardDuty and Security Hub formats if 'findings' in event.get('detail', {}): # Security Hub format finding = event['detail']['findings'][0] else: # GuardDuty native format finding = event['detail'] # Get severity - handle different formats if isinstance(finding.get('Severity'), dict): severity = finding['Severity'].get('Label', 'UNKNOWN') else: # Map numeric severity to label severity_num = finding.get('severity', 0) if severity_num >= 7: severity = 'HIGH' elif severity_num >= 8.5: severity = 'CRITICAL' elif severity_num >= 4: severity = 'MEDIUM' else: severity = 'LOW' finding_type = finding.get('type', 'Unknown') finding_id = finding.get('id', 'Unknown') print(f"Processing finding: {finding_type} (Severity: {severity})") # Only respond to HIGH and CRITICAL if severity not in ['HIGH', 'CRITICAL']: print(f"Severity {severity} below threshold, skipping automated response") return { 'statusCode': 200, 'body': json.dumps('Severity below threshold') } # Route to appropriate handler response_actions = [] if 'UnauthorizedAccess:EC2' in finding_type or 'Backdoor:EC2' in finding_type or 'Trojan:Runtime' in finding_type: instance_id = extract_instance_id(finding) if instance_id: result = isolate_ec2_instance(instance_id, finding_type) response_actions.append(result) elif 'UnauthorizedAccess:IAMUser' in finding_type or 'CredentialAccess' in finding_type: user_name = extract_user_name(finding) if user_name: result = rotate_iam_credentials(user_name, finding_type) response_actions.append(result) elif 'Impact:EC2/PortSweep' in finding_type or 'Recon:EC2' in finding_type: source_ip = extract_source_ip(finding) if source_ip: result = block_suspicious_ip(source_ip, finding_type) response_actions.append(result) elif 'CryptoCurrency' in finding_type: instance_id = extract_instance_id(finding) if instance_id: result = handle_crypto_mining(instance_id, finding_type) response_actions.append(result) # Send comprehensive alert send_security_alert(finding, response_actions) return { 'statusCode': 200, 'body': json.dumps({ 'message': 'Security incident processed', 'finding_id': finding_id, 'actions_taken': response_actions }) } except Exception as e: print(f"Error processing security incident: {str(e)}") import traceback traceback.print_exc() send_error_alert(str(e), event) return { 'statusCode': 500, 'body': json.dumps(f'Error: {str(e)}') } def extract_instance_id(finding): """Extract EC2 instance ID from finding""" try: for resource in finding.get('Resources', []): if resource.get('Type') == 'Instance': instance_id = resource.get('Id', '').split('/')[-1] return instance_id if instance_id.startswith('i-') else None except Exception as e: print(f"Error extracting instance ID: {e}") return None def extract_user_name(finding): """Extract IAM user name from finding""" try: for resource in finding.get('Resources', []): if resource.get('Type') == 'AccessKey': return resource.get('AccessKeyDetails', {}).get('UserName') except Exception as e: print(f"Error extracting user name: {e}") return None def extract_source_ip(finding): """Extract source IP from finding""" try: service = finding.get('Service', {}) action = service.get('Action', {}) network_connection = action.get('NetworkConnectionAction', {}) return network_connection.get('RemoteIpDetails', {}).get('IpAddressV4') except Exception as e: print(f"Error extracting source IP: {e}") return None def isolate_ec2_instance(instance_id, finding_type): """Isolate compromised EC2 instance""" try: print(f"Isolating instance: {instance_id}") # Get instance details response = ec2.describe_instances(InstanceIds=[instance_id]) instance = response['Reservations'][0]['Instances'][0] vpc_id = instance['VpcId'] # Create isolation security group sg_name = f'isolation-{instance_id}-{datetime.now().strftime("%Y%m%d%H%M%S")}' sg_response = ec2.create_security_group( GroupName=sg_name, Description=f'Quarantine SG for {instance_id} - {finding_type}', VpcId=vpc_id ) isolation_sg_id = sg_response['GroupId'] # Remove all default egress rules (blocks all outbound) ec2.revoke_security_group_egress( GroupId=isolation_sg_id, IpPermissions=[{ 'IpProtocol': '-1', 'FromPort': -1, 'ToPort': -1, 'IpRanges': [{'CidrIp': '0.0.0.0/0'}] }] ) # Add only SSH access from admin IP (optional) # ec2.authorize_security_group_ingress( # GroupId=isolation_sg_id, # IpPermissions=[{ # 'IpProtocol': 'tcp', # 'FromPort': 22, # 'ToPort': 22, # 'IpRanges': [{'CidrIp': 'YOUR_ADMIN_IP/32', 'Description': 'Admin access'}] # }] # ) # Apply isolation security group to instance ec2.modify_instance_attribute( InstanceId=instance_id, Groups=[isolation_sg_id] ) # Create snapshot for forensics volumes = [vol['Ebs']['VolumeId'] for vol in instance.get('BlockDeviceMappings', [])] snapshots = [] for volume_id in volumes: snapshot = ec2.create_snapshot( VolumeId=volume_id, Description=f'Forensic snapshot - {finding_type}', TagSpecifications=[{ 'ResourceType': 'snapshot', 'Tags': [ {'Key': 'Purpose', 'Value': 'Forensics'}, {'Key': 'InstanceId', 'Value': instance_id}, {'Key': 'Incident', 'Value': finding_type} ] }] ) snapshots.append(snapshot['SnapshotId']) action_summary = { 'action': 'isolate_instance', 'instance_id': instance_id, 'isolation_sg': isolation_sg_id, 'forensic_snapshots': snapshots, 'status': 'success' } print(f"Instance {instance_id} successfully isolated") return action_summary except Exception as e: print(f"Error isolating instance {instance_id}: {e}") return { 'action': 'isolate_instance', 'instance_id': instance_id, 'status': 'failed', 'error': str(e) } def rotate_iam_credentials(user_name, finding_type): """Rotate compromised IAM credentials""" try: print(f"Rotating credentials for user: {user_name}") actions_taken = [] # List and delete all access keys keys_response = iam.list_access_keys(UserName=user_name) deleted_keys = [] for key in keys_response['AccessKeyMetadata']: access_key_id = key['AccessKeyId'] # Deactivate first (safer than immediate delete) iam.update_access_key( UserName=user_name, AccessKeyId=access_key_id, Status='Inactive' ) deleted_keys.append(access_key_id) actions_taken.append(f'Deactivated access key: {access_key_id}') # Force password reset (if console access exists) try: iam.update_login_profile( UserName=user_name, PasswordResetRequired=True ) actions_taken.append('Forced password reset') except iam.exceptions.NoSuchEntityException: actions_taken.append('No console access to reset') # Attach deny-all policy (additional safety) policy_document = { "Version": "2012-10-17", "Statement": [{ "Effect": "Deny", "Action": "*", "Resource": "*" }] } try: iam.put_user_policy( UserName=user_name, PolicyName='EmergencyDenyAll', PolicyDocument=json.dumps(policy_document) ) actions_taken.append('Applied emergency deny policy') except Exception as e: print(f"Could not apply deny policy: {e}") action_summary = { 'action': 'rotate_credentials', 'user_name': user_name, 'deactivated_keys': deleted_keys, 'actions': actions_taken, 'status': 'success' } print(f"Credentials rotated for {user_name}") return action_summary except Exception as e: print(f"Error rotating credentials for {user_name}: {e}") return { 'action': 'rotate_credentials', 'user_name': user_name, 'status': 'failed', 'error': str(e) }
def block_suspicious_ip(source_ip, finding_type): """Block suspicious IP using NACL""" try: print(f"Blocking suspicious IP: {source_ip}") # Get default VPC vpcs = ec2.describe_vpcs(Filters=[{'Name': 'isDefault', 'Values': ['true']}]) if not vpcs['Vpcs']: return { 'action': 'block_ip', 'source_ip': source_ip, 'status': 'failed', 'error': 'No default VPC found' } vpc_id = vpcs['Vpcs'][0]['VpcId'] # Get network ACLs for VPC nacls = ec2.describe_network_acls( Filters=[{'Name': 'vpc-id', 'Values': [vpc_id]}] ) if not nacls['NetworkAcls']: return { 'action': 'block_ip', 'source_ip': source_ip, 'status': 'failed', 'error': 'No NACLs found' } nacl_id = nacls['NetworkAcls'][0]['NetworkAclId'] # Add deny rule (rule number 1 for highest priority) ec2.create_network_acl_entry( NetworkAclId=nacl_id, RuleNumber=1, Protocol='-1', # All protocols RuleAction='deny', CidrBlock=f'{source_ip}/32' ) action_summary = { 'action': 'block_ip', 'source_ip': source_ip, 'nacl_id': nacl_id, 'status': 'success' } print(f"IP {source_ip} successfully blocked") return action_summary except Exception as e: print(f"Error blocking IP {source_ip}: {e}") return { 'action': 'block_ip', 'source_ip': source_ip, 'status': 'failed', 'error': str(e) } def handle_crypto_mining(instance_id, finding_type): """Handle cryptocurrency mining detection""" try: print(f"Handling crypto mining on instance: {instance_id}") # Stop the instance immediately ec2.stop_instances(InstanceIds=[instance_id]) # Create forensic snapshot response = ec2.describe_instances(InstanceIds=[instance_id]) instance = response['Reservations'][0]['Instances'][0] volumes = [vol['Ebs']['VolumeId'] for vol in instance.get('BlockDeviceMappings', [])] snapshots = [] for volume_id in volumes: snapshot = ec2.create_snapshot( VolumeId=volume_id, Description=f'Crypto mining forensics - {instance_id}' ) snapshots.append(snapshot['SnapshotId']) action_summary = { 'action': 'stop_crypto_mining', 'instance_id': instance_id, 'instance_stopped': True, 'forensic_snapshots': snapshots, 'status': 'success' } print(f"Crypto mining instance {instance_id} stopped") return action_summary except Exception as e: print(f"Error handling crypto mining on {instance_id}: {e}") return { 'action': 'stop_crypto_mining', 'instance_id': instance_id, 'status': 'failed', 'error': str(e) } def send_security_alert(finding, response_actions): """Send comprehensive security alert""" try: # Get severity - handle different formats if isinstance(finding.get('Severity'), dict): severity = finding['Severity'].get('Label', 'UNKNOWN') else: severity_num = finding.get('severity', 0) if severity_num >= 8.5: severity = 'CRITICAL' elif severity_num >= 7: severity = 'HIGH' elif severity_num >= 4: severity = 'MEDIUM' else: severity = 'LOW' # Get finding type (lowercase 'type' for GuardDuty, uppercase 'Type' for Security Hub) finding_type = finding.get('type') or finding.get('Type', 'Unknown') # Get description description = finding.get('description') or finding.get('Description', 'No description') # Get finding ID finding_id = finding.get('id') or finding.get('Id', 'Unknown') # Build alert message message = f"""
π¨ SECURITY INCIDENT DETECTED π¨ Severity: {severity}
Type: {finding_type}
Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S UTC')} Description:
{description} AUTOMATED ACTIONS TAKEN:
""" if response_actions: for action in response_actions: message += f"\nβ {action.get('action', 'unknown')}: {action.get('status', 'unknown')}" if action.get('status') == 'failed': message += f" - Error: {action.get('error', 'unknown')}" else: message += "\nNo automated actions taken (instance not found or not applicable)" message += "\n\nSOC Team: Please review and investigate further." message += f"\n\nFinding ID: {finding_id}" # Send to SNS sns.publish( TopicArn=SNS_TOPIC_ARN, Subject=f'π¨ SECURITY ALERT: {finding_type}', Message=message ) print("Security alert sent successfully") except Exception as e: print(f"Error sending security alert: {e}") import traceback traceback.print_exc() def send_error_alert(error_message, event): """Send alert when response function fails""" try: message = f"""
β SECURITY RESPONSE ERROR An error occurred while processing a security incident. Error: {error_message} Event: {json.dumps(event, indent=2)} Action Required: Manual investigation needed.
""" sns.publish( TopicArn=SNS_TOPIC_ARN, Subject='β Security Response System Error', Message=message ) except Exception as e: print(f"Failed to send error alert: {e}") CODE_BLOCK:
{ "version": "0", "id": "test-event-12345", "detail-type": "GuardDuty Finding", "source": "aws.guardduty", "account": "{YOUR_ACCOUNT_ID}", "time": "2024-12-17T10:00:00Z", "region": "ap-southeast-2", "resources": [], "detail": { "schemaVersion": "2.0", "accountId": "{YOUR_ACCOUNT_ID}", "region": "ap-southeast-2", "partition": "aws", "id": "test-finding-ec2-123", "arn": "arn:aws:guardduty:{YOUR_REGION}:{YOUR_ACCOUNT_ID}:detector/test/finding/test-123", "type": "UnauthorizedAccess:EC2/MaliciousIPCaller.Custom", "severity": 8, "createdAt": "2024-12-17T09:35:00.000Z", "updatedAt": "2024-12-17T09:35:00.000Z", "title": "EC2 instance is communicating with a malicious IP address", "description": "EC2 instance i-test12345678 is communicating with malicious IP 198.51.100.1. This is a TEST finding for security automation validation.", "service": { "serviceName": "guardduty", "detectorId": "test-detector-id", "action": { "actionType": "NETWORK_CONNECTION", "networkConnectionAction": { "blocked": false, "connectionDirection": "INBOUND", "localPortDetails": { "port": 22, "portName": "SSH" }, "protocol": "TCP", "remoteIpDetails": { "ipAddressV4": "198.51.100.1", "organization": { "asn": "12345", "asnOrg": "Malicious ASN" }, "country": { "countryName": "Unknown" } } } }, "archived": false, "count": 1, "eventFirstSeen": "2024-12-17T09:30:00.000Z", "eventLastSeen": "2024-12-17T09:35:00.000Z", "resourceRole": "TARGET" }, "Severity": { "Label": "HIGH", "Normalized": 70, "Product": 8.0 }, "Resources": [ { "Type": "Instance", "Id": "arn:aws:ec2:{YOUR_REGION}:{YOUR_ACCOUNT_ID}:instance/i-test12345678", "Details": { "Instance": { "InstanceId": "i-test12345678", "InstanceType": "t2.micro", "LaunchTime": "2024-12-17T09:00:00.000Z", "Platform": "Linux" } } } ], "Id": "test-finding-ec2-123" }
} Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
{ "version": "0", "id": "test-event-12345", "detail-type": "GuardDuty Finding", "source": "aws.guardduty", "account": "{YOUR_ACCOUNT_ID}", "time": "2024-12-17T10:00:00Z", "region": "ap-southeast-2", "resources": [], "detail": { "schemaVersion": "2.0", "accountId": "{YOUR_ACCOUNT_ID}", "region": "ap-southeast-2", "partition": "aws", "id": "test-finding-ec2-123", "arn": "arn:aws:guardduty:{YOUR_REGION}:{YOUR_ACCOUNT_ID}:detector/test/finding/test-123", "type": "UnauthorizedAccess:EC2/MaliciousIPCaller.Custom", "severity": 8, "createdAt": "2024-12-17T09:35:00.000Z", "updatedAt": "2024-12-17T09:35:00.000Z", "title": "EC2 instance is communicating with a malicious IP address", "description": "EC2 instance i-test12345678 is communicating with malicious IP 198.51.100.1. This is a TEST finding for security automation validation.", "service": { "serviceName": "guardduty", "detectorId": "test-detector-id", "action": { "actionType": "NETWORK_CONNECTION", "networkConnectionAction": { "blocked": false, "connectionDirection": "INBOUND", "localPortDetails": { "port": 22, "portName": "SSH" }, "protocol": "TCP", "remoteIpDetails": { "ipAddressV4": "198.51.100.1", "organization": { "asn": "12345", "asnOrg": "Malicious ASN" }, "country": { "countryName": "Unknown" } } } }, "archived": false, "count": 1, "eventFirstSeen": "2024-12-17T09:30:00.000Z", "eventLastSeen": "2024-12-17T09:35:00.000Z", "resourceRole": "TARGET" }, "Severity": { "Label": "HIGH", "Normalized": 70, "Product": 8.0 }, "Resources": [ { "Type": "Instance", "Id": "arn:aws:ec2:{YOUR_REGION}:{YOUR_ACCOUNT_ID}:instance/i-test12345678", "Details": { "Instance": { "InstanceId": "i-test12345678", "InstanceType": "t2.micro", "LaunchTime": "2024-12-17T09:00:00.000Z", "Platform": "Linux" } } } ], "Id": "test-finding-ec2-123" }
} CODE_BLOCK:
{ "version": "0", "id": "test-event-12345", "detail-type": "GuardDuty Finding", "source": "aws.guardduty", "account": "{YOUR_ACCOUNT_ID}", "time": "2024-12-17T10:00:00Z", "region": "ap-southeast-2", "resources": [], "detail": { "schemaVersion": "2.0", "accountId": "{YOUR_ACCOUNT_ID}", "region": "ap-southeast-2", "partition": "aws", "id": "test-finding-ec2-123", "arn": "arn:aws:guardduty:{YOUR_REGION}:{YOUR_ACCOUNT_ID}:detector/test/finding/test-123", "type": "UnauthorizedAccess:EC2/MaliciousIPCaller.Custom", "severity": 8, "createdAt": "2024-12-17T09:35:00.000Z", "updatedAt": "2024-12-17T09:35:00.000Z", "title": "EC2 instance is communicating with a malicious IP address", "description": "EC2 instance i-test12345678 is communicating with malicious IP 198.51.100.1. This is a TEST finding for security automation validation.", "service": { "serviceName": "guardduty", "detectorId": "test-detector-id", "action": { "actionType": "NETWORK_CONNECTION", "networkConnectionAction": { "blocked": false, "connectionDirection": "INBOUND", "localPortDetails": { "port": 22, "portName": "SSH" }, "protocol": "TCP", "remoteIpDetails": { "ipAddressV4": "198.51.100.1", "organization": { "asn": "12345", "asnOrg": "Malicious ASN" }, "country": { "countryName": "Unknown" } } } }, "archived": false, "count": 1, "eventFirstSeen": "2024-12-17T09:30:00.000Z", "eventLastSeen": "2024-12-17T09:35:00.000Z", "resourceRole": "TARGET" }, "Severity": { "Label": "HIGH", "Normalized": 70, "Product": 8.0 }, "Resources": [ { "Type": "Instance", "Id": "arn:aws:ec2:{YOUR_REGION}:{YOUR_ACCOUNT_ID}:instance/i-test12345678", "Details": { "Instance": { "InstanceId": "i-test12345678", "InstanceType": "t2.micro", "LaunchTime": "2024-12-17T09:00:00.000Z", "Platform": "Linux" } } } ], "Id": "test-finding-ec2-123" }
} CODE_BLOCK:
{ "source": ["aws.guardduty"], "detail-type": ["GuardDuty Finding"], "detail": { "severity": [7, 7.0, 7.1, 7.2, 7.3, 7.4, 7.5, 7.6, 7.7, 7.8, 7.9, 8, 8.0, 8.1, 8.2, 8.3, 8.4, 8.5, 8.6, 8.7, 8.8, 8.9] }
} Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
{ "source": ["aws.guardduty"], "detail-type": ["GuardDuty Finding"], "detail": { "severity": [7, 7.0, 7.1, 7.2, 7.3, 7.4, 7.5, 7.6, 7.7, 7.8, 7.9, 8, 8.0, 8.1, 8.2, 8.3, 8.4, 8.5, 8.6, 8.7, 8.8, 8.9] }
} CODE_BLOCK:
{ "source": ["aws.guardduty"], "detail-type": ["GuardDuty Finding"], "detail": { "severity": [7, 7.0, 7.1, 7.2, 7.3, 7.4, 7.5, 7.6, 7.7, 7.8, 7.9, 8, 8.0, 8.1, 8.2, 8.3, 8.4, 8.5, 8.6, 8.7, 8.8, 8.9] }
} CODE_BLOCK:
{ "source": ["aws.securityhub"], "detail-type": ["Security Hub Findings - Imported"], "detail": { "findings": { "Severity": { "Label": ["CRITICAL", "HIGH"] } } }
} Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
{ "source": ["aws.securityhub"], "detail-type": ["Security Hub Findings - Imported"], "detail": { "findings": { "Severity": { "Label": ["CRITICAL", "HIGH"] } } }
} CODE_BLOCK:
{ "source": ["aws.securityhub"], "detail-type": ["Security Hub Findings - Imported"], "detail": { "findings": { "Severity": { "Label": ["CRITICAL", "HIGH"] } } }
} COMMAND_BLOCK:
# Generate GuardDuty sample finding
aws guardduty create-sample-findings \ --detector-id $DETECTOR_ID \ --finding-types "UnauthorizedAccess:EC2/MaliciousIPCaller.Custom" Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
# Generate GuardDuty sample finding
aws guardduty create-sample-findings \ --detector-id $DETECTOR_ID \ --finding-types "UnauthorizedAccess:EC2/MaliciousIPCaller.Custom" COMMAND_BLOCK:
# Generate GuardDuty sample finding
aws guardduty create-sample-findings \ --detector-id $DETECTOR_ID \ --finding-types "UnauthorizedAccess:EC2/MaliciousIPCaller.Custom" CODE_BLOCK:
{ "Comment": "Advanced Security Incident Response Workflow", "StartAt": "ParseIncident", "States": { "ParseIncident": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "Parameters": { "FunctionName": "security-incident-responder", "Payload.$": "$" }, "ResultPath": "$.responseResult", "Next": "CheckSeverity", "Catch": [{ "ErrorEquals": ["States.ALL"], "Next": "NotifyFailure" }] }, "CheckSeverity": { "Type": "Choice", "Choices": [ { "Variable": "$.detail.findings[0].Severity.Label", "StringEquals": "CRITICAL", "Next": "CriticalIncidentPath" }, { "Variable": "$.detail.findings[0].Severity.Label", "StringEquals": "HIGH", "Next": "HighIncidentPath" } ], "Default": "LogAndExit" }, "CriticalIncidentPath": { "Type": "Parallel", "Branches": [ { "StartAt": "CreateTicket", "States": { "CreateTicket": { "Type": "Task", "Resource": "arn:aws:states:::sns:publish", "Parameters": { "TopicArn": "arn:aws:sns:{YOUR_REGION}:{YOUR_ACCOUNT_ID}:security-incident-alerts", "Subject": "CRITICAL Security Incident - Immediate Action Required", "Message.$": "$.detail.findings[0].Description" }, "End": true } } }, { "StartAt": "WaitForSOC", "States": { "WaitForSOC": { "Type": "Wait", "Seconds": 300, "Next": "CheckIfResolved" }, "CheckIfResolved": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "Parameters": { "FunctionName": "check-incident-status", "Payload": { "findingId.$": "$.detail.findings[0].Id" } }, "End": true } } } ], "Next": "LogSuccess" }, "HighIncidentPath": { "Type": "Task", "Resource": "arn:aws:states:::sns:publish", "Parameters": { "TopicArn": "arn:aws:sns:{YOUR_REGION}:{YOUR_ACCOUNT_ID}:security-incident-alerts", "Subject": "HIGH Security Incident Detected", "Message.$": "$.detail.findings[0].Description" }, "Next": "LogSuccess" }, "LogSuccess": { "Type": "Succeed" }, "LogAndExit": { "Type": "Succeed" }, "NotifyFailure": { "Type": "Task", "Resource": "arn:aws:states:::sns:publish", "Parameters": { "TopicArn": "arn:aws:sns:YOUR_REGION:YOUR_ACCOUNT:security-incident-alerts", "Subject": "Security Response Workflow Failed", "Message": "The security incident response workflow encountered an error." }, "Next": "FailState" }, "FailState": { "Type": "Fail", "Error": "WorkflowFailed", "Cause": "Security response workflow failed to complete" } }
} Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
{ "Comment": "Advanced Security Incident Response Workflow", "StartAt": "ParseIncident", "States": { "ParseIncident": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "Parameters": { "FunctionName": "security-incident-responder", "Payload.$": "$" }, "ResultPath": "$.responseResult", "Next": "CheckSeverity", "Catch": [{ "ErrorEquals": ["States.ALL"], "Next": "NotifyFailure" }] }, "CheckSeverity": { "Type": "Choice", "Choices": [ { "Variable": "$.detail.findings[0].Severity.Label", "StringEquals": "CRITICAL", "Next": "CriticalIncidentPath" }, { "Variable": "$.detail.findings[0].Severity.Label", "StringEquals": "HIGH", "Next": "HighIncidentPath" } ], "Default": "LogAndExit" }, "CriticalIncidentPath": { "Type": "Parallel", "Branches": [ { "StartAt": "CreateTicket", "States": { "CreateTicket": { "Type": "Task", "Resource": "arn:aws:states:::sns:publish", "Parameters": { "TopicArn": "arn:aws:sns:{YOUR_REGION}:{YOUR_ACCOUNT_ID}:security-incident-alerts", "Subject": "CRITICAL Security Incident - Immediate Action Required", "Message.$": "$.detail.findings[0].Description" }, "End": true } } }, { "StartAt": "WaitForSOC", "States": { "WaitForSOC": { "Type": "Wait", "Seconds": 300, "Next": "CheckIfResolved" }, "CheckIfResolved": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "Parameters": { "FunctionName": "check-incident-status", "Payload": { "findingId.$": "$.detail.findings[0].Id" } }, "End": true } } } ], "Next": "LogSuccess" }, "HighIncidentPath": { "Type": "Task", "Resource": "arn:aws:states:::sns:publish", "Parameters": { "TopicArn": "arn:aws:sns:{YOUR_REGION}:{YOUR_ACCOUNT_ID}:security-incident-alerts", "Subject": "HIGH Security Incident Detected", "Message.$": "$.detail.findings[0].Description" }, "Next": "LogSuccess" }, "LogSuccess": { "Type": "Succeed" }, "LogAndExit": { "Type": "Succeed" }, "NotifyFailure": { "Type": "Task", "Resource": "arn:aws:states:::sns:publish", "Parameters": { "TopicArn": "arn:aws:sns:YOUR_REGION:YOUR_ACCOUNT:security-incident-alerts", "Subject": "Security Response Workflow Failed", "Message": "The security incident response workflow encountered an error." }, "Next": "FailState" }, "FailState": { "Type": "Fail", "Error": "WorkflowFailed", "Cause": "Security response workflow failed to complete" } }
} CODE_BLOCK:
{ "Comment": "Advanced Security Incident Response Workflow", "StartAt": "ParseIncident", "States": { "ParseIncident": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "Parameters": { "FunctionName": "security-incident-responder", "Payload.$": "$" }, "ResultPath": "$.responseResult", "Next": "CheckSeverity", "Catch": [{ "ErrorEquals": ["States.ALL"], "Next": "NotifyFailure" }] }, "CheckSeverity": { "Type": "Choice", "Choices": [ { "Variable": "$.detail.findings[0].Severity.Label", "StringEquals": "CRITICAL", "Next": "CriticalIncidentPath" }, { "Variable": "$.detail.findings[0].Severity.Label", "StringEquals": "HIGH", "Next": "HighIncidentPath" } ], "Default": "LogAndExit" }, "CriticalIncidentPath": { "Type": "Parallel", "Branches": [ { "StartAt": "CreateTicket", "States": { "CreateTicket": { "Type": "Task", "Resource": "arn:aws:states:::sns:publish", "Parameters": { "TopicArn": "arn:aws:sns:{YOUR_REGION}:{YOUR_ACCOUNT_ID}:security-incident-alerts", "Subject": "CRITICAL Security Incident - Immediate Action Required", "Message.$": "$.detail.findings[0].Description" }, "End": true } } }, { "StartAt": "WaitForSOC", "States": { "WaitForSOC": { "Type": "Wait", "Seconds": 300, "Next": "CheckIfResolved" }, "CheckIfResolved": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "Parameters": { "FunctionName": "check-incident-status", "Payload": { "findingId.$": "$.detail.findings[0].Id" } }, "End": true } } } ], "Next": "LogSuccess" }, "HighIncidentPath": { "Type": "Task", "Resource": "arn:aws:states:::sns:publish", "Parameters": { "TopicArn": "arn:aws:sns:{YOUR_REGION}:{YOUR_ACCOUNT_ID}:security-incident-alerts", "Subject": "HIGH Security Incident Detected", "Message.$": "$.detail.findings[0].Description" }, "Next": "LogSuccess" }, "LogSuccess": { "Type": "Succeed" }, "LogAndExit": { "Type": "Succeed" }, "NotifyFailure": { "Type": "Task", "Resource": "arn:aws:states:::sns:publish", "Parameters": { "TopicArn": "arn:aws:sns:YOUR_REGION:YOUR_ACCOUNT:security-incident-alerts", "Subject": "Security Response Workflow Failed", "Message": "The security incident response workflow encountered an error." }, "Next": "FailState" }, "FailState": { "Type": "Fail", "Error": "WorkflowFailed", "Cause": "Security response workflow failed to complete" } }
} CODE_BLOCK:
{ "source": ["aws.guardduty", "aws.securityhub"], "detail": { "findings": { "Severity": { "Label": ["CRITICAL"] } } }
} Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
{ "source": ["aws.guardduty", "aws.securityhub"], "detail": { "findings": { "Severity": { "Label": ["CRITICAL"] } } }
} CODE_BLOCK:
{ "source": ["aws.guardduty", "aws.securityhub"], "detail": { "findings": { "Severity": { "Label": ["CRITICAL"] } } }
} COMMAND_BLOCK:
import boto3
import json config = boto3.client('config')
iam = boto3.client('iam')
ec2 = boto3.client('ec2') def lambda_handler(event, context): """Check compliance status and generate report""" compliance_report = { 'timestamp': event['time'], 'checks': {} } # 1. Check Config rules compliance config_rules = config.describe_compliance_by_config_rule() non_compliant = [] for rule in config_rules['ComplianceByConfigRules']: if rule['Compliance']['ComplianceType'] == 'NON_COMPLIANT': non_compliant.append(rule['ConfigRuleName']) compliance_report['checks']['config_rules'] = { 'total': len(config_rules['ComplianceByConfigRules']), 'non_compliant': non_compliant } # 2. Check IAM password policy try: password_policy = iam.get_account_password_policy() compliance_report['checks']['iam_password_policy'] = { 'exists': True, 'minimum_length': password_policy['PasswordPolicy'].get('MinimumPasswordLength', 0), 'require_symbols': password_policy['PasswordPolicy'].get('RequireSymbols', False), 'require_numbers': password_policy['PasswordPolicy'].get('RequireNumbers', False) } except iam.exceptions.NoSuchEntityException: compliance_report['checks']['iam_password_policy'] = { 'exists': False, 'compliant': False } # 3. Check MFA on root account try: summary = iam.get_account_summary() compliance_report['checks']['root_mfa'] = { 'enabled': summary['SummaryMap'].get('AccountMFAEnabled', 0) == 1 } except Exception as e: compliance_report['checks']['root_mfa'] = {'error': str(e)} # 4. Check for unencrypted EBS volumes volumes = ec2.describe_volumes() unencrypted = [v['VolumeId'] for v in volumes['Volumes'] if not v.get('Encrypted', False)] compliance_report['checks']['ebs_encryption'] = { 'total_volumes': len(volumes['Volumes']), 'unencrypted': unencrypted, 'compliant': len(unencrypted) == 0 } # 5. Check for public S3 buckets s3 = boto3.client('s3') buckets = s3.list_buckets()['Buckets'] public_buckets = [] for bucket in buckets: try: acl = s3.get_bucket_acl(Bucket=bucket['Name']) for grant in acl['Grants']: if grant['Grantee'].get('Type') == 'Group' and 'AllUsers' in grant['Grantee'].get('URI', ''): public_buckets.append(bucket['Name']) break except: pass compliance_report['checks']['s3_public_access'] = { 'public_buckets': public_buckets, 'compliant': len(public_buckets) == 0 } # Calculate overall compliance score compliant_checks = sum([ 1 for check in compliance_report['checks'].values() if isinstance(check, dict) and check.get('compliant', False) ]) total_checks = len(compliance_report['checks']) compliance_report['overall_score'] = (compliant_checks / total_checks * 100) if total_checks > 0 else 0 return { 'statusCode': 200, 'body': json.dumps(compliance_report, default=str) } Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
import boto3
import json config = boto3.client('config')
iam = boto3.client('iam')
ec2 = boto3.client('ec2') def lambda_handler(event, context): """Check compliance status and generate report""" compliance_report = { 'timestamp': event['time'], 'checks': {} } # 1. Check Config rules compliance config_rules = config.describe_compliance_by_config_rule() non_compliant = [] for rule in config_rules['ComplianceByConfigRules']: if rule['Compliance']['ComplianceType'] == 'NON_COMPLIANT': non_compliant.append(rule['ConfigRuleName']) compliance_report['checks']['config_rules'] = { 'total': len(config_rules['ComplianceByConfigRules']), 'non_compliant': non_compliant } # 2. Check IAM password policy try: password_policy = iam.get_account_password_policy() compliance_report['checks']['iam_password_policy'] = { 'exists': True, 'minimum_length': password_policy['PasswordPolicy'].get('MinimumPasswordLength', 0), 'require_symbols': password_policy['PasswordPolicy'].get('RequireSymbols', False), 'require_numbers': password_policy['PasswordPolicy'].get('RequireNumbers', False) } except iam.exceptions.NoSuchEntityException: compliance_report['checks']['iam_password_policy'] = { 'exists': False, 'compliant': False } # 3. Check MFA on root account try: summary = iam.get_account_summary() compliance_report['checks']['root_mfa'] = { 'enabled': summary['SummaryMap'].get('AccountMFAEnabled', 0) == 1 } except Exception as e: compliance_report['checks']['root_mfa'] = {'error': str(e)} # 4. Check for unencrypted EBS volumes volumes = ec2.describe_volumes() unencrypted = [v['VolumeId'] for v in volumes['Volumes'] if not v.get('Encrypted', False)] compliance_report['checks']['ebs_encryption'] = { 'total_volumes': len(volumes['Volumes']), 'unencrypted': unencrypted, 'compliant': len(unencrypted) == 0 } # 5. Check for public S3 buckets s3 = boto3.client('s3') buckets = s3.list_buckets()['Buckets'] public_buckets = [] for bucket in buckets: try: acl = s3.get_bucket_acl(Bucket=bucket['Name']) for grant in acl['Grants']: if grant['Grantee'].get('Type') == 'Group' and 'AllUsers' in grant['Grantee'].get('URI', ''): public_buckets.append(bucket['Name']) break except: pass compliance_report['checks']['s3_public_access'] = { 'public_buckets': public_buckets, 'compliant': len(public_buckets) == 0 } # Calculate overall compliance score compliant_checks = sum([ 1 for check in compliance_report['checks'].values() if isinstance(check, dict) and check.get('compliant', False) ]) total_checks = len(compliance_report['checks']) compliance_report['overall_score'] = (compliant_checks / total_checks * 100) if total_checks > 0 else 0 return { 'statusCode': 200, 'body': json.dumps(compliance_report, default=str) } COMMAND_BLOCK:
import boto3
import json config = boto3.client('config')
iam = boto3.client('iam')
ec2 = boto3.client('ec2') def lambda_handler(event, context): """Check compliance status and generate report""" compliance_report = { 'timestamp': event['time'], 'checks': {} } # 1. Check Config rules compliance config_rules = config.describe_compliance_by_config_rule() non_compliant = [] for rule in config_rules['ComplianceByConfigRules']: if rule['Compliance']['ComplianceType'] == 'NON_COMPLIANT': non_compliant.append(rule['ConfigRuleName']) compliance_report['checks']['config_rules'] = { 'total': len(config_rules['ComplianceByConfigRules']), 'non_compliant': non_compliant } # 2. Check IAM password policy try: password_policy = iam.get_account_password_policy() compliance_report['checks']['iam_password_policy'] = { 'exists': True, 'minimum_length': password_policy['PasswordPolicy'].get('MinimumPasswordLength', 0), 'require_symbols': password_policy['PasswordPolicy'].get('RequireSymbols', False), 'require_numbers': password_policy['PasswordPolicy'].get('RequireNumbers', False) } except iam.exceptions.NoSuchEntityException: compliance_report['checks']['iam_password_policy'] = { 'exists': False, 'compliant': False } # 3. Check MFA on root account try: summary = iam.get_account_summary() compliance_report['checks']['root_mfa'] = { 'enabled': summary['SummaryMap'].get('AccountMFAEnabled', 0) == 1 } except Exception as e: compliance_report['checks']['root_mfa'] = {'error': str(e)} # 4. Check for unencrypted EBS volumes volumes = ec2.describe_volumes() unencrypted = [v['VolumeId'] for v in volumes['Volumes'] if not v.get('Encrypted', False)] compliance_report['checks']['ebs_encryption'] = { 'total_volumes': len(volumes['Volumes']), 'unencrypted': unencrypted, 'compliant': len(unencrypted) == 0 } # 5. Check for public S3 buckets s3 = boto3.client('s3') buckets = s3.list_buckets()['Buckets'] public_buckets = [] for bucket in buckets: try: acl = s3.get_bucket_acl(Bucket=bucket['Name']) for grant in acl['Grants']: if grant['Grantee'].get('Type') == 'Group' and 'AllUsers' in grant['Grantee'].get('URI', ''): public_buckets.append(bucket['Name']) break except: pass compliance_report['checks']['s3_public_access'] = { 'public_buckets': public_buckets, 'compliant': len(public_buckets) == 0 } # Calculate overall compliance score compliant_checks = sum([ 1 for check in compliance_report['checks'].values() if isinstance(check, dict) and check.get('compliant', False) ]) total_checks = len(compliance_report['checks']) compliance_report['overall_score'] = (compliant_checks / total_checks * 100) if total_checks > 0 else 0 return { 'statusCode': 200, 'body': json.dumps(compliance_report, default=str) } CODE_BLOCK:
fields @timestamp, userIdentity.principalId, eventName, sourceIPAddress
| filter errorCode like /Unauthorized/ or errorCode like /AccessDenied/
| sort @timestamp desc
| limit 20 Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
fields @timestamp, userIdentity.principalId, eventName, sourceIPAddress
| filter errorCode like /Unauthorized/ or errorCode like /AccessDenied/
| sort @timestamp desc
| limit 20 CODE_BLOCK:
fields @timestamp, userIdentity.principalId, eventName, sourceIPAddress
| filter errorCode like /Unauthorized/ or errorCode like /AccessDenied/
| sort @timestamp desc
| limit 20 CODE_BLOCK:
Security Incident Response Runbook
=================================== System Overview
---------------
- Primary Function: Automated security incident detection and response
- Detection: AWS GuardDuty
- Response: Lambda automated actions
- Alerting: SNS email notifications
- Region: ap-southeast-2 (Sydney) Quick Response Guide
-------------------- HIGH SEVERITY INCIDENTS:
1. Check email alert from SNS topic: security-incident-alerts
2. Review CloudWatch Dashboard: SecurityIncidentResponse
3. Check Lambda logs: /aws/lambda/security-incident-responder
4. Verify automated actions taken (isolate EC2, rotate IAM, etc) MANUAL INTERVENTION REQUIRED:
If automated response failed:
1. Check error message in email alert
2. Review CloudWatch logs for detailed error
3. Manually execute remediation actions
4. Update this runbook with learnings Automated Response Actions
-------------------------- UnauthorizedAccess:EC2 Incidents:
- Action: Isolate EC2 instance with new security group
- Action: Create forensic EBS snapshots
- Action: Send alert to SOC team IAM Credential Compromise:
- Action: Deactivate all IAM access keys
- Action: Force password reset
- Action: Apply emergency deny-all policy Crypto Mining Detection:
- Action: Stop EC2 instance immediately
- Action: Create forensic snapshots
- Action: Send critical alert Malicious IP Detection:
- Action: Block source IP via Network ACL
- Action: Log incident details System Components
----------------- GuardDuty:
- Detector ID: 52cd95edfb049e79eada14b660d424b7
- Status: Check via Console
- Sample findings: Can generate for testing Lambda Functions:
- security-incident-responder: Main response function
- Memory: 512 MB
- Timeout: 5 minutes
- Region: ap-southeast-2 EventBridge Rules:
- guardduty-high-severity-findings: Routes HIGH/CRITICAL findings
- Trigger: Lambda function SNS Topics:
- security-incident-alerts: Email notifications
- Subscribers: [Your email] CloudWatch Dashboard:
- Name: SecurityIncidentResponse
- Widgets: Lambda metrics, SNS, Logs Step Functions:
- security-incident-workflow: Complex incident workflows
- State Machine ARN: Check Console Contact Information
-------------------
Security Team Email: [email protected] (ganti dengan email real)
On-Call Phone: +62-xxx-xxx-xxxx (ganti dengan nomor real)
Escalation: [Manager name and contact] AWS Account ID: YOUR_ACCOUNT_ID
Primary Region: ap-southeast-2 (Sydney) Monthly Testing Procedures
--------------------------- Test Schedule: First Monday of each month, 10:00 AM Test 1 - Generate Sample Finding:
1. Open GuardDuty Console
2. Settings β Sample findings
3. Click "Generate sample findings"
4. Wait 2 minutes Test 2 - Verify Automated Response:
1. Check email for alert (should arrive within 2 min)
2. Check CloudWatch Dashboard for Lambda invocation
3. Review Lambda logs for processing details
4. Verify no errors in execution Test 3 - Dashboard Validation:
1. Open CloudWatch Dashboard: SecurityIncidentResponse
2. Verify all widgets showing data
3. Check Lambda Invocations widget increased
4. Review Recent Logs widget for new entries Recovery Procedures
------------------- Rollback Isolated EC2 Instance:
1. EC2 Console β Instances
2. Find instance with isolation security group
3. Actions β Security β Change security groups
4. Replace with original security group
5. Document incident resolution Re-enable IAM User After False Positive:
1. IAM Console β Users β Select user
2. Security credentials β Activate access keys
3. Remove "EmergencyDenyAll" policy
4. Reset password (user will change on next login)
5. Notify user via approved channel Unblock IP from Network ACL:
1. VPC Console β Network ACLs
2. Find NACL with deny rule for IP
3. Inbound/Outbound rules β Delete rule number 1
4. Document reason for unblock Service Costs (Monthly Estimates)
----------------------------------
GuardDuty: $4.50/month
Security Hub: $30/month (can disable to save)
Lambda: $5/month (only when invoked)
SNS: $1/month
CloudWatch: $10-20/month
CloudTrail: $2-5/month
Config: $20/month (can stop to save)
Step Functions: $10/month TOTAL RUNNING: ~$82-95/month
TOTAL STOPPED: ~$0/month (when services suspended) To Save Costs When Not Using:
- Suspend GuardDuty
- Disable Security Hub
- Stop Config recorder
- Stop CloudTrail logging
- Disable EventBridge rules Services Resume Time: 2-5 minutes Known Issues & Solutions
------------------------ Issue: Lambda timeout when isolating instance
Solution: Increase timeout to 5 minutes (already configured) Issue: No email alerts received
Solution: 1. Check SNS subscription is "Confirmed"
2. Check spam folder
3. Verify SNS_TOPIC_ARN environment variable in Lambda Issue: GuardDuty metrics not in dashboard
Solution: Normal if no real findings yet. Use custom metrics from Lambda instead. Issue: Step Functions execution failed
Solution: Check IAM role has correct permissions for Lambda and SNS Change Log
----------
2024-12-17: Initial system setup
2024-12-17: Day 1 completed - Core automation working
2024-12-17: Day 2 completed - Dashboard and documentation Next Review Date: [30 days from today] Notes
-----
- This is a learning/demo implementation
- For production, add AWS WAF for additional protection
- Consider integrating with SIEM (Splunk, Elastic)
- Enable AWS Shield for DDoS protection
- Add MFA for all IAM users
- Implement least privilege IAM policies
- Regular security audits recommended Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
Security Incident Response Runbook
=================================== System Overview
---------------
- Primary Function: Automated security incident detection and response
- Detection: AWS GuardDuty
- Response: Lambda automated actions
- Alerting: SNS email notifications
- Region: ap-southeast-2 (Sydney) Quick Response Guide
-------------------- HIGH SEVERITY INCIDENTS:
1. Check email alert from SNS topic: security-incident-alerts
2. Review CloudWatch Dashboard: SecurityIncidentResponse
3. Check Lambda logs: /aws/lambda/security-incident-responder
4. Verify automated actions taken (isolate EC2, rotate IAM, etc) MANUAL INTERVENTION REQUIRED:
If automated response failed:
1. Check error message in email alert
2. Review CloudWatch logs for detailed error
3. Manually execute remediation actions
4. Update this runbook with learnings Automated Response Actions
-------------------------- UnauthorizedAccess:EC2 Incidents:
- Action: Isolate EC2 instance with new security group
- Action: Create forensic EBS snapshots
- Action: Send alert to SOC team IAM Credential Compromise:
- Action: Deactivate all IAM access keys
- Action: Force password reset
- Action: Apply emergency deny-all policy Crypto Mining Detection:
- Action: Stop EC2 instance immediately
- Action: Create forensic snapshots
- Action: Send critical alert Malicious IP Detection:
- Action: Block source IP via Network ACL
- Action: Log incident details System Components
----------------- GuardDuty:
- Detector ID: 52cd95edfb049e79eada14b660d424b7
- Status: Check via Console
- Sample findings: Can generate for testing Lambda Functions:
- security-incident-responder: Main response function
- Memory: 512 MB
- Timeout: 5 minutes
- Region: ap-southeast-2 EventBridge Rules:
- guardduty-high-severity-findings: Routes HIGH/CRITICAL findings
- Trigger: Lambda function SNS Topics:
- security-incident-alerts: Email notifications
- Subscribers: [Your email] CloudWatch Dashboard:
- Name: SecurityIncidentResponse
- Widgets: Lambda metrics, SNS, Logs Step Functions:
- security-incident-workflow: Complex incident workflows
- State Machine ARN: Check Console Contact Information
-------------------
Security Team Email: [email protected] (ganti dengan email real)
On-Call Phone: +62-xxx-xxx-xxxx (ganti dengan nomor real)
Escalation: [Manager name and contact] AWS Account ID: YOUR_ACCOUNT_ID
Primary Region: ap-southeast-2 (Sydney) Monthly Testing Procedures
--------------------------- Test Schedule: First Monday of each month, 10:00 AM Test 1 - Generate Sample Finding:
1. Open GuardDuty Console
2. Settings β Sample findings
3. Click "Generate sample findings"
4. Wait 2 minutes Test 2 - Verify Automated Response:
1. Check email for alert (should arrive within 2 min)
2. Check CloudWatch Dashboard for Lambda invocation
3. Review Lambda logs for processing details
4. Verify no errors in execution Test 3 - Dashboard Validation:
1. Open CloudWatch Dashboard: SecurityIncidentResponse
2. Verify all widgets showing data
3. Check Lambda Invocations widget increased
4. Review Recent Logs widget for new entries Recovery Procedures
------------------- Rollback Isolated EC2 Instance:
1. EC2 Console β Instances
2. Find instance with isolation security group
3. Actions β Security β Change security groups
4. Replace with original security group
5. Document incident resolution Re-enable IAM User After False Positive:
1. IAM Console β Users β Select user
2. Security credentials β Activate access keys
3. Remove "EmergencyDenyAll" policy
4. Reset password (user will change on next login)
5. Notify user via approved channel Unblock IP from Network ACL:
1. VPC Console β Network ACLs
2. Find NACL with deny rule for IP
3. Inbound/Outbound rules β Delete rule number 1
4. Document reason for unblock Service Costs (Monthly Estimates)
----------------------------------
GuardDuty: $4.50/month
Security Hub: $30/month (can disable to save)
Lambda: $5/month (only when invoked)
SNS: $1/month
CloudWatch: $10-20/month
CloudTrail: $2-5/month
Config: $20/month (can stop to save)
Step Functions: $10/month TOTAL RUNNING: ~$82-95/month
TOTAL STOPPED: ~$0/month (when services suspended) To Save Costs When Not Using:
- Suspend GuardDuty
- Disable Security Hub
- Stop Config recorder
- Stop CloudTrail logging
- Disable EventBridge rules Services Resume Time: 2-5 minutes Known Issues & Solutions
------------------------ Issue: Lambda timeout when isolating instance
Solution: Increase timeout to 5 minutes (already configured) Issue: No email alerts received
Solution: 1. Check SNS subscription is "Confirmed"
2. Check spam folder
3. Verify SNS_TOPIC_ARN environment variable in Lambda Issue: GuardDuty metrics not in dashboard
Solution: Normal if no real findings yet. Use custom metrics from Lambda instead. Issue: Step Functions execution failed
Solution: Check IAM role has correct permissions for Lambda and SNS Change Log
----------
2024-12-17: Initial system setup
2024-12-17: Day 1 completed - Core automation working
2024-12-17: Day 2 completed - Dashboard and documentation Next Review Date: [30 days from today] Notes
-----
- This is a learning/demo implementation
- For production, add AWS WAF for additional protection
- Consider integrating with SIEM (Splunk, Elastic)
- Enable AWS Shield for DDoS protection
- Add MFA for all IAM users
- Implement least privilege IAM policies
- Regular security audits recommended CODE_BLOCK:
Security Incident Response Runbook
=================================== System Overview
---------------
- Primary Function: Automated security incident detection and response
- Detection: AWS GuardDuty
- Response: Lambda automated actions
- Alerting: SNS email notifications
- Region: ap-southeast-2 (Sydney) Quick Response Guide
-------------------- HIGH SEVERITY INCIDENTS:
1. Check email alert from SNS topic: security-incident-alerts
2. Review CloudWatch Dashboard: SecurityIncidentResponse
3. Check Lambda logs: /aws/lambda/security-incident-responder
4. Verify automated actions taken (isolate EC2, rotate IAM, etc) MANUAL INTERVENTION REQUIRED:
If automated response failed:
1. Check error message in email alert
2. Review CloudWatch logs for detailed error
3. Manually execute remediation actions
4. Update this runbook with learnings Automated Response Actions
-------------------------- UnauthorizedAccess:EC2 Incidents:
- Action: Isolate EC2 instance with new security group
- Action: Create forensic EBS snapshots
- Action: Send alert to SOC team IAM Credential Compromise:
- Action: Deactivate all IAM access keys
- Action: Force password reset
- Action: Apply emergency deny-all policy Crypto Mining Detection:
- Action: Stop EC2 instance immediately
- Action: Create forensic snapshots
- Action: Send critical alert Malicious IP Detection:
- Action: Block source IP via Network ACL
- Action: Log incident details System Components
----------------- GuardDuty:
- Detector ID: 52cd95edfb049e79eada14b660d424b7
- Status: Check via Console
- Sample findings: Can generate for testing Lambda Functions:
- security-incident-responder: Main response function
- Memory: 512 MB
- Timeout: 5 minutes
- Region: ap-southeast-2 EventBridge Rules:
- guardduty-high-severity-findings: Routes HIGH/CRITICAL findings
- Trigger: Lambda function SNS Topics:
- security-incident-alerts: Email notifications
- Subscribers: [Your email] CloudWatch Dashboard:
- Name: SecurityIncidentResponse
- Widgets: Lambda metrics, SNS, Logs Step Functions:
- security-incident-workflow: Complex incident workflows
- State Machine ARN: Check Console Contact Information
-------------------
Security Team Email: [email protected] (ganti dengan email real)
On-Call Phone: +62-xxx-xxx-xxxx (ganti dengan nomor real)
Escalation: [Manager name and contact] AWS Account ID: YOUR_ACCOUNT_ID
Primary Region: ap-southeast-2 (Sydney) Monthly Testing Procedures
--------------------------- Test Schedule: First Monday of each month, 10:00 AM Test 1 - Generate Sample Finding:
1. Open GuardDuty Console
2. Settings β Sample findings
3. Click "Generate sample findings"
4. Wait 2 minutes Test 2 - Verify Automated Response:
1. Check email for alert (should arrive within 2 min)
2. Check CloudWatch Dashboard for Lambda invocation
3. Review Lambda logs for processing details
4. Verify no errors in execution Test 3 - Dashboard Validation:
1. Open CloudWatch Dashboard: SecurityIncidentResponse
2. Verify all widgets showing data
3. Check Lambda Invocations widget increased
4. Review Recent Logs widget for new entries Recovery Procedures
------------------- Rollback Isolated EC2 Instance:
1. EC2 Console β Instances
2. Find instance with isolation security group
3. Actions β Security β Change security groups
4. Replace with original security group
5. Document incident resolution Re-enable IAM User After False Positive:
1. IAM Console β Users β Select user
2. Security credentials β Activate access keys
3. Remove "EmergencyDenyAll" policy
4. Reset password (user will change on next login)
5. Notify user via approved channel Unblock IP from Network ACL:
1. VPC Console β Network ACLs
2. Find NACL with deny rule for IP
3. Inbound/Outbound rules β Delete rule number 1
4. Document reason for unblock Service Costs (Monthly Estimates)
----------------------------------
GuardDuty: $4.50/month
Security Hub: $30/month (can disable to save)
Lambda: $5/month (only when invoked)
SNS: $1/month
CloudWatch: $10-20/month
CloudTrail: $2-5/month
Config: $20/month (can stop to save)
Step Functions: $10/month TOTAL RUNNING: ~$82-95/month
TOTAL STOPPED: ~$0/month (when services suspended) To Save Costs When Not Using:
- Suspend GuardDuty
- Disable Security Hub
- Stop Config recorder
- Stop CloudTrail logging
- Disable EventBridge rules Services Resume Time: 2-5 minutes Known Issues & Solutions
------------------------ Issue: Lambda timeout when isolating instance
Solution: Increase timeout to 5 minutes (already configured) Issue: No email alerts received
Solution: 1. Check SNS subscription is "Confirmed"
2. Check spam folder
3. Verify SNS_TOPIC_ARN environment variable in Lambda Issue: GuardDuty metrics not in dashboard
Solution: Normal if no real findings yet. Use custom metrics from Lambda instead. Issue: Step Functions execution failed
Solution: Check IAM role has correct permissions for Lambda and SNS Change Log
----------
2024-12-17: Initial system setup
2024-12-17: Day 1 completed - Core automation working
2024-12-17: Day 2 completed - Dashboard and documentation Next Review Date: [30 days from today] Notes
-----
- This is a learning/demo implementation
- For production, add AWS WAF for additional protection
- Consider integrating with SIEM (Splunk, Elastic)
- Enable AWS Shield for DDoS protection
- Add MFA for all IAM users
- Implement least privilege IAM policies
- Regular security audits recommended CODE_BLOCK:
Processing finding: UnauthorizedAccess:EC2/MaliciousIPCaller.Custom (Severity: HIGH) Found instance ID: i-99999999 Error isolating instance... (EXPECTED - test instance) Security alert sent successfully Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
Processing finding: UnauthorizedAccess:EC2/MaliciousIPCaller.Custom (Severity: HIGH) Found instance ID: i-99999999 Error isolating instance... (EXPECTED - test instance) Security alert sent successfully CODE_BLOCK:
Processing finding: UnauthorizedAccess:EC2/MaliciousIPCaller.Custom (Severity: HIGH) Found instance ID: i-99999999 Error isolating instance... (EXPECTED - test instance) Security alert sent successfully CODE_BLOCK:
Severity: HIGH Type: UnauthorizedAccess:EC2/... AUTOMATED ACTIONS TAKEN: β isolate_instance: failed - Error: Instance not found Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
Severity: HIGH Type: UnauthorizedAccess:EC2/... AUTOMATED ACTIONS TAKEN: β isolate_instance: failed - Error: Instance not found CODE_BLOCK:
Severity: HIGH Type: UnauthorizedAccess:EC2/... AUTOMATED ACTIONS TAKEN: β isolate_instance: failed - Error: Instance not found CODE_BLOCK:
Processing finding: UnauthorizedAccess:IAMUser/... Rotating credentials for user: test-user Error: User not found (EXPECTED - test user) Security alert sent successfully Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
Processing finding: UnauthorizedAccess:IAMUser/... Rotating credentials for user: test-user Error: User not found (EXPECTED - test user) Security alert sent successfully CODE_BLOCK:
Processing finding: UnauthorizedAccess:IAMUser/... Rotating credentials for user: test-user Error: User not found (EXPECTED - test user) Security alert sent successfully CODE_BLOCK:
Type: UnauthorizedAccess:IAMUser/InstanceCredentialExfiltration
AUTOMATED ACTIONS TAKEN:
β rotate_credentials: failed - Error: User not found Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
Type: UnauthorizedAccess:IAMUser/InstanceCredentialExfiltration
AUTOMATED ACTIONS TAKEN:
β rotate_credentials: failed - Error: User not found CODE_BLOCK:
Type: UnauthorizedAccess:IAMUser/InstanceCredentialExfiltration
AUTOMATED ACTIONS TAKEN:
β rotate_credentials: failed - Error: User not found - AWS Account with admin access (Best Practice Please Use your IAM User * with Admin Access not your Root Account)
- Basic Python knowledge
- Text editor (VS Code, Notepad++)
- AWS CLI installed (optional, can just use Console) - GuardDuty: $4.50/month (30-day free trial)
- Security Hub: $0.0010 per check = ~$30/month
- Lambda: $0.20/million requests = ~$5/month
- Step Functions: $25 per 1K executions = ~$10/month
- CloudTrail: $2 per 100K events = ~$20/month
- SNS: $0.50/million notifications = ~$1/month
- Config: $2 per active rule = ~$20/month - Login AWS Console β Search "GuardDuty"
- Click Get Started
- Click Enable GuardDuty
- Wait 15-30 seconds for initialization - GuardDuty β Settings β Sample findings
- Click Generate sample findings - Search "Security Hub"
- Click Go to Security Hub
- Click Enable Security Hub
- Select AWS Foundational Security Best Practices standard
- Click Enable Security Hub - Security Hub β Integrations
- Find AWS GuardDuty
- Click Accept findings - Search SNS β Topics
- Click Create topic
- Type: Standard
- Name: security-incident-alerts
- Display name: Security Alerts
- Click Create topic - Topic β Create subscription
- Protocol: Email
- Endpoint: [email protected]
- Click Create subscription
- Check your email β Click confirmation link - Topic β Create subscription
- Protocol: AWS Chatbot
- Configure Slack workspace integration - IAM Console β Roles β Create role
- Trusted entity: AWS service β Lambda
- Permissions policies - Add these: - AWSLambdaBasicExecutionRole (managed)
- AmazonEC2FullAccess (managed)
- IAMFullAccess (managed)
- AmazonSNSFullAccess (managed) - Role name: SecurityIncidentResponseRole
- Click Create role - Role β Add permissions β Create inline policy
- JSON tab β Paste: - Name: SecurityResponsePermissions
- Click Create policy - Lambda Console β Create function
- Function name: security-incident-responder
- Runtime: Python 3.11
- Architecture: x86_64
- Execution role: Use existing role β SecurityIncidentResponseRole
- Click Create function - Configuration tab β Environment variables
- Add variable: - Key: SNS_TOPIC_ARN
- Value: arn:aws:sns:{YOUR_REGION}:{ACCOUNT_ID}:security-incident alerts (your SNS ARN) (Your SNS arn) - General configuration β Edit: Memory: 512 MB Timeout: 5 minutes
- Memory: 512 MB
- Timeout: 5 minutes - Memory: 512 MB
- Timeout: 5 minutes - Deploy the Code - EventBridge Console β Rules β Create rule
- Name: guardduty-high-severity-findings
- Description: Route high severity GuardDuty findings to Lambda (optional)
- Event bus: default
- Rule type: Rule with an event pattern
- Event pattern: - Select target: - Target type: AWS service
- Select a target: Lambda function
- Function: security-incident-responder - Click Next β Create rule - Create another rule: securityhub-critical-findings
- Event pattern: - Target: Same Lambda function - GuardDuty β Settings β Sample findings
- Generate sample findings - Lambda Console β security-incident-responder
- Monitor tab β View logs in CloudWatch
- Check latest log stream - CloudTrail Console β Create trail
- Trail name: security-audit-trail
- Storage location: Create new S3 bucket
- Bucket name: security-audit-logs-ACCOUNT-ID
- Log file SSE-KMS encryption: Enabled(use default AWS managed key)
- CloudWatch Logs: Enabled Log group name: /aws/cloudtrail/security-audit
- Log group name: /aws/cloudtrail/security-audit
- Event type: Management events: β Read and β Write
Data events: Skip for now (save cost)
- Management events: β Read and β Write
- Data events: Skip for now (save cost)
- Click Next β Create trail - Log group name: /aws/cloudtrail/security-audit - Management events: β Read and β Write
- Data events: Skip for now (save cost) - AWS Config Console βGet started
- Resource types to record: Select:Record all resources Include global resources: Yes
- Select:Record all resources
- Include global resources: Yes
- Amazon S3 bucket: Create a bucket: Yes Bucket name: config-audit-ACCOUNT-ID
- Create a bucket: Yes
- Bucket name: config-audit-ACCOUNT-ID
- SNS topic: Stream configuration changes: Yes Select existing SNS topic: security-incident-alerts
- Stream configuration changes: Yes
- Select existing SNS topic: security-incident-alerts
- AWS Config role: Create AWS Config service-linked role: Yes
- Create AWS Config service-linked role: Yes - Select:Record all resources
- Include global resources: Yes - Create a bucket: Yes
- Bucket name: config-audit-ACCOUNT-ID - Stream configuration changes: Yes
- Select existing SNS topic: security-incident-alerts - Create AWS Config service-linked role: Yes - iam-password-policy Ensures strong password policy
- Ensures strong password policy
- root-account-mfa-enabled Ensures root account has MFA
- Ensures root account has MFA
- ec2-instance-managed-by-systems-manager Ensures EC2 instances are managed
- Ensures EC2 instances are managed
- encrypted-volumes Ensures EBS volumes are encrypted
- Ensures EBS volumes are encrypted
- s3-bucket-public-read-prohibited Ensures S3 buckets not publicly readable
- Ensures S3 buckets not publicly readable
- cloudtrail-enabled Ensures CloudTrail is enabled
- Ensures CloudTrail is enabled - Ensures strong password policy - Ensures root account has MFA - Ensures EC2 instances are managed - Ensures EBS volumes are encrypted - Ensures S3 buckets not publicly readable - Ensures CloudTrail is enabled - Step Functions Console β Create state machine
- Choose template: Blank
- Type: Standard
- Name: security-incident-workflow
- Definition (paste this JSON): - Replace REGION and ACCOUNT with your Value
- Permissions: Create new role: Yes Role name: StepFunctions-SecurityWorkflow-Role Click Create state machine
- Create new role: Yes
- Role name: StepFunctions-SecurityWorkflow-Role Click Create state machine
- Click Create state machine - Create new role: Yes
- Role name: StepFunctions-SecurityWorkflow-Role Click Create state machine
- Click Create state machine - Click Create state machine - EventBridge β Rules β Create new rule
- Name: critical-findings-to-stepfunctions
- Event pattern: - Target: Step Functions state machine
- State machine: security-incident-workflow
- Click Create - Lambda β Create function
- Name: forensics-collector
- Runtime: Python 3.11
- Role: SecurityIncidentResponseRole - CloudWatch Console β Dashboards β Create dashboard
- Dashboard name: SecurityIncidentResponse
- Click Create dashboard - Add widget β Number
- Metrics β GuardDuty β Select: AWS/GuardDuty β FindingCount
- AWS/GuardDuty β FindingCount
- Statistic: Sum
- Period: 1 hour
- Widget title: "GuardDuty Findings (Last Hour)" - AWS/GuardDuty β FindingCount - Add widget β Line
- Metrics β Lambda β By Function Name
- Select: security-incident-responder β Invocations
- Period: 5 minutes
- Title: "Auto-Response Invocations" - Add widget β Line
- Select: security-incident-responder β Errors
- Period: 5 minutes
- Title: "Response Function Errors" - Add widget β Stacked area
- Custom namespace: Add metric β Security Hub
- Title: "Security Hub Findings by Severity" - Add widget β Logs table
- Log group: /aws/cloudtrail/security-audit - Title: "Recent Unauthorized Access Attempts" - Add widget β Number
- Metrics β Step Functions β ExecutionsStarted
- State machine: security-incident-workflow
- Title: "Workflow Executions"
- Click Save dashboard - S3 Console β Create bucket
- Bucket name: security-runbooks-YOUR_ACCOUNT_ID
- Region: Asia Pacific (Sydney) ap-southeast-2
- Block Public Access: β
** Keep all checked (default)**
- Bucket Versioning: Enable (optional)
- Encryption: Enable (SSE-S3)
- Click Create bucket - Open Text editor or Notepad
- Copy-paste this text: - Save as: runbook.txt - S3 Console β Buckets β security-runbooks-YOUR_ACCOUNT_ID
- Click "Upload"
- Add files β Select runbook.txt
- Scroll down β Click "Upload" - GuardDuty Console β Settings
- Scroll to Sample findings
- Click "Generate sample findings"
- Wait 30-60 seconds - Lambda Console β Functions β security-incident-responder
- Monitor tab
- Metrics section: Check "Invocations" graph - should show spike
Check "Duration" - should be < 5 seconds
Check "Errors" - should be 0
- Check "Invocations" graph - should show spike
- Check "Duration" - should be < 5 seconds
- Check "Errors" - should be 0 - Check "Invocations" graph - should show spike
- Check "Duration" - should be < 5 seconds
- Check "Errors" - should be 0 - Lambda β Monitor tab
- Click "View CloudWatch logs"
- Click latest log stream (top) - Check your email (in 2 minutes)
- Subject: π¨ SECURITY ALERT: UnauthorizedAccess:EC2/MaliciousIPCaller.Custom
- Body should show: - CloudWatch Console β Dashboards β SecurityIncidentResponse
- Verify widgets updated: Lambda Invocations: +1
Lambda Duration: shows recent execution
Recent Logs: shows new entry
- Lambda Invocations: +1
- Lambda Duration: shows recent execution
- Recent Logs: shows new entry - Lambda Invocations: +1
- Lambda Duration: shows recent execution
- Recent Logs: shows new entry - GuardDuty Console β Settings β Sample findings
- Click "Generate sample findings"
- Or specific: GuardDuty β Findings Filter: Type contains "IAM"
Look for: UnauthorizedAccess:IAMUser/InstanceCredentialExfiltration
- GuardDuty β Findings
- Filter: Type contains "IAM"
- Look for: UnauthorizedAccess:IAMUser/InstanceCredentialExfiltration - GuardDuty β Findings
- Filter: Type contains "IAM"
- Look for: UnauthorizedAccess:IAMUser/InstanceCredentialExfiltration - Lambda Console β security-incident-responder β Monitor
- CloudWatch logs (latest stream) - CloudWatch Console β Dashboards β SecurityIncidentResponse
- Verify each widget: - Lambda Console β security-incident-responder β Monitor
- Metrics tab: Duration: Average < 5000ms β
Concurrent executions: < 10 β
Throttles: 0 β
Error rate: 0% β
- Duration: Average < 5000ms β
- Concurrent executions: < 10 β
- Throttles: 0 β
- Error rate: 0% β
- Duration: Average < 5000ms β
- Concurrent executions: < 10 β
- Throttles: 0 β
- Error rate: 0% β
- Configuration tab β General configuration: Memory: 512 MB Timeout: 300 seconds (5 min)
β
Sufficient for our use case
- Memory: 512 MB
- Timeout: 300 seconds (5 min)
- β
Sufficient for our use case - Memory: 512 MB
- Timeout: 300 seconds (5 min)
- β
Sufficient for our use case - AWS Config Console β Rules
- Check compliance status: - "Built automated security incident response system processing 1000+ findings/month with < 2 minute response time"
- "Implemented cloud-native SOAR (Security Orchestration, Automation & Response) using AWS Lambda, Step Functions, reducing manual intervention by 85%"
- "Designed multi-layered threat detection using GuardDuty, Security Hub, and Config, achieving 95% threat detection accuracy"
- "Architected forensics collection pipeline with automated evidence preservation compliant with SOC2 and ISO 27001"
- "Reduced security incident response time from 2 hours (manual) to 2 minutes (automated), saving $150K annually in labor costs"
how-totutorialguidedev.toailinuxnetworkdnsdockerpython