$ ml_inference/
├── handler.py # Lambda entry point
├── model_loader.py # S3 model caching logic
├── feature_extractor.py
├── Dockerfile
└── requirements.txt
ml_inference/
├── handler.py # Lambda entry point
├── model_loader.py # S3 model caching logic
├── feature_extractor.py
├── Dockerfile
└── requirements.txt
ml_inference/
├── handler.py # Lambda entry point
├── model_loader.py # S3 model caching logic
├── feature_extractor.py
├── Dockerfile
└── requirements.txt
FROM public.ecr.aws/lambda/python:3.11 # Copy requirements first for layer caching
COPY requirements.txt .
RUN -weight: 500;">pip -weight: 500;">install --no-cache-dir -r requirements.txt # Copy function code
COPY handler.py model_loader.py feature_extractor.py ./ # Lambda handler entrypoint
CMD ["handler.lambda_handler"]
FROM public.ecr.aws/lambda/python:3.11 # Copy requirements first for layer caching
COPY requirements.txt .
RUN -weight: 500;">pip -weight: 500;">install --no-cache-dir -r requirements.txt # Copy function code
COPY handler.py model_loader.py feature_extractor.py ./ # Lambda handler entrypoint
CMD ["handler.lambda_handler"]
FROM public.ecr.aws/lambda/python:3.11 # Copy requirements first for layer caching
COPY requirements.txt .
RUN -weight: 500;">pip -weight: 500;">install --no-cache-dir -r requirements.txt # Copy function code
COPY handler.py model_loader.py feature_extractor.py ./ # Lambda handler entrypoint
CMD ["handler.lambda_handler"]
scikit-learn==1.4.0
xgboost==2.0.3
numpy==1.26.3
pandas==2.1.4
boto3==1.34.34
joblib==1.3.2
scikit-learn==1.4.0
xgboost==2.0.3
numpy==1.26.3
pandas==2.1.4
boto3==1.34.34
joblib==1.3.2
scikit-learn==1.4.0
xgboost==2.0.3
numpy==1.26.3
pandas==2.1.4
boto3==1.34.34
joblib==1.3.2
import json
import logging
from model_loader import get_model
from feature_extractor import extract_features logger = logging.getLogger()
logger.setLevel(logging.INFO) def lambda_handler(event, context): try: readings = event.get('readings', {}) device_id = event.get('deviceId', 'unknown') # Validate inputs before touching the model required = ['pH', 'turbidity', 'tds', 'temperature'] missing = [f for f in required if f not in readings] if missing: return { 'statusCode': 400, 'body': json.dumps({ 'error': f"Missing fields: {missing}", 'code': 'VALIDATION_ERROR' }) } # Extract features (includes trend calculations) features = extract_features(readings) # Get model — cached in /tmp after first load model = get_model() # Run inference wqi = float(model.predict([features])[0]) confidence = float(model.predict_proba([features]).max()) quality = classify_wqi(wqi) logger.info(f"Inference complete", extra={ 'deviceId': device_id, 'wqi': wqi, 'quality': quality, 'confidence': confidence }) return { 'statusCode': 200, 'body': json.dumps({ 'wqi': round(wqi, 2), 'quality': quality, 'confidence': round(confidence, 4), 'deviceId': device_id }) } except Exception as e: logger.error(f"Inference error: {e}", exc_info=True) return { 'statusCode': 500, 'body': json.dumps({ 'error': 'Inference failed', 'code': 'INFERENCE_ERROR' }) } def classify_wqi(wqi: float) -> str: if wqi >= 90: return 'Excellent' if wqi >= 70: return 'Good' if wqi >= 50: return 'Fair' if wqi >= 25: return 'Poor' return 'Very Poor'
import json
import logging
from model_loader import get_model
from feature_extractor import extract_features logger = logging.getLogger()
logger.setLevel(logging.INFO) def lambda_handler(event, context): try: readings = event.get('readings', {}) device_id = event.get('deviceId', 'unknown') # Validate inputs before touching the model required = ['pH', 'turbidity', 'tds', 'temperature'] missing = [f for f in required if f not in readings] if missing: return { 'statusCode': 400, 'body': json.dumps({ 'error': f"Missing fields: {missing}", 'code': 'VALIDATION_ERROR' }) } # Extract features (includes trend calculations) features = extract_features(readings) # Get model — cached in /tmp after first load model = get_model() # Run inference wqi = float(model.predict([features])[0]) confidence = float(model.predict_proba([features]).max()) quality = classify_wqi(wqi) logger.info(f"Inference complete", extra={ 'deviceId': device_id, 'wqi': wqi, 'quality': quality, 'confidence': confidence }) return { 'statusCode': 200, 'body': json.dumps({ 'wqi': round(wqi, 2), 'quality': quality, 'confidence': round(confidence, 4), 'deviceId': device_id }) } except Exception as e: logger.error(f"Inference error: {e}", exc_info=True) return { 'statusCode': 500, 'body': json.dumps({ 'error': 'Inference failed', 'code': 'INFERENCE_ERROR' }) } def classify_wqi(wqi: float) -> str: if wqi >= 90: return 'Excellent' if wqi >= 70: return 'Good' if wqi >= 50: return 'Fair' if wqi >= 25: return 'Poor' return 'Very Poor'
import json
import logging
from model_loader import get_model
from feature_extractor import extract_features logger = logging.getLogger()
logger.setLevel(logging.INFO) def lambda_handler(event, context): try: readings = event.get('readings', {}) device_id = event.get('deviceId', 'unknown') # Validate inputs before touching the model required = ['pH', 'turbidity', 'tds', 'temperature'] missing = [f for f in required if f not in readings] if missing: return { 'statusCode': 400, 'body': json.dumps({ 'error': f"Missing fields: {missing}", 'code': 'VALIDATION_ERROR' }) } # Extract features (includes trend calculations) features = extract_features(readings) # Get model — cached in /tmp after first load model = get_model() # Run inference wqi = float(model.predict([features])[0]) confidence = float(model.predict_proba([features]).max()) quality = classify_wqi(wqi) logger.info(f"Inference complete", extra={ 'deviceId': device_id, 'wqi': wqi, 'quality': quality, 'confidence': confidence }) return { 'statusCode': 200, 'body': json.dumps({ 'wqi': round(wqi, 2), 'quality': quality, 'confidence': round(confidence, 4), 'deviceId': device_id }) } except Exception as e: logger.error(f"Inference error: {e}", exc_info=True) return { 'statusCode': 500, 'body': json.dumps({ 'error': 'Inference failed', 'code': 'INFERENCE_ERROR' }) } def classify_wqi(wqi: float) -> str: if wqi >= 90: return 'Excellent' if wqi >= 70: return 'Good' if wqi >= 50: return 'Fair' if wqi >= 25: return 'Poor' return 'Very Poor'
import os
import boto3
import joblib
import logging logger = logging.getLogger() MODEL_S3_BUCKET = os.environ['MODEL_BUCKET']
MODEL_S3_KEY = os.environ['MODEL_KEY']
LOCAL_MODEL_PATH = '/tmp/model.joblib' _model_cache = None # Module-level cache — survives across warm invocations def get_model(): global _model_cache if _model_cache is not None: logger.debug("Using in-memory model cache") return _model_cache # Check /tmp first (warm container, model already downloaded) if os.path.exists(LOCAL_MODEL_PATH): logger.info("Loading model from /tmp cache") _model_cache = joblib.load(LOCAL_MODEL_PATH) return _model_cache # Cold -weight: 500;">start — download from S3 logger.info(f"Downloading model from s3://{MODEL_S3_BUCKET}/{MODEL_S3_KEY}") s3 = boto3.client('s3') s3.download_file(MODEL_S3_BUCKET, MODEL_S3_KEY, LOCAL_MODEL_PATH) _model_cache = joblib.load(LOCAL_MODEL_PATH) logger.info("Model loaded and cached") return _model_cache
import os
import boto3
import joblib
import logging logger = logging.getLogger() MODEL_S3_BUCKET = os.environ['MODEL_BUCKET']
MODEL_S3_KEY = os.environ['MODEL_KEY']
LOCAL_MODEL_PATH = '/tmp/model.joblib' _model_cache = None # Module-level cache — survives across warm invocations def get_model(): global _model_cache if _model_cache is not None: logger.debug("Using in-memory model cache") return _model_cache # Check /tmp first (warm container, model already downloaded) if os.path.exists(LOCAL_MODEL_PATH): logger.info("Loading model from /tmp cache") _model_cache = joblib.load(LOCAL_MODEL_PATH) return _model_cache # Cold -weight: 500;">start — download from S3 logger.info(f"Downloading model from s3://{MODEL_S3_BUCKET}/{MODEL_S3_KEY}") s3 = boto3.client('s3') s3.download_file(MODEL_S3_BUCKET, MODEL_S3_KEY, LOCAL_MODEL_PATH) _model_cache = joblib.load(LOCAL_MODEL_PATH) logger.info("Model loaded and cached") return _model_cache
import os
import boto3
import joblib
import logging logger = logging.getLogger() MODEL_S3_BUCKET = os.environ['MODEL_BUCKET']
MODEL_S3_KEY = os.environ['MODEL_KEY']
LOCAL_MODEL_PATH = '/tmp/model.joblib' _model_cache = None # Module-level cache — survives across warm invocations def get_model(): global _model_cache if _model_cache is not None: logger.debug("Using in-memory model cache") return _model_cache # Check /tmp first (warm container, model already downloaded) if os.path.exists(LOCAL_MODEL_PATH): logger.info("Loading model from /tmp cache") _model_cache = joblib.load(LOCAL_MODEL_PATH) return _model_cache # Cold -weight: 500;">start — download from S3 logger.info(f"Downloading model from s3://{MODEL_S3_BUCKET}/{MODEL_S3_KEY}") s3 = boto3.client('s3') s3.download_file(MODEL_S3_BUCKET, MODEL_S3_KEY, LOCAL_MODEL_PATH) _model_cache = joblib.load(LOCAL_MODEL_PATH) logger.info("Model loaded and cached") return _model_cache
# Authenticate Docker with ECR
aws ecr get-login-password --region ap-south-1 | \ -weight: 500;">docker login --username AWS --password-stdin \ 758346259059.dkr.ecr.ap-south-1.amazonaws.com # Build the image
-weight: 500;">docker build -t aquachain-ml-inference . # Tag for ECR
-weight: 500;">docker tag aquachain-ml-inference:latest \ 758346259059.dkr.ecr.ap-south-1.amazonaws.com/aquachain-ml-inference:latest # Push
-weight: 500;">docker push \ 758346259059.dkr.ecr.ap-south-1.amazonaws.com/aquachain-ml-inference:latest
# Authenticate Docker with ECR
aws ecr get-login-password --region ap-south-1 | \ -weight: 500;">docker login --username AWS --password-stdin \ 758346259059.dkr.ecr.ap-south-1.amazonaws.com # Build the image
-weight: 500;">docker build -t aquachain-ml-inference . # Tag for ECR
-weight: 500;">docker tag aquachain-ml-inference:latest \ 758346259059.dkr.ecr.ap-south-1.amazonaws.com/aquachain-ml-inference:latest # Push
-weight: 500;">docker push \ 758346259059.dkr.ecr.ap-south-1.amazonaws.com/aquachain-ml-inference:latest
# Authenticate Docker with ECR
aws ecr get-login-password --region ap-south-1 | \ -weight: 500;">docker login --username AWS --password-stdin \ 758346259059.dkr.ecr.ap-south-1.amazonaws.com # Build the image
-weight: 500;">docker build -t aquachain-ml-inference . # Tag for ECR
-weight: 500;">docker tag aquachain-ml-inference:latest \ 758346259059.dkr.ecr.ap-south-1.amazonaws.com/aquachain-ml-inference:latest # Push
-weight: 500;">docker push \ 758346259059.dkr.ecr.ap-south-1.amazonaws.com/aquachain-ml-inference:latest
aws lambda -weight: 500;">update-function-code \ --function-name aquachain-function-ml-inference-dev \ --image-uri 758346259059.dkr.ecr.ap-south-1.amazonaws.com/aquachain-ml-inference:latest \ --region ap-south-1
aws lambda -weight: 500;">update-function-code \ --function-name aquachain-function-ml-inference-dev \ --image-uri 758346259059.dkr.ecr.ap-south-1.amazonaws.com/aquachain-ml-inference:latest \ --region ap-south-1
aws lambda -weight: 500;">update-function-code \ --function-name aquachain-function-ml-inference-dev \ --image-uri 758346259059.dkr.ecr.ap-south-1.amazonaws.com/aquachain-ml-inference:latest \ --region ap-south-1
from aws_cdk import ( aws_lambda as lambda_, aws_ecr as ecr, aws_iam as iam, Duration
) # Reference existing ECR repo
repo = ecr.Repository.from_repository_name( self, "MLInferenceRepo", "aquachain-ml-inference"
) ml_inference_fn = lambda_.DockerImageFunction( self, "MLInferenceFunction", function_name="aquachain-function-ml-inference-dev", code=lambda_.DockerImageCode.from_ecr( repo, tag_or_digest="latest" ), memory_size=1024, # ML models benefit from more memory timeout=Duration.seconds(30), environment={ "MODEL_BUCKET": "aquachain-models-dev", "MODEL_KEY": "wqi/model_v2.joblib", "LOG_LEVEL": "INFO" }
) # Grant S3 read access for model download
model_bucket.grant_read(ml_inference_fn)
from aws_cdk import ( aws_lambda as lambda_, aws_ecr as ecr, aws_iam as iam, Duration
) # Reference existing ECR repo
repo = ecr.Repository.from_repository_name( self, "MLInferenceRepo", "aquachain-ml-inference"
) ml_inference_fn = lambda_.DockerImageFunction( self, "MLInferenceFunction", function_name="aquachain-function-ml-inference-dev", code=lambda_.DockerImageCode.from_ecr( repo, tag_or_digest="latest" ), memory_size=1024, # ML models benefit from more memory timeout=Duration.seconds(30), environment={ "MODEL_BUCKET": "aquachain-models-dev", "MODEL_KEY": "wqi/model_v2.joblib", "LOG_LEVEL": "INFO" }
) # Grant S3 read access for model download
model_bucket.grant_read(ml_inference_fn)
from aws_cdk import ( aws_lambda as lambda_, aws_ecr as ecr, aws_iam as iam, Duration
) # Reference existing ECR repo
repo = ecr.Repository.from_repository_name( self, "MLInferenceRepo", "aquachain-ml-inference"
) ml_inference_fn = lambda_.DockerImageFunction( self, "MLInferenceFunction", function_name="aquachain-function-ml-inference-dev", code=lambda_.DockerImageCode.from_ecr( repo, tag_or_digest="latest" ), memory_size=1024, # ML models benefit from more memory timeout=Duration.seconds(30), environment={ "MODEL_BUCKET": "aquachain-models-dev", "MODEL_KEY": "wqi/model_v2.joblib", "LOG_LEVEL": "INFO" }
) # Grant S3 read access for model download
model_bucket.grant_read(ml_inference_fn)
# Upload new model version
aws s3 cp model_v3.joblib s3://aquachain-models-dev/wqi/model_v2.joblib # If you need to roll back, just -weight: 500;">update the env var to point at the previous version
aws lambda -weight: 500;">update-function-configuration \ --function-name aquachain-function-ml-inference-dev \ --environment "Variables={MODEL_KEY=wqi/model_v1.joblib,MODEL_BUCKET=aquachain-models-dev}" \ --region ap-south-1
# Upload new model version
aws s3 cp model_v3.joblib s3://aquachain-models-dev/wqi/model_v2.joblib # If you need to roll back, just -weight: 500;">update the env var to point at the previous version
aws lambda -weight: 500;">update-function-configuration \ --function-name aquachain-function-ml-inference-dev \ --environment "Variables={MODEL_KEY=wqi/model_v1.joblib,MODEL_BUCKET=aquachain-models-dev}" \ --region ap-south-1
# Upload new model version
aws s3 cp model_v3.joblib s3://aquachain-models-dev/wqi/model_v2.joblib # If you need to roll back, just -weight: 500;">update the env var to point at the previous version
aws lambda -weight: 500;">update-function-configuration \ --function-name aquachain-function-ml-inference-dev \ --environment "Variables={MODEL_KEY=wqi/model_v1.joblib,MODEL_BUCKET=aquachain-models-dev}" \ --region ap-south-1 - EC2 instance
- Load model at startup
- Serve predictions over HTTP - Paying for compute 24/7
- Even at 3AM when traffic = 0 - Bursts of requests from devices
- Long idle periods - You pay only when your model runs
- No idle infrastructure
- Fully event-driven execution - scikit-learn 1.4.0
- XGBoost 2.0.3
- numpy 1.26.3 + pandas 2.1.4
- Python 3.11
- AWS Lambda (container image)
- Amazon ECR (container registry)
- S3 (model artifact storage) - _model_cache — in-memory, fastest possible, survives as long as the container is warm
- /tmp/model.joblib — survives container reuse even if the Python process restarts - You need ultra-low latency (<50ms)
- You have constant high traffic
- Your model is extremely large (>5GB and slow to load) - low cost, zero idle infrastructure, and production-grade performance.
- If your model doesn’t need to run 24/7, your infrastructure shouldn’t either.