Tools
Secure ML on AWS : Building Production Data Pipelines with S3 and Lambda
2025-12-26
0 views
admin
Welcome Back! ## The Data Pipeline Problem ## Architecture Overview ## Step 1: Setting Up Encrypted S3 Buckets ## Why Three Buckets? ## Infrastructure Code ## Variables File ## Deploy the Infrastructure ## Step 2: Data Validation Lambda Function ## Why Validate Data? ## Validation Logic ## Lambda Dependencies ## Package Lambda ## Step 3: Lambda Infrastructure ## Step 4: CloudTrail for Audit Logging ## Step 5: CloudWatch Monitoring ## Step 6: Testing the Pipeline ## 1. Create Test Data ## 2. Upload to S3 ## 3. Check Lambda Logs ## 4. Verify Validated Data ## Step 7: Advanced Features ## Data Quality Metrics ## Data Versioning ## Important Notes Before Deploying ## Known Considerations ## Security Best Practices Checklist ## Cost Optimization ## Troubleshooting Guide ## Issue: Lambda timeout ## Issue: Permission denied ## Issue: Validation always fails ## What's Next? ## Key Takeaways ## Resources ## Let's Connect! ## About the Author ## Shoaibali MirFollow Reading time: ~15-20 minutes
Level: Intermediate
Prerequisites: Basic AWS knowledge, familiarity with S3 and Lambda
Series: Part 2 of 4 - Read Part 1
Objective: This blog focuses on teaching core concepts and best practices for building data pipelines on AWS. While the code is functional, it's designed for learning and experimentation. For production deployments, additional hardening and testing would be needed. In Part 1, we explored the AIDLC framework and AWS architecture for secure ML pipelines. Now, let's get hands-on with Phase 1: Data Collection & Preparation. What you'll build today: By the end: You'll have a production-ready data pipeline that ensures data quality and security from day one. You can't build reliable ML models on unreliable data. Common issues include: Inconsistent data formats - Training fails due to schema changes
Missing security controls - Sensitive data exposed
No data validation - Bad data silently corrupts models
Manual processes - Doesn't scale, error-prone
No audit trail - Can't track data lineage An automated, secure, validated data pipeline. Here's what we're building: Separation of concerns improves security and organization: Note: The complete S3 configuration is shown below. For production, you might split this into separate files for better organization. Create terraform/s3-buckets.tf: Create terraform/variables.tf: Prevent garbage in, garbage out: Note: After deploying, check your email and confirm the SNS subscription to receive notifications. Create lambda/data-validation/handler.py: Create lambda/data-validation/requirements.txt: Create terraform/lambda.tf: Create terraform/cloudtrail.tf: Create terraform/monitoring.tf: Create test-data/sample.csv: Add custom CloudWatch metrics in Lambda: 1. Lambda Package Size
The pandas library is large (~100MB). For production deployments, consider using AWS Lambda Layers: 2. Validation Rules
The sample validation rules use min_rows: 5 for testing. For production: 3. Terraform State Management
For production, always use remote state: 4. Cost Monitoring
The estimated costs (~$9.50/month) are for development with minimal usage. Always: Estimated Monthly Costs (Development): Production costs scale with data volume. Use S3 Intelligent-Tiering and lifecycle policies. In Part 3, we'll build the training pipeline with SageMaker: In Part 4 (Series Finale), we'll complete the production setup: Remember: Good data pipelines prevent bad models. What data quality challenges are you facing? Let me know in the comments! Tags: #aws #machinelearning #mlops #dataengineering #terraform #lambda #s3 #cloudwatch Templates let you quickly answer FAQs or store snippets for re-use. Are you sure you want to hide this comment? It will become hidden in your post, but will still be visible via the comment's permalink. Hide child comments as well For further actions, you may consider blocking this person and/or reporting abuse COMMAND_BLOCK:
# KMS key for encryption
resource "aws_kms_key" "data_encryption" { description = "Encryption key for ML data pipeline" deletion_window_in_days = 10 enable_key_rotation = true tags = { Name = "ml-data-encryption-key" Environment = var.environment Purpose = "ML-DataPipeline" }
} resource "aws_kms_alias" "data_encryption" { name = "alias/ml-data-encryption" target_key_id = aws_kms_key.data_encryption.key_id
} # Raw data bucket
resource "aws_s3_bucket" "raw_data" { bucket = "${var.project_name}-raw-data-${var.environment}-${data.aws_caller_identity.current.account_id}" tags = { Name = "ML Raw Data" Environment = var.environment DataStage = "Raw" }
} # Enable versioning
resource "aws_s3_bucket_versioning" "raw_data" { bucket = aws_s3_bucket.raw_data.id versioning_configuration { status = "Enabled" }
} # Enable encryption
resource "aws_s3_bucket_server_side_encryption_configuration" "raw_data" { bucket = aws_s3_bucket.raw_data.id rule { apply_server_side_encryption_by_default { sse_algorithm = "aws:kms" kms_master_key_id = aws_kms_key.data_encryption.arn } bucket_key_enabled = true }
} # Block all public access
resource "aws_s3_bucket_public_access_block" "raw_data" { bucket = aws_s3_bucket.raw_data.id block_public_acls = true block_public_policy = true ignore_public_acls = true restrict_public_buckets = true
} # Enforce encryption and secure transport
resource "aws_s3_bucket_policy" "raw_data" { bucket = aws_s3_bucket.raw_data.id policy = jsonencode({ Version = "2012-10-17" Statement = [ { Sid = "DenyUnencryptedObjectUploads" Effect = "Deny" Principal = "*" Action = "s3:PutObject" Resource = "${aws_s3_bucket.raw_data.arn}/*" Condition = { StringNotEquals = { "s3:x-amz-server-side-encryption" = "aws:kms" } } }, { Sid = "DenyInsecureTransport" Effect = "Deny" Principal = "*" Action = "s3:*" Resource = [ aws_s3_bucket.raw_data.arn, "${aws_s3_bucket.raw_data.arn}/*" ] Condition = { Bool = { "aws:SecureTransport" = "false" } } } ] })
} # Lifecycle policy to manage old versions
resource "aws_s3_bucket_lifecycle_configuration" "raw_data" { bucket = aws_s3_bucket.raw_data.id rule { id = "delete-old-versions" status = "Enabled" noncurrent_version_expiration { noncurrent_days = 90 } } rule { id = "transition-to-glacier" status = "Enabled" transition { days = 30 storage_class = "GLACIER" } }
} # Validated data bucket
resource "aws_s3_bucket" "validated_data" { bucket = "${var.project_name}-validated-data-${var.environment}-${data.aws_caller_identity.current.account_id}" tags = { Name = "ML Validated Data" Environment = var.environment DataStage = "Validated" }
} resource "aws_s3_bucket_versioning" "validated_data" { bucket = aws_s3_bucket.validated_data.id versioning_configuration { status = "Enabled" }
} resource "aws_s3_bucket_server_side_encryption_configuration" "validated_data" { bucket = aws_s3_bucket.validated_data.id rule { apply_server_side_encryption_by_default { sse_algorithm = "aws:kms" kms_master_key_id = aws_kms_key.data_encryption.arn } bucket_key_enabled = true }
} resource "aws_s3_bucket_public_access_block" "validated_data" { bucket = aws_s3_bucket.validated_data.id block_public_acls = true block_public_policy = true ignore_public_acls = true restrict_public_buckets = true
} # Model artifacts bucket
resource "aws_s3_bucket" "model_artifacts" { bucket = "${var.project_name}-model-artifacts-${var.environment}-${data.aws_caller_identity.current.account_id}" tags = { Name = "ML Model Artifacts" Environment = var.environment DataStage = "Models" }
} resource "aws_s3_bucket_versioning" "model_artifacts" { bucket = aws_s3_bucket.model_artifacts.id versioning_configuration { status = "Enabled" }
} resource "aws_s3_bucket_server_side_encryption_configuration" "model_artifacts" { bucket = aws_s3_bucket.model_artifacts.id rule { apply_server_side_encryption_by_default { sse_algorithm = "aws:kms" kms_master_key_id = aws_kms_key.data_encryption.arn } bucket_key_enabled = true }
} resource "aws_s3_bucket_public_access_block" "model_artifacts" { bucket = aws_s3_bucket.model_artifacts.id block_public_acls = true block_public_policy = true ignore_public_acls = true restrict_public_buckets = true
} Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
# KMS key for encryption
resource "aws_kms_key" "data_encryption" { description = "Encryption key for ML data pipeline" deletion_window_in_days = 10 enable_key_rotation = true tags = { Name = "ml-data-encryption-key" Environment = var.environment Purpose = "ML-DataPipeline" }
} resource "aws_kms_alias" "data_encryption" { name = "alias/ml-data-encryption" target_key_id = aws_kms_key.data_encryption.key_id
} # Raw data bucket
resource "aws_s3_bucket" "raw_data" { bucket = "${var.project_name}-raw-data-${var.environment}-${data.aws_caller_identity.current.account_id}" tags = { Name = "ML Raw Data" Environment = var.environment DataStage = "Raw" }
} # Enable versioning
resource "aws_s3_bucket_versioning" "raw_data" { bucket = aws_s3_bucket.raw_data.id versioning_configuration { status = "Enabled" }
} # Enable encryption
resource "aws_s3_bucket_server_side_encryption_configuration" "raw_data" { bucket = aws_s3_bucket.raw_data.id rule { apply_server_side_encryption_by_default { sse_algorithm = "aws:kms" kms_master_key_id = aws_kms_key.data_encryption.arn } bucket_key_enabled = true }
} # Block all public access
resource "aws_s3_bucket_public_access_block" "raw_data" { bucket = aws_s3_bucket.raw_data.id block_public_acls = true block_public_policy = true ignore_public_acls = true restrict_public_buckets = true
} # Enforce encryption and secure transport
resource "aws_s3_bucket_policy" "raw_data" { bucket = aws_s3_bucket.raw_data.id policy = jsonencode({ Version = "2012-10-17" Statement = [ { Sid = "DenyUnencryptedObjectUploads" Effect = "Deny" Principal = "*" Action = "s3:PutObject" Resource = "${aws_s3_bucket.raw_data.arn}/*" Condition = { StringNotEquals = { "s3:x-amz-server-side-encryption" = "aws:kms" } } }, { Sid = "DenyInsecureTransport" Effect = "Deny" Principal = "*" Action = "s3:*" Resource = [ aws_s3_bucket.raw_data.arn, "${aws_s3_bucket.raw_data.arn}/*" ] Condition = { Bool = { "aws:SecureTransport" = "false" } } } ] })
} # Lifecycle policy to manage old versions
resource "aws_s3_bucket_lifecycle_configuration" "raw_data" { bucket = aws_s3_bucket.raw_data.id rule { id = "delete-old-versions" status = "Enabled" noncurrent_version_expiration { noncurrent_days = 90 } } rule { id = "transition-to-glacier" status = "Enabled" transition { days = 30 storage_class = "GLACIER" } }
} # Validated data bucket
resource "aws_s3_bucket" "validated_data" { bucket = "${var.project_name}-validated-data-${var.environment}-${data.aws_caller_identity.current.account_id}" tags = { Name = "ML Validated Data" Environment = var.environment DataStage = "Validated" }
} resource "aws_s3_bucket_versioning" "validated_data" { bucket = aws_s3_bucket.validated_data.id versioning_configuration { status = "Enabled" }
} resource "aws_s3_bucket_server_side_encryption_configuration" "validated_data" { bucket = aws_s3_bucket.validated_data.id rule { apply_server_side_encryption_by_default { sse_algorithm = "aws:kms" kms_master_key_id = aws_kms_key.data_encryption.arn } bucket_key_enabled = true }
} resource "aws_s3_bucket_public_access_block" "validated_data" { bucket = aws_s3_bucket.validated_data.id block_public_acls = true block_public_policy = true ignore_public_acls = true restrict_public_buckets = true
} # Model artifacts bucket
resource "aws_s3_bucket" "model_artifacts" { bucket = "${var.project_name}-model-artifacts-${var.environment}-${data.aws_caller_identity.current.account_id}" tags = { Name = "ML Model Artifacts" Environment = var.environment DataStage = "Models" }
} resource "aws_s3_bucket_versioning" "model_artifacts" { bucket = aws_s3_bucket.model_artifacts.id versioning_configuration { status = "Enabled" }
} resource "aws_s3_bucket_server_side_encryption_configuration" "model_artifacts" { bucket = aws_s3_bucket.model_artifacts.id rule { apply_server_side_encryption_by_default { sse_algorithm = "aws:kms" kms_master_key_id = aws_kms_key.data_encryption.arn } bucket_key_enabled = true }
} resource "aws_s3_bucket_public_access_block" "model_artifacts" { bucket = aws_s3_bucket.model_artifacts.id block_public_acls = true block_public_policy = true ignore_public_acls = true restrict_public_buckets = true
} COMMAND_BLOCK:
# KMS key for encryption
resource "aws_kms_key" "data_encryption" { description = "Encryption key for ML data pipeline" deletion_window_in_days = 10 enable_key_rotation = true tags = { Name = "ml-data-encryption-key" Environment = var.environment Purpose = "ML-DataPipeline" }
} resource "aws_kms_alias" "data_encryption" { name = "alias/ml-data-encryption" target_key_id = aws_kms_key.data_encryption.key_id
} # Raw data bucket
resource "aws_s3_bucket" "raw_data" { bucket = "${var.project_name}-raw-data-${var.environment}-${data.aws_caller_identity.current.account_id}" tags = { Name = "ML Raw Data" Environment = var.environment DataStage = "Raw" }
} # Enable versioning
resource "aws_s3_bucket_versioning" "raw_data" { bucket = aws_s3_bucket.raw_data.id versioning_configuration { status = "Enabled" }
} # Enable encryption
resource "aws_s3_bucket_server_side_encryption_configuration" "raw_data" { bucket = aws_s3_bucket.raw_data.id rule { apply_server_side_encryption_by_default { sse_algorithm = "aws:kms" kms_master_key_id = aws_kms_key.data_encryption.arn } bucket_key_enabled = true }
} # Block all public access
resource "aws_s3_bucket_public_access_block" "raw_data" { bucket = aws_s3_bucket.raw_data.id block_public_acls = true block_public_policy = true ignore_public_acls = true restrict_public_buckets = true
} # Enforce encryption and secure transport
resource "aws_s3_bucket_policy" "raw_data" { bucket = aws_s3_bucket.raw_data.id policy = jsonencode({ Version = "2012-10-17" Statement = [ { Sid = "DenyUnencryptedObjectUploads" Effect = "Deny" Principal = "*" Action = "s3:PutObject" Resource = "${aws_s3_bucket.raw_data.arn}/*" Condition = { StringNotEquals = { "s3:x-amz-server-side-encryption" = "aws:kms" } } }, { Sid = "DenyInsecureTransport" Effect = "Deny" Principal = "*" Action = "s3:*" Resource = [ aws_s3_bucket.raw_data.arn, "${aws_s3_bucket.raw_data.arn}/*" ] Condition = { Bool = { "aws:SecureTransport" = "false" } } } ] })
} # Lifecycle policy to manage old versions
resource "aws_s3_bucket_lifecycle_configuration" "raw_data" { bucket = aws_s3_bucket.raw_data.id rule { id = "delete-old-versions" status = "Enabled" noncurrent_version_expiration { noncurrent_days = 90 } } rule { id = "transition-to-glacier" status = "Enabled" transition { days = 30 storage_class = "GLACIER" } }
} # Validated data bucket
resource "aws_s3_bucket" "validated_data" { bucket = "${var.project_name}-validated-data-${var.environment}-${data.aws_caller_identity.current.account_id}" tags = { Name = "ML Validated Data" Environment = var.environment DataStage = "Validated" }
} resource "aws_s3_bucket_versioning" "validated_data" { bucket = aws_s3_bucket.validated_data.id versioning_configuration { status = "Enabled" }
} resource "aws_s3_bucket_server_side_encryption_configuration" "validated_data" { bucket = aws_s3_bucket.validated_data.id rule { apply_server_side_encryption_by_default { sse_algorithm = "aws:kms" kms_master_key_id = aws_kms_key.data_encryption.arn } bucket_key_enabled = true }
} resource "aws_s3_bucket_public_access_block" "validated_data" { bucket = aws_s3_bucket.validated_data.id block_public_acls = true block_public_policy = true ignore_public_acls = true restrict_public_buckets = true
} # Model artifacts bucket
resource "aws_s3_bucket" "model_artifacts" { bucket = "${var.project_name}-model-artifacts-${var.environment}-${data.aws_caller_identity.current.account_id}" tags = { Name = "ML Model Artifacts" Environment = var.environment DataStage = "Models" }
} resource "aws_s3_bucket_versioning" "model_artifacts" { bucket = aws_s3_bucket.model_artifacts.id versioning_configuration { status = "Enabled" }
} resource "aws_s3_bucket_server_side_encryption_configuration" "model_artifacts" { bucket = aws_s3_bucket.model_artifacts.id rule { apply_server_side_encryption_by_default { sse_algorithm = "aws:kms" kms_master_key_id = aws_kms_key.data_encryption.arn } bucket_key_enabled = true }
} resource "aws_s3_bucket_public_access_block" "model_artifacts" { bucket = aws_s3_bucket.model_artifacts.id block_public_acls = true block_public_policy = true ignore_public_acls = true restrict_public_buckets = true
} CODE_BLOCK:
variable "project_name" { description = "Project name prefix" type = string default = "ml-pipeline"
} variable "environment" { description = "Environment (dev, staging, prod)" type = string default = "dev"
} variable "aws_region" { description = "AWS region" type = string default = "ap-south-1"
} variable "notification_email" { description = "Email for SNS notifications" type = string
} data "aws_caller_identity" "current" {} Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
variable "project_name" { description = "Project name prefix" type = string default = "ml-pipeline"
} variable "environment" { description = "Environment (dev, staging, prod)" type = string default = "dev"
} variable "aws_region" { description = "AWS region" type = string default = "ap-south-1"
} variable "notification_email" { description = "Email for SNS notifications" type = string
} data "aws_caller_identity" "current" {} CODE_BLOCK:
variable "project_name" { description = "Project name prefix" type = string default = "ml-pipeline"
} variable "environment" { description = "Environment (dev, staging, prod)" type = string default = "dev"
} variable "aws_region" { description = "AWS region" type = string default = "ap-south-1"
} variable "notification_email" { description = "Email for SNS notifications" type = string
} data "aws_caller_identity" "current" {} COMMAND_BLOCK:
cd terraform # Initialize Terraform
terraform init # Review the plan
terraform plan -var="[email protected]" # Apply the configuration
terraform apply -var="[email protected]" Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
cd terraform # Initialize Terraform
terraform init # Review the plan
terraform plan -var="[email protected]" # Apply the configuration
terraform apply -var="[email protected]" COMMAND_BLOCK:
cd terraform # Initialize Terraform
terraform init # Review the plan
terraform plan -var="[email protected]" # Apply the configuration
terraform apply -var="[email protected]" COMMAND_BLOCK:
import json
import os
import boto3
import logging
import pandas as pd
from io import BytesIO
import hashlib
from datetime import datetime # Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO) # AWS clients
s3_client = boto3.client('s3')
sns_client = boto3.client('sns') # Validation rules
VALIDATION_RULES = { 'required_columns': ['timestamp', 'feature_1', 'feature_2', 'target'], 'numeric_columns': ['feature_1', 'feature_2', 'target'], 'max_null_percentage': 0.05, # 5% max 'min_rows': 5, # Adjusted for testing - use 100+ for production 'max_rows': 1000000, 'date_columns': ['timestamp']
} def calculate_checksum(data: bytes) -> str: """Calculate SHA256 checksum""" return hashlib.sha256(data).hexdigest() def validate_schema(df: pd.DataFrame) -> dict: """Validate DataFrame schema""" issues = [] # Check required columns missing_cols = set(VALIDATION_RULES['required_columns']) - set(df.columns) if missing_cols: issues.append(f"Missing columns: {missing_cols}") # Check numeric columns for col in VALIDATION_RULES['numeric_columns']: if col in df.columns: if not pd.api.types.is_numeric_dtype(df[col]): issues.append(f"Column '{col}' should be numeric") # Check date columns for col in VALIDATION_RULES['date_columns']: if col in df.columns: try: pd.to_datetime(df[col]) except: issues.append(f"Column '{col}' should be valid datetime") return { 'valid': len(issues) == 0, 'issues': issues } def validate_data_quality(df: pd.DataFrame) -> dict: """Validate data quality""" issues = [] # Check row count if len(df) < VALIDATION_RULES['min_rows']: issues.append( f"Insufficient rows: {len(df)} < {VALIDATION_RULES['min_rows']}" ) elif len(df) > VALIDATION_RULES['max_rows']: issues.append( f"Too many rows: {len(df)} > {VALIDATION_RULES['max_rows']}" ) # Check null values for col in df.columns: null_pct = df[col].isnull().sum() / len(df) if null_pct > VALIDATION_RULES['max_null_percentage']: issues.append( f"Column '{col}' has {null_pct:.2%} nulls " f"(max: {VALIDATION_RULES['max_null_percentage']:.2%})" ) # Check duplicates duplicate_count = df.duplicated().sum() if duplicate_count > 0: issues.append(f"Found {duplicate_count} duplicate rows") # Data distribution checks stats = {} for col in VALIDATION_RULES['numeric_columns']: if col in df.columns: stats[col] = { 'mean': float(df[col].mean()), 'std': float(df[col].std()), 'min': float(df[col].min()), 'max': float(df[col].max()), 'null_count': int(df[col].isnull().sum()) } return { 'valid': len(issues) == 0, 'issues': issues, 'stats': { 'row_count': len(df), 'column_count': len(df.columns), 'duplicate_count': duplicate_count, 'column_stats': stats } } def send_notification(topic_arn: str, subject: str, message: str): """Send SNS notification""" try: sns_client.publish( TopicArn=topic_arn, Subject=subject, Message=message ) logger.info(f"Sent notification: {subject}") except Exception as e: logger.error(f"Failed to send notification: {e}") def lambda_handler(event, context): """ Lambda handler triggered by S3 upload """ try: # Parse S3 event record = event['Records'][0] bucket = record['s3']['bucket']['name'] key = record['s3']['object']['key'] logger.info(f"Processing: s3://{bucket}/{key}") # Download file response = s3_client.get_object(Bucket=bucket, Key=key) file_content = response['Body'].read() # Calculate checksum checksum = calculate_checksum(file_content) # Load data df = pd.read_csv(BytesIO(file_content)) logger.info(f"Loaded {len(df)} rows, {len(df.columns)} columns") # Run validations schema_result = validate_schema(df) quality_result = validate_data_quality(df) # Compile results validation_result = { 'file': f"s3://{bucket}/{key}", 'timestamp': datetime.utcnow().isoformat(), 'checksum': checksum, 'schema_validation': schema_result, 'quality_validation': quality_result, 'status': ( 'PASSED' if schema_result['valid'] and quality_result['valid'] else 'FAILED' ) } logger.info(f"Validation status: {validation_result['status']}") # Save validation report report_key = key.replace('raw/', 'reports/').replace('.csv', '_report.json') s3_client.put_object( Bucket=bucket, Key=report_key, Body=json.dumps(validation_result, indent=2), ServerSideEncryption='aws:kms' ) # If passed, copy to validated bucket if validation_result['status'] == 'PASSED': validated_bucket = bucket.replace('raw-data', 'validated-data') validated_key = key.replace('raw/', 'validated/') s3_client.copy_object( CopySource={'Bucket': bucket, 'Key': key}, Bucket=validated_bucket, Key=validated_key, ServerSideEncryption='aws:kms' ) logger.info(f"Copied to: s3://{validated_bucket}/{validated_key}") # Send success notification send_notification( os.environ['SNS_TOPIC_ARN'], f' Data Validation Passed: {key}', f"File validated successfully\n\n" f"Stats:\n" f"- Rows: {quality_result['stats']['row_count']}\n" f"- Columns: {quality_result['stats']['column_count']}\n" f"- Duplicates: {quality_result['stats']['duplicate_count']}\n\n" f"Validation report: s3://{bucket}/{report_key}" ) else: # Send failure notification all_issues = schema_result['issues'] + quality_result['issues'] send_notification( os.environ['SNS_TOPIC_ARN'], f' Data Validation Failed: {key}', f"Validation issues found:\n\n" + "\n".join(f"- {issue}" for issue in all_issues) ) return { 'statusCode': 200, 'body': json.dumps(validation_result) } except Exception as e: logger.error(f"Error: {e}", exc_info=True) send_notification( os.environ.get('SNS_TOPIC_ARN', ''), f' Data Validation Error', f"Error processing {key}:\n{str(e)}" ) return { 'statusCode': 500, 'body': json.dumps({'error': str(e)}) } Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
import json
import os
import boto3
import logging
import pandas as pd
from io import BytesIO
import hashlib
from datetime import datetime # Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO) # AWS clients
s3_client = boto3.client('s3')
sns_client = boto3.client('sns') # Validation rules
VALIDATION_RULES = { 'required_columns': ['timestamp', 'feature_1', 'feature_2', 'target'], 'numeric_columns': ['feature_1', 'feature_2', 'target'], 'max_null_percentage': 0.05, # 5% max 'min_rows': 5, # Adjusted for testing - use 100+ for production 'max_rows': 1000000, 'date_columns': ['timestamp']
} def calculate_checksum(data: bytes) -> str: """Calculate SHA256 checksum""" return hashlib.sha256(data).hexdigest() def validate_schema(df: pd.DataFrame) -> dict: """Validate DataFrame schema""" issues = [] # Check required columns missing_cols = set(VALIDATION_RULES['required_columns']) - set(df.columns) if missing_cols: issues.append(f"Missing columns: {missing_cols}") # Check numeric columns for col in VALIDATION_RULES['numeric_columns']: if col in df.columns: if not pd.api.types.is_numeric_dtype(df[col]): issues.append(f"Column '{col}' should be numeric") # Check date columns for col in VALIDATION_RULES['date_columns']: if col in df.columns: try: pd.to_datetime(df[col]) except: issues.append(f"Column '{col}' should be valid datetime") return { 'valid': len(issues) == 0, 'issues': issues } def validate_data_quality(df: pd.DataFrame) -> dict: """Validate data quality""" issues = [] # Check row count if len(df) < VALIDATION_RULES['min_rows']: issues.append( f"Insufficient rows: {len(df)} < {VALIDATION_RULES['min_rows']}" ) elif len(df) > VALIDATION_RULES['max_rows']: issues.append( f"Too many rows: {len(df)} > {VALIDATION_RULES['max_rows']}" ) # Check null values for col in df.columns: null_pct = df[col].isnull().sum() / len(df) if null_pct > VALIDATION_RULES['max_null_percentage']: issues.append( f"Column '{col}' has {null_pct:.2%} nulls " f"(max: {VALIDATION_RULES['max_null_percentage']:.2%})" ) # Check duplicates duplicate_count = df.duplicated().sum() if duplicate_count > 0: issues.append(f"Found {duplicate_count} duplicate rows") # Data distribution checks stats = {} for col in VALIDATION_RULES['numeric_columns']: if col in df.columns: stats[col] = { 'mean': float(df[col].mean()), 'std': float(df[col].std()), 'min': float(df[col].min()), 'max': float(df[col].max()), 'null_count': int(df[col].isnull().sum()) } return { 'valid': len(issues) == 0, 'issues': issues, 'stats': { 'row_count': len(df), 'column_count': len(df.columns), 'duplicate_count': duplicate_count, 'column_stats': stats } } def send_notification(topic_arn: str, subject: str, message: str): """Send SNS notification""" try: sns_client.publish( TopicArn=topic_arn, Subject=subject, Message=message ) logger.info(f"Sent notification: {subject}") except Exception as e: logger.error(f"Failed to send notification: {e}") def lambda_handler(event, context): """ Lambda handler triggered by S3 upload """ try: # Parse S3 event record = event['Records'][0] bucket = record['s3']['bucket']['name'] key = record['s3']['object']['key'] logger.info(f"Processing: s3://{bucket}/{key}") # Download file response = s3_client.get_object(Bucket=bucket, Key=key) file_content = response['Body'].read() # Calculate checksum checksum = calculate_checksum(file_content) # Load data df = pd.read_csv(BytesIO(file_content)) logger.info(f"Loaded {len(df)} rows, {len(df.columns)} columns") # Run validations schema_result = validate_schema(df) quality_result = validate_data_quality(df) # Compile results validation_result = { 'file': f"s3://{bucket}/{key}", 'timestamp': datetime.utcnow().isoformat(), 'checksum': checksum, 'schema_validation': schema_result, 'quality_validation': quality_result, 'status': ( 'PASSED' if schema_result['valid'] and quality_result['valid'] else 'FAILED' ) } logger.info(f"Validation status: {validation_result['status']}") # Save validation report report_key = key.replace('raw/', 'reports/').replace('.csv', '_report.json') s3_client.put_object( Bucket=bucket, Key=report_key, Body=json.dumps(validation_result, indent=2), ServerSideEncryption='aws:kms' ) # If passed, copy to validated bucket if validation_result['status'] == 'PASSED': validated_bucket = bucket.replace('raw-data', 'validated-data') validated_key = key.replace('raw/', 'validated/') s3_client.copy_object( CopySource={'Bucket': bucket, 'Key': key}, Bucket=validated_bucket, Key=validated_key, ServerSideEncryption='aws:kms' ) logger.info(f"Copied to: s3://{validated_bucket}/{validated_key}") # Send success notification send_notification( os.environ['SNS_TOPIC_ARN'], f' Data Validation Passed: {key}', f"File validated successfully\n\n" f"Stats:\n" f"- Rows: {quality_result['stats']['row_count']}\n" f"- Columns: {quality_result['stats']['column_count']}\n" f"- Duplicates: {quality_result['stats']['duplicate_count']}\n\n" f"Validation report: s3://{bucket}/{report_key}" ) else: # Send failure notification all_issues = schema_result['issues'] + quality_result['issues'] send_notification( os.environ['SNS_TOPIC_ARN'], f' Data Validation Failed: {key}', f"Validation issues found:\n\n" + "\n".join(f"- {issue}" for issue in all_issues) ) return { 'statusCode': 200, 'body': json.dumps(validation_result) } except Exception as e: logger.error(f"Error: {e}", exc_info=True) send_notification( os.environ.get('SNS_TOPIC_ARN', ''), f' Data Validation Error', f"Error processing {key}:\n{str(e)}" ) return { 'statusCode': 500, 'body': json.dumps({'error': str(e)}) } COMMAND_BLOCK:
import json
import os
import boto3
import logging
import pandas as pd
from io import BytesIO
import hashlib
from datetime import datetime # Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO) # AWS clients
s3_client = boto3.client('s3')
sns_client = boto3.client('sns') # Validation rules
VALIDATION_RULES = { 'required_columns': ['timestamp', 'feature_1', 'feature_2', 'target'], 'numeric_columns': ['feature_1', 'feature_2', 'target'], 'max_null_percentage': 0.05, # 5% max 'min_rows': 5, # Adjusted for testing - use 100+ for production 'max_rows': 1000000, 'date_columns': ['timestamp']
} def calculate_checksum(data: bytes) -> str: """Calculate SHA256 checksum""" return hashlib.sha256(data).hexdigest() def validate_schema(df: pd.DataFrame) -> dict: """Validate DataFrame schema""" issues = [] # Check required columns missing_cols = set(VALIDATION_RULES['required_columns']) - set(df.columns) if missing_cols: issues.append(f"Missing columns: {missing_cols}") # Check numeric columns for col in VALIDATION_RULES['numeric_columns']: if col in df.columns: if not pd.api.types.is_numeric_dtype(df[col]): issues.append(f"Column '{col}' should be numeric") # Check date columns for col in VALIDATION_RULES['date_columns']: if col in df.columns: try: pd.to_datetime(df[col]) except: issues.append(f"Column '{col}' should be valid datetime") return { 'valid': len(issues) == 0, 'issues': issues } def validate_data_quality(df: pd.DataFrame) -> dict: """Validate data quality""" issues = [] # Check row count if len(df) < VALIDATION_RULES['min_rows']: issues.append( f"Insufficient rows: {len(df)} < {VALIDATION_RULES['min_rows']}" ) elif len(df) > VALIDATION_RULES['max_rows']: issues.append( f"Too many rows: {len(df)} > {VALIDATION_RULES['max_rows']}" ) # Check null values for col in df.columns: null_pct = df[col].isnull().sum() / len(df) if null_pct > VALIDATION_RULES['max_null_percentage']: issues.append( f"Column '{col}' has {null_pct:.2%} nulls " f"(max: {VALIDATION_RULES['max_null_percentage']:.2%})" ) # Check duplicates duplicate_count = df.duplicated().sum() if duplicate_count > 0: issues.append(f"Found {duplicate_count} duplicate rows") # Data distribution checks stats = {} for col in VALIDATION_RULES['numeric_columns']: if col in df.columns: stats[col] = { 'mean': float(df[col].mean()), 'std': float(df[col].std()), 'min': float(df[col].min()), 'max': float(df[col].max()), 'null_count': int(df[col].isnull().sum()) } return { 'valid': len(issues) == 0, 'issues': issues, 'stats': { 'row_count': len(df), 'column_count': len(df.columns), 'duplicate_count': duplicate_count, 'column_stats': stats } } def send_notification(topic_arn: str, subject: str, message: str): """Send SNS notification""" try: sns_client.publish( TopicArn=topic_arn, Subject=subject, Message=message ) logger.info(f"Sent notification: {subject}") except Exception as e: logger.error(f"Failed to send notification: {e}") def lambda_handler(event, context): """ Lambda handler triggered by S3 upload """ try: # Parse S3 event record = event['Records'][0] bucket = record['s3']['bucket']['name'] key = record['s3']['object']['key'] logger.info(f"Processing: s3://{bucket}/{key}") # Download file response = s3_client.get_object(Bucket=bucket, Key=key) file_content = response['Body'].read() # Calculate checksum checksum = calculate_checksum(file_content) # Load data df = pd.read_csv(BytesIO(file_content)) logger.info(f"Loaded {len(df)} rows, {len(df.columns)} columns") # Run validations schema_result = validate_schema(df) quality_result = validate_data_quality(df) # Compile results validation_result = { 'file': f"s3://{bucket}/{key}", 'timestamp': datetime.utcnow().isoformat(), 'checksum': checksum, 'schema_validation': schema_result, 'quality_validation': quality_result, 'status': ( 'PASSED' if schema_result['valid'] and quality_result['valid'] else 'FAILED' ) } logger.info(f"Validation status: {validation_result['status']}") # Save validation report report_key = key.replace('raw/', 'reports/').replace('.csv', '_report.json') s3_client.put_object( Bucket=bucket, Key=report_key, Body=json.dumps(validation_result, indent=2), ServerSideEncryption='aws:kms' ) # If passed, copy to validated bucket if validation_result['status'] == 'PASSED': validated_bucket = bucket.replace('raw-data', 'validated-data') validated_key = key.replace('raw/', 'validated/') s3_client.copy_object( CopySource={'Bucket': bucket, 'Key': key}, Bucket=validated_bucket, Key=validated_key, ServerSideEncryption='aws:kms' ) logger.info(f"Copied to: s3://{validated_bucket}/{validated_key}") # Send success notification send_notification( os.environ['SNS_TOPIC_ARN'], f' Data Validation Passed: {key}', f"File validated successfully\n\n" f"Stats:\n" f"- Rows: {quality_result['stats']['row_count']}\n" f"- Columns: {quality_result['stats']['column_count']}\n" f"- Duplicates: {quality_result['stats']['duplicate_count']}\n\n" f"Validation report: s3://{bucket}/{report_key}" ) else: # Send failure notification all_issues = schema_result['issues'] + quality_result['issues'] send_notification( os.environ['SNS_TOPIC_ARN'], f' Data Validation Failed: {key}', f"Validation issues found:\n\n" + "\n".join(f"- {issue}" for issue in all_issues) ) return { 'statusCode': 200, 'body': json.dumps(validation_result) } except Exception as e: logger.error(f"Error: {e}", exc_info=True) send_notification( os.environ.get('SNS_TOPIC_ARN', ''), f' Data Validation Error', f"Error processing {key}:\n{str(e)}" ) return { 'statusCode': 500, 'body': json.dumps({'error': str(e)}) } CODE_BLOCK:
pandas==2.1.0
boto3==1.28.85 Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
pandas==2.1.0
boto3==1.28.85 CODE_BLOCK:
pandas==2.1.0
boto3==1.28.85 COMMAND_BLOCK:
cd lambda/data-validation # Create package directory
mkdir package # Install dependencies
pip install -r requirements.txt -t package/ # Copy handler
cp handler.py package/ # Create deployment package
cd package
zip -r ../function.zip .
cd .. Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
cd lambda/data-validation # Create package directory
mkdir package # Install dependencies
pip install -r requirements.txt -t package/ # Copy handler
cp handler.py package/ # Create deployment package
cd package
zip -r ../function.zip .
cd .. COMMAND_BLOCK:
cd lambda/data-validation # Create package directory
mkdir package # Install dependencies
pip install -r requirements.txt -t package/ # Copy handler
cp handler.py package/ # Create deployment package
cd package
zip -r ../function.zip .
cd .. COMMAND_BLOCK:
# IAM role for Lambda
resource "aws_iam_role" "data_validation" { name = "${var.project_name}-data-validation-lambda" assume_role_policy = jsonencode({ Version = "2012-10-17" Statement = [{ Action = "sts:AssumeRole" Effect = "Allow" Principal = { Service = "lambda.amazonaws.com" } }] })
} # IAM policy for Lambda
resource "aws_iam_role_policy" "data_validation" { name = "${var.project_name}-data-validation-policy" role = aws_iam_role.data_validation.id policy = jsonencode({ Version = "2012-10-17" Statement = [ { Effect = "Allow" Action = [ "s3:GetObject", "s3:PutObject" ] Resource = [ "${aws_s3_bucket.raw_data.arn}/*", "${aws_s3_bucket.validated_data.arn}/*" ] }, { Effect = "Allow" Action = [ "kms:Decrypt", "kms:GenerateDataKey" ] Resource = aws_kms_key.data_encryption.arn }, { Effect = "Allow" Action = "sns:Publish" Resource = aws_sns_topic.validation_notifications.arn }, { Effect = "Allow" Action = [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ] Resource = "arn:aws:logs:*:*:*" } ] })
} # SNS topic for notifications
resource "aws_sns_topic" "validation_notifications" { name = "${var.project_name}-validation-notifications"
} resource "aws_sns_topic_subscription" "email" { topic_arn = aws_sns_topic.validation_notifications.arn protocol = "email" endpoint = var.notification_email
} # Lambda function
resource "aws_lambda_function" "data_validation" { filename = "${path.module}/../lambda/data-validation/function.zip" function_name = "${var.project_name}-data-validation" role = aws_iam_role.data_validation.arn handler = "handler.lambda_handler" runtime = "python3.11" timeout = 300 memory_size = 1024 source_code_hash = filebase64sha256("${path.module}/../lambda/data-validation/function.zip") environment { variables = { SNS_TOPIC_ARN = aws_sns_topic.validation_notifications.arn } } tags = { Name = "Data Validation Lambda" Environment = var.environment }
} # S3 trigger
resource "aws_s3_bucket_notification" "raw_data_trigger" { bucket = aws_s3_bucket.raw_data.id lambda_function { lambda_function_arn = aws_lambda_function.data_validation.arn events = ["s3:ObjectCreated:*"] filter_prefix = "raw/" filter_suffix = ".csv" } depends_on = [ aws_lambda_permission.allow_s3, aws_lambda_function.data_validation ]
} resource "aws_lambda_permission" "allow_s3" { statement_id = "AllowS3Invoke" action = "lambda:InvokeFunction" function_name = aws_lambda_function.data_validation.function_name principal = "s3.amazonaws.com" source_arn = aws_s3_bucket.raw_data.arn
} # CloudWatch Log Group
resource "aws_cloudwatch_log_group" "data_validation" { name = "/aws/lambda/${aws_lambda_function.data_validation.function_name}" retention_in_days = 30
} Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
# IAM role for Lambda
resource "aws_iam_role" "data_validation" { name = "${var.project_name}-data-validation-lambda" assume_role_policy = jsonencode({ Version = "2012-10-17" Statement = [{ Action = "sts:AssumeRole" Effect = "Allow" Principal = { Service = "lambda.amazonaws.com" } }] })
} # IAM policy for Lambda
resource "aws_iam_role_policy" "data_validation" { name = "${var.project_name}-data-validation-policy" role = aws_iam_role.data_validation.id policy = jsonencode({ Version = "2012-10-17" Statement = [ { Effect = "Allow" Action = [ "s3:GetObject", "s3:PutObject" ] Resource = [ "${aws_s3_bucket.raw_data.arn}/*", "${aws_s3_bucket.validated_data.arn}/*" ] }, { Effect = "Allow" Action = [ "kms:Decrypt", "kms:GenerateDataKey" ] Resource = aws_kms_key.data_encryption.arn }, { Effect = "Allow" Action = "sns:Publish" Resource = aws_sns_topic.validation_notifications.arn }, { Effect = "Allow" Action = [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ] Resource = "arn:aws:logs:*:*:*" } ] })
} # SNS topic for notifications
resource "aws_sns_topic" "validation_notifications" { name = "${var.project_name}-validation-notifications"
} resource "aws_sns_topic_subscription" "email" { topic_arn = aws_sns_topic.validation_notifications.arn protocol = "email" endpoint = var.notification_email
} # Lambda function
resource "aws_lambda_function" "data_validation" { filename = "${path.module}/../lambda/data-validation/function.zip" function_name = "${var.project_name}-data-validation" role = aws_iam_role.data_validation.arn handler = "handler.lambda_handler" runtime = "python3.11" timeout = 300 memory_size = 1024 source_code_hash = filebase64sha256("${path.module}/../lambda/data-validation/function.zip") environment { variables = { SNS_TOPIC_ARN = aws_sns_topic.validation_notifications.arn } } tags = { Name = "Data Validation Lambda" Environment = var.environment }
} # S3 trigger
resource "aws_s3_bucket_notification" "raw_data_trigger" { bucket = aws_s3_bucket.raw_data.id lambda_function { lambda_function_arn = aws_lambda_function.data_validation.arn events = ["s3:ObjectCreated:*"] filter_prefix = "raw/" filter_suffix = ".csv" } depends_on = [ aws_lambda_permission.allow_s3, aws_lambda_function.data_validation ]
} resource "aws_lambda_permission" "allow_s3" { statement_id = "AllowS3Invoke" action = "lambda:InvokeFunction" function_name = aws_lambda_function.data_validation.function_name principal = "s3.amazonaws.com" source_arn = aws_s3_bucket.raw_data.arn
} # CloudWatch Log Group
resource "aws_cloudwatch_log_group" "data_validation" { name = "/aws/lambda/${aws_lambda_function.data_validation.function_name}" retention_in_days = 30
} COMMAND_BLOCK:
# IAM role for Lambda
resource "aws_iam_role" "data_validation" { name = "${var.project_name}-data-validation-lambda" assume_role_policy = jsonencode({ Version = "2012-10-17" Statement = [{ Action = "sts:AssumeRole" Effect = "Allow" Principal = { Service = "lambda.amazonaws.com" } }] })
} # IAM policy for Lambda
resource "aws_iam_role_policy" "data_validation" { name = "${var.project_name}-data-validation-policy" role = aws_iam_role.data_validation.id policy = jsonencode({ Version = "2012-10-17" Statement = [ { Effect = "Allow" Action = [ "s3:GetObject", "s3:PutObject" ] Resource = [ "${aws_s3_bucket.raw_data.arn}/*", "${aws_s3_bucket.validated_data.arn}/*" ] }, { Effect = "Allow" Action = [ "kms:Decrypt", "kms:GenerateDataKey" ] Resource = aws_kms_key.data_encryption.arn }, { Effect = "Allow" Action = "sns:Publish" Resource = aws_sns_topic.validation_notifications.arn }, { Effect = "Allow" Action = [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ] Resource = "arn:aws:logs:*:*:*" } ] })
} # SNS topic for notifications
resource "aws_sns_topic" "validation_notifications" { name = "${var.project_name}-validation-notifications"
} resource "aws_sns_topic_subscription" "email" { topic_arn = aws_sns_topic.validation_notifications.arn protocol = "email" endpoint = var.notification_email
} # Lambda function
resource "aws_lambda_function" "data_validation" { filename = "${path.module}/../lambda/data-validation/function.zip" function_name = "${var.project_name}-data-validation" role = aws_iam_role.data_validation.arn handler = "handler.lambda_handler" runtime = "python3.11" timeout = 300 memory_size = 1024 source_code_hash = filebase64sha256("${path.module}/../lambda/data-validation/function.zip") environment { variables = { SNS_TOPIC_ARN = aws_sns_topic.validation_notifications.arn } } tags = { Name = "Data Validation Lambda" Environment = var.environment }
} # S3 trigger
resource "aws_s3_bucket_notification" "raw_data_trigger" { bucket = aws_s3_bucket.raw_data.id lambda_function { lambda_function_arn = aws_lambda_function.data_validation.arn events = ["s3:ObjectCreated:*"] filter_prefix = "raw/" filter_suffix = ".csv" } depends_on = [ aws_lambda_permission.allow_s3, aws_lambda_function.data_validation ]
} resource "aws_lambda_permission" "allow_s3" { statement_id = "AllowS3Invoke" action = "lambda:InvokeFunction" function_name = aws_lambda_function.data_validation.function_name principal = "s3.amazonaws.com" source_arn = aws_s3_bucket.raw_data.arn
} # CloudWatch Log Group
resource "aws_cloudwatch_log_group" "data_validation" { name = "/aws/lambda/${aws_lambda_function.data_validation.function_name}" retention_in_days = 30
} COMMAND_BLOCK:
# CloudTrail logs bucket
resource "aws_s3_bucket" "cloudtrail_logs" { bucket = "${var.project_name}-cloudtrail-logs-${data.aws_caller_identity.current.account_id}"
} resource "aws_s3_bucket_public_access_block" "cloudtrail_logs" { bucket = aws_s3_bucket.cloudtrail_logs.id block_public_acls = true block_public_policy = true ignore_public_acls = true restrict_public_buckets = true
} resource "aws_s3_bucket_policy" "cloudtrail_logs" { bucket = aws_s3_bucket.cloudtrail_logs.id policy = jsonencode({ Version = "2012-10-17" Statement = [ { Sid = "AWSCloudTrailAclCheck" Effect = "Allow" Principal = { Service = "cloudtrail.amazonaws.com" } Action = "s3:GetBucketAcl" Resource = aws_s3_bucket.cloudtrail_logs.arn }, { Sid = "AWSCloudTrailWrite" Effect = "Allow" Principal = { Service = "cloudtrail.amazonaws.com" } Action = "s3:PutObject" Resource = "${aws_s3_bucket.cloudtrail_logs.arn}/*" Condition = { StringEquals = { "s3:x-amz-acl" = "bucket-owner-full-control" } } } ] })
} # CloudTrail
resource "aws_cloudtrail" "data_events" { name = "${var.project_name}-data-trail" s3_bucket_name = aws_s3_bucket.cloudtrail_logs.id include_global_service_events = true is_multi_region_trail = true enable_logging = true enable_log_file_validation = true depends_on = [aws_s3_bucket_policy.cloudtrail_logs] event_selector { read_write_type = "All" include_management_events = true data_resource { type = "AWS::S3::Object" values = [ "${aws_s3_bucket.raw_data.arn}/*", "${aws_s3_bucket.validated_data.arn}/*", "${aws_s3_bucket.model_artifacts.arn}/*" ] } } tags = { Name = "ML Data Trail" Environment = var.environment }
} Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
# CloudTrail logs bucket
resource "aws_s3_bucket" "cloudtrail_logs" { bucket = "${var.project_name}-cloudtrail-logs-${data.aws_caller_identity.current.account_id}"
} resource "aws_s3_bucket_public_access_block" "cloudtrail_logs" { bucket = aws_s3_bucket.cloudtrail_logs.id block_public_acls = true block_public_policy = true ignore_public_acls = true restrict_public_buckets = true
} resource "aws_s3_bucket_policy" "cloudtrail_logs" { bucket = aws_s3_bucket.cloudtrail_logs.id policy = jsonencode({ Version = "2012-10-17" Statement = [ { Sid = "AWSCloudTrailAclCheck" Effect = "Allow" Principal = { Service = "cloudtrail.amazonaws.com" } Action = "s3:GetBucketAcl" Resource = aws_s3_bucket.cloudtrail_logs.arn }, { Sid = "AWSCloudTrailWrite" Effect = "Allow" Principal = { Service = "cloudtrail.amazonaws.com" } Action = "s3:PutObject" Resource = "${aws_s3_bucket.cloudtrail_logs.arn}/*" Condition = { StringEquals = { "s3:x-amz-acl" = "bucket-owner-full-control" } } } ] })
} # CloudTrail
resource "aws_cloudtrail" "data_events" { name = "${var.project_name}-data-trail" s3_bucket_name = aws_s3_bucket.cloudtrail_logs.id include_global_service_events = true is_multi_region_trail = true enable_logging = true enable_log_file_validation = true depends_on = [aws_s3_bucket_policy.cloudtrail_logs] event_selector { read_write_type = "All" include_management_events = true data_resource { type = "AWS::S3::Object" values = [ "${aws_s3_bucket.raw_data.arn}/*", "${aws_s3_bucket.validated_data.arn}/*", "${aws_s3_bucket.model_artifacts.arn}/*" ] } } tags = { Name = "ML Data Trail" Environment = var.environment }
} COMMAND_BLOCK:
# CloudTrail logs bucket
resource "aws_s3_bucket" "cloudtrail_logs" { bucket = "${var.project_name}-cloudtrail-logs-${data.aws_caller_identity.current.account_id}"
} resource "aws_s3_bucket_public_access_block" "cloudtrail_logs" { bucket = aws_s3_bucket.cloudtrail_logs.id block_public_acls = true block_public_policy = true ignore_public_acls = true restrict_public_buckets = true
} resource "aws_s3_bucket_policy" "cloudtrail_logs" { bucket = aws_s3_bucket.cloudtrail_logs.id policy = jsonencode({ Version = "2012-10-17" Statement = [ { Sid = "AWSCloudTrailAclCheck" Effect = "Allow" Principal = { Service = "cloudtrail.amazonaws.com" } Action = "s3:GetBucketAcl" Resource = aws_s3_bucket.cloudtrail_logs.arn }, { Sid = "AWSCloudTrailWrite" Effect = "Allow" Principal = { Service = "cloudtrail.amazonaws.com" } Action = "s3:PutObject" Resource = "${aws_s3_bucket.cloudtrail_logs.arn}/*" Condition = { StringEquals = { "s3:x-amz-acl" = "bucket-owner-full-control" } } } ] })
} # CloudTrail
resource "aws_cloudtrail" "data_events" { name = "${var.project_name}-data-trail" s3_bucket_name = aws_s3_bucket.cloudtrail_logs.id include_global_service_events = true is_multi_region_trail = true enable_logging = true enable_log_file_validation = true depends_on = [aws_s3_bucket_policy.cloudtrail_logs] event_selector { read_write_type = "All" include_management_events = true data_resource { type = "AWS::S3::Object" values = [ "${aws_s3_bucket.raw_data.arn}/*", "${aws_s3_bucket.validated_data.arn}/*", "${aws_s3_bucket.model_artifacts.arn}/*" ] } } tags = { Name = "ML Data Trail" Environment = var.environment }
} COMMAND_BLOCK:
# CloudWatch dashboard
resource "aws_cloudwatch_dashboard" "data_pipeline" { dashboard_name = "${var.project_name}-data-pipeline" dashboard_body = jsonencode({ widgets = [ { type = "metric" x = 0 y = 0 width = 12 height = 6 properties = { metrics = [ ["AWS/Lambda", "Invocations", { stat = "Sum" label = "Lambda Invocations" dimensions = { FunctionName = aws_lambda_function.data_validation.function_name } }], [".", "Errors", { stat = "Sum" label = "Lambda Errors" dimensions = { FunctionName = aws_lambda_function.data_validation.function_name } }], [".", "Duration", { stat = "Average" label = "Avg Duration (ms)" dimensions = { FunctionName = aws_lambda_function.data_validation.function_name } }] ] period = 300 stat = "Average" region = var.aws_region title = "Lambda Metrics" view = "timeSeries" stacked = false } } ] })
} # Alarms
resource "aws_cloudwatch_metric_alarm" "lambda_errors" { alarm_name = "${var.project_name}-lambda-errors" comparison_operator = "GreaterThanThreshold" evaluation_periods = "1" metric_name = "Errors" namespace = "AWS/Lambda" period = "300" statistic = "Sum" threshold = "5" alarm_description = "Lambda function errors" alarm_actions = [aws_sns_topic.validation_notifications.arn] dimensions = { FunctionName = aws_lambda_function.data_validation.function_name }
} Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
# CloudWatch dashboard
resource "aws_cloudwatch_dashboard" "data_pipeline" { dashboard_name = "${var.project_name}-data-pipeline" dashboard_body = jsonencode({ widgets = [ { type = "metric" x = 0 y = 0 width = 12 height = 6 properties = { metrics = [ ["AWS/Lambda", "Invocations", { stat = "Sum" label = "Lambda Invocations" dimensions = { FunctionName = aws_lambda_function.data_validation.function_name } }], [".", "Errors", { stat = "Sum" label = "Lambda Errors" dimensions = { FunctionName = aws_lambda_function.data_validation.function_name } }], [".", "Duration", { stat = "Average" label = "Avg Duration (ms)" dimensions = { FunctionName = aws_lambda_function.data_validation.function_name } }] ] period = 300 stat = "Average" region = var.aws_region title = "Lambda Metrics" view = "timeSeries" stacked = false } } ] })
} # Alarms
resource "aws_cloudwatch_metric_alarm" "lambda_errors" { alarm_name = "${var.project_name}-lambda-errors" comparison_operator = "GreaterThanThreshold" evaluation_periods = "1" metric_name = "Errors" namespace = "AWS/Lambda" period = "300" statistic = "Sum" threshold = "5" alarm_description = "Lambda function errors" alarm_actions = [aws_sns_topic.validation_notifications.arn] dimensions = { FunctionName = aws_lambda_function.data_validation.function_name }
} COMMAND_BLOCK:
# CloudWatch dashboard
resource "aws_cloudwatch_dashboard" "data_pipeline" { dashboard_name = "${var.project_name}-data-pipeline" dashboard_body = jsonencode({ widgets = [ { type = "metric" x = 0 y = 0 width = 12 height = 6 properties = { metrics = [ ["AWS/Lambda", "Invocations", { stat = "Sum" label = "Lambda Invocations" dimensions = { FunctionName = aws_lambda_function.data_validation.function_name } }], [".", "Errors", { stat = "Sum" label = "Lambda Errors" dimensions = { FunctionName = aws_lambda_function.data_validation.function_name } }], [".", "Duration", { stat = "Average" label = "Avg Duration (ms)" dimensions = { FunctionName = aws_lambda_function.data_validation.function_name } }] ] period = 300 stat = "Average" region = var.aws_region title = "Lambda Metrics" view = "timeSeries" stacked = false } } ] })
} # Alarms
resource "aws_cloudwatch_metric_alarm" "lambda_errors" { alarm_name = "${var.project_name}-lambda-errors" comparison_operator = "GreaterThanThreshold" evaluation_periods = "1" metric_name = "Errors" namespace = "AWS/Lambda" period = "300" statistic = "Sum" threshold = "5" alarm_description = "Lambda function errors" alarm_actions = [aws_sns_topic.validation_notifications.arn] dimensions = { FunctionName = aws_lambda_function.data_validation.function_name }
} CODE_BLOCK:
timestamp,feature_1,feature_2,target
2024-01-01T00:00:00,1.5,2.3,0
2024-01-01T01:00:00,1.8,2.1,1
2024-01-01T02:00:00,1.2,2.5,0
2024-01-01T03:00:00,1.9,2.0,1
2024-01-01T04:00:00,1.4,2.4,0 Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
timestamp,feature_1,feature_2,target
2024-01-01T00:00:00,1.5,2.3,0
2024-01-01T01:00:00,1.8,2.1,1
2024-01-01T02:00:00,1.2,2.5,0
2024-01-01T03:00:00,1.9,2.0,1
2024-01-01T04:00:00,1.4,2.4,0 CODE_BLOCK:
timestamp,feature_1,feature_2,target
2024-01-01T00:00:00,1.5,2.3,0
2024-01-01T01:00:00,1.8,2.1,1
2024-01-01T02:00:00,1.2,2.5,0
2024-01-01T03:00:00,1.9,2.0,1
2024-01-01T04:00:00,1.4,2.4,0 COMMAND_BLOCK:
# Upload test file
aws s3 cp test-data/sample.csv \ s3://ml-pipeline-raw-data-dev-YOUR_ACCOUNT_ID/raw/sample.csv \ --server-side-encryption aws:kms Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
# Upload test file
aws s3 cp test-data/sample.csv \ s3://ml-pipeline-raw-data-dev-YOUR_ACCOUNT_ID/raw/sample.csv \ --server-side-encryption aws:kms COMMAND_BLOCK:
# Upload test file
aws s3 cp test-data/sample.csv \ s3://ml-pipeline-raw-data-dev-YOUR_ACCOUNT_ID/raw/sample.csv \ --server-side-encryption aws:kms COMMAND_BLOCK:
# View Lambda logs
aws logs tail /aws/lambda/ml-pipeline-data-validation --follow Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
# View Lambda logs
aws logs tail /aws/lambda/ml-pipeline-data-validation --follow COMMAND_BLOCK:
# View Lambda logs
aws logs tail /aws/lambda/ml-pipeline-data-validation --follow COMMAND_BLOCK:
# Check validated bucket
aws s3 ls s3://ml-pipeline-validated-data-dev-YOUR_ACCOUNT_ID/validated/ # Download validation report
aws s3 cp \ s3://ml-pipeline-raw-data-dev-YOUR_ACCOUNT_ID/reports/sample_report.json \ ./ Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
# Check validated bucket
aws s3 ls s3://ml-pipeline-validated-data-dev-YOUR_ACCOUNT_ID/validated/ # Download validation report
aws s3 cp \ s3://ml-pipeline-raw-data-dev-YOUR_ACCOUNT_ID/reports/sample_report.json \ ./ COMMAND_BLOCK:
# Check validated bucket
aws s3 ls s3://ml-pipeline-validated-data-dev-YOUR_ACCOUNT_ID/validated/ # Download validation report
aws s3 cp \ s3://ml-pipeline-raw-data-dev-YOUR_ACCOUNT_ID/reports/sample_report.json \ ./ CODE_BLOCK:
import boto3 cloudwatch = boto3.client('cloudwatch') def publish_metrics(validation_result): """Publish custom metrics to CloudWatch""" metrics = [ { 'MetricName': 'DataQualityScore', 'Value': 100.0 if validation_result['status'] == 'PASSED' else 0.0, 'Unit': 'Percent' }, { 'MetricName': 'RowCount', 'Value': validation_result['quality_validation']['stats']['row_count'], 'Unit': 'Count' }, { 'MetricName': 'DuplicateRows', 'Value': validation_result['quality_validation']['stats']['duplicate_count'], 'Unit': 'Count' } ] cloudwatch.put_metric_data( Namespace='MLPipeline/DataQuality', MetricData=metrics ) Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
import boto3 cloudwatch = boto3.client('cloudwatch') def publish_metrics(validation_result): """Publish custom metrics to CloudWatch""" metrics = [ { 'MetricName': 'DataQualityScore', 'Value': 100.0 if validation_result['status'] == 'PASSED' else 0.0, 'Unit': 'Percent' }, { 'MetricName': 'RowCount', 'Value': validation_result['quality_validation']['stats']['row_count'], 'Unit': 'Count' }, { 'MetricName': 'DuplicateRows', 'Value': validation_result['quality_validation']['stats']['duplicate_count'], 'Unit': 'Count' } ] cloudwatch.put_metric_data( Namespace='MLPipeline/DataQuality', MetricData=metrics ) CODE_BLOCK:
import boto3 cloudwatch = boto3.client('cloudwatch') def publish_metrics(validation_result): """Publish custom metrics to CloudWatch""" metrics = [ { 'MetricName': 'DataQualityScore', 'Value': 100.0 if validation_result['status'] == 'PASSED' else 0.0, 'Unit': 'Percent' }, { 'MetricName': 'RowCount', 'Value': validation_result['quality_validation']['stats']['row_count'], 'Unit': 'Count' }, { 'MetricName': 'DuplicateRows', 'Value': validation_result['quality_validation']['stats']['duplicate_count'], 'Unit': 'Count' } ] cloudwatch.put_metric_data( Namespace='MLPipeline/DataQuality', MetricData=metrics ) COMMAND_BLOCK:
def create_data_lineage(bucket, key, checksum): """Create data lineage metadata""" metadata = { 'source_file': key, 'checksum': checksum, 'ingestion_time': datetime.utcnow().isoformat(), 'validation_version': '1.0', 'pipeline_version': os.environ.get('PIPELINE_VERSION', 'unknown') } # Store in DynamoDB or S3 s3_client.put_object( Bucket=bucket, Key=f"lineage/{checksum}.json", Body=json.dumps(metadata, indent=2) ) Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
def create_data_lineage(bucket, key, checksum): """Create data lineage metadata""" metadata = { 'source_file': key, 'checksum': checksum, 'ingestion_time': datetime.utcnow().isoformat(), 'validation_version': '1.0', 'pipeline_version': os.environ.get('PIPELINE_VERSION', 'unknown') } # Store in DynamoDB or S3 s3_client.put_object( Bucket=bucket, Key=f"lineage/{checksum}.json", Body=json.dumps(metadata, indent=2) ) COMMAND_BLOCK:
def create_data_lineage(bucket, key, checksum): """Create data lineage metadata""" metadata = { 'source_file': key, 'checksum': checksum, 'ingestion_time': datetime.utcnow().isoformat(), 'validation_version': '1.0', 'pipeline_version': os.environ.get('PIPELINE_VERSION', 'unknown') } # Store in DynamoDB or S3 s3_client.put_object( Bucket=bucket, Key=f"lineage/{checksum}.json", Body=json.dumps(metadata, indent=2) ) COMMAND_BLOCK:
# In terraform/lambda.tf, add to aws_lambda_function resource:
layers = [ "arn:aws:lambda:${var.aws_region}:336392948345:layer:AWSSDKPandas-Python311:1"
] Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
# In terraform/lambda.tf, add to aws_lambda_function resource:
layers = [ "arn:aws:lambda:${var.aws_region}:336392948345:layer:AWSSDKPandas-Python311:1"
] COMMAND_BLOCK:
# In terraform/lambda.tf, add to aws_lambda_function resource:
layers = [ "arn:aws:lambda:${var.aws_region}:336392948345:layer:AWSSDKPandas-Python311:1"
] COMMAND_BLOCK:
# Add to terraform configuration
terraform { backend "s3" { bucket = "your-terraform-state-bucket" key = "ml-pipeline/terraform.tfstate" region = "ap-south-1" dynamodb_table = "terraform-state-lock" }
} Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
# Add to terraform configuration
terraform { backend "s3" { bucket = "your-terraform-state-bucket" key = "ml-pipeline/terraform.tfstate" region = "ap-south-1" dynamodb_table = "terraform-state-lock" }
} COMMAND_BLOCK:
# Add to terraform configuration
terraform { backend "s3" { bucket = "your-terraform-state-bucket" key = "ml-pipeline/terraform.tfstate" region = "ap-south-1" dynamodb_table = "terraform-state-lock" }
} COMMAND_BLOCK:
# Increase timeout in terraform/lambda.tf
timeout = 900 # 15 minutes # Or reduce file size before processing Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
# Increase timeout in terraform/lambda.tf
timeout = 900 # 15 minutes # Or reduce file size before processing COMMAND_BLOCK:
# Increase timeout in terraform/lambda.tf
timeout = 900 # 15 minutes # Or reduce file size before processing COMMAND_BLOCK:
# Check IAM role has KMS permissions
aws iam get-role-policy \ --role-name ml-pipeline-data-validation-lambda \ --policy-name ml-pipeline-data-validation-policy Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
# Check IAM role has KMS permissions
aws iam get-role-policy \ --role-name ml-pipeline-data-validation-lambda \ --policy-name ml-pipeline-data-validation-policy COMMAND_BLOCK:
# Check IAM role has KMS permissions
aws iam get-role-policy \ --role-name ml-pipeline-data-validation-lambda \ --policy-name ml-pipeline-data-validation-policy COMMAND_BLOCK:
# Check validation rules match your data
# View Lambda logs for detailed errors
aws logs tail /aws/lambda/ml-pipeline-data-validation --follow Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
# Check validation rules match your data
# View Lambda logs for detailed errors
aws logs tail /aws/lambda/ml-pipeline-data-validation --follow COMMAND_BLOCK:
# Check validation rules match your data
# View Lambda logs for detailed errors
aws logs tail /aws/lambda/ml-pipeline-data-validation --follow - Encrypted S3 data lake with proper access controls
- Automated data validation pipeline with Lambda
- Data quality monitoring and alerting
- Complete audit trail with CloudTrail
- Infrastructure as Code with Terraform - S3: Encrypted data storage with versioning
- Lambda: Serverless data validation
- KMS: Encryption key management
- CloudTrail: Audit logging
- CloudWatch: Monitoring and alerting
- SNS: Notifications
- Terraform: Infrastructure as Code - Raw Data Bucket - Unvalidated data from sources
- Validated Data Bucket - Quality-checked, ready for training
- Model Artifacts Bucket - Trained models and metadata - Catch schema changes early
- Detect data quality issues
- Ensure consistency
- Create audit trail - Increase min_rows to 100 or higher based on your use case
- Adjust max_null_percentage based on data quality requirements
- Customize column validation rules to match your schema - Set up AWS Budgets and billing alerts
- Monitor CloudWatch costs (logs can grow quickly)
- Use S3 lifecycle policies to manage data retention
- Review CloudTrail data event costs - KMS encryption for all S3 buckets
- Enforce encryption in bucket policies
- TLS in transit (enforced by bucket policy) - Least privilege IAM roles
- No public bucket access
- VPC endpoints for Lambda (optional enhancement) - CloudTrail logging all data access
- CloudWatch logs retention
- SNS notifications for failures - Automated validation
- Schema enforcement
- Quality metrics tracking - Secure, encrypted data storage
- Automated data validation
- Complete audit trail
- Monitoring and alerting
- Infrastructure as Code - Distributed training infrastructure
- Experiment tracking with MLflow
- Model versioning and registry
- Hyperparameter optimization with Spot instances - CI/CD pipelines for automated deployment
- SageMaker endpoints with auto-scaling
- Model monitoring and drift detection
- Production observability and incident response - Automate validation - Don't trust manual checks
- Encrypt everything - KMS at rest, TLS in transit
- Audit all access - CloudTrail is your friend
- Monitor data quality - Track metrics over time
- Use IaC - Terraform makes it reproducible - S3 Encryption Best Practices
- Lambda Best Practices
- CloudTrail Data Events
- Terraform AWS Provider - Part 1: AIDLC Framework Overview - Questions about the data pipeline? Drop a comment below
- Follow me for Part 3 - SageMaker Training
- Like if this helped you build your pipeline
- Share with your team
how-totutorialguidedev.toaimlserverpythonterraform