Scalable ML Training on AWS: SageMaker, Spot Instances and Experiment Tracking

Scalable ML Training on AWS: SageMaker, Spot Instances and Experiment Tracking

Source: Dev.to

Welcome to Part 3! ## The ML Training Problem ## Architecture Overview ## AIDLC Phase 2 Reference Architecture ## Step 1: Custom Training Container ## Why Custom Containers? ## Training Script ## Dockerfile ## Requirements ## Build and Push Container ## Step 2: Verify Data Preparation ## Check Train/Val Split Exists ## If Data Split Missing ## Step 3: SageMaker Training Infrastructure ## Step 4: Training Job Configuration ## Step 5: Local Testing (CRITICAL STEP) ## Why Test Locally? ## 1. Prepare Test Environment ## 2. Run Local Training ## 3. Verify Outputs ## 4. Test Different Hyperparameters ## Common Local Testing Issues ## Step 6: Hyperparameter Tuning ## Step 7: Model Registry Integration ## Step 8: End-to-End Training Pipeline ## Step 9: Cost Optimization ## Spot Instances Strategy ## Instance Type Selection ## Realistic Cost Summary ## Cost Saving Tips: ## Step 10: Monitoring and Alerts ## Testing the Pipeline ## 1. Verify Prerequisites ## 2. Test Locally (REQUIRED) ## 3. Push Container to ECR ## 4. Run Simple Training Job ## 5. Monitor Training ## 6. Run Hyperparameter Tuning (Optional) ## Troubleshooting Guide ## Issue: "No CSV files found in /opt/ml/input/data/training" ## Issue: Training job fails immediately ## Issue: "No module named 'sklearn'" ## Issue: SageMaker can't pull ECR image ## Issue: Spot instance interrupted repeatedly ## Issue: Metric regex not matching ## Issue: Model registration fails ## Issue: High costs unexpectedly ## Security Best Practices ## What's Next? ## Key Takeaways ## Resources ## Let's Connect! ## About the Author ## Shoaibali MirFollow Reading time: ~20-25 minutes Level: Intermediate to Advanced Prerequisites: Docker installed locally, completed Parts 1 & 2 (especially the train/val data split from Part 2, Step 6.5) Series: Part 3 of 4 - Part 1 | Part 2 Important Production Considerations This article demonstrates core SageMaker training concepts with functional code. For production deployments: The code focuses on teaching SageMaker fundamentals—production readiness requires additional hardening covered in recommendations part. In Part 1, we covered the complete AIDLC framework. In Part 2, we built a secure data pipeline with automated validation and prepared train/validation splits. AIDLC Framework Progress: Now it's time for the exciting part: training ML models at scale with SageMaker. What you'll build today: By the end: You'll have a functional training pipeline demonstrating AWS SageMaker best practices within the AIDLC framework. Training models manually doesn't scale. Common issues: Inconsistent environments - "Works on my machine" Lost experiments - Can't reproduce winning model Expensive compute - Burning money on idle GPUs No versioning - Which model is in production? Manual tuning - Hyperparameter search takes forever An automated, tracked, cost-optimized training pipeline that implements AIDLC Phase 2 (Model Development & Training). Here's what we're building for Phase 2: Architecture Note: This implements AIDLC Phase 2 (Model Development & Training), building on the secure data foundation from Phase 1 (Part 2). Model deployment (Phase 4) and monitoring (Phase 5) will be covered in Part 4. SageMaker built-in algorithms are great, but custom containers give you: Create training/train.py: Create training/Dockerfile: Create training/requirements.txt: Prerequisites: You should have completed Part 2, Step 6.5 (data splitting). This step verifies your data is ready for training. If you see "An error occurred (NoSuchKey)", go back to Part 2, Step 6.5 and run either: Option 1: Update your Lambda (automated, recommended) Option 2: Run the manual split script: Important: SageMaker training requires separate train and validation data paths. Without this split, training jobs will fail. Create terraform/sagemaker.tf: Deploy the infrastructure: Create training/training_config.py: ALWAYS TEST LOCALLY FIRST Local testing catches 90% of issues before you spend money on SageMaker. This is the most important step to avoid wasting time and money. Pro Tip: Keep local testing fast by using small datasets (10-100 rows). Once local tests pass, you can confidently run SageMaker with full datasets. Create training/hyperparameter_tuning.py: Create training/model_registry.py: Create pipeline/train_pipeline.py: Monthly Costs (Development - 10 training runs): With Hyperparameter Tuning (4 tuning runs/month): Production Scale (100 training jobs/month): Create terraform/sagemaker-monitoring.tf: Training Security Checklist: Network Security (Recommended for Production) Audit & Compliance (AIDLC Phase 6) In Part 4 (Series Finale), we'll complete the AIDLC framework: Phase 4: Model Deployment Phase 5: Monitoring & Maintenance Phase 6: Compliance (Final) This final part brings all AIDLC phases together into a production-demo-ready ML system on AWS. Remember: Good training pipelines are automated, tested, tracked, cost-efficient, and follow the AIDLC framework for production readiness. What training challenges are you facing? Let me know in the comments! Tags: #aws #machinelearning #mlops #aidlc #sagemaker #devops #python #terraform #docker Templates let you quickly answer FAQs or store snippets for re-use. Are you sure you want to hide this comment? It will become hidden in your post, but will still be visible via the comment's permalink. Hide child comments as well For further actions, you may consider blocking this person and/or reporting abuse COMMAND_BLOCK: import os import json import argparse import numpy as np import pandas as pd import joblib from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score import boto3 # SageMaker environment variables SM_MODEL_DIR = os.environ.get('SM_MODEL_DIR', '/opt/ml/model') SM_CHANNEL_TRAINING = os.environ.get('SM_CHANNEL_TRAINING', '/opt/ml/input/data/training') SM_CHANNEL_VALIDATION = os.environ.get('SM_CHANNEL_VALIDATION', '/opt/ml/input/data/validation') SM_OUTPUT_DATA_DIR = os.environ.get('SM_OUTPUT_DATA_DIR', '/opt/ml/output/data') def setup_mlflow(): """ Configure MLflow tracking - optional for SageMaker Note: MLflow integration from SageMaker requires: - MLflow deployed in same VPC as SageMaker - Proper DNS/ALB endpoint for MLFLOW_TRACKING_URI - For simplicity, this example makes MLflow optional For local development, MLflow works great. For SageMaker production, use SageMaker Experiments instead or deploy MLflow with proper VPC setup. """ mlflow_uri = os.environ.get('MLFLOW_TRACKING_URI') if not mlflow_uri: print("MLflow tracking disabled (MLFLOW_TRACKING_URI not set)") print("Using SageMaker's built-in experiment tracking instead") return False try: import mlflow import mlflow.sklearn mlflow.set_tracking_uri(mlflow_uri) mlflow.set_experiment('sagemaker-training') print(f"MLflow tracking enabled: {mlflow_uri}") return True except Exception as e: print(f"MLflow unavailable: {e}. Continuing without MLflow tracking.") return False def load_data(data_path): """Load training data from S3""" print(f"Loading data from {data_path}") # List all CSV files files = [f for f in os.listdir(data_path) if f.endswith('.csv')] if not files: raise ValueError(f"No CSV files found in {data_path}") # Combine all files dfs = [] for file in files: df = pd.read_csv(os.path.join(data_path, file)) dfs.append(df) data = pd.concat(dfs, ignore_index=True) print(f"Loaded {len(data)} samples with {len(data.columns)} features") return data def prepare_features(data): """Prepare features and target""" # Drop timestamp column if present if 'timestamp' in data.columns: data = data.drop('timestamp', axis=1) # Assuming last column is target X = data.iloc[:, :-1].values y = data.iloc[:, -1].values print(f"Features shape: {X.shape}, Target shape: {y.shape}") return X, y def train_model(X_train, y_train, hyperparameters): """Train Random Forest model""" print("Training Random Forest model...") model = RandomForestClassifier( n_estimators=hyperparameters['n_estimators'], max_depth=hyperparameters['max_depth'], min_samples_split=hyperparameters['min_samples_split'], min_samples_leaf=hyperparameters['min_samples_leaf'], random_state=42, n_jobs=-1 ) model.fit(X_train, y_train) print("Training completed") return model def evaluate_model(model, X_val, y_val): """Evaluate model on validation set""" print("Evaluating model...") y_pred = model.predict(X_val) metrics = { 'accuracy': float(accuracy_score(y_val, y_pred)), 'precision': float(precision_score(y_val, y_pred, average='weighted', zero_division=0)), 'recall': float(recall_score(y_val, y_pred, average='weighted', zero_division=0)), 'f1_score': float(f1_score(y_val, y_pred, average='weighted', zero_division=0)) } # Print as JSON for SageMaker regex metric extraction print(f"Validation Metrics: {json.dumps(metrics)}") return metrics def save_model(model, model_dir): """Save model artifact""" print(f"Saving model to {model_dir}") # Ensure directory exists os.makedirs(model_dir, exist_ok=True) model_path = os.path.join(model_dir, 'model.joblib') joblib.dump(model, model_path) print(f"Model saved to {model_path}") return model_path def save_metrics(metrics, output_dir): """Save metrics for SageMaker""" os.makedirs(output_dir, exist_ok=True) metrics_path = os.path.join(output_dir, 'metrics.json') with open(metrics_path, 'w') as f: json.dump(metrics, f, indent=2) print(f"Metrics saved to {metrics_path}") def main(): """Main training loop""" parser = argparse.ArgumentParser() # Hyperparameters parser.add_argument('--n_estimators', type=int, default=100) parser.add_argument('--max_depth', type=int, default=10) parser.add_argument('--min_samples_split', type=int, default=2) parser.add_argument('--min_samples_leaf', type=int, default=1) args, _ = parser.parse_known_args() hyperparameters = { 'n_estimators': args.n_estimators, 'max_depth': args.max_depth, 'min_samples_split': args.min_samples_split, 'min_samples_leaf': args.min_samples_leaf } print(f"Hyperparameters: {json.dumps(hyperparameters)}") # Setup MLflow (optional - works for local dev, not SageMaker without VPC) mlflow_enabled = setup_mlflow() # Training logic try: # Load data train_data = load_data(SM_CHANNEL_TRAINING) val_data = load_data(SM_CHANNEL_VALIDATION) # Prepare features X_train, y_train = prepare_features(train_data) X_val, y_val = prepare_features(val_data) # Train model model = train_model(X_train, y_train, hyperparameters) # Evaluate model (AIDLC Phase 3: Model Evaluation) metrics = evaluate_model(model, X_val, y_val) # Log to MLflow if enabled (local development only) if mlflow_enabled: import mlflow import mlflow.sklearn with mlflow.start_run(): mlflow.log_params(hyperparameters) mlflow.log_param('train_samples', len(X_train)) mlflow.log_param('val_samples', len(X_val)) mlflow.log_param('n_features', X_train.shape[1]) mlflow.log_metrics(metrics) mlflow.sklearn.log_model(model, "model") # Save model model_path = save_model(model, SM_MODEL_DIR) # Save metrics for SageMaker save_metrics(metrics, SM_OUTPUT_DATA_DIR) print("Training completed successfully!") except Exception as e: print(f"Training failed: {e}") raise if __name__ == '__main__': main() Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: import os import json import argparse import numpy as np import pandas as pd import joblib from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score import boto3 # SageMaker environment variables SM_MODEL_DIR = os.environ.get('SM_MODEL_DIR', '/opt/ml/model') SM_CHANNEL_TRAINING = os.environ.get('SM_CHANNEL_TRAINING', '/opt/ml/input/data/training') SM_CHANNEL_VALIDATION = os.environ.get('SM_CHANNEL_VALIDATION', '/opt/ml/input/data/validation') SM_OUTPUT_DATA_DIR = os.environ.get('SM_OUTPUT_DATA_DIR', '/opt/ml/output/data') def setup_mlflow(): """ Configure MLflow tracking - optional for SageMaker Note: MLflow integration from SageMaker requires: - MLflow deployed in same VPC as SageMaker - Proper DNS/ALB endpoint for MLFLOW_TRACKING_URI - For simplicity, this example makes MLflow optional For local development, MLflow works great. For SageMaker production, use SageMaker Experiments instead or deploy MLflow with proper VPC setup. """ mlflow_uri = os.environ.get('MLFLOW_TRACKING_URI') if not mlflow_uri: print("MLflow tracking disabled (MLFLOW_TRACKING_URI not set)") print("Using SageMaker's built-in experiment tracking instead") return False try: import mlflow import mlflow.sklearn mlflow.set_tracking_uri(mlflow_uri) mlflow.set_experiment('sagemaker-training') print(f"MLflow tracking enabled: {mlflow_uri}") return True except Exception as e: print(f"MLflow unavailable: {e}. Continuing without MLflow tracking.") return False def load_data(data_path): """Load training data from S3""" print(f"Loading data from {data_path}") # List all CSV files files = [f for f in os.listdir(data_path) if f.endswith('.csv')] if not files: raise ValueError(f"No CSV files found in {data_path}") # Combine all files dfs = [] for file in files: df = pd.read_csv(os.path.join(data_path, file)) dfs.append(df) data = pd.concat(dfs, ignore_index=True) print(f"Loaded {len(data)} samples with {len(data.columns)} features") return data def prepare_features(data): """Prepare features and target""" # Drop timestamp column if present if 'timestamp' in data.columns: data = data.drop('timestamp', axis=1) # Assuming last column is target X = data.iloc[:, :-1].values y = data.iloc[:, -1].values print(f"Features shape: {X.shape}, Target shape: {y.shape}") return X, y def train_model(X_train, y_train, hyperparameters): """Train Random Forest model""" print("Training Random Forest model...") model = RandomForestClassifier( n_estimators=hyperparameters['n_estimators'], max_depth=hyperparameters['max_depth'], min_samples_split=hyperparameters['min_samples_split'], min_samples_leaf=hyperparameters['min_samples_leaf'], random_state=42, n_jobs=-1 ) model.fit(X_train, y_train) print("Training completed") return model def evaluate_model(model, X_val, y_val): """Evaluate model on validation set""" print("Evaluating model...") y_pred = model.predict(X_val) metrics = { 'accuracy': float(accuracy_score(y_val, y_pred)), 'precision': float(precision_score(y_val, y_pred, average='weighted', zero_division=0)), 'recall': float(recall_score(y_val, y_pred, average='weighted', zero_division=0)), 'f1_score': float(f1_score(y_val, y_pred, average='weighted', zero_division=0)) } # Print as JSON for SageMaker regex metric extraction print(f"Validation Metrics: {json.dumps(metrics)}") return metrics def save_model(model, model_dir): """Save model artifact""" print(f"Saving model to {model_dir}") # Ensure directory exists os.makedirs(model_dir, exist_ok=True) model_path = os.path.join(model_dir, 'model.joblib') joblib.dump(model, model_path) print(f"Model saved to {model_path}") return model_path def save_metrics(metrics, output_dir): """Save metrics for SageMaker""" os.makedirs(output_dir, exist_ok=True) metrics_path = os.path.join(output_dir, 'metrics.json') with open(metrics_path, 'w') as f: json.dump(metrics, f, indent=2) print(f"Metrics saved to {metrics_path}") def main(): """Main training loop""" parser = argparse.ArgumentParser() # Hyperparameters parser.add_argument('--n_estimators', type=int, default=100) parser.add_argument('--max_depth', type=int, default=10) parser.add_argument('--min_samples_split', type=int, default=2) parser.add_argument('--min_samples_leaf', type=int, default=1) args, _ = parser.parse_known_args() hyperparameters = { 'n_estimators': args.n_estimators, 'max_depth': args.max_depth, 'min_samples_split': args.min_samples_split, 'min_samples_leaf': args.min_samples_leaf } print(f"Hyperparameters: {json.dumps(hyperparameters)}") # Setup MLflow (optional - works for local dev, not SageMaker without VPC) mlflow_enabled = setup_mlflow() # Training logic try: # Load data train_data = load_data(SM_CHANNEL_TRAINING) val_data = load_data(SM_CHANNEL_VALIDATION) # Prepare features X_train, y_train = prepare_features(train_data) X_val, y_val = prepare_features(val_data) # Train model model = train_model(X_train, y_train, hyperparameters) # Evaluate model (AIDLC Phase 3: Model Evaluation) metrics = evaluate_model(model, X_val, y_val) # Log to MLflow if enabled (local development only) if mlflow_enabled: import mlflow import mlflow.sklearn with mlflow.start_run(): mlflow.log_params(hyperparameters) mlflow.log_param('train_samples', len(X_train)) mlflow.log_param('val_samples', len(X_val)) mlflow.log_param('n_features', X_train.shape[1]) mlflow.log_metrics(metrics) mlflow.sklearn.log_model(model, "model") # Save model model_path = save_model(model, SM_MODEL_DIR) # Save metrics for SageMaker save_metrics(metrics, SM_OUTPUT_DATA_DIR) print("Training completed successfully!") except Exception as e: print(f"Training failed: {e}") raise if __name__ == '__main__': main() COMMAND_BLOCK: import os import json import argparse import numpy as np import pandas as pd import joblib from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score import boto3 # SageMaker environment variables SM_MODEL_DIR = os.environ.get('SM_MODEL_DIR', '/opt/ml/model') SM_CHANNEL_TRAINING = os.environ.get('SM_CHANNEL_TRAINING', '/opt/ml/input/data/training') SM_CHANNEL_VALIDATION = os.environ.get('SM_CHANNEL_VALIDATION', '/opt/ml/input/data/validation') SM_OUTPUT_DATA_DIR = os.environ.get('SM_OUTPUT_DATA_DIR', '/opt/ml/output/data') def setup_mlflow(): """ Configure MLflow tracking - optional for SageMaker Note: MLflow integration from SageMaker requires: - MLflow deployed in same VPC as SageMaker - Proper DNS/ALB endpoint for MLFLOW_TRACKING_URI - For simplicity, this example makes MLflow optional For local development, MLflow works great. For SageMaker production, use SageMaker Experiments instead or deploy MLflow with proper VPC setup. """ mlflow_uri = os.environ.get('MLFLOW_TRACKING_URI') if not mlflow_uri: print("MLflow tracking disabled (MLFLOW_TRACKING_URI not set)") print("Using SageMaker's built-in experiment tracking instead") return False try: import mlflow import mlflow.sklearn mlflow.set_tracking_uri(mlflow_uri) mlflow.set_experiment('sagemaker-training') print(f"MLflow tracking enabled: {mlflow_uri}") return True except Exception as e: print(f"MLflow unavailable: {e}. Continuing without MLflow tracking.") return False def load_data(data_path): """Load training data from S3""" print(f"Loading data from {data_path}") # List all CSV files files = [f for f in os.listdir(data_path) if f.endswith('.csv')] if not files: raise ValueError(f"No CSV files found in {data_path}") # Combine all files dfs = [] for file in files: df = pd.read_csv(os.path.join(data_path, file)) dfs.append(df) data = pd.concat(dfs, ignore_index=True) print(f"Loaded {len(data)} samples with {len(data.columns)} features") return data def prepare_features(data): """Prepare features and target""" # Drop timestamp column if present if 'timestamp' in data.columns: data = data.drop('timestamp', axis=1) # Assuming last column is target X = data.iloc[:, :-1].values y = data.iloc[:, -1].values print(f"Features shape: {X.shape}, Target shape: {y.shape}") return X, y def train_model(X_train, y_train, hyperparameters): """Train Random Forest model""" print("Training Random Forest model...") model = RandomForestClassifier( n_estimators=hyperparameters['n_estimators'], max_depth=hyperparameters['max_depth'], min_samples_split=hyperparameters['min_samples_split'], min_samples_leaf=hyperparameters['min_samples_leaf'], random_state=42, n_jobs=-1 ) model.fit(X_train, y_train) print("Training completed") return model def evaluate_model(model, X_val, y_val): """Evaluate model on validation set""" print("Evaluating model...") y_pred = model.predict(X_val) metrics = { 'accuracy': float(accuracy_score(y_val, y_pred)), 'precision': float(precision_score(y_val, y_pred, average='weighted', zero_division=0)), 'recall': float(recall_score(y_val, y_pred, average='weighted', zero_division=0)), 'f1_score': float(f1_score(y_val, y_pred, average='weighted', zero_division=0)) } # Print as JSON for SageMaker regex metric extraction print(f"Validation Metrics: {json.dumps(metrics)}") return metrics def save_model(model, model_dir): """Save model artifact""" print(f"Saving model to {model_dir}") # Ensure directory exists os.makedirs(model_dir, exist_ok=True) model_path = os.path.join(model_dir, 'model.joblib') joblib.dump(model, model_path) print(f"Model saved to {model_path}") return model_path def save_metrics(metrics, output_dir): """Save metrics for SageMaker""" os.makedirs(output_dir, exist_ok=True) metrics_path = os.path.join(output_dir, 'metrics.json') with open(metrics_path, 'w') as f: json.dump(metrics, f, indent=2) print(f"Metrics saved to {metrics_path}") def main(): """Main training loop""" parser = argparse.ArgumentParser() # Hyperparameters parser.add_argument('--n_estimators', type=int, default=100) parser.add_argument('--max_depth', type=int, default=10) parser.add_argument('--min_samples_split', type=int, default=2) parser.add_argument('--min_samples_leaf', type=int, default=1) args, _ = parser.parse_known_args() hyperparameters = { 'n_estimators': args.n_estimators, 'max_depth': args.max_depth, 'min_samples_split': args.min_samples_split, 'min_samples_leaf': args.min_samples_leaf } print(f"Hyperparameters: {json.dumps(hyperparameters)}") # Setup MLflow (optional - works for local dev, not SageMaker without VPC) mlflow_enabled = setup_mlflow() # Training logic try: # Load data train_data = load_data(SM_CHANNEL_TRAINING) val_data = load_data(SM_CHANNEL_VALIDATION) # Prepare features X_train, y_train = prepare_features(train_data) X_val, y_val = prepare_features(val_data) # Train model model = train_model(X_train, y_train, hyperparameters) # Evaluate model (AIDLC Phase 3: Model Evaluation) metrics = evaluate_model(model, X_val, y_val) # Log to MLflow if enabled (local development only) if mlflow_enabled: import mlflow import mlflow.sklearn with mlflow.start_run(): mlflow.log_params(hyperparameters) mlflow.log_param('train_samples', len(X_train)) mlflow.log_param('val_samples', len(X_val)) mlflow.log_param('n_features', X_train.shape[1]) mlflow.log_metrics(metrics) mlflow.sklearn.log_model(model, "model") # Save model model_path = save_model(model, SM_MODEL_DIR) # Save metrics for SageMaker save_metrics(metrics, SM_OUTPUT_DATA_DIR) print("Training completed successfully!") except Exception as e: print(f"Training failed: {e}") raise if __name__ == '__main__': main() COMMAND_BLOCK: FROM python:3.11-slim # Install system dependencies RUN apt-get update && apt-get install -y \ build-essential \ curl \ && rm -rf /var/lib/apt/lists/* # Set working directory WORKDIR /opt/ml/code # Copy requirements COPY requirements.txt . # Install Python dependencies RUN pip install --no-cache-dir -r requirements.txt # Copy training script COPY train.py . # Set environment variables ENV PYTHONUNBUFFERED=1 ENV SAGEMAKER_PROGRAM=train.py # Entry point ENTRYPOINT ["python", "train.py"] Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: FROM python:3.11-slim # Install system dependencies RUN apt-get update && apt-get install -y \ build-essential \ curl \ && rm -rf /var/lib/apt/lists/* # Set working directory WORKDIR /opt/ml/code # Copy requirements COPY requirements.txt . # Install Python dependencies RUN pip install --no-cache-dir -r requirements.txt # Copy training script COPY train.py . # Set environment variables ENV PYTHONUNBUFFERED=1 ENV SAGEMAKER_PROGRAM=train.py # Entry point ENTRYPOINT ["python", "train.py"] COMMAND_BLOCK: FROM python:3.11-slim # Install system dependencies RUN apt-get update && apt-get install -y \ build-essential \ curl \ && rm -rf /var/lib/apt/lists/* # Set working directory WORKDIR /opt/ml/code # Copy requirements COPY requirements.txt . # Install Python dependencies RUN pip install --no-cache-dir -r requirements.txt # Copy training script COPY train.py . # Set environment variables ENV PYTHONUNBUFFERED=1 ENV SAGEMAKER_PROGRAM=train.py # Entry point ENTRYPOINT ["python", "train.py"] CODE_BLOCK: scikit-learn==1.3.0 pandas==2.1.0 numpy==1.24.3 joblib==1.3.2 boto3==1.28.85 mlflow==2.7.1 Enter fullscreen mode Exit fullscreen mode CODE_BLOCK: scikit-learn==1.3.0 pandas==2.1.0 numpy==1.24.3 joblib==1.3.2 boto3==1.28.85 mlflow==2.7.1 CODE_BLOCK: scikit-learn==1.3.0 pandas==2.1.0 numpy==1.24.3 joblib==1.3.2 boto3==1.28.85 mlflow==2.7.1 COMMAND_BLOCK: # Set variables export AWS_REGION="ap-south-1" # Change to your region export AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text) export ECR_REPO="${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/ml-training" # Build Docker image cd training docker build -t ml-training:latest . # Tag for ECR docker tag ml-training:latest ${ECR_REPO}:latest # Login to ECR aws ecr get-login-password --region ${AWS_REGION} | \ docker login --username AWS --password-stdin ${ECR_REPO} # Create ECR repository if not exists aws ecr create-repository \ --repository-name ml-training \ --region ${AWS_REGION} \ --image-scanning-configuration scanOnPush=true \ --encryption-configuration encryptionType=KMS || true # Push to ECR docker push ${ECR_REPO}:latest echo "Container pushed to: ${ECR_REPO}:latest" Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # Set variables export AWS_REGION="ap-south-1" # Change to your region export AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text) export ECR_REPO="${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/ml-training" # Build Docker image cd training docker build -t ml-training:latest . # Tag for ECR docker tag ml-training:latest ${ECR_REPO}:latest # Login to ECR aws ecr get-login-password --region ${AWS_REGION} | \ docker login --username AWS --password-stdin ${ECR_REPO} # Create ECR repository if not exists aws ecr create-repository \ --repository-name ml-training \ --region ${AWS_REGION} \ --image-scanning-configuration scanOnPush=true \ --encryption-configuration encryptionType=KMS || true # Push to ECR docker push ${ECR_REPO}:latest echo "Container pushed to: ${ECR_REPO}:latest" COMMAND_BLOCK: # Set variables export AWS_REGION="ap-south-1" # Change to your region export AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text) export ECR_REPO="${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/ml-training" # Build Docker image cd training docker build -t ml-training:latest . # Tag for ECR docker tag ml-training:latest ${ECR_REPO}:latest # Login to ECR aws ecr get-login-password --region ${AWS_REGION} | \ docker login --username AWS --password-stdin ${ECR_REPO} # Create ECR repository if not exists aws ecr create-repository \ --repository-name ml-training \ --region ${AWS_REGION} \ --image-scanning-configuration scanOnPush=true \ --encryption-configuration encryptionType=KMS || true # Push to ECR docker push ${ECR_REPO}:latest echo "Container pushed to: ${ECR_REPO}:latest" COMMAND_BLOCK: # Set your bucket name (from Part 2) export AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text) export VALIDATED_BUCKET="ml-pipeline-validated-data-dev-${AWS_ACCOUNT_ID}" # Verify training data echo "Checking training data..." aws s3 ls s3://${VALIDATED_BUCKET}/validated/train/ # Verify validation data echo "Checking validation data..." aws s3 ls s3://${VALIDATED_BUCKET}/validated/val/ Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # Set your bucket name (from Part 2) export AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text) export VALIDATED_BUCKET="ml-pipeline-validated-data-dev-${AWS_ACCOUNT_ID}" # Verify training data echo "Checking training data..." aws s3 ls s3://${VALIDATED_BUCKET}/validated/train/ # Verify validation data echo "Checking validation data..." aws s3 ls s3://${VALIDATED_BUCKET}/validated/val/ COMMAND_BLOCK: # Set your bucket name (from Part 2) export AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text) export VALIDATED_BUCKET="ml-pipeline-validated-data-dev-${AWS_ACCOUNT_ID}" # Verify training data echo "Checking training data..." aws s3 ls s3://${VALIDATED_BUCKET}/validated/train/ # Verify validation data echo "Checking validation data..." aws s3 ls s3://${VALIDATED_BUCKET}/validated/val/ CODE_BLOCK: Checking training data... 2024-12-27 10:30:45 1234 sample.csv Checking validation data... 2024-12-27 10:30:45 308 sample.csv Enter fullscreen mode Exit fullscreen mode CODE_BLOCK: Checking training data... 2024-12-27 10:30:45 1234 sample.csv Checking validation data... 2024-12-27 10:30:45 308 sample.csv CODE_BLOCK: Checking training data... 2024-12-27 10:30:45 1234 sample.csv Checking validation data... 2024-12-27 10:30:45 308 sample.csv COMMAND_BLOCK: # Quick fix: Manual split python scripts/split_data.py \ ${VALIDATED_BUCKET} \ validated/sample.csv \ validated # Verify again aws s3 ls s3://${VALIDATED_BUCKET}/validated/train/ aws s3 ls s3://${VALIDATED_BUCKET}/validated/val/ Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # Quick fix: Manual split python scripts/split_data.py \ ${VALIDATED_BUCKET} \ validated/sample.csv \ validated # Verify again aws s3 ls s3://${VALIDATED_BUCKET}/validated/train/ aws s3 ls s3://${VALIDATED_BUCKET}/validated/val/ COMMAND_BLOCK: # Quick fix: Manual split python scripts/split_data.py \ ${VALIDATED_BUCKET} \ validated/sample.csv \ validated # Verify again aws s3 ls s3://${VALIDATED_BUCKET}/validated/train/ aws s3 ls s3://${VALIDATED_BUCKET}/validated/val/ COMMAND_BLOCK: # ECR Repository for training images resource "aws_ecr_repository" "ml_training" { name = "${var.project_name}-training" image_tag_mutability = "MUTABLE" image_scanning_configuration { scan_on_push = true } encryption_configuration { encryption_type = "KMS" kms_key = aws_kms_key.data_encryption.arn } tags = { Name = "ML Training Repository" Environment = var.environment } } # ECR repository policy for SageMaker resource "aws_ecr_repository_policy" "ml_training" { repository = aws_ecr_repository.ml_training.name policy = jsonencode({ Version = "2012-10-17" Statement = [ { Sid = "AllowSageMakerPull" Effect = "Allow" Principal = { Service = "sagemaker.amazonaws.com" } Action = [ "ecr:BatchGetImage", "ecr:GetDownloadUrlForLayer", "ecr:BatchCheckLayerAvailability" ] } ] }) } # SageMaker Execution Role resource "aws_iam_role" "sagemaker_execution" { name = "${var.project_name}-sagemaker-execution" assume_role_policy = jsonencode({ Version = "2012-10-17" Statement = [{ Action = "sts:AssumeRole" Effect = "Allow" Principal = { Service = "sagemaker.amazonaws.com" } }] }) } # SageMaker Execution Policy resource "aws_iam_role_policy" "sagemaker_execution" { name = "${var.project_name}-sagemaker-policy" role = aws_iam_role.sagemaker_execution.id policy = jsonencode({ Version = "2012-10-17" Statement = [ { Effect = "Allow" Action = [ "s3:GetObject", "s3:PutObject", "s3:ListBucket" ] Resource = [ aws_s3_bucket.validated_data.arn, "${aws_s3_bucket.validated_data.arn}/*", aws_s3_bucket.model_artifacts.arn, "${aws_s3_bucket.model_artifacts.arn}/*" ] }, { Effect = "Allow" Action = [ "ecr:GetAuthorizationToken", "ecr:BatchCheckLayerAvailability", "ecr:GetDownloadUrlForLayer", "ecr:BatchGetImage" ] Resource = "*" }, { Effect = "Allow" Action = [ "kms:Decrypt", "kms:GenerateDataKey" ] Resource = aws_kms_key.data_encryption.arn }, { Effect = "Allow" Action = [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ] Resource = "arn:aws:logs:*:*:*" }, { Effect = "Allow" Action = [ "cloudwatch:PutMetricData" ] Resource = "*" } ] }) } # SageMaker Model Registry (AIDLC Phase 6: Governance) resource "aws_sagemaker_model_package_group" "ml_models" { model_package_group_name = "${var.project_name}-models" model_package_group_description = "ML model registry for ${var.project_name}" tags = { Name = "ML Model Registry" Environment = var.environment } } Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # ECR Repository for training images resource "aws_ecr_repository" "ml_training" { name = "${var.project_name}-training" image_tag_mutability = "MUTABLE" image_scanning_configuration { scan_on_push = true } encryption_configuration { encryption_type = "KMS" kms_key = aws_kms_key.data_encryption.arn } tags = { Name = "ML Training Repository" Environment = var.environment } } # ECR repository policy for SageMaker resource "aws_ecr_repository_policy" "ml_training" { repository = aws_ecr_repository.ml_training.name policy = jsonencode({ Version = "2012-10-17" Statement = [ { Sid = "AllowSageMakerPull" Effect = "Allow" Principal = { Service = "sagemaker.amazonaws.com" } Action = [ "ecr:BatchGetImage", "ecr:GetDownloadUrlForLayer", "ecr:BatchCheckLayerAvailability" ] } ] }) } # SageMaker Execution Role resource "aws_iam_role" "sagemaker_execution" { name = "${var.project_name}-sagemaker-execution" assume_role_policy = jsonencode({ Version = "2012-10-17" Statement = [{ Action = "sts:AssumeRole" Effect = "Allow" Principal = { Service = "sagemaker.amazonaws.com" } }] }) } # SageMaker Execution Policy resource "aws_iam_role_policy" "sagemaker_execution" { name = "${var.project_name}-sagemaker-policy" role = aws_iam_role.sagemaker_execution.id policy = jsonencode({ Version = "2012-10-17" Statement = [ { Effect = "Allow" Action = [ "s3:GetObject", "s3:PutObject", "s3:ListBucket" ] Resource = [ aws_s3_bucket.validated_data.arn, "${aws_s3_bucket.validated_data.arn}/*", aws_s3_bucket.model_artifacts.arn, "${aws_s3_bucket.model_artifacts.arn}/*" ] }, { Effect = "Allow" Action = [ "ecr:GetAuthorizationToken", "ecr:BatchCheckLayerAvailability", "ecr:GetDownloadUrlForLayer", "ecr:BatchGetImage" ] Resource = "*" }, { Effect = "Allow" Action = [ "kms:Decrypt", "kms:GenerateDataKey" ] Resource = aws_kms_key.data_encryption.arn }, { Effect = "Allow" Action = [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ] Resource = "arn:aws:logs:*:*:*" }, { Effect = "Allow" Action = [ "cloudwatch:PutMetricData" ] Resource = "*" } ] }) } # SageMaker Model Registry (AIDLC Phase 6: Governance) resource "aws_sagemaker_model_package_group" "ml_models" { model_package_group_name = "${var.project_name}-models" model_package_group_description = "ML model registry for ${var.project_name}" tags = { Name = "ML Model Registry" Environment = var.environment } } COMMAND_BLOCK: # ECR Repository for training images resource "aws_ecr_repository" "ml_training" { name = "${var.project_name}-training" image_tag_mutability = "MUTABLE" image_scanning_configuration { scan_on_push = true } encryption_configuration { encryption_type = "KMS" kms_key = aws_kms_key.data_encryption.arn } tags = { Name = "ML Training Repository" Environment = var.environment } } # ECR repository policy for SageMaker resource "aws_ecr_repository_policy" "ml_training" { repository = aws_ecr_repository.ml_training.name policy = jsonencode({ Version = "2012-10-17" Statement = [ { Sid = "AllowSageMakerPull" Effect = "Allow" Principal = { Service = "sagemaker.amazonaws.com" } Action = [ "ecr:BatchGetImage", "ecr:GetDownloadUrlForLayer", "ecr:BatchCheckLayerAvailability" ] } ] }) } # SageMaker Execution Role resource "aws_iam_role" "sagemaker_execution" { name = "${var.project_name}-sagemaker-execution" assume_role_policy = jsonencode({ Version = "2012-10-17" Statement = [{ Action = "sts:AssumeRole" Effect = "Allow" Principal = { Service = "sagemaker.amazonaws.com" } }] }) } # SageMaker Execution Policy resource "aws_iam_role_policy" "sagemaker_execution" { name = "${var.project_name}-sagemaker-policy" role = aws_iam_role.sagemaker_execution.id policy = jsonencode({ Version = "2012-10-17" Statement = [ { Effect = "Allow" Action = [ "s3:GetObject", "s3:PutObject", "s3:ListBucket" ] Resource = [ aws_s3_bucket.validated_data.arn, "${aws_s3_bucket.validated_data.arn}/*", aws_s3_bucket.model_artifacts.arn, "${aws_s3_bucket.model_artifacts.arn}/*" ] }, { Effect = "Allow" Action = [ "ecr:GetAuthorizationToken", "ecr:BatchCheckLayerAvailability", "ecr:GetDownloadUrlForLayer", "ecr:BatchGetImage" ] Resource = "*" }, { Effect = "Allow" Action = [ "kms:Decrypt", "kms:GenerateDataKey" ] Resource = aws_kms_key.data_encryption.arn }, { Effect = "Allow" Action = [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ] Resource = "arn:aws:logs:*:*:*" }, { Effect = "Allow" Action = [ "cloudwatch:PutMetricData" ] Resource = "*" } ] }) } # SageMaker Model Registry (AIDLC Phase 6: Governance) resource "aws_sagemaker_model_package_group" "ml_models" { model_package_group_name = "${var.project_name}-models" model_package_group_description = "ML model registry for ${var.project_name}" tags = { Name = "ML Model Registry" Environment = var.environment } } CODE_BLOCK: cd terraform terraform apply -var="[email protected]" Enter fullscreen mode Exit fullscreen mode CODE_BLOCK: cd terraform terraform apply -var="[email protected]" CODE_BLOCK: cd terraform terraform apply -var="[email protected]" COMMAND_BLOCK: import boto3 import sagemaker from sagemaker.estimator import Estimator from datetime import datetime import os # Configuration PROJECT_NAME = os.environ.get('PROJECT_NAME', 'ml-pipeline') ENVIRONMENT = os.environ.get('ENVIRONMENT', 'dev') AWS_REGION = os.environ.get('AWS_REGION', 'ap-south-1') # Initialize SageMaker session sagemaker_session = sagemaker.Session() account_id = boto3.client('sts').get_caller_identity()['Account'] role = f"arn:aws:iam::{account_id}:role/{PROJECT_NAME}-sagemaker-execution" # ECR image URI image_uri = f"{account_id}.dkr.ecr.{AWS_REGION}.amazonaws.com/{PROJECT_NAME}-training:latest" # S3 paths - properly split data from Part 2 s3_bucket = f"{PROJECT_NAME}-validated-data-{ENVIRONMENT}-{account_id}" s3_output = f"{PROJECT_NAME}-model-artifacts-{ENVIRONMENT}-{account_id}" training_data = f"s3://{s3_bucket}/validated/train/" validation_data = f"s3://{s3_bucket}/validated/val/" output_path = f"s3://{s3_output}/models/" def create_training_job( instance_type='ml.m5.xlarge', instance_count=1, use_spot_instances=True, hyperparameters=None ): """ Create and run a SageMaker training job """ if hyperparameters is None: hyperparameters = { 'n_estimators': 100, 'max_depth': 10, 'min_samples_split': 2, 'min_samples_leaf': 1 } # Create estimator estimator = Estimator( image_uri=image_uri, role=role, instance_count=instance_count, instance_type=instance_type, output_path=output_path, sagemaker_session=sagemaker_session, hyperparameters=hyperparameters, use_spot_instances=use_spot_instances, max_wait=7200 if use_spot_instances else None, # 2 hours max_run=3600, # 1 hour volume_size=30, # GB encrypt_inter_container_traffic=True, enable_network_isolation=False, # Set True for max security tags=[ {'Key': 'Project', 'Value': PROJECT_NAME}, {'Key': 'Environment', 'Value': ENVIRONMENT}, {'Key': 'ManagedBy', 'Value': 'Terraform'} ] ) # Start training job_name = f"{PROJECT_NAME}-{datetime.now().strftime('%Y-%m-%d-%H-%M-%S')}" print(f"Starting training job: {job_name}") print(f"Training data: {training_data}") print(f"Validation data: {validation_data}") estimator.fit( inputs={ 'training': training_data, 'validation': validation_data }, job_name=job_name, wait=True, logs='All' ) return estimator, job_name if __name__ == '__main__': print("Starting SageMaker training job...") # Run training estimator, job_name = create_training_job( instance_type='ml.m5.xlarge', use_spot_instances=True ) print(f"\nTraining job completed: {job_name}") print(f"Model artifacts: {estimator.model_data}") Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: import boto3 import sagemaker from sagemaker.estimator import Estimator from datetime import datetime import os # Configuration PROJECT_NAME = os.environ.get('PROJECT_NAME', 'ml-pipeline') ENVIRONMENT = os.environ.get('ENVIRONMENT', 'dev') AWS_REGION = os.environ.get('AWS_REGION', 'ap-south-1') # Initialize SageMaker session sagemaker_session = sagemaker.Session() account_id = boto3.client('sts').get_caller_identity()['Account'] role = f"arn:aws:iam::{account_id}:role/{PROJECT_NAME}-sagemaker-execution" # ECR image URI image_uri = f"{account_id}.dkr.ecr.{AWS_REGION}.amazonaws.com/{PROJECT_NAME}-training:latest" # S3 paths - properly split data from Part 2 s3_bucket = f"{PROJECT_NAME}-validated-data-{ENVIRONMENT}-{account_id}" s3_output = f"{PROJECT_NAME}-model-artifacts-{ENVIRONMENT}-{account_id}" training_data = f"s3://{s3_bucket}/validated/train/" validation_data = f"s3://{s3_bucket}/validated/val/" output_path = f"s3://{s3_output}/models/" def create_training_job( instance_type='ml.m5.xlarge', instance_count=1, use_spot_instances=True, hyperparameters=None ): """ Create and run a SageMaker training job """ if hyperparameters is None: hyperparameters = { 'n_estimators': 100, 'max_depth': 10, 'min_samples_split': 2, 'min_samples_leaf': 1 } # Create estimator estimator = Estimator( image_uri=image_uri, role=role, instance_count=instance_count, instance_type=instance_type, output_path=output_path, sagemaker_session=sagemaker_session, hyperparameters=hyperparameters, use_spot_instances=use_spot_instances, max_wait=7200 if use_spot_instances else None, # 2 hours max_run=3600, # 1 hour volume_size=30, # GB encrypt_inter_container_traffic=True, enable_network_isolation=False, # Set True for max security tags=[ {'Key': 'Project', 'Value': PROJECT_NAME}, {'Key': 'Environment', 'Value': ENVIRONMENT}, {'Key': 'ManagedBy', 'Value': 'Terraform'} ] ) # Start training job_name = f"{PROJECT_NAME}-{datetime.now().strftime('%Y-%m-%d-%H-%M-%S')}" print(f"Starting training job: {job_name}") print(f"Training data: {training_data}") print(f"Validation data: {validation_data}") estimator.fit( inputs={ 'training': training_data, 'validation': validation_data }, job_name=job_name, wait=True, logs='All' ) return estimator, job_name if __name__ == '__main__': print("Starting SageMaker training job...") # Run training estimator, job_name = create_training_job( instance_type='ml.m5.xlarge', use_spot_instances=True ) print(f"\nTraining job completed: {job_name}") print(f"Model artifacts: {estimator.model_data}") COMMAND_BLOCK: import boto3 import sagemaker from sagemaker.estimator import Estimator from datetime import datetime import os # Configuration PROJECT_NAME = os.environ.get('PROJECT_NAME', 'ml-pipeline') ENVIRONMENT = os.environ.get('ENVIRONMENT', 'dev') AWS_REGION = os.environ.get('AWS_REGION', 'ap-south-1') # Initialize SageMaker session sagemaker_session = sagemaker.Session() account_id = boto3.client('sts').get_caller_identity()['Account'] role = f"arn:aws:iam::{account_id}:role/{PROJECT_NAME}-sagemaker-execution" # ECR image URI image_uri = f"{account_id}.dkr.ecr.{AWS_REGION}.amazonaws.com/{PROJECT_NAME}-training:latest" # S3 paths - properly split data from Part 2 s3_bucket = f"{PROJECT_NAME}-validated-data-{ENVIRONMENT}-{account_id}" s3_output = f"{PROJECT_NAME}-model-artifacts-{ENVIRONMENT}-{account_id}" training_data = f"s3://{s3_bucket}/validated/train/" validation_data = f"s3://{s3_bucket}/validated/val/" output_path = f"s3://{s3_output}/models/" def create_training_job( instance_type='ml.m5.xlarge', instance_count=1, use_spot_instances=True, hyperparameters=None ): """ Create and run a SageMaker training job """ if hyperparameters is None: hyperparameters = { 'n_estimators': 100, 'max_depth': 10, 'min_samples_split': 2, 'min_samples_leaf': 1 } # Create estimator estimator = Estimator( image_uri=image_uri, role=role, instance_count=instance_count, instance_type=instance_type, output_path=output_path, sagemaker_session=sagemaker_session, hyperparameters=hyperparameters, use_spot_instances=use_spot_instances, max_wait=7200 if use_spot_instances else None, # 2 hours max_run=3600, # 1 hour volume_size=30, # GB encrypt_inter_container_traffic=True, enable_network_isolation=False, # Set True for max security tags=[ {'Key': 'Project', 'Value': PROJECT_NAME}, {'Key': 'Environment', 'Value': ENVIRONMENT}, {'Key': 'ManagedBy', 'Value': 'Terraform'} ] ) # Start training job_name = f"{PROJECT_NAME}-{datetime.now().strftime('%Y-%m-%d-%H-%M-%S')}" print(f"Starting training job: {job_name}") print(f"Training data: {training_data}") print(f"Validation data: {validation_data}") estimator.fit( inputs={ 'training': training_data, 'validation': validation_data }, job_name=job_name, wait=True, logs='All' ) return estimator, job_name if __name__ == '__main__': print("Starting SageMaker training job...") # Run training estimator, job_name = create_training_job( instance_type='ml.m5.xlarge', use_spot_instances=True ) print(f"\nTraining job completed: {job_name}") print(f"Model artifacts: {estimator.model_data}") COMMAND_BLOCK: # Build container cd training docker build -t ml-training:test . # Create SageMaker directory structure mkdir -p test-sagemaker/{input/data/training,input/data/validation,model,output/data} # Copy your split test data (from Part 2) # If you don't have test data, create minimal samples cat > test-sagemaker/input/data/training/sample.csv << 'EOF' timestamp,feature_1,feature_2,target 2024-01-01T00:00:00,1.5,2.3,0 2024-01-01T01:00:00,1.8,2.1,1 2024-01-01T02:00:00,1.2,2.5,0 2024-01-01T03:00:00,1.9,2.0,1 2024-01-01T04:00:00,1.4,2.4,0 2024-01-01T05:00:00,1.6,2.2,1 EOF cat > test-sagemaker/input/data/validation/sample.csv << 'EOF' timestamp,feature_1,feature_2,target 2024-01-01T06:00:00,1.7,2.1,0 2024-01-01T07:00:00,1.3,2.6,1 EOF Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # Build container cd training docker build -t ml-training:test . # Create SageMaker directory structure mkdir -p test-sagemaker/{input/data/training,input/data/validation,model,output/data} # Copy your split test data (from Part 2) # If you don't have test data, create minimal samples cat > test-sagemaker/input/data/training/sample.csv << 'EOF' timestamp,feature_1,feature_2,target 2024-01-01T00:00:00,1.5,2.3,0 2024-01-01T01:00:00,1.8,2.1,1 2024-01-01T02:00:00,1.2,2.5,0 2024-01-01T03:00:00,1.9,2.0,1 2024-01-01T04:00:00,1.4,2.4,0 2024-01-01T05:00:00,1.6,2.2,1 EOF cat > test-sagemaker/input/data/validation/sample.csv << 'EOF' timestamp,feature_1,feature_2,target 2024-01-01T06:00:00,1.7,2.1,0 2024-01-01T07:00:00,1.3,2.6,1 EOF COMMAND_BLOCK: # Build container cd training docker build -t ml-training:test . # Create SageMaker directory structure mkdir -p test-sagemaker/{input/data/training,input/data/validation,model,output/data} # Copy your split test data (from Part 2) # If you don't have test data, create minimal samples cat > test-sagemaker/input/data/training/sample.csv << 'EOF' timestamp,feature_1,feature_2,target 2024-01-01T00:00:00,1.5,2.3,0 2024-01-01T01:00:00,1.8,2.1,1 2024-01-01T02:00:00,1.2,2.5,0 2024-01-01T03:00:00,1.9,2.0,1 2024-01-01T04:00:00,1.4,2.4,0 2024-01-01T05:00:00,1.6,2.2,1 EOF cat > test-sagemaker/input/data/validation/sample.csv << 'EOF' timestamp,feature_1,feature_2,target 2024-01-01T06:00:00,1.7,2.1,0 2024-01-01T07:00:00,1.3,2.6,1 EOF COMMAND_BLOCK: # Run container with test data docker run --rm \ -v $(pwd)/test-sagemaker:/opt/ml \ ml-training:test \ --n_estimators 50 \ --max_depth 5 # Expected output: # Hyperparameters: {"n_estimators": 50, "max_depth": 5, ...} # Loading data from /opt/ml/input/data/training # Loaded 6 samples with 4 features # Training Random Forest model... # Training completed # Evaluating model... # Validation Metrics: {"accuracy": 0.5, "precision": 0.5, ...} # Model saved to /opt/ml/model/model.joblib # Training completed successfully! Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # Run container with test data docker run --rm \ -v $(pwd)/test-sagemaker:/opt/ml \ ml-training:test \ --n_estimators 50 \ --max_depth 5 # Expected output: # Hyperparameters: {"n_estimators": 50, "max_depth": 5, ...} # Loading data from /opt/ml/input/data/training # Loaded 6 samples with 4 features # Training Random Forest model... # Training completed # Evaluating model... # Validation Metrics: {"accuracy": 0.5, "precision": 0.5, ...} # Model saved to /opt/ml/model/model.joblib # Training completed successfully! COMMAND_BLOCK: # Run container with test data docker run --rm \ -v $(pwd)/test-sagemaker:/opt/ml \ ml-training:test \ --n_estimators 50 \ --max_depth 5 # Expected output: # Hyperparameters: {"n_estimators": 50, "max_depth": 5, ...} # Loading data from /opt/ml/input/data/training # Loaded 6 samples with 4 features # Training Random Forest model... # Training completed # Evaluating model... # Validation Metrics: {"accuracy": 0.5, "precision": 0.5, ...} # Model saved to /opt/ml/model/model.joblib # Training completed successfully! COMMAND_BLOCK: # Check if model was created ls -lh test-sagemaker/model/ # Should see: model.joblib # Check metrics cat test-sagemaker/output/data/metrics.json # Should see JSON with metrics # Load model to verify it works python3 << 'EOF' import joblib model = joblib.load('test-sagemaker/model/model.joblib') print(f"Model loaded: {type(model)}") print(f"Features: {model.n_features_in_}") EOF Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # Check if model was created ls -lh test-sagemaker/model/ # Should see: model.joblib # Check metrics cat test-sagemaker/output/data/metrics.json # Should see JSON with metrics # Load model to verify it works python3 << 'EOF' import joblib model = joblib.load('test-sagemaker/model/model.joblib') print(f"Model loaded: {type(model)}") print(f"Features: {model.n_features_in_}") EOF COMMAND_BLOCK: # Check if model was created ls -lh test-sagemaker/model/ # Should see: model.joblib # Check metrics cat test-sagemaker/output/data/metrics.json # Should see JSON with metrics # Load model to verify it works python3 << 'EOF' import joblib model = joblib.load('test-sagemaker/model/model.joblib') print(f"Model loaded: {type(model)}") print(f"Features: {model.n_features_in_}") EOF COMMAND_BLOCK: # Clear previous outputs rm -rf test-sagemaker/model/* test-sagemaker/output/* # Test with different hyperparameters docker run --rm \ -v $(pwd)/test-sagemaker:/opt/ml \ ml-training:test \ --n_estimators 100 \ --max_depth 15 \ --min_samples_split 5 # Verify outputs again ls -lh test-sagemaker/model/ cat test-sagemaker/output/data/metrics.json Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # Clear previous outputs rm -rf test-sagemaker/model/* test-sagemaker/output/* # Test with different hyperparameters docker run --rm \ -v $(pwd)/test-sagemaker:/opt/ml \ ml-training:test \ --n_estimators 100 \ --max_depth 15 \ --min_samples_split 5 # Verify outputs again ls -lh test-sagemaker/model/ cat test-sagemaker/output/data/metrics.json COMMAND_BLOCK: # Clear previous outputs rm -rf test-sagemaker/model/* test-sagemaker/output/* # Test with different hyperparameters docker run --rm \ -v $(pwd)/test-sagemaker:/opt/ml \ ml-training:test \ --n_estimators 100 \ --max_depth 15 \ --min_samples_split 5 # Verify outputs again ls -lh test-sagemaker/model/ cat test-sagemaker/output/data/metrics.json COMMAND_BLOCK: import boto3 from sagemaker.tuner import ( HyperparameterTuner, IntegerParameter, ) from sagemaker.estimator import Estimator from training_config import * def create_hyperparameter_tuning_job( max_jobs=20, max_parallel_jobs=2, objective_metric_name='validation:f1_score' ): """ Run hyperparameter tuning with SageMaker """ # Define hyperparameter ranges hyperparameter_ranges = { 'n_estimators': IntegerParameter(50, 200), 'max_depth': IntegerParameter(5, 20), 'min_samples_split': IntegerParameter(2, 10), 'min_samples_leaf': IntegerParameter(1, 5) } # Create base estimator estimator = Estimator( image_uri=image_uri, role=role, instance_count=1, instance_type='ml.m5.xlarge', output_path=output_path, sagemaker_session=sagemaker_session, use_spot_instances=True, max_wait=7200, max_run=3600 ) # Create tuner with regex matching JSON output from train.py tuner = HyperparameterTuner( estimator=estimator, objective_metric_name=objective_metric_name, hyperparameter_ranges=hyperparameter_ranges, metric_definitions=[ {'Name': 'validation:accuracy', 'Regex': r'"accuracy":\s*([0-9\.]+)'}, {'Name': 'validation:precision', 'Regex': r'"precision":\s*([0-9\.]+)'}, {'Name': 'validation:recall', 'Regex': r'"recall":\s*([0-9\.]+)'}, {'Name': 'validation:f1_score', 'Regex': r'"f1_score":\s*([0-9\.]+)'} ], max_jobs=max_jobs, max_parallel_jobs=max_parallel_jobs, objective_type='Maximize', strategy='Bayesian' ) # Start tuning tuning_job_name = f"{PROJECT_NAME}-tuning-{datetime.now().strftime('%Y-%m-%d-%H-%M-%S')}" print(f"Starting hyperparameter tuning: {tuning_job_name}") print(f"Max jobs: {max_jobs}, Max parallel: {max_parallel_jobs}") tuner.fit( inputs={ 'training': training_data, 'validation': validation_data }, job_name=tuning_job_name, wait=True ) return tuner, tuning_job_name if __name__ == '__main__': print("Starting hyperparameter tuning job...") tuner, job_name = create_hyperparameter_tuning_job( max_jobs=20, max_parallel_jobs=2 ) print(f"\nTuning completed: {job_name}") # Get best training job best_job = tuner.best_training_job() print(f" Best training job: {best_job}") # Get best hyperparameters best_params = tuner.best_estimator().hyperparameters() print(f" Best hyperparameters: {best_params}") Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: import boto3 from sagemaker.tuner import ( HyperparameterTuner, IntegerParameter, ) from sagemaker.estimator import Estimator from training_config import * def create_hyperparameter_tuning_job( max_jobs=20, max_parallel_jobs=2, objective_metric_name='validation:f1_score' ): """ Run hyperparameter tuning with SageMaker """ # Define hyperparameter ranges hyperparameter_ranges = { 'n_estimators': IntegerParameter(50, 200), 'max_depth': IntegerParameter(5, 20), 'min_samples_split': IntegerParameter(2, 10), 'min_samples_leaf': IntegerParameter(1, 5) } # Create base estimator estimator = Estimator( image_uri=image_uri, role=role, instance_count=1, instance_type='ml.m5.xlarge', output_path=output_path, sagemaker_session=sagemaker_session, use_spot_instances=True, max_wait=7200, max_run=3600 ) # Create tuner with regex matching JSON output from train.py tuner = HyperparameterTuner( estimator=estimator, objective_metric_name=objective_metric_name, hyperparameter_ranges=hyperparameter_ranges, metric_definitions=[ {'Name': 'validation:accuracy', 'Regex': r'"accuracy":\s*([0-9\.]+)'}, {'Name': 'validation:precision', 'Regex': r'"precision":\s*([0-9\.]+)'}, {'Name': 'validation:recall', 'Regex': r'"recall":\s*([0-9\.]+)'}, {'Name': 'validation:f1_score', 'Regex': r'"f1_score":\s*([0-9\.]+)'} ], max_jobs=max_jobs, max_parallel_jobs=max_parallel_jobs, objective_type='Maximize', strategy='Bayesian' ) # Start tuning tuning_job_name = f"{PROJECT_NAME}-tuning-{datetime.now().strftime('%Y-%m-%d-%H-%M-%S')}" print(f"Starting hyperparameter tuning: {tuning_job_name}") print(f"Max jobs: {max_jobs}, Max parallel: {max_parallel_jobs}") tuner.fit( inputs={ 'training': training_data, 'validation': validation_data }, job_name=tuning_job_name, wait=True ) return tuner, tuning_job_name if __name__ == '__main__': print("Starting hyperparameter tuning job...") tuner, job_name = create_hyperparameter_tuning_job( max_jobs=20, max_parallel_jobs=2 ) print(f"\nTuning completed: {job_name}") # Get best training job best_job = tuner.best_training_job() print(f" Best training job: {best_job}") # Get best hyperparameters best_params = tuner.best_estimator().hyperparameters() print(f" Best hyperparameters: {best_params}") COMMAND_BLOCK: import boto3 from sagemaker.tuner import ( HyperparameterTuner, IntegerParameter, ) from sagemaker.estimator import Estimator from training_config import * def create_hyperparameter_tuning_job( max_jobs=20, max_parallel_jobs=2, objective_metric_name='validation:f1_score' ): """ Run hyperparameter tuning with SageMaker """ # Define hyperparameter ranges hyperparameter_ranges = { 'n_estimators': IntegerParameter(50, 200), 'max_depth': IntegerParameter(5, 20), 'min_samples_split': IntegerParameter(2, 10), 'min_samples_leaf': IntegerParameter(1, 5) } # Create base estimator estimator = Estimator( image_uri=image_uri, role=role, instance_count=1, instance_type='ml.m5.xlarge', output_path=output_path, sagemaker_session=sagemaker_session, use_spot_instances=True, max_wait=7200, max_run=3600 ) # Create tuner with regex matching JSON output from train.py tuner = HyperparameterTuner( estimator=estimator, objective_metric_name=objective_metric_name, hyperparameter_ranges=hyperparameter_ranges, metric_definitions=[ {'Name': 'validation:accuracy', 'Regex': r'"accuracy":\s*([0-9\.]+)'}, {'Name': 'validation:precision', 'Regex': r'"precision":\s*([0-9\.]+)'}, {'Name': 'validation:recall', 'Regex': r'"recall":\s*([0-9\.]+)'}, {'Name': 'validation:f1_score', 'Regex': r'"f1_score":\s*([0-9\.]+)'} ], max_jobs=max_jobs, max_parallel_jobs=max_parallel_jobs, objective_type='Maximize', strategy='Bayesian' ) # Start tuning tuning_job_name = f"{PROJECT_NAME}-tuning-{datetime.now().strftime('%Y-%m-%d-%H-%M-%S')}" print(f"Starting hyperparameter tuning: {tuning_job_name}") print(f"Max jobs: {max_jobs}, Max parallel: {max_parallel_jobs}") tuner.fit( inputs={ 'training': training_data, 'validation': validation_data }, job_name=tuning_job_name, wait=True ) return tuner, tuning_job_name if __name__ == '__main__': print("Starting hyperparameter tuning job...") tuner, job_name = create_hyperparameter_tuning_job( max_jobs=20, max_parallel_jobs=2 ) print(f"\nTuning completed: {job_name}") # Get best training job best_job = tuner.best_training_job() print(f" Best training job: {best_job}") # Get best hyperparameters best_params = tuner.best_estimator().hyperparameters() print(f" Best hyperparameters: {best_params}") COMMAND_BLOCK: import boto3 import json from datetime import datetime import os AWS_REGION = os.environ.get('AWS_REGION', 'ap-south-1') sagemaker_client = boto3.client('sagemaker', region_name=AWS_REGION) def get_metrics_from_training_job(training_job_name): """ Extract metrics from completed training job """ try: response = sagemaker_client.describe_training_job( TrainingJobName=training_job_name ) # Get metrics from training job final_metrics = response.get('FinalMetricDataList', []) metrics = {} for metric in final_metrics: metric_name = metric['MetricName'].replace('validation:', '') metrics[metric_name] = float(metric['Value']) return metrics if metrics else {'accuracy': 0.0, 'f1_score': 0.0} except Exception as e: print(f"Could not fetch metrics: {e}") return {'accuracy': 0.0, 'f1_score': 0.0} def register_model( model_package_group_name, model_data_url, image_uri, metrics, approval_status='PendingManualApproval' ): """ Register model in SageMaker Model Registry (AIDLC Phase 6: Governance) """ model_package_description = f"Model trained on {datetime.now().isoformat()}" # Create model package response = sagemaker_client.create_model_package( ModelPackageGroupName=model_package_group_name, ModelPackageDescription=model_package_description, InferenceSpecification={ 'Containers': [{ 'Image': image_uri, 'ModelDataUrl': model_data_url }], 'SupportedContentTypes': ['text/csv', 'application/json'], 'SupportedResponseMIMETypes': ['application/json'] }, ModelApprovalStatus=approval_status, MetadataProperties={ 'GeneratedBy': 'sagemaker-training-pipeline' }, CustomerMetadataProperties={ 'accuracy': str(metrics.get('accuracy', 0)), 'f1_score': str(metrics.get('f1_score', 0)), 'training_date': datetime.now().isoformat() } ) model_package_arn = response['ModelPackageArn'] print(f"Model registered: {model_package_arn}") return model_package_arn def approve_model(model_package_arn): """ Approve model for production deployment """ sagemaker_client.update_model_package( ModelPackageArn=model_package_arn, ModelApprovalStatus='Approved' ) print(f"Model approved: {model_package_arn}") def list_model_versions(model_package_group_name): """ List all versions of a model """ response = sagemaker_client.list_model_packages( ModelPackageGroupName=model_package_group_name, SortBy='CreationTime', SortOrder='Descending' ) return response['ModelPackageSummaryList'] Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: import boto3 import json from datetime import datetime import os AWS_REGION = os.environ.get('AWS_REGION', 'ap-south-1') sagemaker_client = boto3.client('sagemaker', region_name=AWS_REGION) def get_metrics_from_training_job(training_job_name): """ Extract metrics from completed training job """ try: response = sagemaker_client.describe_training_job( TrainingJobName=training_job_name ) # Get metrics from training job final_metrics = response.get('FinalMetricDataList', []) metrics = {} for metric in final_metrics: metric_name = metric['MetricName'].replace('validation:', '') metrics[metric_name] = float(metric['Value']) return metrics if metrics else {'accuracy': 0.0, 'f1_score': 0.0} except Exception as e: print(f"Could not fetch metrics: {e}") return {'accuracy': 0.0, 'f1_score': 0.0} def register_model( model_package_group_name, model_data_url, image_uri, metrics, approval_status='PendingManualApproval' ): """ Register model in SageMaker Model Registry (AIDLC Phase 6: Governance) """ model_package_description = f"Model trained on {datetime.now().isoformat()}" # Create model package response = sagemaker_client.create_model_package( ModelPackageGroupName=model_package_group_name, ModelPackageDescription=model_package_description, InferenceSpecification={ 'Containers': [{ 'Image': image_uri, 'ModelDataUrl': model_data_url }], 'SupportedContentTypes': ['text/csv', 'application/json'], 'SupportedResponseMIMETypes': ['application/json'] }, ModelApprovalStatus=approval_status, MetadataProperties={ 'GeneratedBy': 'sagemaker-training-pipeline' }, CustomerMetadataProperties={ 'accuracy': str(metrics.get('accuracy', 0)), 'f1_score': str(metrics.get('f1_score', 0)), 'training_date': datetime.now().isoformat() } ) model_package_arn = response['ModelPackageArn'] print(f"Model registered: {model_package_arn}") return model_package_arn def approve_model(model_package_arn): """ Approve model for production deployment """ sagemaker_client.update_model_package( ModelPackageArn=model_package_arn, ModelApprovalStatus='Approved' ) print(f"Model approved: {model_package_arn}") def list_model_versions(model_package_group_name): """ List all versions of a model """ response = sagemaker_client.list_model_packages( ModelPackageGroupName=model_package_group_name, SortBy='CreationTime', SortOrder='Descending' ) return response['ModelPackageSummaryList'] COMMAND_BLOCK: import boto3 import json from datetime import datetime import os AWS_REGION = os.environ.get('AWS_REGION', 'ap-south-1') sagemaker_client = boto3.client('sagemaker', region_name=AWS_REGION) def get_metrics_from_training_job(training_job_name): """ Extract metrics from completed training job """ try: response = sagemaker_client.describe_training_job( TrainingJobName=training_job_name ) # Get metrics from training job final_metrics = response.get('FinalMetricDataList', []) metrics = {} for metric in final_metrics: metric_name = metric['MetricName'].replace('validation:', '') metrics[metric_name] = float(metric['Value']) return metrics if metrics else {'accuracy': 0.0, 'f1_score': 0.0} except Exception as e: print(f"Could not fetch metrics: {e}") return {'accuracy': 0.0, 'f1_score': 0.0} def register_model( model_package_group_name, model_data_url, image_uri, metrics, approval_status='PendingManualApproval' ): """ Register model in SageMaker Model Registry (AIDLC Phase 6: Governance) """ model_package_description = f"Model trained on {datetime.now().isoformat()}" # Create model package response = sagemaker_client.create_model_package( ModelPackageGroupName=model_package_group_name, ModelPackageDescription=model_package_description, InferenceSpecification={ 'Containers': [{ 'Image': image_uri, 'ModelDataUrl': model_data_url }], 'SupportedContentTypes': ['text/csv', 'application/json'], 'SupportedResponseMIMETypes': ['application/json'] }, ModelApprovalStatus=approval_status, MetadataProperties={ 'GeneratedBy': 'sagemaker-training-pipeline' }, CustomerMetadataProperties={ 'accuracy': str(metrics.get('accuracy', 0)), 'f1_score': str(metrics.get('f1_score', 0)), 'training_date': datetime.now().isoformat() } ) model_package_arn = response['ModelPackageArn'] print(f"Model registered: {model_package_arn}") return model_package_arn def approve_model(model_package_arn): """ Approve model for production deployment """ sagemaker_client.update_model_package( ModelPackageArn=model_package_arn, ModelApprovalStatus='Approved' ) print(f"Model approved: {model_package_arn}") def list_model_versions(model_package_group_name): """ List all versions of a model """ response = sagemaker_client.list_model_packages( ModelPackageGroupName=model_package_group_name, SortBy='CreationTime', SortOrder='Descending' ) return response['ModelPackageSummaryList'] COMMAND_BLOCK: #!/usr/bin/env python3 """ Complete training pipeline orchestration with model registry """ import argparse import sys import os # Add parent directory to path sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'training')) from training_config import create_training_job, PROJECT_NAME, image_uri from model_registry import register_model, get_metrics_from_training_job from hyperparameter_tuning import create_hyperparameter_tuning_job def run_simple_training(register=True): """Run a single training job""" print("=" * 60) print("Running Simple Training Job") print("=" * 60) estimator, job_name = create_training_job( instance_type='ml.m5.xlarge', use_spot_instances=True ) print(f"\nTraining completed: {job_name}") print(f"Model artifacts: {estimator.model_data}") # Register model in Model Registry (AIDLC Phase 6: Governance) if register: print("\nRegistering model in SageMaker Model Registry...") try: # Get metrics from training job metrics = get_metrics_from_training_job(job_name) print(f"Model metrics: {metrics}") model_arn = register_model( model_package_group_name=f"{PROJECT_NAME}-models", model_data_url=estimator.model_data, image_uri=image_uri, metrics=metrics, approval_status='PendingManualApproval' ) print(f"\nModel registered successfully!") print(f"Model ARN: {model_arn}") print(f"\n Model requires manual approval before deployment.") print(f" Approve via SageMaker console or using approve_model() function.") except Exception as e: print(f"Model registration failed: {e}") print(" Training completed successfully, but model was not registered.") return estimator def run_hyperparameter_tuning(): """Run hyperparameter tuning""" print("=" * 60) print("Running Hyperparameter Tuning") print("=" * 60) tuner, job_name = create_hyperparameter_tuning_job( max_jobs=20, max_parallel_jobs=2 ) print(f"\nTuning completed: {job_name}") # Get best training job best_job = tuner.best_training_job() print(f" Best training job: {best_job}") # Get best estimator best_estimator = tuner.best_estimator() print(f"Best model artifacts: {best_estimator.model_data}") return tuner def main(): parser = argparse.ArgumentParser(description='Run ML training pipeline') parser.add_argument( '--mode', choices=['simple', 'tuning'], default='simple', help='Training mode: simple or tuning' ) parser.add_argument( '--no-register', action='store_true', help='Skip model registration' ) args = parser.parse_args() try: if args.mode == 'simple': estimator = run_simple_training(register=not args.no_register) else: tuner = run_hyperparameter_tuning() print("\n" + "=" * 60) print("Pipeline completed successfully!") print("=" * 60) except Exception as e: print(f"\nPipeline failed: {e}") sys.exit(1) if __name__ == '__main__': main() Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: #!/usr/bin/env python3 """ Complete training pipeline orchestration with model registry """ import argparse import sys import os # Add parent directory to path sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'training')) from training_config import create_training_job, PROJECT_NAME, image_uri from model_registry import register_model, get_metrics_from_training_job from hyperparameter_tuning import create_hyperparameter_tuning_job def run_simple_training(register=True): """Run a single training job""" print("=" * 60) print("Running Simple Training Job") print("=" * 60) estimator, job_name = create_training_job( instance_type='ml.m5.xlarge', use_spot_instances=True ) print(f"\nTraining completed: {job_name}") print(f"Model artifacts: {estimator.model_data}") # Register model in Model Registry (AIDLC Phase 6: Governance) if register: print("\nRegistering model in SageMaker Model Registry...") try: # Get metrics from training job metrics = get_metrics_from_training_job(job_name) print(f"Model metrics: {metrics}") model_arn = register_model( model_package_group_name=f"{PROJECT_NAME}-models", model_data_url=estimator.model_data, image_uri=image_uri, metrics=metrics, approval_status='PendingManualApproval' ) print(f"\nModel registered successfully!") print(f"Model ARN: {model_arn}") print(f"\n Model requires manual approval before deployment.") print(f" Approve via SageMaker console or using approve_model() function.") except Exception as e: print(f"Model registration failed: {e}") print(" Training completed successfully, but model was not registered.") return estimator def run_hyperparameter_tuning(): """Run hyperparameter tuning""" print("=" * 60) print("Running Hyperparameter Tuning") print("=" * 60) tuner, job_name = create_hyperparameter_tuning_job( max_jobs=20, max_parallel_jobs=2 ) print(f"\nTuning completed: {job_name}") # Get best training job best_job = tuner.best_training_job() print(f" Best training job: {best_job}") # Get best estimator best_estimator = tuner.best_estimator() print(f"Best model artifacts: {best_estimator.model_data}") return tuner def main(): parser = argparse.ArgumentParser(description='Run ML training pipeline') parser.add_argument( '--mode', choices=['simple', 'tuning'], default='simple', help='Training mode: simple or tuning' ) parser.add_argument( '--no-register', action='store_true', help='Skip model registration' ) args = parser.parse_args() try: if args.mode == 'simple': estimator = run_simple_training(register=not args.no_register) else: tuner = run_hyperparameter_tuning() print("\n" + "=" * 60) print("Pipeline completed successfully!") print("=" * 60) except Exception as e: print(f"\nPipeline failed: {e}") sys.exit(1) if __name__ == '__main__': main() COMMAND_BLOCK: #!/usr/bin/env python3 """ Complete training pipeline orchestration with model registry """ import argparse import sys import os # Add parent directory to path sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'training')) from training_config import create_training_job, PROJECT_NAME, image_uri from model_registry import register_model, get_metrics_from_training_job from hyperparameter_tuning import create_hyperparameter_tuning_job def run_simple_training(register=True): """Run a single training job""" print("=" * 60) print("Running Simple Training Job") print("=" * 60) estimator, job_name = create_training_job( instance_type='ml.m5.xlarge', use_spot_instances=True ) print(f"\nTraining completed: {job_name}") print(f"Model artifacts: {estimator.model_data}") # Register model in Model Registry (AIDLC Phase 6: Governance) if register: print("\nRegistering model in SageMaker Model Registry...") try: # Get metrics from training job metrics = get_metrics_from_training_job(job_name) print(f"Model metrics: {metrics}") model_arn = register_model( model_package_group_name=f"{PROJECT_NAME}-models", model_data_url=estimator.model_data, image_uri=image_uri, metrics=metrics, approval_status='PendingManualApproval' ) print(f"\nModel registered successfully!") print(f"Model ARN: {model_arn}") print(f"\n Model requires manual approval before deployment.") print(f" Approve via SageMaker console or using approve_model() function.") except Exception as e: print(f"Model registration failed: {e}") print(" Training completed successfully, but model was not registered.") return estimator def run_hyperparameter_tuning(): """Run hyperparameter tuning""" print("=" * 60) print("Running Hyperparameter Tuning") print("=" * 60) tuner, job_name = create_hyperparameter_tuning_job( max_jobs=20, max_parallel_jobs=2 ) print(f"\nTuning completed: {job_name}") # Get best training job best_job = tuner.best_training_job() print(f" Best training job: {best_job}") # Get best estimator best_estimator = tuner.best_estimator() print(f"Best model artifacts: {best_estimator.model_data}") return tuner def main(): parser = argparse.ArgumentParser(description='Run ML training pipeline') parser.add_argument( '--mode', choices=['simple', 'tuning'], default='simple', help='Training mode: simple or tuning' ) parser.add_argument( '--no-register', action='store_true', help='Skip model registration' ) args = parser.parse_args() try: if args.mode == 'simple': estimator = run_simple_training(register=not args.no_register) else: tuner = run_hyperparameter_tuning() print("\n" + "=" * 60) print("Pipeline completed successfully!") print("=" * 60) except Exception as e: print(f"\nPipeline failed: {e}") sys.exit(1) if __name__ == '__main__': main() COMMAND_BLOCK: # Already configured in training_config.py: estimator = Estimator( # ... other params ... use_spot_instances=True, max_wait=7200, # Maximum wait time (2 hours) max_run=3600, # Maximum training time (1 hour) ) # Spot instances save ~70% on compute costs # Trade-off: Training can be interrupted # Best for: Non-urgent, resumable training jobs Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # Already configured in training_config.py: estimator = Estimator( # ... other params ... use_spot_instances=True, max_wait=7200, # Maximum wait time (2 hours) max_run=3600, # Maximum training time (1 hour) ) # Spot instances save ~70% on compute costs # Trade-off: Training can be interrupted # Best for: Non-urgent, resumable training jobs COMMAND_BLOCK: # Already configured in training_config.py: estimator = Estimator( # ... other params ... use_spot_instances=True, max_wait=7200, # Maximum wait time (2 hours) max_run=3600, # Maximum training time (1 hour) ) # Spot instances save ~70% on compute costs # Trade-off: Training can be interrupted # Best for: Non-urgent, resumable training jobs COMMAND_BLOCK: # Development/Testing instance_type = 'ml.m5.large' # $0.115/hr on-demand # Production Training instance_type = 'ml.m5.xlarge' # $0.23/hr on-demand # GPU Training (deep learning) instance_type = 'ml.p3.2xlarge' # $3.06/hr (1 GPU) # With Spot instances (70% savings): # ml.m5.xlarge: $0.23/hr → ~$0.07/hr with Spot Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # Development/Testing instance_type = 'ml.m5.large' # $0.115/hr on-demand # Production Training instance_type = 'ml.m5.xlarge' # $0.23/hr on-demand # GPU Training (deep learning) instance_type = 'ml.p3.2xlarge' # $3.06/hr (1 GPU) # With Spot instances (70% savings): # ml.m5.xlarge: $0.23/hr → ~$0.07/hr with Spot COMMAND_BLOCK: # Development/Testing instance_type = 'ml.m5.large' # $0.115/hr on-demand # Production Training instance_type = 'ml.m5.xlarge' # $0.23/hr on-demand # GPU Training (deep learning) instance_type = 'ml.p3.2xlarge' # $3.06/hr (1 GPU) # With Spot instances (70% savings): # ml.m5.xlarge: $0.23/hr → ~$0.07/hr with Spot COMMAND_BLOCK: # Enable AWS Cost Explorer aws ce get-cost-and-usage \ --time-period Start=2024-01-01,End=2024-01-31 \ --granularity MONTHLY \ --metrics BlendedCost \ --group-by Type=SERVICE # Set up budget alerts (do this in console or via Terraform) Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # Enable AWS Cost Explorer aws ce get-cost-and-usage \ --time-period Start=2024-01-01,End=2024-01-31 \ --granularity MONTHLY \ --metrics BlendedCost \ --group-by Type=SERVICE # Set up budget alerts (do this in console or via Terraform) COMMAND_BLOCK: # Enable AWS Cost Explorer aws ce get-cost-and-usage \ --time-period Start=2024-01-01,End=2024-01-31 \ --granularity MONTHLY \ --metrics BlendedCost \ --group-by Type=SERVICE # Set up budget alerts (do this in console or via Terraform) COMMAND_BLOCK: # CloudWatch Log Group for SageMaker resource "aws_cloudwatch_log_group" "sagemaker_training" { name = "/aws/sagemaker/TrainingJobs" retention_in_days = 30 kms_key_id = aws_kms_key.data_encryption.arn tags = { Name = "SageMaker Training Logs" Environment = var.environment } } # Training Job Failure Alarm resource "aws_cloudwatch_metric_alarm" "training_failures" { alarm_name = "${var.project_name}-training-failures" comparison_operator = "GreaterThanThreshold" evaluation_periods = "1" metric_name = "TrainingJobsFailed" namespace = "AWS/SageMaker" period = "300" statistic = "Sum" threshold = "1" alarm_description = "Alert when training job fails" alarm_actions = [aws_sns_topic.validation_notifications.arn] treat_missing_data = "notBreaching" } # Training Cost Alert resource "aws_cloudwatch_metric_alarm" "training_cost" { alarm_name = "${var.project_name}-training-cost" comparison_operator = "GreaterThanThreshold" evaluation_periods = "1" metric_name = "EstimatedCharges" namespace = "AWS/Billing" period = "86400" statistic = "Maximum" threshold = "100" # $100/day threshold alarm_description = "Alert when daily training costs exceed $100" alarm_actions = [aws_sns_topic.validation_notifications.arn] dimensions = { Currency = "USD" ServiceName = "AmazonSageMaker" } } # Spot Instance Interruption Alarm resource "aws_cloudwatch_metric_alarm" "spot_interruptions" { alarm_name = "${var.project_name}-spot-interruptions" comparison_operator = "GreaterThanThreshold" evaluation_periods = "1" metric_name = "TrainingJobsStoppedDueToSpotInterruption" namespace = "AWS/SageMaker" period = "300" statistic = "Sum" threshold = "3" alarm_description = "Alert when multiple spot interruptions occur" alarm_actions = [aws_sns_topic.validation_notifications.arn] } Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # CloudWatch Log Group for SageMaker resource "aws_cloudwatch_log_group" "sagemaker_training" { name = "/aws/sagemaker/TrainingJobs" retention_in_days = 30 kms_key_id = aws_kms_key.data_encryption.arn tags = { Name = "SageMaker Training Logs" Environment = var.environment } } # Training Job Failure Alarm resource "aws_cloudwatch_metric_alarm" "training_failures" { alarm_name = "${var.project_name}-training-failures" comparison_operator = "GreaterThanThreshold" evaluation_periods = "1" metric_name = "TrainingJobsFailed" namespace = "AWS/SageMaker" period = "300" statistic = "Sum" threshold = "1" alarm_description = "Alert when training job fails" alarm_actions = [aws_sns_topic.validation_notifications.arn] treat_missing_data = "notBreaching" } # Training Cost Alert resource "aws_cloudwatch_metric_alarm" "training_cost" { alarm_name = "${var.project_name}-training-cost" comparison_operator = "GreaterThanThreshold" evaluation_periods = "1" metric_name = "EstimatedCharges" namespace = "AWS/Billing" period = "86400" statistic = "Maximum" threshold = "100" # $100/day threshold alarm_description = "Alert when daily training costs exceed $100" alarm_actions = [aws_sns_topic.validation_notifications.arn] dimensions = { Currency = "USD" ServiceName = "AmazonSageMaker" } } # Spot Instance Interruption Alarm resource "aws_cloudwatch_metric_alarm" "spot_interruptions" { alarm_name = "${var.project_name}-spot-interruptions" comparison_operator = "GreaterThanThreshold" evaluation_periods = "1" metric_name = "TrainingJobsStoppedDueToSpotInterruption" namespace = "AWS/SageMaker" period = "300" statistic = "Sum" threshold = "3" alarm_description = "Alert when multiple spot interruptions occur" alarm_actions = [aws_sns_topic.validation_notifications.arn] } COMMAND_BLOCK: # CloudWatch Log Group for SageMaker resource "aws_cloudwatch_log_group" "sagemaker_training" { name = "/aws/sagemaker/TrainingJobs" retention_in_days = 30 kms_key_id = aws_kms_key.data_encryption.arn tags = { Name = "SageMaker Training Logs" Environment = var.environment } } # Training Job Failure Alarm resource "aws_cloudwatch_metric_alarm" "training_failures" { alarm_name = "${var.project_name}-training-failures" comparison_operator = "GreaterThanThreshold" evaluation_periods = "1" metric_name = "TrainingJobsFailed" namespace = "AWS/SageMaker" period = "300" statistic = "Sum" threshold = "1" alarm_description = "Alert when training job fails" alarm_actions = [aws_sns_topic.validation_notifications.arn] treat_missing_data = "notBreaching" } # Training Cost Alert resource "aws_cloudwatch_metric_alarm" "training_cost" { alarm_name = "${var.project_name}-training-cost" comparison_operator = "GreaterThanThreshold" evaluation_periods = "1" metric_name = "EstimatedCharges" namespace = "AWS/Billing" period = "86400" statistic = "Maximum" threshold = "100" # $100/day threshold alarm_description = "Alert when daily training costs exceed $100" alarm_actions = [aws_sns_topic.validation_notifications.arn] dimensions = { Currency = "USD" ServiceName = "AmazonSageMaker" } } # Spot Instance Interruption Alarm resource "aws_cloudwatch_metric_alarm" "spot_interruptions" { alarm_name = "${var.project_name}-spot-interruptions" comparison_operator = "GreaterThanThreshold" evaluation_periods = "1" metric_name = "TrainingJobsStoppedDueToSpotInterruption" namespace = "AWS/SageMaker" period = "300" statistic = "Sum" threshold = "3" alarm_description = "Alert when multiple spot interruptions occur" alarm_actions = [aws_sns_topic.validation_notifications.arn] } COMMAND_BLOCK: # Set environment variables export AWS_REGION="ap-south-1" # Your region export AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text) export VALIDATED_BUCKET="ml-pipeline-validated-data-dev-${AWS_ACCOUNT_ID}" # Verify data split exists (from Part 2, Step 6.5) echo "Checking training data..." aws s3 ls s3://${VALIDATED_BUCKET}/validated/train/ echo "Checking validation data..." aws s3 ls s3://${VALIDATED_BUCKET}/validated/val/ # Both should show CSV files. If not, go back to Step 2. Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # Set environment variables export AWS_REGION="ap-south-1" # Your region export AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text) export VALIDATED_BUCKET="ml-pipeline-validated-data-dev-${AWS_ACCOUNT_ID}" # Verify data split exists (from Part 2, Step 6.5) echo "Checking training data..." aws s3 ls s3://${VALIDATED_BUCKET}/validated/train/ echo "Checking validation data..." aws s3 ls s3://${VALIDATED_BUCKET}/validated/val/ # Both should show CSV files. If not, go back to Step 2. COMMAND_BLOCK: # Set environment variables export AWS_REGION="ap-south-1" # Your region export AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text) export VALIDATED_BUCKET="ml-pipeline-validated-data-dev-${AWS_ACCOUNT_ID}" # Verify data split exists (from Part 2, Step 6.5) echo "Checking training data..." aws s3 ls s3://${VALIDATED_BUCKET}/validated/train/ echo "Checking validation data..." aws s3 ls s3://${VALIDATED_BUCKET}/validated/val/ # Both should show CSV files. If not, go back to Step 2. COMMAND_BLOCK: # ALWAYS test locally first! cd training docker build -t ml-training:test . # Prepare test data (if not already done in Step 5) mkdir -p test-sagemaker/{input/data/training,input/data/validation,model,output/data} # Copy test data or create minimal samples (see Step 5) # Run local test docker run --rm \ -v $(pwd)/test-sagemaker:/opt/ml \ ml-training:test \ --n_estimators 50 \ --max_depth 5 # Verify outputs ls -lh test-sagemaker/model/ # Should see model.joblib cat test-sagemaker/output/data/metrics.json # Should see metrics Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # ALWAYS test locally first! cd training docker build -t ml-training:test . # Prepare test data (if not already done in Step 5) mkdir -p test-sagemaker/{input/data/training,input/data/validation,model,output/data} # Copy test data or create minimal samples (see Step 5) # Run local test docker run --rm \ -v $(pwd)/test-sagemaker:/opt/ml \ ml-training:test \ --n_estimators 50 \ --max_depth 5 # Verify outputs ls -lh test-sagemaker/model/ # Should see model.joblib cat test-sagemaker/output/data/metrics.json # Should see metrics COMMAND_BLOCK: # ALWAYS test locally first! cd training docker build -t ml-training:test . # Prepare test data (if not already done in Step 5) mkdir -p test-sagemaker/{input/data/training,input/data/validation,model,output/data} # Copy test data or create minimal samples (see Step 5) # Run local test docker run --rm \ -v $(pwd)/test-sagemaker:/opt/ml \ ml-training:test \ --n_estimators 50 \ --max_depth 5 # Verify outputs ls -lh test-sagemaker/model/ # Should see model.joblib cat test-sagemaker/output/data/metrics.json # Should see metrics COMMAND_BLOCK: # Build and push (see Step 1 for full commands) export ECR_REPO="${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/ml-training" docker build -t ml-training:latest . docker tag ml-training:latest ${ECR_REPO}:latest aws ecr get-login-password --region ${AWS_REGION} | \ docker login --username AWS --password-stdin ${ECR_REPO} docker push ${ECR_REPO}:latest echo "Container available at: ${ECR_REPO}:latest" Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # Build and push (see Step 1 for full commands) export ECR_REPO="${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/ml-training" docker build -t ml-training:latest . docker tag ml-training:latest ${ECR_REPO}:latest aws ecr get-login-password --region ${AWS_REGION} | \ docker login --username AWS --password-stdin ${ECR_REPO} docker push ${ECR_REPO}:latest echo "Container available at: ${ECR_REPO}:latest" COMMAND_BLOCK: # Build and push (see Step 1 for full commands) export ECR_REPO="${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/ml-training" docker build -t ml-training:latest . docker tag ml-training:latest ${ECR_REPO}:latest aws ecr get-login-password --region ${AWS_REGION} | \ docker login --username AWS --password-stdin ${ECR_REPO} docker push ${ECR_REPO}:latest echo "Container available at: ${ECR_REPO}:latest" COMMAND_BLOCK: cd ../pipeline python train_pipeline.py --mode simple # Expected output: # ============================================================ # Running Simple Training Job # ============================================================ # Starting training job: ml-pipeline-2024-12-27-10-30-45 # Training data: s3://ml-pipeline-validated-data-dev-123456789/validated/train/ # Validation data: s3://ml-pipeline-validated-data-dev-123456789/validated/val/ # ... # Training completed: ml-pipeline-2024-12-27-10-30-45 # Model artifacts: s3://ml-pipeline-model-artifacts-dev-123456789/models/... # Registering model in SageMaker Model Registry... # Model registered successfully! Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: cd ../pipeline python train_pipeline.py --mode simple # Expected output: # ============================================================ # Running Simple Training Job # ============================================================ # Starting training job: ml-pipeline-2024-12-27-10-30-45 # Training data: s3://ml-pipeline-validated-data-dev-123456789/validated/train/ # Validation data: s3://ml-pipeline-validated-data-dev-123456789/validated/val/ # ... # Training completed: ml-pipeline-2024-12-27-10-30-45 # Model artifacts: s3://ml-pipeline-model-artifacts-dev-123456789/models/... # Registering model in SageMaker Model Registry... # Model registered successfully! COMMAND_BLOCK: cd ../pipeline python train_pipeline.py --mode simple # Expected output: # ============================================================ # Running Simple Training Job # ============================================================ # Starting training job: ml-pipeline-2024-12-27-10-30-45 # Training data: s3://ml-pipeline-validated-data-dev-123456789/validated/train/ # Validation data: s3://ml-pipeline-validated-data-dev-123456789/validated/val/ # ... # Training completed: ml-pipeline-2024-12-27-10-30-45 # Model artifacts: s3://ml-pipeline-model-artifacts-dev-123456789/models/... # Registering model in SageMaker Model Registry... # Model registered successfully! COMMAND_BLOCK: # Watch CloudWatch logs in real-time aws logs tail /aws/sagemaker/TrainingJobs --follow # In another terminal, check SageMaker console # https://console.aws.amazon.com/sagemaker/home?region=ap-south-1#/jobs # List model registry versions aws sagemaker list-model-packages \ --model-package-group-name ml-pipeline-models Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # Watch CloudWatch logs in real-time aws logs tail /aws/sagemaker/TrainingJobs --follow # In another terminal, check SageMaker console # https://console.aws.amazon.com/sagemaker/home?region=ap-south-1#/jobs # List model registry versions aws sagemaker list-model-packages \ --model-package-group-name ml-pipeline-models COMMAND_BLOCK: # Watch CloudWatch logs in real-time aws logs tail /aws/sagemaker/TrainingJobs --follow # In another terminal, check SageMaker console # https://console.aws.amazon.com/sagemaker/home?region=ap-south-1#/jobs # List model registry versions aws sagemaker list-model-packages \ --model-package-group-name ml-pipeline-models COMMAND_BLOCK: # Only run after successful simple training python train_pipeline.py --mode tuning # This will: # - Start 20 training jobs (2 in parallel) # - Take 2-4 hours with Spot instances # - Find best hyperparameters # - Cost: ~$14 with Spot instances Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # Only run after successful simple training python train_pipeline.py --mode tuning # This will: # - Start 20 training jobs (2 in parallel) # - Take 2-4 hours with Spot instances # - Find best hyperparameters # - Cost: ~$14 with Spot instances COMMAND_BLOCK: # Only run after successful simple training python train_pipeline.py --mode tuning # This will: # - Start 20 training jobs (2 in parallel) # - Take 2-4 hours with Spot instances # - Find best hyperparameters # - Cost: ~$14 with Spot instances COMMAND_BLOCK: # Cause: Train/val split missing or incorrect S3 path # Solution: Verify data split from Part 2 # Check S3 paths aws s3 ls s3://ml-pipeline-validated-data-dev-YOUR_ACCOUNT_ID/validated/train/ aws s3 ls s3://ml-pipeline-validated-data-dev-YOUR_ACCOUNT_ID/validated/val/ # If empty, go back to Part 2, Step 6.5 and run the split script Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # Cause: Train/val split missing or incorrect S3 path # Solution: Verify data split from Part 2 # Check S3 paths aws s3 ls s3://ml-pipeline-validated-data-dev-YOUR_ACCOUNT_ID/validated/train/ aws s3 ls s3://ml-pipeline-validated-data-dev-YOUR_ACCOUNT_ID/validated/val/ # If empty, go back to Part 2, Step 6.5 and run the split script COMMAND_BLOCK: # Cause: Train/val split missing or incorrect S3 path # Solution: Verify data split from Part 2 # Check S3 paths aws s3 ls s3://ml-pipeline-validated-data-dev-YOUR_ACCOUNT_ID/validated/train/ aws s3 ls s3://ml-pipeline-validated-data-dev-YOUR_ACCOUNT_ID/validated/val/ # If empty, go back to Part 2, Step 6.5 and run the split script COMMAND_BLOCK: # Check CloudWatch logs for error aws logs tail /aws/sagemaker/TrainingJobs --follow # Common causes: # 1. Invalid S3 path (train/val not split) → See above # 2. Missing IAM permissions → Check terraform/sagemaker.tf # 3. Container image issues → Test locally first (Step 5) # 4. Incorrect data format → Verify CSV schema matches expected columns Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # Check CloudWatch logs for error aws logs tail /aws/sagemaker/TrainingJobs --follow # Common causes: # 1. Invalid S3 path (train/val not split) → See above # 2. Missing IAM permissions → Check terraform/sagemaker.tf # 3. Container image issues → Test locally first (Step 5) # 4. Incorrect data format → Verify CSV schema matches expected columns COMMAND_BLOCK: # Check CloudWatch logs for error aws logs tail /aws/sagemaker/TrainingJobs --follow # Common causes: # 1. Invalid S3 path (train/val not split) → See above # 2. Missing IAM permissions → Check terraform/sagemaker.tf # 3. Container image issues → Test locally first (Step 5) # 4. Incorrect data format → Verify CSV schema matches expected columns COMMAND_BLOCK: # Cause: Dependencies not installed in container # Solution: Rebuild without cache cd training docker build --no-cache -t ml-training:latest . # Verify dependencies docker run --rm ml-training:latest pip list | grep scikit # Should see: scikit-learn 1.3.0 Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # Cause: Dependencies not installed in container # Solution: Rebuild without cache cd training docker build --no-cache -t ml-training:latest . # Verify dependencies docker run --rm ml-training:latest pip list | grep scikit # Should see: scikit-learn 1.3.0 COMMAND_BLOCK: # Cause: Dependencies not installed in container # Solution: Rebuild without cache cd training docker build --no-cache -t ml-training:latest . # Verify dependencies docker run --rm ml-training:latest pip list | grep scikit # Should see: scikit-learn 1.3.0 COMMAND_BLOCK: # Check ECR repository policy aws ecr get-repository-policy --repository-name ml-training # Should see: "Service": "sagemaker.amazonaws.com" # If not, re-apply Terraform cd terraform terraform apply -target=aws_ecr_repository_policy.ml_training Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # Check ECR repository policy aws ecr get-repository-policy --repository-name ml-training # Should see: "Service": "sagemaker.amazonaws.com" # If not, re-apply Terraform cd terraform terraform apply -target=aws_ecr_repository_policy.ml_training COMMAND_BLOCK: # Check ECR repository policy aws ecr get-repository-policy --repository-name ml-training # Should see: "Service": "sagemaker.amazonaws.com" # If not, re-apply Terraform cd terraform terraform apply -target=aws_ecr_repository_policy.ml_training COMMAND_BLOCK: # Solution 1: Increase max_wait # In training_config.py, change: max_wait=14400 # 4 hours instead of 2 # Solution 2: Use on-demand for critical jobs use_spot_instances=False # Solution 3: Try different instance type instance_type='ml.m5.2xlarge' # Less contentious Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # Solution 1: Increase max_wait # In training_config.py, change: max_wait=14400 # 4 hours instead of 2 # Solution 2: Use on-demand for critical jobs use_spot_instances=False # Solution 3: Try different instance type instance_type='ml.m5.2xlarge' # Less contentious COMMAND_BLOCK: # Solution 1: Increase max_wait # In training_config.py, change: max_wait=14400 # 4 hours instead of 2 # Solution 2: Use on-demand for critical jobs use_spot_instances=False # Solution 3: Try different instance type instance_type='ml.m5.2xlarge' # Less contentious COMMAND_BLOCK: # Check actual log output format aws logs tail /aws/sagemaker/TrainingJobs --follow | grep "Validation Metrics" # Should see JSON format: # Validation Metrics: {"accuracy": 0.95, "f1_score": 0.93, ...} # Verify regex in hyperparameter_tuning.py matches: # r'"accuracy":\s*([0-9\.]+)' # If you see: Validation Metrics: accuracy=0.95 (non-JSON) # Then update train.py to output JSON (already done in our code) Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # Check actual log output format aws logs tail /aws/sagemaker/TrainingJobs --follow | grep "Validation Metrics" # Should see JSON format: # Validation Metrics: {"accuracy": 0.95, "f1_score": 0.93, ...} # Verify regex in hyperparameter_tuning.py matches: # r'"accuracy":\s*([0-9\.]+)' # If you see: Validation Metrics: accuracy=0.95 (non-JSON) # Then update train.py to output JSON (already done in our code) COMMAND_BLOCK: # Check actual log output format aws logs tail /aws/sagemaker/TrainingJobs --follow | grep "Validation Metrics" # Should see JSON format: # Validation Metrics: {"accuracy": 0.95, "f1_score": 0.93, ...} # Verify regex in hyperparameter_tuning.py matches: # r'"accuracy":\s*([0-9\.]+)' # If you see: Validation Metrics: accuracy=0.95 (non-JSON) # Then update train.py to output JSON (already done in our code) COMMAND_BLOCK: # Verify model package group exists aws sagemaker describe-model-package-group \ --model-package-group-name ml-pipeline-models # If not exists, apply Terraform cd terraform terraform apply -target=aws_sagemaker_model_package_group.ml_models Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # Verify model package group exists aws sagemaker describe-model-package-group \ --model-package-group-name ml-pipeline-models # If not exists, apply Terraform cd terraform terraform apply -target=aws_sagemaker_model_package_group.ml_models COMMAND_BLOCK: # Verify model package group exists aws sagemaker describe-model-package-group \ --model-package-group-name ml-pipeline-models # If not exists, apply Terraform cd terraform terraform apply -target=aws_sagemaker_model_package_group.ml_models COMMAND_BLOCK: # Check current month's costs aws ce get-cost-and-usage \ --time-period Start=$(date -d "$(date +%Y-%m-01)" +%Y-%m-%d),End=$(date +%Y-%m-%d) \ --granularity DAILY \ --metrics BlendedCost \ --group-by Type=SERVICE \ --filter file://<(echo '{ "Dimensions": { "Key": "SERVICE", "Values": ["Amazon SageMaker"] } }') # Common causes: # 1. Hyperparameter tuning with too many jobs # 2. Forgot to enable Spot instances # 3. Using expensive instance types (ml.p3.*) # 4. Jobs not terminating (increase max_run timeout) Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: # Check current month's costs aws ce get-cost-and-usage \ --time-period Start=$(date -d "$(date +%Y-%m-01)" +%Y-%m-%d),End=$(date +%Y-%m-%d) \ --granularity DAILY \ --metrics BlendedCost \ --group-by Type=SERVICE \ --filter file://<(echo '{ "Dimensions": { "Key": "SERVICE", "Values": ["Amazon SageMaker"] } }') # Common causes: # 1. Hyperparameter tuning with too many jobs # 2. Forgot to enable Spot instances # 3. Using expensive instance types (ml.p3.*) # 4. Jobs not terminating (increase max_run timeout) COMMAND_BLOCK: # Check current month's costs aws ce get-cost-and-usage \ --time-period Start=$(date -d "$(date +%Y-%m-01)" +%Y-%m-%d),End=$(date +%Y-%m-%d) \ --granularity DAILY \ --metrics BlendedCost \ --group-by Type=SERVICE \ --filter file://<(echo '{ "Dimensions": { "Key": "SERVICE", "Values": ["Amazon SageMaker"] } }') # Common causes: # 1. Hyperparameter tuning with too many jobs # 2. Forgot to enable Spot instances # 3. Using expensive instance types (ml.p3.*) # 4. Jobs not terminating (increase max_run timeout) - MLflow Integration: Requires VPC deployment with proper networking (simplified here for learning) - Data Preparation: Use train/val splits from Part 2 (verification steps included) - Security: Production requires VPC isolation, not shown here to keep focus on SageMaker fundamentals - Cost Management: Monitor usage closely, especially with hyperparameter tuning - Phase 1: Data Collection & Preparation - Phase 6: Governance (CloudTrail, KMS, IAM) - Phase 2: Model Development & Training - Phase 3: Model Evaluation (validation metrics) - Phase 6: Governance (experiment tracking, model versioning) - Phase 4: Model Deployment - Phase 5: Monitoring & Maintenance - Phase 6: Governance (CI/CD, compliance) - SageMaker training infrastructure with custom containers - Experiment tracking with MLflow (optional, local development only) - Model versioning and registry - Cost optimization with Spot instances - Automated hyperparameter tuning - SageMaker Training: Managed training infrastructure - ECR: Container registry for custom images - S3: Data storage and model artifacts (from Part 2) - Spot Instances: 70% cost savings - CloudWatch: Training metrics and logging - SageMaker Model Registry: Model versioning (AIDLC Phase 6) - Full control over dependencies - Any ML framework (PyTorch, TensorFlow, scikit-learn) - Custom preprocessing logic - Integration with your tools - Faster iteration: Seconds vs minutes - Zero AWS costs: No SageMaker charges - Easier debugging: Full Docker logs locally - Quick fixes: Edit code, rebuild, retest immediately - Use Spot instances - 70% savings over on-demand (biggest impact) - S3 lifecycle policies - Transition old models to Glacier after 90 days - Right-size instances - Start with ml.m5.large, upgrade if needed - CloudWatch log retention - Set to 30 days max (7 days for dev) - Delete failed training artifacts - Clean up S3 regularly - Hyperparameter tuning budget - Limit max_jobs to control costs - Stop unused resources - Clean up old model versions - Training data encrypted at rest (S3 KMS) - Inter-container traffic encrypted - Model artifacts encrypted - CloudWatch logs encrypted - IAM role with least privilege - No hard-coded credentials - ECR image scanning enabled - ECR repository policy restricts access - VPC configuration for SageMaker - Private subnets for sensitive workloads - VPC endpoints for S3/CloudWatch - (Not implemented here to keep focus on fundamentals) - CloudWatch logging for all jobs - CloudTrail tracking API calls - Model versioning and lineage - Cost monitoring and alerts - Regular base image updates - Vulnerability scanning on push - Minimal dependencies - Non-root user in container (recommended) - CI/CD Pipeline for automated deployment - SageMaker inference endpoints with auto-scaling - A/B Testing and canary deployments - Model drift detection - Data quality monitoring - Performance degradation alerts - Automated retraining triggers - Complete observability stack - Incident response procedures - Rollback automation - Audit trail completion - Test locally first - Catches 90% of issues before AWS charges (most important!) - Use custom containers - Full control over environment and dependencies - Verify data split - Separate train/val datasets from Part 2 are essential - Optimize costs - Spot instances save 70%, but monitor usage closely - Version models - Model registry (AIDLC Phase 6) enables tracking and governance - Automate tuning - Hyperparameter optimization finds better models faster - Monitor everything - CloudWatch metrics and alarms catch issues early - Follow AIDLC - Each phase builds on the previous for production ML - SageMaker Training - Managed Spot Training - Model Registry - Hyperparameter Tuning - Custom Training Containers - SageMaker Python SDK - Docker Documentation - Part 1: AIDLC Framework - Part 2: Data Pipelines - Questions about training at scale? Drop a comment - Follow for Part 4 - Production Deployment & Monitoring (Final!) - Like if this helped you build your training pipeline - Share with your team/connects