Secure ML on AWS : Building Production Data Pipelines with S3 and Lambda

Secure ML on AWS : Building Production Data Pipelines with S3 and Lambda

Source: Dev.to

Welcome Back! ## The Data Pipeline Problem ## Architecture Overview ## Step 1: Setting Up Encrypted S3 Buckets ## Why Three Buckets? ## Infrastructure Code ## Variables File ## Deploy the Infrastructure ## Step 2: Data Validation Lambda Function ## Why Validate Data? ## Validation Logic ## Lambda Dependencies ## Package Lambda ## Step 3: Lambda Infrastructure ## Step 4: CloudTrail for Audit Logging ## Step 5: CloudWatch Monitoring ## Step 6: Testing the Pipeline ## 1. Create Test Data ## 2. Upload to S3 ## 3. Check Lambda Logs ## 4. Verify Validated Data ## Step 7: Advanced Features ## Data Quality Metrics ## Data Versioning ## Important Notes Before Deploying ## Known Considerations ## Security Best Practices Checklist ## Cost Optimization ## Troubleshooting Guide ## Issue: Lambda timeout ## Issue: Permission denied ## Issue: Validation always fails ## What's Next? ## Key Takeaways ## Resources ## Let's Connect! ## About the Author ## Shoaibali MirFollow Reading time: ~15-20 minutes Level: Intermediate Prerequisites: Basic AWS knowledge, familiarity with S3 and Lambda Series: Part 2 of 4 - Read Part 1 Objective: This blog focuses on teaching core concepts and best practices for building data pipelines on AWS. While the code is functional, it's designed for learning and experimentation. For production deployments, additional hardening and testing would be needed. In Part 1, we explored the AIDLC framework and AWS architecture for secure ML pipelines. Now, let's get hands-on with Phase 1: Data Collection & Preparation. What you'll build today: By the end: You'll have a production-ready data pipeline that ensures data quality and security from day one. You can't build reliable ML models on unreliable data. Common issues include: Inconsistent data formats - Training fails due to schema changes Missing security controls - Sensitive data exposed No data validation - Bad data silently corrupts models Manual processes - Doesn't scale, error-prone No audit trail - Can't track data lineage An automated, secure, validated data pipeline. Here's what we're building: Separation of concerns improves security and organization: Note: The complete S3 configuration is shown below. For production, you might split this into separate files for better organization. Create terraform/s3-buckets.tf: Create terraform/variables.tf: Prevent garbage in, garbage out: Note: After deploying, check your email and confirm the SNS subscription to receive notifications. Create lambda/data-validation/handler.py: Create lambda/data-validation/requirements.txt: Create terraform/lambda.tf: Create terraform/cloudtrail.tf: Create terraform/monitoring.tf: Create test-data/sample.csv: Add custom CloudWatch metrics in Lambda: 1. Lambda Package Size The pandas library is large (~100MB). For production deployments, consider using AWS Lambda Layers: 2. Validation Rules The sample validation rules use min_rows: 5 for testing. For production: 3. Terraform State Management For production, always use remote state: 4. Cost Monitoring The estimated costs (~$9.50/month) are for development with minimal usage. Always: Estimated Monthly Costs (Development): Production costs scale with data volume. Use S3 Intelligent-Tiering and lifecycle policies. In Part 3, we'll build the training pipeline with SageMaker: In Part 4 (Series Finale), we'll complete the production setup: Remember: Good data pipelines prevent bad models. What data quality challenges are you facing? Let me know in the comments! Tags: #aws #machinelearning #mlops #dataengineering #terraform #lambda #s3 #cloudwatch Templates let you quickly answer FAQs or store snippets for re-use. Are you sure you want to hide this comment? It will become hidden in your post, but will still be visible via the comment's permalink. Hide child comments as well For further actions, you may consider blocking this person and/or reporting abuse COMMAND_BLOCK: # KMS key for encryption resource "aws_kms_key" "data_encryption" { description = "Encryption key for ML data pipeline" deletion_window_in_days = 10 enable_key_rotation = true tags = { Name = "ml-data-encryption-key" Environment = var.environment Purpose = "ML-DataPipeline" } } resource "aws_kms_alias" "data_encryption" { name = "alias/ml-data-encryption" target_key_id = aws_kms_key.data_encryption.key_id } # Raw data bucket resource "aws_s3_bucket" "raw_data" { bucket = "${var.project_name}-raw-data-${var.environment}-${data.aws_caller_identity.current.account_id}" tags = { Name = "ML Raw Data" Environment = var.environment DataStage = "Raw" } } # Enable versioning resource "aws_s3_bucket_versioning" "raw_data" { bucket = aws_s3_bucket.raw_data.id versioning_configuration { status = "Enabled" } } # Enable encryption resource "aws_s3_bucket_server_side_encryption_configuration" "raw_data" { bucket = aws_s3_bucket.raw_data.id rule { apply_server_side_encryption_by_default { sse_algorithm = "aws:kms" kms_master_key_id = aws_kms_key.data_encryption.arn } bucket_key_enabled = true } } # Block all public access resource "aws_s3_bucket_public_access_block" "raw_data" { bucket = aws_s3_bucket.raw_data.id block_public_acls = true block_public_policy = true ignore_public_acls = true restrict_public_buckets = true } # Enforce encryption and secure transport resource "aws_s3_bucket_policy" "raw_data" { bucket = aws_s3_bucket.raw_data.id policy = jsonencode({ Version = "2012-10-17" Statement = [ { Sid = "DenyUnencryptedObjectUploads" Effect = "Deny" Principal = "*" Action = "s3:PutObject" Resource = "${aws_s3_bucket.raw_data.arn}/*" Condition = { StringNotEquals = { "s3:x-amz-server-side-encryption" = "aws:kms" } } }, { Sid = "DenyInsecureTransport" Effect = "Deny" Principal = "*" Action = "s3:*" Resource = [ aws_s3_bucket.raw_data.arn, "${aws_s3_bucket.raw_data.arn}/*" ] Condition = { Bool = { "aws:SecureTransport" = "false" } } } ] }) } # Lifecycle policy to manage old versions resource "aws_s3_bucket_lifecycle_configuration" "raw_data" { bucket = aws_s3_bucket.raw_data.id rule { id = "delete-old-versions" status = "Enabled" noncurrent_version_expiration { noncurrent_days = 90 } } rule { id = "transition-to-glacier" status = "Enabled" transition { days = 30 storage_class = "GLACIER" } } } # Validated data bucket resource "aws_s3_bucket" "validated_data" { bucket = "${var.project_name}-validated-data-${var.environment}-${data.aws_caller_identity.current.account_id}" tags = { Name = "ML Validated Data" Environment = var.environment DataStage = "Validated" } } resource "aws_s3_bucket_versioning" "validated_data" { bucket = aws_s3_bucket.validated_data.id versioning_configuration { status = "Enabled" } } resource "aws_s3_bucket_server_side_encryption_configuration" "validated_data" { bucket = aws_s3_bucket.validated_data.id rule { apply_server_side_encryption_by_default { sse_algorithm = "aws:kms" kms_master_key_id = aws_kms_key.data_encryption.arn } bucket_key_enabled = true } } resource "aws_s3_bucket_public_access_block" "validated_data" { bucket = aws_s3_bucket.validated_data.id block_public_acls = true block_public_policy = true ignore_public_acls = true restrict_public_buckets = true } # Model artifacts bucket resource "aws_s3_bucket" "model_artifacts" { bucket = "${var.project_name}-model-artifacts-${var.environment}-${data.aws_caller_identity.current.account_id}" tags = { Name = "ML Model Artifacts" Environment = var.environment DataStage = "Models" } } resource "aws_s3_bucket_versioning" "model_artifacts" { bucket = aws_s3_bucket.model_artifacts.id versioning_configuration { status = "Enabled" } } resource "aws_s3_bucket_server_side_encryption_configuration" "model_artifacts" { bucket = aws_s3_bucket.model_artifacts.id rule { apply_server_side_encryption_by_default { sse_algorithm = "aws:kms" kms_master_key_id = aws_kms_key.data_encryption.arn } bucket_key_enabled = true } } resource "aws_s3_bucket_public_access_block" "model_artifacts" { bucket = aws_s3_bucket.model_artifacts.id block_public_acls = true block_public_policy = true ignore_public_acls = true restrict_public_buckets = true } Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # KMS key for encryption resource "aws_kms_key" "data_encryption" { description = "Encryption key for ML data pipeline" deletion_window_in_days = 10 enable_key_rotation = true tags = { Name = "ml-data-encryption-key" Environment = var.environment Purpose = "ML-DataPipeline" } } resource "aws_kms_alias" "data_encryption" { name = "alias/ml-data-encryption" target_key_id = aws_kms_key.data_encryption.key_id } # Raw data bucket resource "aws_s3_bucket" "raw_data" { bucket = "${var.project_name}-raw-data-${var.environment}-${data.aws_caller_identity.current.account_id}" tags = { Name = "ML Raw Data" Environment = var.environment DataStage = "Raw" } } # Enable versioning resource "aws_s3_bucket_versioning" "raw_data" { bucket = aws_s3_bucket.raw_data.id versioning_configuration { status = "Enabled" } } # Enable encryption resource "aws_s3_bucket_server_side_encryption_configuration" "raw_data" { bucket = aws_s3_bucket.raw_data.id rule { apply_server_side_encryption_by_default { sse_algorithm = "aws:kms" kms_master_key_id = aws_kms_key.data_encryption.arn } bucket_key_enabled = true } } # Block all public access resource "aws_s3_bucket_public_access_block" "raw_data" { bucket = aws_s3_bucket.raw_data.id block_public_acls = true block_public_policy = true ignore_public_acls = true restrict_public_buckets = true } # Enforce encryption and secure transport resource "aws_s3_bucket_policy" "raw_data" { bucket = aws_s3_bucket.raw_data.id policy = jsonencode({ Version = "2012-10-17" Statement = [ { Sid = "DenyUnencryptedObjectUploads" Effect = "Deny" Principal = "*" Action = "s3:PutObject" Resource = "${aws_s3_bucket.raw_data.arn}/*" Condition = { StringNotEquals = { "s3:x-amz-server-side-encryption" = "aws:kms" } } }, { Sid = "DenyInsecureTransport" Effect = "Deny" Principal = "*" Action = "s3:*" Resource = [ aws_s3_bucket.raw_data.arn, "${aws_s3_bucket.raw_data.arn}/*" ] Condition = { Bool = { "aws:SecureTransport" = "false" } } } ] }) } # Lifecycle policy to manage old versions resource "aws_s3_bucket_lifecycle_configuration" "raw_data" { bucket = aws_s3_bucket.raw_data.id rule { id = "delete-old-versions" status = "Enabled" noncurrent_version_expiration { noncurrent_days = 90 } } rule { id = "transition-to-glacier" status = "Enabled" transition { days = 30 storage_class = "GLACIER" } } } # Validated data bucket resource "aws_s3_bucket" "validated_data" { bucket = "${var.project_name}-validated-data-${var.environment}-${data.aws_caller_identity.current.account_id}" tags = { Name = "ML Validated Data" Environment = var.environment DataStage = "Validated" } } resource "aws_s3_bucket_versioning" "validated_data" { bucket = aws_s3_bucket.validated_data.id versioning_configuration { status = "Enabled" } } resource "aws_s3_bucket_server_side_encryption_configuration" "validated_data" { bucket = aws_s3_bucket.validated_data.id rule { apply_server_side_encryption_by_default { sse_algorithm = "aws:kms" kms_master_key_id = aws_kms_key.data_encryption.arn } bucket_key_enabled = true } } resource "aws_s3_bucket_public_access_block" "validated_data" { bucket = aws_s3_bucket.validated_data.id block_public_acls = true block_public_policy = true ignore_public_acls = true restrict_public_buckets = true } # Model artifacts bucket resource "aws_s3_bucket" "model_artifacts" { bucket = "${var.project_name}-model-artifacts-${var.environment}-${data.aws_caller_identity.current.account_id}" tags = { Name = "ML Model Artifacts" Environment = var.environment DataStage = "Models" } } resource "aws_s3_bucket_versioning" "model_artifacts" { bucket = aws_s3_bucket.model_artifacts.id versioning_configuration { status = "Enabled" } } resource "aws_s3_bucket_server_side_encryption_configuration" "model_artifacts" { bucket = aws_s3_bucket.model_artifacts.id rule { apply_server_side_encryption_by_default { sse_algorithm = "aws:kms" kms_master_key_id = aws_kms_key.data_encryption.arn } bucket_key_enabled = true } } resource "aws_s3_bucket_public_access_block" "model_artifacts" { bucket = aws_s3_bucket.model_artifacts.id block_public_acls = true block_public_policy = true ignore_public_acls = true restrict_public_buckets = true } COMMAND_BLOCK: # KMS key for encryption resource "aws_kms_key" "data_encryption" { description = "Encryption key for ML data pipeline" deletion_window_in_days = 10 enable_key_rotation = true tags = { Name = "ml-data-encryption-key" Environment = var.environment Purpose = "ML-DataPipeline" } } resource "aws_kms_alias" "data_encryption" { name = "alias/ml-data-encryption" target_key_id = aws_kms_key.data_encryption.key_id } # Raw data bucket resource "aws_s3_bucket" "raw_data" { bucket = "${var.project_name}-raw-data-${var.environment}-${data.aws_caller_identity.current.account_id}" tags = { Name = "ML Raw Data" Environment = var.environment DataStage = "Raw" } } # Enable versioning resource "aws_s3_bucket_versioning" "raw_data" { bucket = aws_s3_bucket.raw_data.id versioning_configuration { status = "Enabled" } } # Enable encryption resource "aws_s3_bucket_server_side_encryption_configuration" "raw_data" { bucket = aws_s3_bucket.raw_data.id rule { apply_server_side_encryption_by_default { sse_algorithm = "aws:kms" kms_master_key_id = aws_kms_key.data_encryption.arn } bucket_key_enabled = true } } # Block all public access resource "aws_s3_bucket_public_access_block" "raw_data" { bucket = aws_s3_bucket.raw_data.id block_public_acls = true block_public_policy = true ignore_public_acls = true restrict_public_buckets = true } # Enforce encryption and secure transport resource "aws_s3_bucket_policy" "raw_data" { bucket = aws_s3_bucket.raw_data.id policy = jsonencode({ Version = "2012-10-17" Statement = [ { Sid = "DenyUnencryptedObjectUploads" Effect = "Deny" Principal = "*" Action = "s3:PutObject" Resource = "${aws_s3_bucket.raw_data.arn}/*" Condition = { StringNotEquals = { "s3:x-amz-server-side-encryption" = "aws:kms" } } }, { Sid = "DenyInsecureTransport" Effect = "Deny" Principal = "*" Action = "s3:*" Resource = [ aws_s3_bucket.raw_data.arn, "${aws_s3_bucket.raw_data.arn}/*" ] Condition = { Bool = { "aws:SecureTransport" = "false" } } } ] }) } # Lifecycle policy to manage old versions resource "aws_s3_bucket_lifecycle_configuration" "raw_data" { bucket = aws_s3_bucket.raw_data.id rule { id = "delete-old-versions" status = "Enabled" noncurrent_version_expiration { noncurrent_days = 90 } } rule { id = "transition-to-glacier" status = "Enabled" transition { days = 30 storage_class = "GLACIER" } } } # Validated data bucket resource "aws_s3_bucket" "validated_data" { bucket = "${var.project_name}-validated-data-${var.environment}-${data.aws_caller_identity.current.account_id}" tags = { Name = "ML Validated Data" Environment = var.environment DataStage = "Validated" } } resource "aws_s3_bucket_versioning" "validated_data" { bucket = aws_s3_bucket.validated_data.id versioning_configuration { status = "Enabled" } } resource "aws_s3_bucket_server_side_encryption_configuration" "validated_data" { bucket = aws_s3_bucket.validated_data.id rule { apply_server_side_encryption_by_default { sse_algorithm = "aws:kms" kms_master_key_id = aws_kms_key.data_encryption.arn } bucket_key_enabled = true } } resource "aws_s3_bucket_public_access_block" "validated_data" { bucket = aws_s3_bucket.validated_data.id block_public_acls = true block_public_policy = true ignore_public_acls = true restrict_public_buckets = true } # Model artifacts bucket resource "aws_s3_bucket" "model_artifacts" { bucket = "${var.project_name}-model-artifacts-${var.environment}-${data.aws_caller_identity.current.account_id}" tags = { Name = "ML Model Artifacts" Environment = var.environment DataStage = "Models" } } resource "aws_s3_bucket_versioning" "model_artifacts" { bucket = aws_s3_bucket.model_artifacts.id versioning_configuration { status = "Enabled" } } resource "aws_s3_bucket_server_side_encryption_configuration" "model_artifacts" { bucket = aws_s3_bucket.model_artifacts.id rule { apply_server_side_encryption_by_default { sse_algorithm = "aws:kms" kms_master_key_id = aws_kms_key.data_encryption.arn } bucket_key_enabled = true } } resource "aws_s3_bucket_public_access_block" "model_artifacts" { bucket = aws_s3_bucket.model_artifacts.id block_public_acls = true block_public_policy = true ignore_public_acls = true restrict_public_buckets = true } CODE_BLOCK: variable "project_name" { description = "Project name prefix" type = string default = "ml-pipeline" } variable "environment" { description = "Environment (dev, staging, prod)" type = string default = "dev" } variable "aws_region" { description = "AWS region" type = string default = "ap-south-1" } variable "notification_email" { description = "Email for SNS notifications" type = string } data "aws_caller_identity" "current" {} Enter fullscreen mode Exit fullscreen mode CODE_BLOCK: variable "project_name" { description = "Project name prefix" type = string default = "ml-pipeline" } variable "environment" { description = "Environment (dev, staging, prod)" type = string default = "dev" } variable "aws_region" { description = "AWS region" type = string default = "ap-south-1" } variable "notification_email" { description = "Email for SNS notifications" type = string } data "aws_caller_identity" "current" {} CODE_BLOCK: variable "project_name" { description = "Project name prefix" type = string default = "ml-pipeline" } variable "environment" { description = "Environment (dev, staging, prod)" type = string default = "dev" } variable "aws_region" { description = "AWS region" type = string default = "ap-south-1" } variable "notification_email" { description = "Email for SNS notifications" type = string } data "aws_caller_identity" "current" {} COMMAND_BLOCK: cd terraform # Initialize Terraform terraform init # Review the plan terraform plan -var="[email protected]" # Apply the configuration terraform apply -var="[email protected]" Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: cd terraform # Initialize Terraform terraform init # Review the plan terraform plan -var="[email protected]" # Apply the configuration terraform apply -var="[email protected]" COMMAND_BLOCK: cd terraform # Initialize Terraform terraform init # Review the plan terraform plan -var="[email protected]" # Apply the configuration terraform apply -var="[email protected]" COMMAND_BLOCK: import json import os import boto3 import logging import pandas as pd from io import BytesIO import hashlib from datetime import datetime # Configure logging logger = logging.getLogger() logger.setLevel(logging.INFO) # AWS clients s3_client = boto3.client('s3') sns_client = boto3.client('sns') # Validation rules VALIDATION_RULES = { 'required_columns': ['timestamp', 'feature_1', 'feature_2', 'target'], 'numeric_columns': ['feature_1', 'feature_2', 'target'], 'max_null_percentage': 0.05, # 5% max 'min_rows': 5, # Adjusted for testing - use 100+ for production 'max_rows': 1000000, 'date_columns': ['timestamp'] } def calculate_checksum(data: bytes) -> str: """Calculate SHA256 checksum""" return hashlib.sha256(data).hexdigest() def validate_schema(df: pd.DataFrame) -> dict: """Validate DataFrame schema""" issues = [] # Check required columns missing_cols = set(VALIDATION_RULES['required_columns']) - set(df.columns) if missing_cols: issues.append(f"Missing columns: {missing_cols}") # Check numeric columns for col in VALIDATION_RULES['numeric_columns']: if col in df.columns: if not pd.api.types.is_numeric_dtype(df[col]): issues.append(f"Column '{col}' should be numeric") # Check date columns for col in VALIDATION_RULES['date_columns']: if col in df.columns: try: pd.to_datetime(df[col]) except: issues.append(f"Column '{col}' should be valid datetime") return { 'valid': len(issues) == 0, 'issues': issues } def validate_data_quality(df: pd.DataFrame) -> dict: """Validate data quality""" issues = [] # Check row count if len(df) < VALIDATION_RULES['min_rows']: issues.append( f"Insufficient rows: {len(df)} < {VALIDATION_RULES['min_rows']}" ) elif len(df) > VALIDATION_RULES['max_rows']: issues.append( f"Too many rows: {len(df)} > {VALIDATION_RULES['max_rows']}" ) # Check null values for col in df.columns: null_pct = df[col].isnull().sum() / len(df) if null_pct > VALIDATION_RULES['max_null_percentage']: issues.append( f"Column '{col}' has {null_pct:.2%} nulls " f"(max: {VALIDATION_RULES['max_null_percentage']:.2%})" ) # Check duplicates duplicate_count = df.duplicated().sum() if duplicate_count > 0: issues.append(f"Found {duplicate_count} duplicate rows") # Data distribution checks stats = {} for col in VALIDATION_RULES['numeric_columns']: if col in df.columns: stats[col] = { 'mean': float(df[col].mean()), 'std': float(df[col].std()), 'min': float(df[col].min()), 'max': float(df[col].max()), 'null_count': int(df[col].isnull().sum()) } return { 'valid': len(issues) == 0, 'issues': issues, 'stats': { 'row_count': len(df), 'column_count': len(df.columns), 'duplicate_count': duplicate_count, 'column_stats': stats } } def send_notification(topic_arn: str, subject: str, message: str): """Send SNS notification""" try: sns_client.publish( TopicArn=topic_arn, Subject=subject, Message=message ) logger.info(f"Sent notification: {subject}") except Exception as e: logger.error(f"Failed to send notification: {e}") def lambda_handler(event, context): """ Lambda handler triggered by S3 upload """ try: # Parse S3 event record = event['Records'][0] bucket = record['s3']['bucket']['name'] key = record['s3']['object']['key'] logger.info(f"Processing: s3://{bucket}/{key}") # Download file response = s3_client.get_object(Bucket=bucket, Key=key) file_content = response['Body'].read() # Calculate checksum checksum = calculate_checksum(file_content) # Load data df = pd.read_csv(BytesIO(file_content)) logger.info(f"Loaded {len(df)} rows, {len(df.columns)} columns") # Run validations schema_result = validate_schema(df) quality_result = validate_data_quality(df) # Compile results validation_result = { 'file': f"s3://{bucket}/{key}", 'timestamp': datetime.utcnow().isoformat(), 'checksum': checksum, 'schema_validation': schema_result, 'quality_validation': quality_result, 'status': ( 'PASSED' if schema_result['valid'] and quality_result['valid'] else 'FAILED' ) } logger.info(f"Validation status: {validation_result['status']}") # Save validation report report_key = key.replace('raw/', 'reports/').replace('.csv', '_report.json') s3_client.put_object( Bucket=bucket, Key=report_key, Body=json.dumps(validation_result, indent=2), ServerSideEncryption='aws:kms' ) # If passed, copy to validated bucket if validation_result['status'] == 'PASSED': validated_bucket = bucket.replace('raw-data', 'validated-data') validated_key = key.replace('raw/', 'validated/') s3_client.copy_object( CopySource={'Bucket': bucket, 'Key': key}, Bucket=validated_bucket, Key=validated_key, ServerSideEncryption='aws:kms' ) logger.info(f"Copied to: s3://{validated_bucket}/{validated_key}") # Send success notification send_notification( os.environ['SNS_TOPIC_ARN'], f' Data Validation Passed: {key}', f"File validated successfully\n\n" f"Stats:\n" f"- Rows: {quality_result['stats']['row_count']}\n" f"- Columns: {quality_result['stats']['column_count']}\n" f"- Duplicates: {quality_result['stats']['duplicate_count']}\n\n" f"Validation report: s3://{bucket}/{report_key}" ) else: # Send failure notification all_issues = schema_result['issues'] + quality_result['issues'] send_notification( os.environ['SNS_TOPIC_ARN'], f' Data Validation Failed: {key}', f"Validation issues found:\n\n" + "\n".join(f"- {issue}" for issue in all_issues) ) return { 'statusCode': 200, 'body': json.dumps(validation_result) } except Exception as e: logger.error(f"Error: {e}", exc_info=True) send_notification( os.environ.get('SNS_TOPIC_ARN', ''), f' Data Validation Error', f"Error processing {key}:\n{str(e)}" ) return { 'statusCode': 500, 'body': json.dumps({'error': str(e)}) } Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: import json import os import boto3 import logging import pandas as pd from io import BytesIO import hashlib from datetime import datetime # Configure logging logger = logging.getLogger() logger.setLevel(logging.INFO) # AWS clients s3_client = boto3.client('s3') sns_client = boto3.client('sns') # Validation rules VALIDATION_RULES = { 'required_columns': ['timestamp', 'feature_1', 'feature_2', 'target'], 'numeric_columns': ['feature_1', 'feature_2', 'target'], 'max_null_percentage': 0.05, # 5% max 'min_rows': 5, # Adjusted for testing - use 100+ for production 'max_rows': 1000000, 'date_columns': ['timestamp'] } def calculate_checksum(data: bytes) -> str: """Calculate SHA256 checksum""" return hashlib.sha256(data).hexdigest() def validate_schema(df: pd.DataFrame) -> dict: """Validate DataFrame schema""" issues = [] # Check required columns missing_cols = set(VALIDATION_RULES['required_columns']) - set(df.columns) if missing_cols: issues.append(f"Missing columns: {missing_cols}") # Check numeric columns for col in VALIDATION_RULES['numeric_columns']: if col in df.columns: if not pd.api.types.is_numeric_dtype(df[col]): issues.append(f"Column '{col}' should be numeric") # Check date columns for col in VALIDATION_RULES['date_columns']: if col in df.columns: try: pd.to_datetime(df[col]) except: issues.append(f"Column '{col}' should be valid datetime") return { 'valid': len(issues) == 0, 'issues': issues } def validate_data_quality(df: pd.DataFrame) -> dict: """Validate data quality""" issues = [] # Check row count if len(df) < VALIDATION_RULES['min_rows']: issues.append( f"Insufficient rows: {len(df)} < {VALIDATION_RULES['min_rows']}" ) elif len(df) > VALIDATION_RULES['max_rows']: issues.append( f"Too many rows: {len(df)} > {VALIDATION_RULES['max_rows']}" ) # Check null values for col in df.columns: null_pct = df[col].isnull().sum() / len(df) if null_pct > VALIDATION_RULES['max_null_percentage']: issues.append( f"Column '{col}' has {null_pct:.2%} nulls " f"(max: {VALIDATION_RULES['max_null_percentage']:.2%})" ) # Check duplicates duplicate_count = df.duplicated().sum() if duplicate_count > 0: issues.append(f"Found {duplicate_count} duplicate rows") # Data distribution checks stats = {} for col in VALIDATION_RULES['numeric_columns']: if col in df.columns: stats[col] = { 'mean': float(df[col].mean()), 'std': float(df[col].std()), 'min': float(df[col].min()), 'max': float(df[col].max()), 'null_count': int(df[col].isnull().sum()) } return { 'valid': len(issues) == 0, 'issues': issues, 'stats': { 'row_count': len(df), 'column_count': len(df.columns), 'duplicate_count': duplicate_count, 'column_stats': stats } } def send_notification(topic_arn: str, subject: str, message: str): """Send SNS notification""" try: sns_client.publish( TopicArn=topic_arn, Subject=subject, Message=message ) logger.info(f"Sent notification: {subject}") except Exception as e: logger.error(f"Failed to send notification: {e}") def lambda_handler(event, context): """ Lambda handler triggered by S3 upload """ try: # Parse S3 event record = event['Records'][0] bucket = record['s3']['bucket']['name'] key = record['s3']['object']['key'] logger.info(f"Processing: s3://{bucket}/{key}") # Download file response = s3_client.get_object(Bucket=bucket, Key=key) file_content = response['Body'].read() # Calculate checksum checksum = calculate_checksum(file_content) # Load data df = pd.read_csv(BytesIO(file_content)) logger.info(f"Loaded {len(df)} rows, {len(df.columns)} columns") # Run validations schema_result = validate_schema(df) quality_result = validate_data_quality(df) # Compile results validation_result = { 'file': f"s3://{bucket}/{key}", 'timestamp': datetime.utcnow().isoformat(), 'checksum': checksum, 'schema_validation': schema_result, 'quality_validation': quality_result, 'status': ( 'PASSED' if schema_result['valid'] and quality_result['valid'] else 'FAILED' ) } logger.info(f"Validation status: {validation_result['status']}") # Save validation report report_key = key.replace('raw/', 'reports/').replace('.csv', '_report.json') s3_client.put_object( Bucket=bucket, Key=report_key, Body=json.dumps(validation_result, indent=2), ServerSideEncryption='aws:kms' ) # If passed, copy to validated bucket if validation_result['status'] == 'PASSED': validated_bucket = bucket.replace('raw-data', 'validated-data') validated_key = key.replace('raw/', 'validated/') s3_client.copy_object( CopySource={'Bucket': bucket, 'Key': key}, Bucket=validated_bucket, Key=validated_key, ServerSideEncryption='aws:kms' ) logger.info(f"Copied to: s3://{validated_bucket}/{validated_key}") # Send success notification send_notification( os.environ['SNS_TOPIC_ARN'], f' Data Validation Passed: {key}', f"File validated successfully\n\n" f"Stats:\n" f"- Rows: {quality_result['stats']['row_count']}\n" f"- Columns: {quality_result['stats']['column_count']}\n" f"- Duplicates: {quality_result['stats']['duplicate_count']}\n\n" f"Validation report: s3://{bucket}/{report_key}" ) else: # Send failure notification all_issues = schema_result['issues'] + quality_result['issues'] send_notification( os.environ['SNS_TOPIC_ARN'], f' Data Validation Failed: {key}', f"Validation issues found:\n\n" + "\n".join(f"- {issue}" for issue in all_issues) ) return { 'statusCode': 200, 'body': json.dumps(validation_result) } except Exception as e: logger.error(f"Error: {e}", exc_info=True) send_notification( os.environ.get('SNS_TOPIC_ARN', ''), f' Data Validation Error', f"Error processing {key}:\n{str(e)}" ) return { 'statusCode': 500, 'body': json.dumps({'error': str(e)}) } COMMAND_BLOCK: import json import os import boto3 import logging import pandas as pd from io import BytesIO import hashlib from datetime import datetime # Configure logging logger = logging.getLogger() logger.setLevel(logging.INFO) # AWS clients s3_client = boto3.client('s3') sns_client = boto3.client('sns') # Validation rules VALIDATION_RULES = { 'required_columns': ['timestamp', 'feature_1', 'feature_2', 'target'], 'numeric_columns': ['feature_1', 'feature_2', 'target'], 'max_null_percentage': 0.05, # 5% max 'min_rows': 5, # Adjusted for testing - use 100+ for production 'max_rows': 1000000, 'date_columns': ['timestamp'] } def calculate_checksum(data: bytes) -> str: """Calculate SHA256 checksum""" return hashlib.sha256(data).hexdigest() def validate_schema(df: pd.DataFrame) -> dict: """Validate DataFrame schema""" issues = [] # Check required columns missing_cols = set(VALIDATION_RULES['required_columns']) - set(df.columns) if missing_cols: issues.append(f"Missing columns: {missing_cols}") # Check numeric columns for col in VALIDATION_RULES['numeric_columns']: if col in df.columns: if not pd.api.types.is_numeric_dtype(df[col]): issues.append(f"Column '{col}' should be numeric") # Check date columns for col in VALIDATION_RULES['date_columns']: if col in df.columns: try: pd.to_datetime(df[col]) except: issues.append(f"Column '{col}' should be valid datetime") return { 'valid': len(issues) == 0, 'issues': issues } def validate_data_quality(df: pd.DataFrame) -> dict: """Validate data quality""" issues = [] # Check row count if len(df) < VALIDATION_RULES['min_rows']: issues.append( f"Insufficient rows: {len(df)} < {VALIDATION_RULES['min_rows']}" ) elif len(df) > VALIDATION_RULES['max_rows']: issues.append( f"Too many rows: {len(df)} > {VALIDATION_RULES['max_rows']}" ) # Check null values for col in df.columns: null_pct = df[col].isnull().sum() / len(df) if null_pct > VALIDATION_RULES['max_null_percentage']: issues.append( f"Column '{col}' has {null_pct:.2%} nulls " f"(max: {VALIDATION_RULES['max_null_percentage']:.2%})" ) # Check duplicates duplicate_count = df.duplicated().sum() if duplicate_count > 0: issues.append(f"Found {duplicate_count} duplicate rows") # Data distribution checks stats = {} for col in VALIDATION_RULES['numeric_columns']: if col in df.columns: stats[col] = { 'mean': float(df[col].mean()), 'std': float(df[col].std()), 'min': float(df[col].min()), 'max': float(df[col].max()), 'null_count': int(df[col].isnull().sum()) } return { 'valid': len(issues) == 0, 'issues': issues, 'stats': { 'row_count': len(df), 'column_count': len(df.columns), 'duplicate_count': duplicate_count, 'column_stats': stats } } def send_notification(topic_arn: str, subject: str, message: str): """Send SNS notification""" try: sns_client.publish( TopicArn=topic_arn, Subject=subject, Message=message ) logger.info(f"Sent notification: {subject}") except Exception as e: logger.error(f"Failed to send notification: {e}") def lambda_handler(event, context): """ Lambda handler triggered by S3 upload """ try: # Parse S3 event record = event['Records'][0] bucket = record['s3']['bucket']['name'] key = record['s3']['object']['key'] logger.info(f"Processing: s3://{bucket}/{key}") # Download file response = s3_client.get_object(Bucket=bucket, Key=key) file_content = response['Body'].read() # Calculate checksum checksum = calculate_checksum(file_content) # Load data df = pd.read_csv(BytesIO(file_content)) logger.info(f"Loaded {len(df)} rows, {len(df.columns)} columns") # Run validations schema_result = validate_schema(df) quality_result = validate_data_quality(df) # Compile results validation_result = { 'file': f"s3://{bucket}/{key}", 'timestamp': datetime.utcnow().isoformat(), 'checksum': checksum, 'schema_validation': schema_result, 'quality_validation': quality_result, 'status': ( 'PASSED' if schema_result['valid'] and quality_result['valid'] else 'FAILED' ) } logger.info(f"Validation status: {validation_result['status']}") # Save validation report report_key = key.replace('raw/', 'reports/').replace('.csv', '_report.json') s3_client.put_object( Bucket=bucket, Key=report_key, Body=json.dumps(validation_result, indent=2), ServerSideEncryption='aws:kms' ) # If passed, copy to validated bucket if validation_result['status'] == 'PASSED': validated_bucket = bucket.replace('raw-data', 'validated-data') validated_key = key.replace('raw/', 'validated/') s3_client.copy_object( CopySource={'Bucket': bucket, 'Key': key}, Bucket=validated_bucket, Key=validated_key, ServerSideEncryption='aws:kms' ) logger.info(f"Copied to: s3://{validated_bucket}/{validated_key}") # Send success notification send_notification( os.environ['SNS_TOPIC_ARN'], f' Data Validation Passed: {key}', f"File validated successfully\n\n" f"Stats:\n" f"- Rows: {quality_result['stats']['row_count']}\n" f"- Columns: {quality_result['stats']['column_count']}\n" f"- Duplicates: {quality_result['stats']['duplicate_count']}\n\n" f"Validation report: s3://{bucket}/{report_key}" ) else: # Send failure notification all_issues = schema_result['issues'] + quality_result['issues'] send_notification( os.environ['SNS_TOPIC_ARN'], f' Data Validation Failed: {key}', f"Validation issues found:\n\n" + "\n".join(f"- {issue}" for issue in all_issues) ) return { 'statusCode': 200, 'body': json.dumps(validation_result) } except Exception as e: logger.error(f"Error: {e}", exc_info=True) send_notification( os.environ.get('SNS_TOPIC_ARN', ''), f' Data Validation Error', f"Error processing {key}:\n{str(e)}" ) return { 'statusCode': 500, 'body': json.dumps({'error': str(e)}) } CODE_BLOCK: pandas==2.1.0 boto3==1.28.85 Enter fullscreen mode Exit fullscreen mode CODE_BLOCK: pandas==2.1.0 boto3==1.28.85 CODE_BLOCK: pandas==2.1.0 boto3==1.28.85 COMMAND_BLOCK: cd lambda/data-validation # Create package directory mkdir package # Install dependencies pip install -r requirements.txt -t package/ # Copy handler cp handler.py package/ # Create deployment package cd package zip -r ../function.zip . cd .. Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: cd lambda/data-validation # Create package directory mkdir package # Install dependencies pip install -r requirements.txt -t package/ # Copy handler cp handler.py package/ # Create deployment package cd package zip -r ../function.zip . cd .. COMMAND_BLOCK: cd lambda/data-validation # Create package directory mkdir package # Install dependencies pip install -r requirements.txt -t package/ # Copy handler cp handler.py package/ # Create deployment package cd package zip -r ../function.zip . cd .. COMMAND_BLOCK: # IAM role for Lambda resource "aws_iam_role" "data_validation" { name = "${var.project_name}-data-validation-lambda" assume_role_policy = jsonencode({ Version = "2012-10-17" Statement = [{ Action = "sts:AssumeRole" Effect = "Allow" Principal = { Service = "lambda.amazonaws.com" } }] }) } # IAM policy for Lambda resource "aws_iam_role_policy" "data_validation" { name = "${var.project_name}-data-validation-policy" role = aws_iam_role.data_validation.id policy = jsonencode({ Version = "2012-10-17" Statement = [ { Effect = "Allow" Action = [ "s3:GetObject", "s3:PutObject" ] Resource = [ "${aws_s3_bucket.raw_data.arn}/*", "${aws_s3_bucket.validated_data.arn}/*" ] }, { Effect = "Allow" Action = [ "kms:Decrypt", "kms:GenerateDataKey" ] Resource = aws_kms_key.data_encryption.arn }, { Effect = "Allow" Action = "sns:Publish" Resource = aws_sns_topic.validation_notifications.arn }, { Effect = "Allow" Action = [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ] Resource = "arn:aws:logs:*:*:*" } ] }) } # SNS topic for notifications resource "aws_sns_topic" "validation_notifications" { name = "${var.project_name}-validation-notifications" } resource "aws_sns_topic_subscription" "email" { topic_arn = aws_sns_topic.validation_notifications.arn protocol = "email" endpoint = var.notification_email } # Lambda function resource "aws_lambda_function" "data_validation" { filename = "${path.module}/../lambda/data-validation/function.zip" function_name = "${var.project_name}-data-validation" role = aws_iam_role.data_validation.arn handler = "handler.lambda_handler" runtime = "python3.11" timeout = 300 memory_size = 1024 source_code_hash = filebase64sha256("${path.module}/../lambda/data-validation/function.zip") environment { variables = { SNS_TOPIC_ARN = aws_sns_topic.validation_notifications.arn } } tags = { Name = "Data Validation Lambda" Environment = var.environment } } # S3 trigger resource "aws_s3_bucket_notification" "raw_data_trigger" { bucket = aws_s3_bucket.raw_data.id lambda_function { lambda_function_arn = aws_lambda_function.data_validation.arn events = ["s3:ObjectCreated:*"] filter_prefix = "raw/" filter_suffix = ".csv" } depends_on = [ aws_lambda_permission.allow_s3, aws_lambda_function.data_validation ] } resource "aws_lambda_permission" "allow_s3" { statement_id = "AllowS3Invoke" action = "lambda:InvokeFunction" function_name = aws_lambda_function.data_validation.function_name principal = "s3.amazonaws.com" source_arn = aws_s3_bucket.raw_data.arn } # CloudWatch Log Group resource "aws_cloudwatch_log_group" "data_validation" { name = "/aws/lambda/${aws_lambda_function.data_validation.function_name}" retention_in_days = 30 } Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # IAM role for Lambda resource "aws_iam_role" "data_validation" { name = "${var.project_name}-data-validation-lambda" assume_role_policy = jsonencode({ Version = "2012-10-17" Statement = [{ Action = "sts:AssumeRole" Effect = "Allow" Principal = { Service = "lambda.amazonaws.com" } }] }) } # IAM policy for Lambda resource "aws_iam_role_policy" "data_validation" { name = "${var.project_name}-data-validation-policy" role = aws_iam_role.data_validation.id policy = jsonencode({ Version = "2012-10-17" Statement = [ { Effect = "Allow" Action = [ "s3:GetObject", "s3:PutObject" ] Resource = [ "${aws_s3_bucket.raw_data.arn}/*", "${aws_s3_bucket.validated_data.arn}/*" ] }, { Effect = "Allow" Action = [ "kms:Decrypt", "kms:GenerateDataKey" ] Resource = aws_kms_key.data_encryption.arn }, { Effect = "Allow" Action = "sns:Publish" Resource = aws_sns_topic.validation_notifications.arn }, { Effect = "Allow" Action = [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ] Resource = "arn:aws:logs:*:*:*" } ] }) } # SNS topic for notifications resource "aws_sns_topic" "validation_notifications" { name = "${var.project_name}-validation-notifications" } resource "aws_sns_topic_subscription" "email" { topic_arn = aws_sns_topic.validation_notifications.arn protocol = "email" endpoint = var.notification_email } # Lambda function resource "aws_lambda_function" "data_validation" { filename = "${path.module}/../lambda/data-validation/function.zip" function_name = "${var.project_name}-data-validation" role = aws_iam_role.data_validation.arn handler = "handler.lambda_handler" runtime = "python3.11" timeout = 300 memory_size = 1024 source_code_hash = filebase64sha256("${path.module}/../lambda/data-validation/function.zip") environment { variables = { SNS_TOPIC_ARN = aws_sns_topic.validation_notifications.arn } } tags = { Name = "Data Validation Lambda" Environment = var.environment } } # S3 trigger resource "aws_s3_bucket_notification" "raw_data_trigger" { bucket = aws_s3_bucket.raw_data.id lambda_function { lambda_function_arn = aws_lambda_function.data_validation.arn events = ["s3:ObjectCreated:*"] filter_prefix = "raw/" filter_suffix = ".csv" } depends_on = [ aws_lambda_permission.allow_s3, aws_lambda_function.data_validation ] } resource "aws_lambda_permission" "allow_s3" { statement_id = "AllowS3Invoke" action = "lambda:InvokeFunction" function_name = aws_lambda_function.data_validation.function_name principal = "s3.amazonaws.com" source_arn = aws_s3_bucket.raw_data.arn } # CloudWatch Log Group resource "aws_cloudwatch_log_group" "data_validation" { name = "/aws/lambda/${aws_lambda_function.data_validation.function_name}" retention_in_days = 30 } COMMAND_BLOCK: # IAM role for Lambda resource "aws_iam_role" "data_validation" { name = "${var.project_name}-data-validation-lambda" assume_role_policy = jsonencode({ Version = "2012-10-17" Statement = [{ Action = "sts:AssumeRole" Effect = "Allow" Principal = { Service = "lambda.amazonaws.com" } }] }) } # IAM policy for Lambda resource "aws_iam_role_policy" "data_validation" { name = "${var.project_name}-data-validation-policy" role = aws_iam_role.data_validation.id policy = jsonencode({ Version = "2012-10-17" Statement = [ { Effect = "Allow" Action = [ "s3:GetObject", "s3:PutObject" ] Resource = [ "${aws_s3_bucket.raw_data.arn}/*", "${aws_s3_bucket.validated_data.arn}/*" ] }, { Effect = "Allow" Action = [ "kms:Decrypt", "kms:GenerateDataKey" ] Resource = aws_kms_key.data_encryption.arn }, { Effect = "Allow" Action = "sns:Publish" Resource = aws_sns_topic.validation_notifications.arn }, { Effect = "Allow" Action = [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ] Resource = "arn:aws:logs:*:*:*" } ] }) } # SNS topic for notifications resource "aws_sns_topic" "validation_notifications" { name = "${var.project_name}-validation-notifications" } resource "aws_sns_topic_subscription" "email" { topic_arn = aws_sns_topic.validation_notifications.arn protocol = "email" endpoint = var.notification_email } # Lambda function resource "aws_lambda_function" "data_validation" { filename = "${path.module}/../lambda/data-validation/function.zip" function_name = "${var.project_name}-data-validation" role = aws_iam_role.data_validation.arn handler = "handler.lambda_handler" runtime = "python3.11" timeout = 300 memory_size = 1024 source_code_hash = filebase64sha256("${path.module}/../lambda/data-validation/function.zip") environment { variables = { SNS_TOPIC_ARN = aws_sns_topic.validation_notifications.arn } } tags = { Name = "Data Validation Lambda" Environment = var.environment } } # S3 trigger resource "aws_s3_bucket_notification" "raw_data_trigger" { bucket = aws_s3_bucket.raw_data.id lambda_function { lambda_function_arn = aws_lambda_function.data_validation.arn events = ["s3:ObjectCreated:*"] filter_prefix = "raw/" filter_suffix = ".csv" } depends_on = [ aws_lambda_permission.allow_s3, aws_lambda_function.data_validation ] } resource "aws_lambda_permission" "allow_s3" { statement_id = "AllowS3Invoke" action = "lambda:InvokeFunction" function_name = aws_lambda_function.data_validation.function_name principal = "s3.amazonaws.com" source_arn = aws_s3_bucket.raw_data.arn } # CloudWatch Log Group resource "aws_cloudwatch_log_group" "data_validation" { name = "/aws/lambda/${aws_lambda_function.data_validation.function_name}" retention_in_days = 30 } COMMAND_BLOCK: # CloudTrail logs bucket resource "aws_s3_bucket" "cloudtrail_logs" { bucket = "${var.project_name}-cloudtrail-logs-${data.aws_caller_identity.current.account_id}" } resource "aws_s3_bucket_public_access_block" "cloudtrail_logs" { bucket = aws_s3_bucket.cloudtrail_logs.id block_public_acls = true block_public_policy = true ignore_public_acls = true restrict_public_buckets = true } resource "aws_s3_bucket_policy" "cloudtrail_logs" { bucket = aws_s3_bucket.cloudtrail_logs.id policy = jsonencode({ Version = "2012-10-17" Statement = [ { Sid = "AWSCloudTrailAclCheck" Effect = "Allow" Principal = { Service = "cloudtrail.amazonaws.com" } Action = "s3:GetBucketAcl" Resource = aws_s3_bucket.cloudtrail_logs.arn }, { Sid = "AWSCloudTrailWrite" Effect = "Allow" Principal = { Service = "cloudtrail.amazonaws.com" } Action = "s3:PutObject" Resource = "${aws_s3_bucket.cloudtrail_logs.arn}/*" Condition = { StringEquals = { "s3:x-amz-acl" = "bucket-owner-full-control" } } } ] }) } # CloudTrail resource "aws_cloudtrail" "data_events" { name = "${var.project_name}-data-trail" s3_bucket_name = aws_s3_bucket.cloudtrail_logs.id include_global_service_events = true is_multi_region_trail = true enable_logging = true enable_log_file_validation = true depends_on = [aws_s3_bucket_policy.cloudtrail_logs] event_selector { read_write_type = "All" include_management_events = true data_resource { type = "AWS::S3::Object" values = [ "${aws_s3_bucket.raw_data.arn}/*", "${aws_s3_bucket.validated_data.arn}/*", "${aws_s3_bucket.model_artifacts.arn}/*" ] } } tags = { Name = "ML Data Trail" Environment = var.environment } } Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # CloudTrail logs bucket resource "aws_s3_bucket" "cloudtrail_logs" { bucket = "${var.project_name}-cloudtrail-logs-${data.aws_caller_identity.current.account_id}" } resource "aws_s3_bucket_public_access_block" "cloudtrail_logs" { bucket = aws_s3_bucket.cloudtrail_logs.id block_public_acls = true block_public_policy = true ignore_public_acls = true restrict_public_buckets = true } resource "aws_s3_bucket_policy" "cloudtrail_logs" { bucket = aws_s3_bucket.cloudtrail_logs.id policy = jsonencode({ Version = "2012-10-17" Statement = [ { Sid = "AWSCloudTrailAclCheck" Effect = "Allow" Principal = { Service = "cloudtrail.amazonaws.com" } Action = "s3:GetBucketAcl" Resource = aws_s3_bucket.cloudtrail_logs.arn }, { Sid = "AWSCloudTrailWrite" Effect = "Allow" Principal = { Service = "cloudtrail.amazonaws.com" } Action = "s3:PutObject" Resource = "${aws_s3_bucket.cloudtrail_logs.arn}/*" Condition = { StringEquals = { "s3:x-amz-acl" = "bucket-owner-full-control" } } } ] }) } # CloudTrail resource "aws_cloudtrail" "data_events" { name = "${var.project_name}-data-trail" s3_bucket_name = aws_s3_bucket.cloudtrail_logs.id include_global_service_events = true is_multi_region_trail = true enable_logging = true enable_log_file_validation = true depends_on = [aws_s3_bucket_policy.cloudtrail_logs] event_selector { read_write_type = "All" include_management_events = true data_resource { type = "AWS::S3::Object" values = [ "${aws_s3_bucket.raw_data.arn}/*", "${aws_s3_bucket.validated_data.arn}/*", "${aws_s3_bucket.model_artifacts.arn}/*" ] } } tags = { Name = "ML Data Trail" Environment = var.environment } } COMMAND_BLOCK: # CloudTrail logs bucket resource "aws_s3_bucket" "cloudtrail_logs" { bucket = "${var.project_name}-cloudtrail-logs-${data.aws_caller_identity.current.account_id}" } resource "aws_s3_bucket_public_access_block" "cloudtrail_logs" { bucket = aws_s3_bucket.cloudtrail_logs.id block_public_acls = true block_public_policy = true ignore_public_acls = true restrict_public_buckets = true } resource "aws_s3_bucket_policy" "cloudtrail_logs" { bucket = aws_s3_bucket.cloudtrail_logs.id policy = jsonencode({ Version = "2012-10-17" Statement = [ { Sid = "AWSCloudTrailAclCheck" Effect = "Allow" Principal = { Service = "cloudtrail.amazonaws.com" } Action = "s3:GetBucketAcl" Resource = aws_s3_bucket.cloudtrail_logs.arn }, { Sid = "AWSCloudTrailWrite" Effect = "Allow" Principal = { Service = "cloudtrail.amazonaws.com" } Action = "s3:PutObject" Resource = "${aws_s3_bucket.cloudtrail_logs.arn}/*" Condition = { StringEquals = { "s3:x-amz-acl" = "bucket-owner-full-control" } } } ] }) } # CloudTrail resource "aws_cloudtrail" "data_events" { name = "${var.project_name}-data-trail" s3_bucket_name = aws_s3_bucket.cloudtrail_logs.id include_global_service_events = true is_multi_region_trail = true enable_logging = true enable_log_file_validation = true depends_on = [aws_s3_bucket_policy.cloudtrail_logs] event_selector { read_write_type = "All" include_management_events = true data_resource { type = "AWS::S3::Object" values = [ "${aws_s3_bucket.raw_data.arn}/*", "${aws_s3_bucket.validated_data.arn}/*", "${aws_s3_bucket.model_artifacts.arn}/*" ] } } tags = { Name = "ML Data Trail" Environment = var.environment } } COMMAND_BLOCK: # CloudWatch dashboard resource "aws_cloudwatch_dashboard" "data_pipeline" { dashboard_name = "${var.project_name}-data-pipeline" dashboard_body = jsonencode({ widgets = [ { type = "metric" x = 0 y = 0 width = 12 height = 6 properties = { metrics = [ ["AWS/Lambda", "Invocations", { stat = "Sum" label = "Lambda Invocations" dimensions = { FunctionName = aws_lambda_function.data_validation.function_name } }], [".", "Errors", { stat = "Sum" label = "Lambda Errors" dimensions = { FunctionName = aws_lambda_function.data_validation.function_name } }], [".", "Duration", { stat = "Average" label = "Avg Duration (ms)" dimensions = { FunctionName = aws_lambda_function.data_validation.function_name } }] ] period = 300 stat = "Average" region = var.aws_region title = "Lambda Metrics" view = "timeSeries" stacked = false } } ] }) } # Alarms resource "aws_cloudwatch_metric_alarm" "lambda_errors" { alarm_name = "${var.project_name}-lambda-errors" comparison_operator = "GreaterThanThreshold" evaluation_periods = "1" metric_name = "Errors" namespace = "AWS/Lambda" period = "300" statistic = "Sum" threshold = "5" alarm_description = "Lambda function errors" alarm_actions = [aws_sns_topic.validation_notifications.arn] dimensions = { FunctionName = aws_lambda_function.data_validation.function_name } } Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # CloudWatch dashboard resource "aws_cloudwatch_dashboard" "data_pipeline" { dashboard_name = "${var.project_name}-data-pipeline" dashboard_body = jsonencode({ widgets = [ { type = "metric" x = 0 y = 0 width = 12 height = 6 properties = { metrics = [ ["AWS/Lambda", "Invocations", { stat = "Sum" label = "Lambda Invocations" dimensions = { FunctionName = aws_lambda_function.data_validation.function_name } }], [".", "Errors", { stat = "Sum" label = "Lambda Errors" dimensions = { FunctionName = aws_lambda_function.data_validation.function_name } }], [".", "Duration", { stat = "Average" label = "Avg Duration (ms)" dimensions = { FunctionName = aws_lambda_function.data_validation.function_name } }] ] period = 300 stat = "Average" region = var.aws_region title = "Lambda Metrics" view = "timeSeries" stacked = false } } ] }) } # Alarms resource "aws_cloudwatch_metric_alarm" "lambda_errors" { alarm_name = "${var.project_name}-lambda-errors" comparison_operator = "GreaterThanThreshold" evaluation_periods = "1" metric_name = "Errors" namespace = "AWS/Lambda" period = "300" statistic = "Sum" threshold = "5" alarm_description = "Lambda function errors" alarm_actions = [aws_sns_topic.validation_notifications.arn] dimensions = { FunctionName = aws_lambda_function.data_validation.function_name } } COMMAND_BLOCK: # CloudWatch dashboard resource "aws_cloudwatch_dashboard" "data_pipeline" { dashboard_name = "${var.project_name}-data-pipeline" dashboard_body = jsonencode({ widgets = [ { type = "metric" x = 0 y = 0 width = 12 height = 6 properties = { metrics = [ ["AWS/Lambda", "Invocations", { stat = "Sum" label = "Lambda Invocations" dimensions = { FunctionName = aws_lambda_function.data_validation.function_name } }], [".", "Errors", { stat = "Sum" label = "Lambda Errors" dimensions = { FunctionName = aws_lambda_function.data_validation.function_name } }], [".", "Duration", { stat = "Average" label = "Avg Duration (ms)" dimensions = { FunctionName = aws_lambda_function.data_validation.function_name } }] ] period = 300 stat = "Average" region = var.aws_region title = "Lambda Metrics" view = "timeSeries" stacked = false } } ] }) } # Alarms resource "aws_cloudwatch_metric_alarm" "lambda_errors" { alarm_name = "${var.project_name}-lambda-errors" comparison_operator = "GreaterThanThreshold" evaluation_periods = "1" metric_name = "Errors" namespace = "AWS/Lambda" period = "300" statistic = "Sum" threshold = "5" alarm_description = "Lambda function errors" alarm_actions = [aws_sns_topic.validation_notifications.arn] dimensions = { FunctionName = aws_lambda_function.data_validation.function_name } } CODE_BLOCK: timestamp,feature_1,feature_2,target 2024-01-01T00:00:00,1.5,2.3,0 2024-01-01T01:00:00,1.8,2.1,1 2024-01-01T02:00:00,1.2,2.5,0 2024-01-01T03:00:00,1.9,2.0,1 2024-01-01T04:00:00,1.4,2.4,0 Enter fullscreen mode Exit fullscreen mode CODE_BLOCK: timestamp,feature_1,feature_2,target 2024-01-01T00:00:00,1.5,2.3,0 2024-01-01T01:00:00,1.8,2.1,1 2024-01-01T02:00:00,1.2,2.5,0 2024-01-01T03:00:00,1.9,2.0,1 2024-01-01T04:00:00,1.4,2.4,0 CODE_BLOCK: timestamp,feature_1,feature_2,target 2024-01-01T00:00:00,1.5,2.3,0 2024-01-01T01:00:00,1.8,2.1,1 2024-01-01T02:00:00,1.2,2.5,0 2024-01-01T03:00:00,1.9,2.0,1 2024-01-01T04:00:00,1.4,2.4,0 COMMAND_BLOCK: # Upload test file aws s3 cp test-data/sample.csv \ s3://ml-pipeline-raw-data-dev-YOUR_ACCOUNT_ID/raw/sample.csv \ --server-side-encryption aws:kms Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # Upload test file aws s3 cp test-data/sample.csv \ s3://ml-pipeline-raw-data-dev-YOUR_ACCOUNT_ID/raw/sample.csv \ --server-side-encryption aws:kms COMMAND_BLOCK: # Upload test file aws s3 cp test-data/sample.csv \ s3://ml-pipeline-raw-data-dev-YOUR_ACCOUNT_ID/raw/sample.csv \ --server-side-encryption aws:kms COMMAND_BLOCK: # View Lambda logs aws logs tail /aws/lambda/ml-pipeline-data-validation --follow Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # View Lambda logs aws logs tail /aws/lambda/ml-pipeline-data-validation --follow COMMAND_BLOCK: # View Lambda logs aws logs tail /aws/lambda/ml-pipeline-data-validation --follow COMMAND_BLOCK: # Check validated bucket aws s3 ls s3://ml-pipeline-validated-data-dev-YOUR_ACCOUNT_ID/validated/ # Download validation report aws s3 cp \ s3://ml-pipeline-raw-data-dev-YOUR_ACCOUNT_ID/reports/sample_report.json \ ./ Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # Check validated bucket aws s3 ls s3://ml-pipeline-validated-data-dev-YOUR_ACCOUNT_ID/validated/ # Download validation report aws s3 cp \ s3://ml-pipeline-raw-data-dev-YOUR_ACCOUNT_ID/reports/sample_report.json \ ./ COMMAND_BLOCK: # Check validated bucket aws s3 ls s3://ml-pipeline-validated-data-dev-YOUR_ACCOUNT_ID/validated/ # Download validation report aws s3 cp \ s3://ml-pipeline-raw-data-dev-YOUR_ACCOUNT_ID/reports/sample_report.json \ ./ CODE_BLOCK: import boto3 cloudwatch = boto3.client('cloudwatch') def publish_metrics(validation_result): """Publish custom metrics to CloudWatch""" metrics = [ { 'MetricName': 'DataQualityScore', 'Value': 100.0 if validation_result['status'] == 'PASSED' else 0.0, 'Unit': 'Percent' }, { 'MetricName': 'RowCount', 'Value': validation_result['quality_validation']['stats']['row_count'], 'Unit': 'Count' }, { 'MetricName': 'DuplicateRows', 'Value': validation_result['quality_validation']['stats']['duplicate_count'], 'Unit': 'Count' } ] cloudwatch.put_metric_data( Namespace='MLPipeline/DataQuality', MetricData=metrics ) Enter fullscreen mode Exit fullscreen mode CODE_BLOCK: import boto3 cloudwatch = boto3.client('cloudwatch') def publish_metrics(validation_result): """Publish custom metrics to CloudWatch""" metrics = [ { 'MetricName': 'DataQualityScore', 'Value': 100.0 if validation_result['status'] == 'PASSED' else 0.0, 'Unit': 'Percent' }, { 'MetricName': 'RowCount', 'Value': validation_result['quality_validation']['stats']['row_count'], 'Unit': 'Count' }, { 'MetricName': 'DuplicateRows', 'Value': validation_result['quality_validation']['stats']['duplicate_count'], 'Unit': 'Count' } ] cloudwatch.put_metric_data( Namespace='MLPipeline/DataQuality', MetricData=metrics ) CODE_BLOCK: import boto3 cloudwatch = boto3.client('cloudwatch') def publish_metrics(validation_result): """Publish custom metrics to CloudWatch""" metrics = [ { 'MetricName': 'DataQualityScore', 'Value': 100.0 if validation_result['status'] == 'PASSED' else 0.0, 'Unit': 'Percent' }, { 'MetricName': 'RowCount', 'Value': validation_result['quality_validation']['stats']['row_count'], 'Unit': 'Count' }, { 'MetricName': 'DuplicateRows', 'Value': validation_result['quality_validation']['stats']['duplicate_count'], 'Unit': 'Count' } ] cloudwatch.put_metric_data( Namespace='MLPipeline/DataQuality', MetricData=metrics ) COMMAND_BLOCK: def create_data_lineage(bucket, key, checksum): """Create data lineage metadata""" metadata = { 'source_file': key, 'checksum': checksum, 'ingestion_time': datetime.utcnow().isoformat(), 'validation_version': '1.0', 'pipeline_version': os.environ.get('PIPELINE_VERSION', 'unknown') } # Store in DynamoDB or S3 s3_client.put_object( Bucket=bucket, Key=f"lineage/{checksum}.json", Body=json.dumps(metadata, indent=2) ) Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: def create_data_lineage(bucket, key, checksum): """Create data lineage metadata""" metadata = { 'source_file': key, 'checksum': checksum, 'ingestion_time': datetime.utcnow().isoformat(), 'validation_version': '1.0', 'pipeline_version': os.environ.get('PIPELINE_VERSION', 'unknown') } # Store in DynamoDB or S3 s3_client.put_object( Bucket=bucket, Key=f"lineage/{checksum}.json", Body=json.dumps(metadata, indent=2) ) COMMAND_BLOCK: def create_data_lineage(bucket, key, checksum): """Create data lineage metadata""" metadata = { 'source_file': key, 'checksum': checksum, 'ingestion_time': datetime.utcnow().isoformat(), 'validation_version': '1.0', 'pipeline_version': os.environ.get('PIPELINE_VERSION', 'unknown') } # Store in DynamoDB or S3 s3_client.put_object( Bucket=bucket, Key=f"lineage/{checksum}.json", Body=json.dumps(metadata, indent=2) ) COMMAND_BLOCK: # In terraform/lambda.tf, add to aws_lambda_function resource: layers = [ "arn:aws:lambda:${var.aws_region}:336392948345:layer:AWSSDKPandas-Python311:1" ] Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # In terraform/lambda.tf, add to aws_lambda_function resource: layers = [ "arn:aws:lambda:${var.aws_region}:336392948345:layer:AWSSDKPandas-Python311:1" ] COMMAND_BLOCK: # In terraform/lambda.tf, add to aws_lambda_function resource: layers = [ "arn:aws:lambda:${var.aws_region}:336392948345:layer:AWSSDKPandas-Python311:1" ] COMMAND_BLOCK: # Add to terraform configuration terraform { backend "s3" { bucket = "your-terraform-state-bucket" key = "ml-pipeline/terraform.tfstate" region = "ap-south-1" dynamodb_table = "terraform-state-lock" } } Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # Add to terraform configuration terraform { backend "s3" { bucket = "your-terraform-state-bucket" key = "ml-pipeline/terraform.tfstate" region = "ap-south-1" dynamodb_table = "terraform-state-lock" } } COMMAND_BLOCK: # Add to terraform configuration terraform { backend "s3" { bucket = "your-terraform-state-bucket" key = "ml-pipeline/terraform.tfstate" region = "ap-south-1" dynamodb_table = "terraform-state-lock" } } COMMAND_BLOCK: # Increase timeout in terraform/lambda.tf timeout = 900 # 15 minutes # Or reduce file size before processing Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # Increase timeout in terraform/lambda.tf timeout = 900 # 15 minutes # Or reduce file size before processing COMMAND_BLOCK: # Increase timeout in terraform/lambda.tf timeout = 900 # 15 minutes # Or reduce file size before processing COMMAND_BLOCK: # Check IAM role has KMS permissions aws iam get-role-policy \ --role-name ml-pipeline-data-validation-lambda \ --policy-name ml-pipeline-data-validation-policy Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # Check IAM role has KMS permissions aws iam get-role-policy \ --role-name ml-pipeline-data-validation-lambda \ --policy-name ml-pipeline-data-validation-policy COMMAND_BLOCK: # Check IAM role has KMS permissions aws iam get-role-policy \ --role-name ml-pipeline-data-validation-lambda \ --policy-name ml-pipeline-data-validation-policy COMMAND_BLOCK: # Check validation rules match your data # View Lambda logs for detailed errors aws logs tail /aws/lambda/ml-pipeline-data-validation --follow Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # Check validation rules match your data # View Lambda logs for detailed errors aws logs tail /aws/lambda/ml-pipeline-data-validation --follow COMMAND_BLOCK: # Check validation rules match your data # View Lambda logs for detailed errors aws logs tail /aws/lambda/ml-pipeline-data-validation --follow - Encrypted S3 data lake with proper access controls - Automated data validation pipeline with Lambda - Data quality monitoring and alerting - Complete audit trail with CloudTrail - Infrastructure as Code with Terraform - S3: Encrypted data storage with versioning - Lambda: Serverless data validation - KMS: Encryption key management - CloudTrail: Audit logging - CloudWatch: Monitoring and alerting - SNS: Notifications - Terraform: Infrastructure as Code - Raw Data Bucket - Unvalidated data from sources - Validated Data Bucket - Quality-checked, ready for training - Model Artifacts Bucket - Trained models and metadata - Catch schema changes early - Detect data quality issues - Ensure consistency - Create audit trail - Increase min_rows to 100 or higher based on your use case - Adjust max_null_percentage based on data quality requirements - Customize column validation rules to match your schema - Set up AWS Budgets and billing alerts - Monitor CloudWatch costs (logs can grow quickly) - Use S3 lifecycle policies to manage data retention - Review CloudTrail data event costs - KMS encryption for all S3 buckets - Enforce encryption in bucket policies - TLS in transit (enforced by bucket policy) - Least privilege IAM roles - No public bucket access - VPC endpoints for Lambda (optional enhancement) - CloudTrail logging all data access - CloudWatch logs retention - SNS notifications for failures - Automated validation - Schema enforcement - Quality metrics tracking - Secure, encrypted data storage - Automated data validation - Complete audit trail - Monitoring and alerting - Infrastructure as Code - Distributed training infrastructure - Experiment tracking with MLflow - Model versioning and registry - Hyperparameter optimization with Spot instances - CI/CD pipelines for automated deployment - SageMaker endpoints with auto-scaling - Model monitoring and drift detection - Production observability and incident response - Automate validation - Don't trust manual checks - Encrypt everything - KMS at rest, TLS in transit - Audit all access - CloudTrail is your friend - Monitor data quality - Track metrics over time - Use IaC - Terraform makes it reproducible - S3 Encryption Best Practices - Lambda Best Practices - CloudTrail Data Events - Terraform AWS Provider - Part 1: AIDLC Framework Overview - Questions about the data pipeline? Drop a comment below - Follow me for Part 3 - SageMaker Training - Like if this helped you build your pipeline - Share with your team